使用 Scoket.io、React、NestJS 和 Redis Cluster 构建可扩展的实时聊天应用程序

本文将引导您使用现代技术构建一个可扩展的实时聊天应用程序。我们将重点讨论如何创建一个既能处理数千个并发用户,又能保持实时性能的系统。

架构概述

聊天应用程序采用可扩展架构,包含以下组件:

  • 带有 Socket.IO 客户端的 React 前端
  • 多个 NestJS 后端实例
  • NGINX 负载均衡器
  • 用于数据持久化和发布/订阅的 Redis 集群

为什么选择这些堆栈?

  1. React:提供对实时应用程序至关重要的高效 UI 更新
  2. NestJS:提供出色的 TypeScript 支持和可扩展的 WebSocket 处理
  3. Redis 集群:支持水平扩展和可靠的发布/订阅消息传递
  4. NGINX:管理负载平衡和 WebSocket 连接

实施细节

1. WebSocket 连接管理

前端通过 Socket.IO 建立 WebSocket 连接:

// SocketContext.tsx
import { createContext, useContext, useEffect, useState } from 'react';
import { io, Socket } from 'socket.io-client';
const SocketContext = createContext<{
  socket: Socket | null;
  username: string;
  setUsername: (name: string) => void;
}>({
  socket: null,
  username: '',
  setUsername: () => {},
});
export const SocketProvider = ({ children }) => {
  const [socket, setSocket] = useState<Socket | null>(null);
  const [username, setUsername] = useState('');
  useEffect(() => {
    const newSocket = io('http://localhost:8080');
    setSocket(newSocket);
    return () => {
      newSocket.close();
    };
  }, []);
  return (
    <SocketContext.Provider value={{ socket, username, setUsername }}>
      {children}
    </SocketContext.Provider>
  );
};

2. 使用 NGINX 进行负载平衡

NGINX 的 WebSocket 负载平衡配置:

