使用 WebSockets 和 Redis 在 Node.js 分布式架构中进行实时消息传递

在管理许多有状态连接时,使用分布式系统会带来新的复杂性,就像在 Web 应用程序中实现实时消息传递功能时所必须的那样。本文旨在提供一个简洁的示例,说明如何使用 WebSockets 向客户端发送实时数据,并利用 Redis 作为 pub/sub 代理将传入数据导向正确的套接字。该示例是 MERN 网络应用(MongoDB、Express.js、React、Node.js)的一部分。

创建 WebSocketServer 以处理来自客户端的 WebSocket 连接

import ws from "ws";

interface WSS {
  server: ws.WebSocketServer;
  clients: Map<string, ws.WebSocket>;
}

const wss: WSS = {
  server: new ws.WebSocketServer({ noServer: true, clientTracking: false }),
  clients: new Map(),
}

第一步是在 HTTP 服务器旁创建一个 WebSocketServer。我们还需要跟踪打开的客户端连接,因此需要初始化一个新对象,该对象包含服务器和一个包含客户端套接字的映射。我们以 noServer 模式初始化,并将 clientTracking 设为 false,以确保它与 HTTP 服务器完全分离。

接下来,我们需要允许 WebSocketServer 处理对 HTTP 服务器的 “升级 “请求。如果您不熟悉,WebSocket 连接需要经过三个步骤:

  • 客户端通过向服务器发送 HTTP GET 请求来启动连接,请求中包含表示协议升级的特殊标头。
  • 服务器会处理请求,可能会执行身份验证,然后返回同意升级的回复。
  • 连接升级后,所有后续通信都将通过 WebSocket 协议进行。
使用 WebSockets 和 Redis 在 Node.js 分布式架构中进行实时消息传递

在下面的示例中,我使用了一个身份验证函数,它返回一个 IncomingRequestWithUser,这只是一个普通的请求对象,并附加了当前会话用户的 mongo 文档。

withSocket(server: http.Server): void {
  server.on("upgrade", async (req, socket, head) => {
    socket.on("error", console.error);
    try {
      const reqWithUser = await authenticateSocket(req); // Custom authentication function
      wss.server.handleUpgrade(reqWithUser, socket, head, (ws) => {
        wss.server.emit("connection", ws, reqWithUser);
      });
    } catch (err) {
      console.log("error", err);
      socket.write(err.message); // HTTP response string
      socket.destroy();
    }
  });

  wss.server.on("connection", (socket, req: IncomingMessageWithUser) => {
    const user = req.user;
    if (!user) return socket.close(1008, "Unauthorized");
    console.log(`Websocket opened (${user.email})`);

    const userId = user._id.toString();
    wss.clients.set(userId, socket);

    socket.on("close", () => {
      console.log(`Websocket closed (${user.email})`);
      wss.clients.delete(userId);
    });
  });
}

在这里,我们通过验证请求,然后使用 WebSocketServer 的 handleUpgrade 方法来处理 “升级 “事件。我们还为 “连接 “事件添加了一个监听器。在本例中,我使用 mongo 用户文档的 ObjectId 作为键,将套接字存储在客户端映射中。这样,每当该用户收到消息时,我们就可以轻松地返回到该套接字。我们还确保在套接字关闭时清理地图条目。

向客户端发送信息

现在我们已经建立了 WebSocketServer,可以通过打开的套接字向客户端发送消息。例如,你有一个 Webhook,可以监听来自外部的消息,并将其实时发送给你的用户。只要能从请求中识别出clients映射中的套接字,就能非常简单地检查该键是否存在开放的套接字,并在找到后发送数据。

webhook.post("/message", async (req, res) => {
  const { message, userId } = req.body;
  const socket = wss.clients.get(userId);
  socket?.send(JSON.stringify(message));
  return res.sendStatus(200);
})

现在,我们可以在前端轻松打开 WebSocket 连接并处理传入的信息。

const ws = new WebSocket("ws://example.com");
ws.onmessage = (e) => {
  const { message } = JSON.parse(e.data);
  console.log(message);
};

如果我们是在单台机器上的单个服务器实例上运行,那么我们就完成了!服务器存储打开的套接字连接,Webhook 逻辑能够访问用户连接的套接字(如果存在),并通过套接字发送消息数据。但是,如果我们在其中加入一点并行化,情况就会变得复杂一些。

假设你使用节点集群模块在同一台机器上运行多个后端实例。我们目前编写的代码将无法工作!因为现在有多个 WebSocketServers,每个 WebSocketServers 都会跟踪自己的客户端套接字,而 webhook 可能会在任何一个 HTTP 服务器实例上被触发,因此 webhook 被触发的实例很可能与存储我们所需套接字的实例不同。在节点集群模块的范围内,这个问题可以通过向主进程发送消息并转发给所有 Worker 来解决。

