本文将引导您使用现代技术构建一个可扩展的实时聊天应用程序。我们将重点讨论如何创建一个既能处理数千个并发用户,又能保持实时性能的系统。
架构概述
聊天应用程序采用可扩展架构,包含以下组件:
- 带有 Socket.IO 客户端的 React 前端
- 多个 NestJS 后端实例
- NGINX 负载均衡器
- 用于数据持久化和发布/订阅的 Redis 集群
为什么选择这些堆栈?
- React:提供对实时应用程序至关重要的高效 UI 更新
- NestJS:提供出色的 TypeScript 支持和可扩展的 WebSocket 处理
- Redis 集群:支持水平扩展和可靠的发布/订阅消息传递
- NGINX:管理负载平衡和 WebSocket 连接
实施细节
1. WebSocket 连接管理
前端通过 Socket.IO 建立 WebSocket 连接:
// SocketContext.tsx
import { createContext, useContext, useEffect, useState } from 'react';
import { io, Socket } from 'socket.io-client';
const SocketContext = createContext<{
socket: Socket | null;
username: string;
setUsername: (name: string) => void;
}>({
socket: null,
username: '',
setUsername: () => {},
});
export const SocketProvider = ({ children }) => {
const [socket, setSocket] = useState<Socket | null>(null);
const [username, setUsername] = useState('');
useEffect(() => {
const newSocket = io('http://localhost:8080');
setSocket(newSocket);
return () => {
newSocket.close();
};
}, []);
return (
<SocketContext.Provider value={{ socket, username, setUsername }}>
{children}
</SocketContext.Provider>
);
};
2. 使用 NGINX 进行负载平衡
NGINX 的 WebSocket 负载平衡配置:
upstream socket_nodes {
ip_hash; # 确保会话持久性
server backend1:3000;
server backend2:3000;
}
server {
listen 80;
location /socket.io/ {
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $host;
proxy_pass http://socket_nodes;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
3. Redis 集群的可扩展性
用 Redis 集群配置可实现数据分片和高可用性:
// app.module.ts
@Module({
imports: [
RedisModule.forRoot({
type: 'cluster',
nodes: [
{ host: 'redis-1', port: 6379 },
{ host: 'redis-2', port: 6380 },
{ host: 'redis-3', port: 6381 },
],
options: {
scaleReads: 'all',
clusterRetryStrategy: (times) => Math.min(times * 100, 3000),
},
}),
],
})
4. 消息处理和分发
后端使用 NestJS WebSocket decorators 处理消息:
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
OnGatewayConnection,
OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { InjectRedis } from '@nestjs-modules/ioredis';
import Redis from 'ioredis';
@WebSocketGateway({
cors: {
origin: '*',
},
})
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;
constructor(@InjectRedis() private readonly redis: Redis) {}
private async updateUsers() {
const users = await this.redis.hgetall('users');
const usersList = Object.entries(users).map(([id, data]) => {
const userData = JSON.parse(data);
return {
id,
...userData,
};
});
this.server.emit('users:update', usersList);
}
async handleConnection(client: Socket) {
console.log(`Client connected: ${client.id}`);
}
async handleDisconnect(client: Socket) {
await this.redis.hdel('users', client.id);
await this.updateUsers();
}
@SubscribeMessage('user:join')
async handleUserJoin(client: Socket, username: string) {
await this.redis.hset(
'users',
client.id,
JSON.stringify({
username,
isActive: true,
}),
);
await this.updateUsers();
}
@SubscribeMessage('message:private')
async handlePrivateMessage(
client: Socket,
payload: { to: string; content: string; from: string },
) {
const users = await this.redis.hgetall('users');
const targetSocket = Object.entries(users).find(
([, data]) => JSON.parse(data).username === payload.to,
)?.[0];
if (targetSocket) {
this.server.to(targetSocket).emit('message:receive', payload);
client.emit('message:receive', payload);
}
}
@SubscribeMessage('room:create')
async handleRoomCreate(client: Socket, room: string) {
await this.redis.sadd('rooms', room);
client.join(room);
// 向所有客户广播房间更新信息
this.broadcastRoomUpdate();
}
@SubscribeMessage('room:join')
async handleRoomJoin(client: Socket, room: string) {
client.join(room);
// 向所有客户广播房间更新信息
this.broadcastRoomUpdate();
}
@SubscribeMessage('rooms:list')
async handleRoomsList() {
await this.broadcastRoomUpdate();
}
// 添加此方法以广播房间更新信息
private async broadcastRoomUpdate() {
const rooms = await this.redis.smembers('rooms');
const roomsList = rooms.map((room) => ({
name: room,
users: this.server.sockets.adapter.rooms.get(room)?.size || 0,
}));
this.server.emit('rooms:update', roomsList);
}
@SubscribeMessage('message:room')
async handleRoomMessage(
client: Socket,
payload: { room: string; content: string; from: string },
) {
this.server.to(payload.room).emit('message:receive', payload);
}
}
扩展注意事项
1. 水平扩展
- 多个 NestJS 实例处理 WebSocket 连接
- Redis Cluster 跨节点分发数据
- NGINX 负载均衡器分配流量
2. 数据持久性
- 消息存储在 Redis 中以实现容错
- 服务器重启后仍保留用户会话
- Redis 集群中保存的房间状态
3. 实时性能
- WebSocket 连接保持低延迟
- Redis pub/sub 确保消息传递
- 粘性会话可防止连接中断
架构可以处理:
- 单个嵌套实例可支持 10,000 个并发 WebSocket 连接
- 大规模实时消息传递
- 自动故障转移和恢复
- 服务器重启后会话仍然保持
部署(本地)
使用 Docker Compose 进行编排:
version: '3.8'
services:
frontend:
build:
context: ./frontend
dockerfile: Dockerfile
ports:
- "3000:3000"
networks:
- app-network
backend1:
build:
context: ./backend
dockerfile: Dockerfile
environment:
- REDIS_NODES=redis-1:6379,redis-2:6380,redis-3:6381
depends_on:
- redis-1
- redis-2
- redis-3
networks:
- app-network
backend2:
build:
context: ./backend
dockerfile: Dockerfile
environment:
- REDIS_NODES=redis-1:6379,redis-2:6380,redis-3:6381
depends_on:
- redis-1
- redis-2
- redis-3
networks:
- app-network
nginx:
image: nginx:alpine
ports:
- "8080:80"
volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- backend1
- backend2
networks:
- app-network
redis-1:
image: redis:alpine
command: redis-server /usr/local/etc/redis/redis.conf
ports:
- "6379:6379"
- "16379:16379"
volumes:
- ./redis/cluster/redis-node-1.conf:/usr/local/etc/redis/redis.conf
- redis1-data:/data
networks:
- app-network
redis-2:
image: redis:alpine
command: redis-server /usr/local/etc/redis/redis.conf
ports:
- "6380:6380"
- "16380:16380"
volumes:
- ./redis/cluster/redis-node-2.conf:/usr/local/etc/redis/redis.conf
- redis2-data:/data
networks:
- app-network
redis-3:
image: redis:alpine
command: redis-server /usr/local/etc/redis/redis.conf
ports:
- "6381:6381"
- "16381:16381"
volumes:
- ./redis/cluster/redis-node-3.conf:/usr/local/etc/redis/redis.conf
- redis3-data:/data
networks:
- app-network
redis-cluster-init:
image: redis:alpine
command: redis-cli --cluster create redis-1:6379 redis-2:6380 redis-3:6381 --cluster-yes
depends_on:
- redis-1
- redis-2
- redis-3
networks:
- app-network
networks:
app-network:
driver: bridge
volumes:
redis1-data:
redis2-data:
redis3-data:
输出
加入聊天:

群聊:

单聊:

结论
这种架构为可扩展的聊天应用程序提供了坚实的基础。关键要点:
- 用于实时通信的 WebSocket 连接
- Redis Cluster 用于数据持久性和发布/订阅
- 水平扩展的负载平衡
- Docker 实现一致部署完整的源代码可在 GitHub 上获取,您可以使用 Docker Compose 进行部署。
代码 :https://github.com/Rezowanur-Rahman-Robin/Scalable-Socket-App/tree/master
作者:Rezowanur Rahman Robin
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/56169.html