本文主要总结下自己在直播领域中实践所遇到过的一些高并发技术问题,以及相关的解决思路。
其实在直播这类业务中,对于系统的实时性要求是非常高的,例如直播间里面的大哥送了某个礼物之后,要求在200ms内,必须将直播送礼的行为通知到直播间内的各个用户终端上。这一点的实现,通常都是需要使用实时推送系统去完成,这个实时推送系统,也常被人称之为IM系统。
IM系统的底层是什么?
我认为,IM系统就是一个持有非常多网络长连接的应用。这些网络长连接可以基于WebSocket去做,也可以基于Tcp/Ip协议去做,通常我们会使用Tcp/ip协议的长连接去做,因为它的网络传输可靠性要比WebSocket更强。
Im数据包中注入业务code区分
每个客户端都需要和IM服务建立长连接,那么仅仅是建立了连接就可以了嘛?我认为这样的设计不够严谨,如果单纯是确认了连接建立,此时客户端需要发送一个明确的数据包给到服务端,告知当前连接的更多详细信息(例如当前的登录userId,登录token)。例如我们可以定义一种专门的数据包,让其在登录的场景中使用。
同理,很多不同场景的数据包,它们所携带的包体内部字段都是不同的,所以这块可以使用一个code一样的标识来进行区分。然后不同的code类型的包交给不同的Handler去处理。
可以抽象一个消息体的类,如下:
消息内的code不同,代表接下来的请求逻辑交给不同的处理器去解决,这块可以采用策略模式去设计:
IM连接如何存储
所谓的IM连接,其实可以理解为就是Netty中的ChannelHandlerContext(如果你的IM服务是用Netty做的话)。因为这个对象中的writeAndFlush函数可以将数据写回给到客户端那段,从而实现这个服务端通知客户端的效果。
那么一旦建立连接之后,我们就需要将这个ChannelHandlerContext对象和用户id进行关联,然后保存起来。(如何关联可以采用它的attr方法)
这个保存,其实挺让人头疼的,因为这个对象比较特殊,不能通过redis进行存储。因为这个对象持有的一些引用是不能通过持久化的方式去存放的,所以只能存于本地的Map集合中。那么接下来又有个问题了:
高并发下,应该用HashMap存储还是ConcurrentHashMap存放ChannelHandlerContext?
通常我们考虑使用chm的原因,是因为多线程下同时可能会有对它进行读写同一个key的情况才使用。但是目前我们的IM服务并没有这种情况,同一个userId对于ChannelHandlerContext的读写操作不存在并发问题,所以使用HashMap性能更好(毕竟没有加锁)。
本地存储,也就意味着每个服务能持有的连接数会有固定上限,而且一旦服务崩溃,连接就会丢失了
是的,如果使用本地的map集合进行连接对象存储的话,确实每台机器持有的连接数是有限的。不过大家设想一下,假设我们的JVM进程堆空间设置了8gb,如果要把HashMap撑爆,得往里面塞入多少的数据呀。而且IM服务肯定不可能是单点架构,假设有100w人在线,我们部署500个节点,每个节点管理2000个链接,其实也算是挺正常的概念了。
如果我们的服务出现了崩溃,所有的链接断开了,那么此时需要在客户端实现一套重连机制,让客户端重新对其他机器的进行连接才行。
ChannelHandlerContext的有效性如何保证
毕竟本地内存存储对象,是一种比较奢侈的行为,但是目前这个场景,没有比它更合适的办法了,所以对于存储的对象一定要进行严格筛选。所以在设计IM服务的时候,需要考虑到:
- 无效的连接需要进行清除,释放空间
- 客户端主动断开链接后,要及时清除本地记录
第二点还比较好实现,和客户端进行约定,如果要进行链接断开,则需要发送一个断开的信号包,这样后台收到信号包之后,就进行连接对象移出。(这个设计思路参考了tcp的四次挥手中的FIN信号包机制)
不过上述的这种实现比较依赖于客户端,后边我通过实践发现,可以在Netty的org.qiyu.im.core.server.hanler.tcp.ImCoreHandler#channelUnregistered接口中进行监听,如果发现链接断开,则直接移除ChannelHandlerContext。在这里做监听判断的设计,其实可以减少上述的连接断开信号包的发送步骤。
心跳包检测机制如何设计
每次客户端发送心跳包之后,后台需要将心跳包的时间戳记录到一个zSet集合中,命令如下:
zadd im:heart:172.31.22.01:online 1684411969 1009279
其存储结构如下图所示:
而后台也需要有一个定时任务,每隔一段时间扫描zSet集合中指定时间段之外(例如最近15秒之外)的元素,并且进行删除操作,命令如下:
ZREMRANGEBYRANK im:heart:172.31.22.01:online 16844107769 16844110769
之所以是15秒,是因为我们定义心跳包是5秒一次发送,如果3个间隔时间段内都没有收到信号包,就视作是用户断线了。
采用这种思路进行设计的话,又会存在一个问题,假设有100w用户在线,那么单纯使用一个ZSet存储是否存在容量有限的问题呢?答案是肯定的。
所以建议我们在Redis的ZSet这块可以做一个分片,例如在每个容器服务内部按照 “容器ip” 作为片键的方式分成N组的zset集合。假设我们的netty服务部署了500个节点,那么这样下来,每个zset集合只需要存储2000个元素。
IM连接的对象如何保存
由于我们的IM服务建立连接之后,需要存放在内存的HashMap集合中。一般来说,map集合的上限控制在5k以内会比较合适。另外在连接建立和断开的时候,都需要往map集合进行添加和删除操作,同时可以设计一个相关的MQ消息,用于通知我们的业务下游服务。
如何确保IM服务消息通知的准确性
IM消息发送用于通知客户端们,但是消息到底是否触发了客户端,这一点我们是无法清楚得知的。万一消息发送出去之后,没有抵达客户端,就会导致数据丢失的情况。那么面对这类情况,我们应该如何进行优化呢?
所以我们还需要设计一个ACK机制,当IM通知了客户端之后,如果客户端在10秒内没有返回一个ACK确认信号包,就表示消息发送失败,需要重试。那么这种场景要如何设计呢?
这里我们可以尝试使用RocketMQ的延迟消息去进行实现,例如当我们发送了一条IM消息出去之后,同时发送一条RocketMQ的延迟消息,设置为10秒的延迟级别。然后同时将发送给客户端的消息id记录到redis的一个map集合中,key是消息id,value是一个ack的标记位。如果客户端在10秒内返回了ack标记,就把map中对应的记录给移除。如果客户端超时没有返回ack确认包,那么在延迟消息消费的时候,就可以从Redis的这个map中检索到记录,从而进行消息补偿发送。
一般来说,这个Redis的map只会存放消息发送后10秒内没有返回ack的记录,这样就可以极大的减少了它的存储空间。
IM服务器如何进行平滑发布
说实话,在设计这块功能的时候,我特定去看了关于Dubbo的平滑发布机。下边说说我的一个大概思考:
IM服务器在准备关闭的时候,需要发送一个信号包给到客户端,通知客户端当前连接准备关闭了,需要客户端重连服务器。此时需要重新请求服务端接口,获取新的连接url。
然后这个服务端获取链接url的接口内部 一定是要包含负载均衡算法的,每次请求返回的url不能是同一个,不然可能会有链接倾斜问题发生。建议可以考虑结合IM服务器的连接数进行判断,将连接数少的服务配备较大的权重。
作者: Danny idea | 来源:公众号——Idea的技术分享
版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。