确保可扩展性已成为当今技术领域的一项标准要求。在微服务世界中开发缺乏可扩展性的应用程序/工具可能会面临各种挑战。在本文中,YıldızTech 团队将分享他们在使用 WebSocket 时发现的一种解决方案,并介绍如何使用 Spring 和 RabbitMQ 集成开发可扩展的 WebSocket 应用程序。在整篇文章中,我们将假设读者熟悉 Kubernetes 环境和概念。
WebSocket 是一种广泛用于满足实时通信需求的工具,如消息传递、位置共享、直播等。在涉及这些功能的应用程序中,确保将正确的信息无任何损失或延迟地传递给正确的人至关重要。为此,在分布式架构中运行的服务必须相互通信。为了有效地将问题形象化,我们不妨举例说明以下场景。
如果我们逐一考虑这些情况,第一种情况就是导致撰写本文的一个问题。当用户建立 WebSocket 连接时,他们基本上是在建立双向端到端连接。随着用户数量和连接数的增加,Kubernetes 环境(如果配置了水平 Pod 扩展 [HPA] 功能)会尝试扩展超负荷的服务。不过,在此过程中,如果用户从其连接的 WebSocket 服务以外的服务接收消息(由于扩展到另一个实例),他们将无法看到这些消息。同样,在有多个服务且需要向特定用户发送消息的情况下,如果在用户连接到第一个服务时向第二个服务发送了消息,用户将无法收到预期的消息。虽然可以扩展这些情况,但归根结底,我们的主要要求是确保信息的正确发送。
根据我们以前的研究结果,RabbitMQ 显然可以成功处理消息分发 [2]。正如下面属于我们演示项目 [3] 的代码片段所示,在 Spring 消息库的帮助下,RabbitMQ 可以被定义为消息代理。这样,即使建立了 WebSocket 连接,也可以通过 RabbitMQ 向客户端发送消息。通过这种配置,消息将通过 RabbitMQ 路由,从而实现 WebSocket 和客户端之间的无缝通信。
private static final String[]APP_PREFIXES=new String[]{"/app","/exchange"};
public static final String[]BROKER_PREFIXES=new String[]{"/queue","/topic","/exchange"};
@Override
public void configureMessageBroker(MessageBrokerRegistry registry){
registry.setPreservePublishOrder(true)
.setApplicationDestinationPrefixes(APP_PREFIXES)
.enableStompBrokerRelay(BROKER_PREFIXES)
.setRelayHost(environmentConfig.getRabbitUrl())
.setRelayPort(environmentConfig.getRabbitPort())
.setClientLogin(environmentConfig.getRabbitUser())
.setClientPasscode(environmentConfig.getRabbitPassword())
.setSystemLogin(environmentConfig.getRabbitUser())
.setSystemPasscode(environmentConfig.getRabbitPassword())
.setUserDestinationBroadcast("/topic/unresolved-user")
.setUserRegistryBroadcast("/topic/user-registry");
...
}
使用 RabbitMQ 作为消息代理有两方面的优势。首先,RabbitMQ 本身具有可扩展性,可以高效地处理大量消息流量。其次,使用 UserRegistryBroadcast
参数,无论用户连接到哪个 WebSocket pod,都可以通过 /topic/user-registry
队列发送消息。这样就能向所有连接的用户无缝发送信息,确保信息能送达预定的收件人,而不管他们具体连接的是哪个 WebSocket pod。
要根据示例项目的日志观察上述情况,可以使用环境变量 SERVER_PORT=8080
和 SERVER_PORT=8081
运行两个不同的应用程序进行模拟。举例来说,由于我们添加了安全层,因此需要通过以下请求获取token,并将相应的token分配给 src/main/resources/client
下客户端代码中的token参数。用户信息可通过 UserDetailsConfig 类访问。请注意,应将 USERNAME 和 PASSWORD 替换为 UserDetailsConfig 中的实际用户名和密码值。
curl --location 'http://localhost:8080/api/v1/token/provide' \
--header 'Content-Type: application/json' \
--data-raw '{
"username": "{USERNAME}",
"password": "{PASSWORD}"
}'
要建立 WebSocket 连接,可在浏览器中打开位于 src/main/resources/client
下的 two_user_twoo_pod.html
页面。连接建立后,您将在应用程序控制台中看到以下日志。值得注意的是,每个用户都分配了一个 UUID。Spring 会使用这些 UUID 向相应用户发送消息。
// Pod 1 (Port: 8080)
2023-06-21T17:45:27.922+03:00 TRACE 2316 --- [nboundChannel-2] o.s.m.s.u.UserDestinationMessageHandler : Translated /user/queue/live-feed-gps-stream -> [/queue/live-feed-gps-stream-user47c7efd6-7eaf-d40a-8d75-70464430f128]
// Pod 2 (Port: 8081)
2023-06-21T17:45:27.922+03:00 TRACE 2316 --- [nboundChannel-2] o.s.m.s.u.UserDestinationMessageHandler : Translated /user/queue/live-feed-gps-stream -> [/queue/live-feed-gps-stream-userba0eba33-91ce-12d7-72ec-4ed9305f19ae]
连接建立后,当发送以下请求时,可以看到信息已成功发送给第一个 pod 中的相应用户。
curl --location 'http://localhost:8080/web-socket/api/v1/queue/gps' \
--header 'Authorization: Bearer {TOKEN} \
--header 'Content-Type: application/json' \
--data '{
"plateNumber": "34AB1234",
"latitude": 40.12314,
"longitude":38.12314
}'
Pod 1 (Port: 8080)
2023-06-21T17:46:28.928+03:00 TRACE 2304 --- [brokerChannel-1] o.s.m.s.u.UserDestinationMessageHandler : Translated /user/izzet.kilic@yilditech.co/queue/live-feed-gps-stream -> [/queue/live-feed-gps-stream-user47c7efd6-7eaf-d40a-8d75-70464430f128]
2023-06-21T17:46:28.928+03:00 TRACE 2304 --- [brokerChannel-1] o.s.m.s.u.UserDestinationMessageHandler : Translated /user/emre.kiziltepe@yilditech.co/queue/live-feed-gps-stream -> [/queue/live-feed-gps-stream-userba0eba33-91ce-12d7-72ec-4ed9305f19ae]
这里需要注意的是,一条传入消息可以通过一个 pod 发送给两个不同的用户。RabbitMQ 使这成为可能,它允许通过提到的 /topic/user-registry
队列从每个 pod 访问用户。
当然,我们并不期望一切都如此简单。我们可以将遇到的一些挑战及其解决方案归纳如下:
未关闭队列
在将我们的项目转移到上述测试环境后,我们注意到 RabbitMQ 的内存开始迅速增加,最终达到了无法响应的程度。在调查该问题时,我们发现尽管应用程序中的活跃用户数量较少,但却存在大量打开的队列。
经过简单研究,我们发现这个问题与 RabbitMQ 有关,为了启用队列的自动关闭,需要在客户端和服务器端添加 auto-delete: true 参数作为头[1]。有了这个参数,当用户断开连接时,队列将自动开始关闭。
//////// client side
var header = {
'auto-delete': true
};
client.subscribe("/user/queue/{QUEUE_NAME}", function (message) {
...
}, header);
//////// server side
var header=new HashMap<String, Object>();
header.put("auto-delete","true");
simpMessagingTemplate.convertAndSendToUser(...,...,...,header);
队列中消息的累积
当存在不同的 WebSocket 队列时,会出现另一个问题。当用户同时连接到不同队列时,RabbitMQ 会为每个用户创建单独的队列。在 WebSocket 服务中,当消息到达队列时,Spring 会根据用户的会话 ID (UUID) 将该消息路由到用户。让我们以 live-feed-gps-stream 队列和 courier-connection-status-stream 队列为例进行说明。当用户连接到这两个队列时,会生成不同的 UUID,如 817083cf-9ccc-0a06-924b-51dea2700eb0 和 11530489-5963-ad10-4b84-f111811d1b45。如果有消息发送到 live-feed-gps-stream 队列,则会为另一个队列创建一个单独的 RabbitMQ 队列,其 UUID 为 817083cf-9ccc-0a06-924b-51dea2700eb0,并在该队列中累积消息。
即使 RabbitMQ 队列被定义为自动删除(AD),当队列中有消息且连接丢失时,它们也不会自动关闭。为了解决这个问题,我们通过在 RabbitMQ 中定义策略(管理 > 策略)找到了解决方法。通过以下策略,我们确保在连接丢失时保持打开状态的队列也被关闭:
通过应用该策略,我们可以确保在连接终止时关闭任何打开的队列,即使队列中还有报文。得益于上述解决方案,我们的系统长期以来一直运行平稳,没有出现任何中断。当与微服务架构结合使用时,扩展可以使系统更加灵活、快速和可用。每个微服务都可以独立扩展,即使在高流量或高需求的情况下也能优化性能。这可以加快业务流程,让用户获得无缝体验。我们希望这篇文章能对面临 WebSocket 类似问题的团队有所帮助,并协助他们找到解决方案。
参考资料
- user782220. (2014, January 21). RabbitMQ difference between exclusive and auto-delete? Stack Overflow. https://stackoverflow.com/questions/21248563/rabbitmq-difference-between-exclusive-and-auto-delete
- borist2/spring-rabbimq-test-queue. (2019, September 6). GitHub. https://github.com/borist2/spring-rabbimq-test-queue
- Yildiz-Tech/spring-boot-scalable-websocket-demo. (2023). GitHub. https://github.com/Yildiz-Tech/spring-boot-scalable-websocket-demo
作者:İzzet Kılıç
来源:Yıldız Tech
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/34078.html