upstream socket_nodes {
    ip_hash;  # 确保会话持久性
    server backend1:3000;
    server backend2:3000;
}
server {
    listen 80;
    
    location /socket.io/ {
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $host;
        proxy_pass http://socket_nodes;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
}

3. Redis 集群的可扩展性

用 Redis 集群配置可实现数据分片和高可用性:

// app.module.ts
@Module({
  imports: [
    RedisModule.forRoot({
      type: 'cluster',
      nodes: [
        { host: 'redis-1', port: 6379 },
        { host: 'redis-2', port: 6380 },
        { host: 'redis-3', port: 6381 },
      ],
      options: {
        scaleReads: 'all',
        clusterRetryStrategy: (times) => Math.min(times * 100, 3000),
      },
    }),
  ],
})

4. 消息处理和分发

后端使用 NestJS WebSocket decorators 处理消息:

import {
  WebSocketGateway,
  WebSocketServer,
  SubscribeMessage,
  OnGatewayConnection,
  OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { InjectRedis } from '@nestjs-modules/ioredis';
import Redis from 'ioredis';
@WebSocketGateway({
  cors: {
    origin: '*',
  },
})
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;
  constructor(@InjectRedis() private readonly redis: Redis) {}
  private async updateUsers() {
    const users = await this.redis.hgetall('users');
    const usersList = Object.entries(users).map(([id, data]) => {
      const userData = JSON.parse(data);
      return {
        id,
        ...userData,
      };
    });
    this.server.emit('users:update', usersList);
  }
  async handleConnection(client: Socket) {
    console.log(`Client connected: ${client.id}`);
  }
  async handleDisconnect(client: Socket) {
    await this.redis.hdel('users', client.id);
    await this.updateUsers();
  }
  @SubscribeMessage('user:join')
  async handleUserJoin(client: Socket, username: string) {
    await this.redis.hset(
      'users',
      client.id,
      JSON.stringify({
        username,
        isActive: true,
      }),
    );
    await this.updateUsers();
  }
  @SubscribeMessage('message:private')
  async handlePrivateMessage(
    client: Socket,
    payload: { to: string; content: string; from: string },
  ) {
    const users = await this.redis.hgetall('users');
    const targetSocket = Object.entries(users).find(
      ([, data]) => JSON.parse(data).username === payload.to,
    )?.[0];
    if (targetSocket) {
      this.server.to(targetSocket).emit('message:receive', payload);
      client.emit('message:receive', payload);
    }
  }
  @SubscribeMessage('room:create')
  async handleRoomCreate(client: Socket, room: string) {
    await this.redis.sadd('rooms', room);
    client.join(room);
    // 向所有客户广播房间更新信息
    this.broadcastRoomUpdate();
  }
  @SubscribeMessage('room:join')
  async handleRoomJoin(client: Socket, room: string) {
    client.join(room);
    // 向所有客户广播房间更新信息
    this.broadcastRoomUpdate();
  }
  @SubscribeMessage('rooms:list')
  async handleRoomsList() {
    await this.broadcastRoomUpdate();
  }
  // 添加此方法以广播房间更新信息
  private async broadcastRoomUpdate() {
    const rooms = await this.redis.smembers('rooms');
    const roomsList = rooms.map((room) => ({
      name: room,
      users: this.server.sockets.adapter.rooms.get(room)?.size || 0,
    }));
    this.server.emit('rooms:update', roomsList);
  }
  @SubscribeMessage('message:room')
  async handleRoomMessage(
    client: Socket,
    payload: { room: string; content: string; from: string },
  ) {
    this.server.to(payload.room).emit('message:receive', payload);
  }
}

扩展注意事项

1. 水平扩展

  • 多个 NestJS 实例处理 WebSocket 连接
  • Redis Cluster 跨节点分发数据
  • NGINX 负载均衡器分配流量

2. 数据持久性

  • 消息存储在 Redis 中以实现容错
  • 服务器重启后仍保留用户会话
  • Redis 集群中保存的房间状态

3. 实时性能

  • WebSocket 连接保持低延迟
  • Redis pub/sub 确保消息传递
  • 粘性会话可防止连接中断

架构可以处理:

  • 单个嵌套实例可支持 10,000 个并发 WebSocket 连接
  • 大规模实时消息传递
  • 自动故障转移和恢复
  • 服务器重启后会话仍然保持

部署(本地)

使用 Docker Compose 进行编排:

version: '3.8'
services:
  frontend:
    build:
      context: ./frontend
      dockerfile: Dockerfile
    ports:
      - "3000:3000"
    networks:
      - app-network
  backend1:
    build:
      context: ./backend
      dockerfile: Dockerfile
    environment:
      - REDIS_NODES=redis-1:6379,redis-2:6380,redis-3:6381
    depends_on:
      - redis-1
      - redis-2
      - redis-3
    networks:
      - app-network
  backend2:
    build:
      context: ./backend
      dockerfile: Dockerfile
    environment:
      - REDIS_NODES=redis-1:6379,redis-2:6380,redis-3:6381
    depends_on:
      - redis-1
      - redis-2
      - redis-3
    networks:
      - app-network
  nginx:
    image: nginx:alpine
    ports:
      - "8080:80"
    volumes:
      - ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro
    depends_on:
      - backend1
      - backend2
    networks:
      - app-network
  redis-1:
    image: redis:alpine
    command: redis-server /usr/local/etc/redis/redis.conf
    ports:
      - "6379:6379"
      - "16379:16379"
    volumes:
      - ./redis/cluster/redis-node-1.conf:/usr/local/etc/redis/redis.conf
      - redis1-data:/data
    networks:
      - app-network
  redis-2:
    image: redis:alpine
    command: redis-server /usr/local/etc/redis/redis.conf
    ports:
      - "6380:6380"
      - "16380:16380"
    volumes:
      - ./redis/cluster/redis-node-2.conf:/usr/local/etc/redis/redis.conf
      - redis2-data:/data
    networks:
      - app-network
  redis-3:
    image: redis:alpine
    command: redis-server /usr/local/etc/redis/redis.conf
    ports:
      - "6381:6381"
      - "16381:16381"
    volumes:
      - ./redis/cluster/redis-node-3.conf:/usr/local/etc/redis/redis.conf
      - redis3-data:/data
    networks:
      - app-network
  redis-cluster-init:
    image: redis:alpine
    command: redis-cli --cluster create redis-1:6379 redis-2:6380 redis-3:6381 --cluster-yes
    depends_on:
      - redis-1
      - redis-2
      - redis-3
    networks:
      - app-network
networks:
  app-network:
    driver: bridge
volumes:
  redis1-data:
  redis2-data:
  redis3-data:

输出

加入聊天:

使用 Scoket.io、React、NestJS 和 Redis Cluster 构建可扩展的实时聊天应用程序

群聊:

使用 Scoket.io、React、NestJS 和 Redis Cluster 构建可扩展的实时聊天应用程序

单聊:

使用 Scoket.io、React、NestJS 和 Redis Cluster 构建可扩展的实时聊天应用程序

结论

这种架构为可扩展的聊天应用程序提供了坚实的基础。关键要点:

  • 用于实时通信的 WebSocket 连接
  • Redis Cluster 用于数据持久性和发布/订阅
  • 水平扩展的负载平衡
  • Docker 实现一致部署完整的源代码可在 GitHub 上获取,您可以使用 Docker Compose 进行部署。

代码 :https://github.com/Rezowanur-Rahman-Robin/Scalable-Socket-App/tree/master

作者:Rezowanur Ra​​hman Robin

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/56169.html

(0)

相关推荐

发表回复

登录后才能评论