作为一名对 Kafka 生态系统感兴趣的开发人员,尝试将 Kafka 与其他技术相结合总是非常有趣的。本文将演示如何结合 WebSocket 和 Kafka 来构建一个简单的聊天服务器。
在聊天服务器中采用 Kafka 的意义
- 当流量扩展时,消息的消费速度可能与生产速度不同
- 在消费者端切换消息分发策略,无需重新部署 WebSocket 服务器和关闭所有现有 WebSocket。
- 消息可以广播到多个目的地,可以是另一个活跃用户的 WebSocket、存储用户历史记录的数据库或数据仓库。
结合 Kafka 和 WebSocket 可能会很棘手
WebSocket 可以被抽象为持久保存在内存中的状态,这与 Kafka 并不兼容,以下是一些需要修复的难题:
- WebSocket 需要持久化在内存中,序列化之后传递是没有意义的,也不可能。
- 在消费者端,应将主题的分区与相应的 WebSocket 一起分配,例如,如果 ServerA 持有 UserA 的 WebSocket,则作为消费者的 ServerA 应声明所有发送给 UserA 的消息所在的分区。因此,应创建自定义的 PartitionAssignor,或者消费者需要使用
assign
而不是subscribe
,但assign
肯定需要更多的手动配置和调整。 - 基于第一点和第二点,操作工作量将会很大,例如随着流量的增加,分区数量也会扩大。
分离消息生产和消费可能会解决问题
受OpenIM的启发,一个潜在的解决方案是将 WebSocket 和 Kafka 结合起来,也许可以在中间添加一个消息重定向机制。OpenIM 提供了详细的图表,包括消息代理、消息推送器、消息传输等。为了提取核心组件,我在下图中抽象了核心组件和逻辑,模拟用户 A 向用户 B 发送消息。
有了这个简化的视图,这里的关键挑战是如何Locate Server
。实际上有几种解决方案,例如在 Zookeeper 或 Consul 中注册userId
和serverId
,或者采用一些一致性哈希算法来分发 WebSocket 和消息userId
。
示例演示
源代码可以在这里找到,此示例由以下部分组成:
- 基于 Ktor 的WebSocket 服务器(ChatServer ) 。
- 基于 Ktor 的 HTTP 服务器(SessionRegistry)充当 Consul/Zookeeper 的角色,保存所有用户会话的 ChatServer URL,例如,在 K8s 中哪个 pod 现在正在托管 websocket 会话。
- 普通 Kafka 消费者(MessageDispatcher)消费消息并根据注册的 websocket 会话信息,将消息发送到目标 ChatServer。
当UserA想要发送消息给UserB时,核心逻辑如下:
1. UserA 启动与 ChatServer 的 websocket 连接,ChatServer 将其 URL 注册到 UserA 的 SessionRegistry。(所有其他用户的启动过程相同,并且 1 个用户可能有多个活动的 websocket 会话)。并且当 websocket 关闭时,ChatServer 将在 SessionRegistry 中为该用户取消注册。
2. 在活动的 websocket 会话中,UserA 到达 ChatServer 的每条消息(或多条消息的组合)都将直接生成到 Kafka 消息主题。每条消息都包含 senderId、receiverId 和原始消息的信息。
3. MessageDispatcher 会持续从 Kafka 消息主题中消费消息。对于每条消息(或发送给同一接收者的多条消息),MessageDispatcher 会先从 SessionRegistry 中查找接收者的所有活动会话,然后将消息 POST 到 ChatServer。
4. 在步骤3中,如果会话注册信息由于某种原因过期,MessageDispatcher 向 ChatServer 发送的 POST 消息应该失败,并且 MessageDispatcher 负责从 SessionRegistry 中取消注册特定的会话。
5. 当消息发布到持有接收方 websocket 会话的 ChatServer 时,ChatServer 将通过会话将消息发送给接收方。如果目标 websocket 会话不存在,ChatServer 应在消息发布时发送 HTTP 错误(例如 404),因此 MessageDispatcher 可以处理步骤 4 中描述的这种情况。
6. 如果接收方没有任何活动的 websocket 会话,MessageDispatcher 将在调度时忽略它。这并不意味着消息被忽略。开发人员可以将消息主题中的消息沉入任何类型的持久存储中,并根据需要获取离线消息。
这个设计是非常高层次的,很多生产系统的细节尚未涉及,例如:
- 如何恢复或跟上离线信息或发送失败的信息。
- 向用户组发送信息。
- …
不过,鉴于本文更像是一次头脑风暴,您可以自己探索扩展系统的选项😃。
作者:Li Pei
原文:https://lip17.medium.com/building-chat-server-with-kafka-2c988f74beb8
版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。