本文将使用 Ktor 为 Android 或 JVM 应用程序构建 WebSocket 客户端。WebSockets 可实现客户端与服务器之间的实时双向通信,因此非常适合聊天、实时更新和数据流等应用。Ktor 提供了在 Kotlin 代码中利用 WebSockets 的便捷方法。
设置依赖关系
首先,将所需的 Ktor 库添加到项目的 build.gradle
文件中:
val ktor_version = "3.0.0-beta-1"
implementation("io.ktor:ktor-client-core:$ktor_version")
implementation("io.ktor:ktor-client-okhttp:$ktor_version")
implementation("io.ktor:ktor-client-websockets:$ktor_version")
implementation("io.ktor:ktor-client-logging:$ktor_version")
这些依赖项提供了 Ktor 客户端的核心功能、用于高效网络通信的 OkHttp 引擎、WebSocket 支持和日志功能。
KtorWebsocketClient 类
现在,深入了解负责管理 WebSocket 连接的 KtorWebsocketClient
类:
class KtorWebsocketClient (
private val url: String,
private val listener: WebsocketEvents,
) {
//... 该类的其余部分
}
此类有三个参数:
url
:要连接的 WebSocket 服务器的 URL。listener
:定义连接事件和接收消息的回调的接口。
WebsocketEvents 接口
WebsocketEvents
接口定义了连接和消息事件的回调:
interface WebsocketEvents {
fun onReceive(data: String)
fun onConnected()
fun onDisconnected(reason: String)
}
该接口允许您实现监听器逻辑,以处理建立连接(onConnected)、接收消息(onReceive)和断开连接(onDisconnected)事件。
创建 WebSocket 客户端
KtorWebsocketClient
类利用了 Ktor 的 HttpClient 和 OkHttp 引擎:
private val client = HttpClient(OkHttp) {
engine {
config {
pingInterval(PING_INTERVAL, TimeUnit.MILLISECONDS)
}
}
install(Logging)
install(WebSockets)
}
我们将 OkHttp 引擎的 pingInterval 设置为 5 秒,以保持与服务器的连接。此外,我们还安装了日志和 WebSockets 功能,用于记录通信细节并启用 WebSocket 支持。
协程设置
该类利用协程进行异步通信。使用 Dispatchers.IO 创建一个 coroutine 作用域,用于执行网络操作:
private val scope = CoroutineScope(Dispatchers.IO) + SupervisorJob() + CoroutineExceptionHandler { _, throwable ->
Log.d(
tag = TAG,
message = "Error: ${throwable.message}",
)
}
SupervisorJob
可确保一个协程中的故障不会影响其他例行程序。CoroutineExceptionHandler
会记录 WebSocket 通信过程中遇到的任何错误。
连接管理
connect
函数建立 WebSocket 连接:
suspend fun connect() {
try {
Log.d(
tag = TAG,
message = "Connecting to websocket at $url..."
)
session = client.webSocketSession(url)
listener.onConnected()
Log.d(
tag = TAG,
message = "Connected to websocket at $url"
)
// ... 接收消息
} catch (e: Exception) {
// ... 处理错误并重新连接
}
}
该函数首先记录连接尝试,然后使用 client.webSocketSession
启动连接。连接成功后,将在监听器上触发 onConnected
回调,并记录连接详情。接下来我们将处理接收消息和出错情况。
接收消息
connect
函数继续处理从服务器传入的消息:
session!!.incoming
.receiveAsFlow()
.filterIsInstance<Frame.Text>()
.filterNotNull()
.collect { data ->
val message = data.readText()
listener.onReceive(message)
Log.d(
tag = TAG,
message = "Received message: $message"
)
}
我们使用session
的incoming
通道接收传入帧流。然后过滤Frame.Text
,将它们转换为文本消息,并调用监听器上的onReceive
回调来处理收到的消息。最后,我们记录收到的消息以进行调试。
错误处理和重新连接
connect
函数还包括一个异常处理程序来管理连接错误:
catch (e: Exception) {
listener.onDisconnected(e.message ?: "Unknown error")
Log.d(
tag = TAG,
message = "Error: ${e.message}",
)
reconnect()
}
该处理程序记录错误,通知侦听器断开连接的原因,并调用reconnect
函数尝试重新建立连接。
reconnect
函数取消任何正在进行的重新连接作业,并安排新的协程在指定的延迟后重新连接:
private fun reconnect() {
job?.cancel()
Log.d(
tag = TAG,
message = "Reconnecting to websocket in ${RECONNECT_DELAY}ms..."
)
job = scope.launch {
stop()
delay(RECONNECT_DELAY)
connect()
}
}
这种方法可确保每次只发生一次重新连接尝试,并避免服务器因连接请求而过载。
发送消息
send
函数允许发送消息到服务器:
suspend fun send(message: String) {
Log.d(
tag = TAG,
message = "Sending message: $message"
)
session?.send(Frame.Text(message))
}
此函数记录正在发送的消息,然后用于将包含该消息的帧session?.send
发送Frame.Text
到服务器。请注意,空检查可确保我们仅在建立连接后才发送消息。
该函数记录发送的信息,然后使用 session.send 向服务器发送包含信息的 Frame.Text 框架。请注意,空检查确保我们只在连接已建立的情况下发送消息。
按照这些步骤,您就可以创建一个健壮的 Ktor WebSocket 客户端,用于在 Kotlin 应用程序中进行实时通信。切记要替换占位符值(如 url),并根据具体用例实现 WebsocketEvents 监听器。Ktor 为在应用程序中利用 WebSockets 提供了便捷高效的方式,简化了实时数据交换。
完整代码:
class KtorWebsocketClient(
private val url: String,
private val listener: WebsocketEvents,
) {
private val client = HttpClient(OkHttp) {
engine {
config {
pingInterval(PING_INTERVAL, TimeUnit.MILLISECONDS)
}
}
install(Logging)
install(WebSockets)
}
private val scope = CoroutineScope(Dispatchers.IO) + SupervisorJob() + CoroutineExceptionHandler { _, throwable ->
Log.d(
tag = TAG,
message = "Error: ${throwable.message}",
)
}
private var job: Job? = null
private var session: WebSocketSession? = null
suspend fun connect() {
try {
Log.d(
tag = TAG,
message = "Connecting to websocket at $url..."
)
session = client.webSocketSession(url)
listener.onConnected()
Log.info(
tag = TAG,
message = "Connected to websocket at $url"
)
session!!.incoming
.receiveAsFlow()
.filterIsInstance<Frame.Text>()
.filterNotNull()
.collect { data ->
val message = data.readText()
listener.onReceive(message)
Log.info(
tag = TAG,
message = "Received message: $message"
)
}
} catch (e: Exception) {
listener.onDisconnected(e.message ?: "Unknown error")
Log.d(
tag = TAG,
message = "Error: ${e.message}",
)
reconnect()
}
}
private fun reconnect() {
job?.cancel()
Log.d(
tag = TAG,
message = "Reconnecting to websocket in ${RECONNECT_DELAY}ms..."
)
job = scope.launch {
stop()
delay(RECONNECT_DELAY)
connect()
}
}
suspend fun stop() {
Log.d(
tag = TAG,
message = "Closing websocket session..."
)
session?.close()
session = null
}
suspend fun send(message: String) {
Log.d(
tag = TAG,
message = "Sending message: $message"
)
session?.send(Frame.Text(message))
}
interface WebsocketEvents{
fun onReceive(data: String)
fun onConnected()
fun onDisconnected(reason: String)
}
companion object {
private const val RECONNECT_DELAY = 10_000L
private val PING_INTERVAL = 5_000L
private const val TAG = "EVChargingWebSocketClient"
}
}
版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。