但是,如果我们进一步扩展,接收 webhook 请求的实例与保存套接字的实例之间的距离就会拉大。现在,假设您使用 AWS ECS 等云容器编排服务在多台机器上运行节点集群。我们该如何在不同机器上运行的实例之间高效地通信消息数据呢?这就是 Redis 的用武之地。

Redis 作为 pub/sub 代理

Redis 是一种流行的开源内存数据存储,许多知名公司都在使用它,包括 GitHub、Twitter 和 Snapchat。它最常用于缓存和会话存储,但也提供了另一种功能:pub/sub 消息传递范例。

Pub/Sub 是发布/订阅的简称,是一种消息传递范例,发布者通过订阅者收听的频道发送数据。它效率高、健壮、扩展轻松,是通过分布式架构转发消息数据的最佳选择。

让我们在后端实现 pub/sub 代理。

import Redis from "ioredis";

interface ChannelRouter {
  [channel: string]: (message: string) => Promise<void>;
}

export default class Broker {
  private readonly pub: Redis;
  private readonly sub: Redis;
  private router: ChannelRouter;

  constructor(pub: Redis, sub: Redis) {
    this.pub = pub;
    this.sub = sub;
    this.router = {};
  }

  route(channel: string, handler: ChannelRouter[string]) {
    this.router[channel] = handler;
  }

  async subscribe() {
    this.sub.on(
      "message",
      async (channel, message) => await this.router[channel](message),
    );
    await this.sub.subscribe(...Object.keys(this.router));
  }

  async publish(channel: string, message: any) {
    await this.pub.publish(channel, JSON.stringify(message));
  }
}

在这里,我们创建了一个 Broker 类来管理 Redis 连接,并将通道消息路由到处理函数。我们提供了一个route方法来处理建立新的 “路由”,或与处理通过这些通道传递的数据的逻辑相匹配的通道。我们还提供了一个 subscribe 方法来处理事件监听器的设置和将我们的 “子 “客户端订阅到我们建立的所有通道,以及一个通过指定通道发送消息对象的publish方法。如你所见,Redis 为发布消息和订阅通道提供了一个非常简单的 API。

需要注意的是,由于每个服务器实例都能接收 webhook 请求并持有客户端套接字,因此每个实例都应同时是发布者和订阅者。Redis 客户端在订阅通道时无法发布数据,因此我们需要两个独立的 Redis 连接。

将一切整合在一起

现在有了Broker类,让我们在应用程序中使用它!首先,我们将初始化一个 Broker,并为传入的 Webhook 消息定义一个通道。

import Redis from "ioredis";
import Broker from "./broker";

// ...

const pub = new Redis(6379, "localhost"); // Replace with your real redis port and host
const sub = new Redis(6379, "localhost");

const broker = new Broker(pub, sub);

broker.route("webhook:message", async (message) => {
  const { userId, data } = JSON.parse(message);
  const socket = wss.clients.get(userId);
  socket?.send(JSON.stringify(data));
});

await broker.subscribe();

console.log("Pub/Sub broker initialized.");

在这里,我们创建了两个新的 Redis 连接,将它们传递给 Broker 构造函数,并使用 route 方法定义了一个名为 “webhook:message “的通道。现在,只要有消息通过 “webhook:message “通道发送,我们的处理函数就会在数据上被调用!我们会在clients映射中检查是否有以传递的 userId 为关键字的套接字。如果找到了,我们就通过它发送消息数据。

在本例中,我们只为 Broker 定义了一个路由,但由于我们定义了 Broker 类,添加和处理更多通道就像再次调用路由方法一样简单。

现在,让我们回到 webhook 路由,更新它以发布收到的消息。

webhook.post("/message", async (req, res) => {
  const { message, userId } = req.body;
  await broker.publish("webhook:message", {
    userId,
    data: message,
  });
  return res.sendStatus(200);
});

现在,我们不再直接检查 userId 的套接字,而是通过 “webhook:message“通道发布数据。这将把数据转发给后端的所有实例,包括持有我们正在寻找的套接字的实例。

我们就大功告成了!现在,我们不必再担心哪个实例持有哪个套接字连接,我们可以享受 WebSockets 的强大功能和分布式架构的高效率。这是一个强大的解决方案,可以让你走得更远,当然也足以处理绝大多数情况。

(如果速度不够快,或者负载太大,代理成为瓶颈,那么更先进的解决方案是使用 ZMQ,它是一个非常强大的无代理消息库)。

作者:Ben Barber
译自medium.

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/38943.html

(0)

相关推荐

发表回复

登录后才能评论