在管理许多有状态连接时,使用分布式系统会带来新的复杂性,就像在 Web 应用程序中实现实时消息传递功能时所必须的那样。本文旨在提供一个简洁的示例,说明如何使用 WebSockets 向客户端发送实时数据,并利用 Redis 作为 pub/sub 代理将传入数据导向正确的套接字。该示例是 MERN 网络应用(MongoDB、Express.js、React、Node.js)的一部分。
创建 WebSocketServer 以处理来自客户端的 WebSocket 连接
import ws from "ws";
interface WSS {
server: ws.WebSocketServer;
clients: Map<string, ws.WebSocket>;
}
const wss: WSS = {
server: new ws.WebSocketServer({ noServer: true, clientTracking: false }),
clients: new Map(),
}
第一步是在 HTTP 服务器旁创建一个 WebSocketServer
。我们还需要跟踪打开的客户端连接,因此需要初始化一个新对象,该对象包含服务器和一个包含客户端套接字的映射。我们以 noServer
模式初始化,并将 clientTracking
设为 false,以确保它与 HTTP 服务器完全分离。
接下来,我们需要允许 WebSocketServer 处理对 HTTP 服务器的 “升级 “请求。如果您不熟悉,WebSocket 连接需要经过三个步骤:
- 客户端通过向服务器发送 HTTP GET 请求来启动连接,请求中包含表示协议升级的特殊标头。
- 服务器会处理请求,可能会执行身份验证,然后返回同意升级的回复。
- 连接升级后,所有后续通信都将通过 WebSocket 协议进行。
在下面的示例中,我使用了一个身份验证函数,它返回一个 IncomingRequestWithUser
,这只是一个普通的请求对象,并附加了当前会话用户的 mongo 文档。
withSocket(server: http.Server): void {
server.on("upgrade", async (req, socket, head) => {
socket.on("error", console.error);
try {
const reqWithUser = await authenticateSocket(req); // Custom authentication function
wss.server.handleUpgrade(reqWithUser, socket, head, (ws) => {
wss.server.emit("connection", ws, reqWithUser);
});
} catch (err) {
console.log("error", err);
socket.write(err.message); // HTTP response string
socket.destroy();
}
});
wss.server.on("connection", (socket, req: IncomingMessageWithUser) => {
const user = req.user;
if (!user) return socket.close(1008, "Unauthorized");
console.log(`Websocket opened (${user.email})`);
const userId = user._id.toString();
wss.clients.set(userId, socket);
socket.on("close", () => {
console.log(`Websocket closed (${user.email})`);
wss.clients.delete(userId);
});
});
}
在这里,我们通过验证请求,然后使用 WebSocketServer 的 handleUpgrade 方法来处理 “升级 “事件。我们还为 “连接 “事件添加了一个监听器。在本例中,我使用 mongo 用户文档的 ObjectId 作为键,将套接字存储在客户端映射中。这样,每当该用户收到消息时,我们就可以轻松地返回到该套接字。我们还确保在套接字关闭时清理地图条目。
向客户端发送信息
现在我们已经建立了 WebSocketServer
,可以通过打开的套接字向客户端发送消息。例如,你有一个 Webhook,可以监听来自外部的消息,并将其实时发送给你的用户。只要能从请求中识别出clients
映射中的套接字,就能非常简单地检查该键是否存在开放的套接字,并在找到后发送数据。
webhook.post("/message", async (req, res) => {
const { message, userId } = req.body;
const socket = wss.clients.get(userId);
socket?.send(JSON.stringify(message));
return res.sendStatus(200);
})
现在,我们可以在前端轻松打开 WebSocket 连接并处理传入的信息。
const ws = new WebSocket("ws://example.com");
ws.onmessage = (e) => {
const { message } = JSON.parse(e.data);
console.log(message);
};
如果我们是在单台机器上的单个服务器实例上运行,那么我们就完成了!服务器存储打开的套接字连接,Webhook 逻辑能够访问用户连接的套接字(如果存在),并通过套接字发送消息数据。但是,如果我们在其中加入一点并行化,情况就会变得复杂一些。
假设你使用节点集群模块在同一台机器上运行多个后端实例。我们目前编写的代码将无法工作!因为现在有多个 WebSocketServers,每个 WebSocketServers 都会跟踪自己的客户端套接字,而 webhook 可能会在任何一个 HTTP 服务器实例上被触发,因此 webhook 被触发的实例很可能与存储我们所需套接字的实例不同。在节点集群模块的范围内,这个问题可以通过向主进程发送消息并转发给所有 Worker 来解决。
但是,如果我们进一步扩展,接收 webhook 请求的实例与保存套接字的实例之间的距离就会拉大。现在,假设您使用 AWS ECS 等云容器编排服务在多台机器上运行节点集群。我们该如何在不同机器上运行的实例之间高效地通信消息数据呢?这就是 Redis 的用武之地。
Redis 作为 pub/sub 代理
Redis 是一种流行的开源内存数据存储,许多知名公司都在使用它,包括 GitHub、Twitter 和 Snapchat。它最常用于缓存和会话存储,但也提供了另一种功能:pub/sub 消息传递范例。
Pub/Sub 是发布/订阅的简称,是一种消息传递范例,发布者通过订阅者收听的频道发送数据。它效率高、健壮、扩展轻松,是通过分布式架构转发消息数据的最佳选择。
让我们在后端实现 pub/sub 代理。
import Redis from "ioredis";
interface ChannelRouter {
[channel: string]: (message: string) => Promise<void>;
}
export default class Broker {
private readonly pub: Redis;
private readonly sub: Redis;
private router: ChannelRouter;
constructor(pub: Redis, sub: Redis) {
this.pub = pub;
this.sub = sub;
this.router = {};
}
route(channel: string, handler: ChannelRouter[string]) {
this.router[channel] = handler;
}
async subscribe() {
this.sub.on(
"message",
async (channel, message) => await this.router[channel](message),
);
await this.sub.subscribe(...Object.keys(this.router));
}
async publish(channel: string, message: any) {
await this.pub.publish(channel, JSON.stringify(message));
}
}
在这里,我们创建了一个 Broker
类来管理 Redis 连接,并将通道消息路由到处理函数。我们提供了一个route
方法来处理建立新的 “路由”,或与处理通过这些通道传递的数据的逻辑相匹配的通道。我们还提供了一个 subscribe
方法来处理事件监听器的设置和将我们的 “子 “客户端订阅到我们建立的所有通道,以及一个通过指定通道发送消息对象的publish
方法。如你所见,Redis 为发布消息和订阅通道提供了一个非常简单的 API。
需要注意的是,由于每个服务器实例都能接收 webhook 请求并持有客户端套接字,因此每个实例都应同时是发布者和订阅者。Redis 客户端在订阅通道时无法发布数据,因此我们需要两个独立的 Redis 连接。
将一切整合在一起
现在有了Broker
类,让我们在应用程序中使用它!首先,我们将初始化一个 Broker
,并为传入的 Webhook 消息定义一个通道。
import Redis from "ioredis";
import Broker from "./broker";
// ...
const pub = new Redis(6379, "localhost"); // Replace with your real redis port and host
const sub = new Redis(6379, "localhost");
const broker = new Broker(pub, sub);
broker.route("webhook:message", async (message) => {
const { userId, data } = JSON.parse(message);
const socket = wss.clients.get(userId);
socket?.send(JSON.stringify(data));
});
await broker.subscribe();
console.log("Pub/Sub broker initialized.");
在这里,我们创建了两个新的 Redis 连接,将它们传递给 Broker
构造函数,并使用 route
方法定义了一个名为 “webhook:message
“的通道。现在,只要有消息通过 “webhook:message
“通道发送,我们的处理函数就会在数据上被调用!我们会在clients
映射中检查是否有以传递的 userId 为关键字的套接字。如果找到了,我们就通过它发送消息数据。
在本例中,我们只为 Broker
定义了一个路由,但由于我们定义了 Broker
类,添加和处理更多通道就像再次调用路由方法一样简单。
现在,让我们回到 webhook 路由,更新它以发布收到的消息。
webhook.post("/message", async (req, res) => {
const { message, userId } = req.body;
await broker.publish("webhook:message", {
userId,
data: message,
});
return res.sendStatus(200);
});
现在,我们不再直接检查 userId
的套接字,而是通过 “webhook:message
“通道发布数据。这将把数据转发给后端的所有实例,包括持有我们正在寻找的套接字的实例。
我们就大功告成了!现在,我们不必再担心哪个实例持有哪个套接字连接,我们可以享受 WebSockets 的强大功能和分布式架构的高效率。这是一个强大的解决方案,可以让你走得更远,当然也足以处理绝大多数情况。
(如果速度不够快,或者负载太大,代理成为瓶颈,那么更先进的解决方案是使用 ZMQ,它是一个非常强大的无代理消息库)。
作者:Ben Barber
译自medium.
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/38943.html