使用 Ktor 为 Android 或 JVM 应用构建 WebSocket 客户端

本文将使用 Ktor 为 Android 或 JVM 应用程序构建 WebSocket 客户端。WebSockets 可实现客户端与服务器之间的实时双向通信,因此非常适合聊天、实时更新和数据流等应用。Ktor 提供了在 Kotlin 代码中利用 WebSockets 的便捷方法。

设置依赖关系

首先,将所需的 Ktor 库添加到项目的 build.gradle 文件中:

val ktor_version = "3.0.0-beta-1"

implementation("io.ktor:ktor-client-core:$ktor_version")
implementation("io.ktor:ktor-client-okhttp:$ktor_version")
implementation("io.ktor:ktor-client-websockets:$ktor_version")
implementation("io.ktor:ktor-client-logging:$ktor_version")

这些依赖项提供了 Ktor 客户端的核心功能、用于高效网络通信的 OkHttp 引擎、WebSocket 支持和日志功能。

KtorWebsocketClient 类

现在,深入了解负责管理 WebSocket 连接的 KtorWebsocketClient 类:

class  KtorWebsocketClient ( 
    private  val url: String, 
    private  val listener: WebsocketEvents, 
) { 

    //... 该类的其余部分
}

此类有三个参数:

  • url:要连接的 WebSocket 服务器的 URL。
  • listener:定义连接事件和接收消息的回调的接口。

WebsocketEvents 接口

WebsocketEvents接口定义了连接和消息事件的回调:

interface WebsocketEvents {
    fun onReceive(data: String)
    fun onConnected()
    fun onDisconnected(reason: String)
}

该接口允许您实现监听器逻辑,以处理建立连接(onConnected)、接收消息(onReceive)和断开连接(onDisconnected)事件。

创建 WebSocket 客户端

KtorWebsocketClient 类利用了 Ktor 的 HttpClient 和 OkHttp 引擎:

private val client = HttpClient(OkHttp) {
    engine {
        config {
          pingInterval(PING_INTERVAL, TimeUnit.MILLISECONDS)
        }
    }

    install(Logging)
    install(WebSockets)
}

我们将 OkHttp 引擎的 pingInterval 设置为 5 秒,以保持与服务器的连接。此外,我们还安装了日志和 WebSockets 功能,用于记录通信细节并启用 WebSocket 支持。

协程设置

该类利用协程进行异步通信。使用 Dispatchers.IO 创建一个 coroutine 作用域,用于执行网络操作:

private val scope = CoroutineScope(Dispatchers.IO) + SupervisorJob() + CoroutineExceptionHandler { _, throwable ->
    Log.d(
        tag = TAG,
        message = "Error: ${throwable.message}",
    )
}

SupervisorJob 可确保一个协程中的故障不会影响其他例行程序。CoroutineExceptionHandler 会记录 WebSocket 通信过程中遇到的任何错误。

连接管理

connect 函数建立 WebSocket 连接:

suspend fun connect() {
    try {
        Log.d(
            tag = TAG,
            message = "Connecting to websocket at $url..."
        )

        session = client.webSocketSession(url)

        listener.onConnected()

        Log.d(
            tag = TAG,
            message = "Connected to websocket at $url"
        )

        // ... 接收消息
    } catch (e: Exception) {
        // ... 处理错误并重新连接
    }
}

该函数首先记录连接尝试,然后使用 client.webSocketSession 启动连接。连接成功后,将在监听器上触发 onConnected 回调,并记录连接详情。接下来我们将处理接收消息和出错情况。

接收消息

connect 函数继续处理从服务器传入的消息:

session!!.incoming
    .receiveAsFlow()
    .filterIsInstance<Frame.Text>()
    .filterNotNull()
    .collect { data ->
        val message = data.readText()
        listener.onReceive(message)

        Log.d(
            tag = TAG,
            message = "Received message: $message"
        )
    }

我们使用sessionincoming通道接收传入帧流。然后过滤Frame.Text,将它们转换为文本消息,并调用监听器上的onReceive回调来处理收到的消息。最后,我们记录收到的消息以进行调试。

错误处理和重新连接

connect函数还包括一个异常处理程序来管理连接错误:

