WebSocket 是一种通信协议,可在客户端(通常是网络浏览器)和服务器之间通过单个长效连接进行全双工通信,这意味着在客户端连接服务器的整个过程中,连接一直处于打开状态。与遵循请求-响应模式的传统 HTTP 请求不同,WebSockets 允许实时双向通信,从而实现高效的交互式网络应用。
就 WebSockets 而言,垂直扩展有助于在单个服务器上处理更多的 WebSocket 连接,并随着内存和处理能力的增加而提高整体性能。但是,纵向扩展也有实际限制,在某些情况下,成本和性能收益可能无法证明进一步纵向扩展是合理的。通过横向扩展,我们必须将 WebSocket 连接分配到多个服务器,以提高性能、处理更多并发连接并确保高可用性。负载平衡器通常用于将传入的 WebSocket 连接平均分配到可用的服务器实例上。然而,由于连接的状态特性以及在多个服务器之间同步和管理状态的需要,WebSocket 应用程序的横向扩展可能具有挑战性。这时,我们可以利用 PUB/SUB 系统的强大功能来管理多个服务器实例之间的状态。
让我们来看看像 Google docs 这样的应用程序,我们有两个用户连接到服务器 server1。在这里,只要有新的更改日志可用,我们就可以通过为文档创建一个具有唯一标识符的通道,将数据发布给所有连接的用户,所有用户都将连接到该通道并接收更新。现在,由于该通道存在于单个实例中,我们可以很容易地将更改日志发布到连接到该服务器的 websockets 上,但如果其他用户编辑同一文档并连接到服务器 2,情况会怎样呢?他们根本无法收到用户 1 和用户 2 的更新,因为该通道只存在于服务器 1 中,与其他服务器没有任何联系。
在这里,我们可以应用的基本概念是为文档创建一个唯一的通道,该通道将在 PUB/SUB 系统(在我们的例子中是 Redis)中创建。连接到任何服务器的用户对文档所做的所有更改都将只发布到 Redis 频道。 所有正在处理该文档的用户都将是 Redis 频道的订阅者。因此,每当主题/通道中出现某些信息时,所有用户都会通过数据库收到有关更改日志的通知。唯一不同的是,PUB/SUB 将作为单点工作,有助于同步和建立不同服务器实例之间的链接。
下面我将使用FastAPI作为演示代码。
1、下面的 RedisPubSubManger
类将有助于创建与 Redis 的连接、订阅和取消订阅频道以及向频道发布消息。
import asyncio
import redis.asyncio as aioredis
import json
from fastapi import WebSocket
class RedisPubSubManager:
"""
Initializes the RedisPubSubManager.
Args:
host (str): Redis server host.
port (int): Redis server port.
"""
def __init__(self, host='localhost', port=6379):
self.redis_host = host
self.redis_port = port
self.pubsub = None
async def _get_redis_connection(self) -> aioredis.Redis:
"""
Establishes a connection to Redis.
Returns:
aioredis.Redis: Redis connection object.
"""
return aioredis.Redis(host=self.redis_host,
port=self.redis_port,
auto_close_connection_pool=False)
async def connect(self) -> None:
"""
Connects to the Redis server and initializes the pubsub client.
"""
self.redis_connection = await self._get_redis_connection()
self.pubsub = self.redis_connection.pubsub()
async def _publish(self, room_id: str, message: str) -> None:
"""
Publishes a message to a specific Redis channel.
Args:
room_id (str): Channel or room ID.
message (str): Message to be published.
"""
await self.redis_connection.publish(room_id, message)
async def subscribe(self, room_id: str) -> aioredis.Redis:
"""
Subscribes to a Redis channel.
Args:
room_id (str): Channel or room ID to subscribe to.
Returns:
aioredis.ChannelSubscribe: PubSub object for the subscribed channel.
"""
await self.pubsub.subscribe(room_id)
return self.pubsub
async def unsubscribe(self, room_id: str) -> None:
"""
Unsubscribes from a Redis channel.
Args:
room_id (str): Channel or room ID to unsubscribe from.
"""
await self.pubsub.unsubscribe(room_id)
2、WebSocketManger
类,它将处理创建房间/频道、订阅/订阅房间等的逻辑。
class WebSocketManager:
def __init__(self):
"""
Initializes the WebSocketManager.
Attributes:
rooms (dict): A dictionary to store WebSocket connections in different rooms.
pubsub_client (RedisPubSubManager): An instance of the RedisPubSubManager class for pub-sub functionality.
"""
self.rooms: dict = {}
self.pubsub_client = RedisPubSubManager()
async def add_user_to_room(self, room_id: str, websocket: WebSocket) -> None:
"""
Adds a user's WebSocket connection to a room.
Args:
room_id (str): Room ID or channel name.
websocket (WebSocket): WebSocket connection object.
"""
await websocket.accept()
if room_id in self.rooms:
self.rooms[room_id].append(websocket)
else:
self.rooms[room_id] = [websocket]
await self.pubsub_client.connect()
pubsub_subscriber = await self.pubsub_client.subscribe(room_id)
asyncio.create_task(self._pubsub_data_reader(pubsub_subscriber))
async def broadcast_to_room(self, room_id: str, message: str) -> None:
"""
Broadcasts a message to all connected WebSockets in a room.
Args:
room_id (str): Room ID or channel name.
message (str): Message to be broadcasted.
"""
await self.pubsub_client._publish(room_id, message)
async def remove_user_from_room(self, room_id: str, websocket: WebSocket) -> None:
"""
Removes a user's WebSocket connection from a room.
Args:
room_id (str): Room ID or channel name.
websocket (WebSocket): WebSocket connection object.
"""
self.rooms[room_id].remove(websocket)
if len(self.rooms[room_id]) == 0:
del self.rooms[room_id]
await self.pubsub_client.unsubscribe(room_id)
async def _pubsub_data_reader(self, pubsub_subscriber):
"""
Reads and broadcasts messages received from Redis PubSub.
Args:
pubsub_subscriber (aioredis.ChannelSubscribe): PubSub object for the subscribed channel.
"""
while True:
message = await pubsub_subscriber.get_message(ignore_subscribe_messages=True)
if message is not None:
room_id = message['channel'].decode('utf-8')
all_sockets = self.rooms[room_id]
for socket in all_sockets:
data = message['data'].decode('utf-8')
await socket.send_text(data)
3、连接到 WebSocket 服务器并建立连接的 API 路由。
import logging
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from websocket.socketManager import WebSocketManager
import json
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--port", default=8000, type=int)
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("FastAPI app")
app = FastAPI()
# Adding the CORS middleware to the app
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
socket_manager = WebSocketManager()
@app.websocket("/api/v1/ws/{room_id}/{user_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str, user_id: int):
await socket_manager.add_user_to_room(room_id, websocket)
message = {
"user_id": user_id,
"room_id": room_id,
"message": f"User {user_id} connected to room - {room_id}"
}
await socket_manager.broadcast_to_room(room_id, json.dumps(message))
try:
while True:
data = await websocket.receive_text()
message = {
"user_id": user_id,
"room_id": room_id,
"message": data
}
await socket_manager.broadcast_to_room(room_id, json.dumps(message))
except WebSocketDisconnect:
await socket_manager.remove_user_from_room(room_id, websocket)
message = {
"user_id": user_id,
"room_id": room_id,
"message": f"User {user_id} disconnected from room - {room_id}"
}
await socket_manager.broadcast_to_room(room_id, json.dumps(message))
if __name__ == "__main__":
uvicorn.run("main:app", host="127.0.0.1", port=args.port, reload=True)
当我们第一次收到带有房间 ID 的请求时,在接受 WebSocket 连接后,我们会检查是否已有可用的房间 ID。如果房间 ID 已经可用,我们就会将新的 WebSocket 连接添加到已经连接到房间 ID 的 WebSocket 连接列表中;如果房间 ID 不可用,我们就会先建立与 redis 的连接,然后以房间 ID 为名订阅 PUB/SUB 主题。订阅后,我们将附加一个阅读器 _pubsub_data_reader,它将不断从主题中提取数据,并在其中提取相关的 WebSocket 连接列表,然后将消息发送给客户端。这就是代码的整体工作概念。
remove_user_from_room 用于当 websocket 连接断开时,从与 room_id 关联的连接列表中删除该连接。如果某个房间 ID 没有可用的连接,我们就会取消订阅相应的主题。
结论
虽然这段代码小巧而简单,但使用 pub/sub 实现全面的 websocket 却相对复杂,需要考虑各种情况。我们需要考虑不同的故障安全机制,以避免任何中断,并在出现故障时进行连接重试等。
完整代码地址:https://github.com/NandaGopal56/websockets-pubsub
版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。