catch (e: Exception) {
    listener.onDisconnected(e.message ?: "Unknown error")

    Log.d(
        tag = TAG,
        message = "Error: ${e.message}",
    )

    reconnect()
}

该处理程序记录错误,通知侦听器断开连接的原因,并调用reconnect函数尝试重新建立连接。

reconnect函数取消任何正在进行的重新连接作业,并安排新的协程在指定的延迟后重新连接:

private fun reconnect() {
    job?.cancel()

    Log.d(
        tag = TAG,
        message = "Reconnecting to websocket in ${RECONNECT_DELAY}ms..."
    )

    job = scope.launch {
        stop()
        delay(RECONNECT_DELAY)
        connect()
    }
}

这种方法可确保每次只发生一次重新连接尝试,并避免服务器因连接请求而过载。

发送消息

send函数允许发送消息到服务器:

suspend fun send(message: String) {
    Log.d(
        tag = TAG,
        message = "Sending message: $message"
    )

    session?.send(Frame.Text(message))
}

此函数记录正在发送的消息,然后用于将包含该消息的帧session?.send发送Frame.Text到服务器。请注意,空检查可确保我们仅在建立连接后才发送消息。

该函数记录发送的信息,然后使用 session.send 向服务器发送包含信息的 Frame.Text 框架。请注意,空检查确保我们只在连接已建立的情况下发送消息。

按照这些步骤,您就可以创建一个健壮的 Ktor WebSocket 客户端,用于在 Kotlin 应用程序中进行实时通信。切记要替换占位符值(如 url),并根据具体用例实现 WebsocketEvents 监听器。Ktor 为在应用程序中利用 WebSockets 提供了便捷高效的方式,简化了实时数据交换。

完整代码:

class KtorWebsocketClient(
    private val url: String,
    private val listener: WebsocketEvents,
) {
    private val client = HttpClient(OkHttp) {
        engine {
            config {
              pingInterval(PING_INTERVAL, TimeUnit.MILLISECONDS)
            }
        }

        install(Logging)
        install(WebSockets)
    }

    private val scope = CoroutineScope(Dispatchers.IO) + SupervisorJob() + CoroutineExceptionHandler { _, throwable ->
        Log.d(
            tag = TAG,
            message = "Error: ${throwable.message}",
        )
    }

    private var job: Job? = null

    private var session: WebSocketSession? = null

    suspend fun connect() {
        try {
            Log.d(
                tag = TAG,
                message = "Connecting to websocket at $url..."
            )

            session = client.webSocketSession(url)

            listener.onConnected()

            Log.info(
                tag = TAG,
                message = "Connected to websocket at $url"
            )

            session!!.incoming
                .receiveAsFlow()
                .filterIsInstance<Frame.Text>()
                .filterNotNull()
                .collect { data ->
                    val message = data.readText()
                    listener.onReceive(message)

                    Log.info(
                        tag = TAG,
                        message = "Received message: $message"
                    )
                }
        } catch (e: Exception) {
            listener.onDisconnected(e.message ?: "Unknown error")

            Log.d(
                tag = TAG,
                message = "Error: ${e.message}",
            )

            reconnect()
        }
    }

    private fun reconnect() {
        job?.cancel()

        Log.d(
            tag = TAG,
            message = "Reconnecting to websocket in ${RECONNECT_DELAY}ms..."
        )

        job = scope.launch {
            stop()
            delay(RECONNECT_DELAY)
            connect()
        }
    }

    suspend fun stop() {
        Log.d(
            tag = TAG,
            message = "Closing websocket session..."
        )

        session?.close()
        session = null
    }

    suspend fun send(message: String) {
        Log.d(
            tag = TAG,
            message = "Sending message: $message"
        )

        session?.send(Frame.Text(message))
    }

    interface WebsocketEvents{
        fun onReceive(data: String)
        fun onConnected()
        fun onDisconnected(reason: String)
    }

    companion object {
        private const val RECONNECT_DELAY = 10_000L
        private val PING_INTERVAL = 5_000L

        private const val TAG = "EVChargingWebSocketClient"
    }
}

版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。

(0)

相关推荐

发表回复

登录后才能评论