在多个客户端(用户)需要实时通信的情况下,传统的单台服务器方法无法很好地扩展。随着客户端数量的增加,单一服务器将成为瓶颈,导致性能问题和潜在故障。此外,如果客户端连接到不同的服务器,它们将无法相互通信,从而导致信息孤岛。
解决方法
- 多服务器实例:部署多个服务器实例(服务器 1、服务器 2、服务器 3)来处理连接客户端的负载,而不是依赖单个服务器。
- Redis Pub/Sub:Redis 实例使用其 Pub/Sub 功能充当中央消息代理。每个服务器实例都会与 Redis 建立连接,同时充当发布者和订阅者。
- 发布消息:当客户端向所连接的服务器发送消息时,服务器会使用 Pub/Sub 机制将消息发布到 Redis 中的特定通道(如 “聊天”)。
- 订阅消息:所有其他服务器实例都订阅了 Redis 中的同一频道。发布消息时,Redis 会将消息分发到所有订阅的服务器。
- 广播消息:每个服务器实例从 Redis 接收消息后,会将消息广播给连接到该服务器实例的所有客户端。
通过利用 Redis Pub/Sub,该解决方案将消息分发与单个服务器分离开来。负载分布在多个服务器实例上,确保了可扩展性,因为可以在不影响通信流的情况下添加新的实例。连接到不同服务器实例的客户端可以进行无缝通信,消除消息孤岛,实现整个系统的实时通信。
步骤 1:基本设置(通信服务器-客户端)
参考代码库:https://github.com/Subham-Maity/scalable_chat_app_nest/tree/5adeea135b5010e44b30ea36bdc156ee929ed893
server
–api/src/chat/chat.gateway.ts
import {
WebSocketGateway,
SubscribeMessage,
MessageBody,
WebSocketServer,
OnGatewayConnection, OnGatewayDisconnect,
} from '@nestjs/websockets';
import { ChatService } from './chat.service';
import { Server, Socket } from 'socket.io';
import { Logger } from '@nestjs/common';
/**
* @WebSocketGateway is a decorator that creates a WebSocket gateway
* and exposes a specified port (3002) for the WebSocket server.
* The `cors` option allows cross-origin requests.
*/
@WebSocketGateway(3002,{cors:true})
/*
✅//OnGatewayConnection
io.on("connection", (socket) => {
console.log("Client connected");
✅ //OnGatewayDisconnect
socket.on("disconnect", () => {
console.log("Client disconnected");
});
});
*//**
* This class implements the OnGatewayConnection and OnGatewayDisconnect interfaces,
* which provide methods to handle client connections and disconnections.
*
* The commented-out code demonstrates how to handle connections and disconnections
* using the socket.io event listeners `io.on("connection", ...)` and `socket.on("disconnect", ...)`.
*/
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect{
private readonly logger = new Logger(ChatGateway.name);
/**
* @WebSocketServer() is a decorator that injects the Socket.IO server instance.
* It allows you to access and interact with the WebSocket server.
*/
@WebSocketServer() server: Server
/**
* This method is called when a new client connects to the WebSocket server.
* It logs the client's connection, and emits a `user-joined` event to all connected clients,
* including the client's ID in the event payload.
*/
handleConnection(client: Socket){
this.logger.verbose('New client connected', client.id);
//The event data will only be broadcast to every socket but the sender.
//instead of this.server.emit we use client.broadcast.emit because we don't need to show the same user that he join the chat again
//so broadcast the message to all clients except the user who just joined
client.broadcast.emit( `user-joined`, {
message: `New user joined the chat: ${client.id}`,
id: client.id
})
}
/**
* This method is called when a client disconnects from the WebSocket server.
* It logs the client's disconnection, and emits a `user-left` event to all connected clients,
* including the client's ID in the event payload.
*/
handleDisconnect(client: Socket){
this.logger.verbose('Client disconnected', client.id);
this.server.emit( `user-left`, {
message: `User left the chat: ${client.id}`,
id: client.id
})
}
/**
* This method is called when a client sends a message with the event name 'newMessage'.
* It receives the client socket instance and the message payload.
*
* The method logs the received message, broadcasts the message to all connected clients
* using the 'message' event, and sends a reply to the current client with the "Hello from server" message.
*
* It also demonstrates how to broadcast a message to all clients using `this.server.emit("reply", "broadcasting...")`.
*/
@SubscribeMessage('newMessage')
handleNewMessage( client: Socket,@MessageBody() message: any) {
// Log the received message
this.logger.debug(message);
// Broadcast the received message to all clients (group chat)
this.server.emit('message', message); //broadcast to all clients (group chat)
/**
//socket.emit("reply", "Hello from server") - Reply to the current client with a "Hello from server" message
if (client) {
client.emit("reply", "Hello from server")
} else {
console.log('Client is undefined');
}
// io.emit("reply", "broadcasting...") - broadcast to all clients
//reply - event , broadcasting... - arguments
this.server.emit("reply", "broadcasting...")
*/
}
}
Client
–client/src/components/chat.tsx
'use client';
import React, { useState, useEffect, useRef } from 'react';
import io from 'socket.io-client';
const socket = io('http://localhost:3002');
const ChatComponent = () => {
const [message, setMessage] = useState('');
const [messages, setMessages] = useState<string[]>([]);
const messagesEndRef = useRef<HTMLDivElement | null>(null);
useEffect(() => {
// fn: this.server.emit('message', message)
// This event listener is triggered when the server broadcasts a message to all clients using 'this.server.emit('message', message)'
// It receives the broadcast message and appends it to the 'messages' state
socket.on('message', (data) => {
//If we directly push the new data into the messages array using the push method (e.g., messages.push(data)),
// we would be mutating the existing state array.
// Reacts state updates are designed to be immutable, so mutating the state directly can lead to unexpected behavior and potential bugs.
//By using the spread operator ...prevMessages, we create a new array that includes all the elements from the previous messages array (prevMessages).
// This way, we don't lose any existing messages when adding a new
//...prevMessages is the spread syntax, which spreads the elements of the prevMessages array into a new array.
//[...prevMessages, data] creates a new array by combining the elements from prevMessages and the new data element.
// The resulting array [...prevMessages, data] becomes the new value of the messages state.
setMessages((prevMessages) => [...prevMessages, data]);
});
// fn: this.server.emit('user-joined', { message: `New user joined the chat: ${client.id}`, id: client.id })
// This event listener is triggered when the server emits a 'user-joined' event with a message and client ID
// It receives the event payload and appends the message to the 'messages' state
socket.on('user-joined', (data) => {
setMessages((prevMessages) => [...prevMessages, data.message]);
});
// fn: this.server.emit('user-left', { message: `User left the chat: ${client.id}`, id: client.id })
// This event listener is triggered when the server emits a 'user-left' event with a message and client ID
// It receives the event payload and appends the message to the 'messages' state
socket.on('user-left', (data) => {
setMessages((prevMessages) => [...prevMessages, data.message]);
});
// This is a cleanup function that runs when the component is unmounted or when the effect is re-run due to a change in dependencies.
// It removes the event listeners for 'message', 'user-joined', and 'user-left' events to prevent memory leaks.
return () => {
socket.off('message');
socket.off('user-joined');
socket.off('user-left');
};
}, []);
// This useEffect hook is responsible for scrolling the message container to the bottom whenever the 'messages' state changes.
// It ensures that the latest messages are always visible without the user having to manually scroll down.
useEffect(() => {
// messagesEndRef.current is a reference to the div element at the end of the message container.
// The '?' is a safe navigation operator to ensure that messagesEndRef.current is not null or undefined before calling scrollIntoView().
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
}, [messages]);
// fn: @SubscribeMessage('newMessage')
// This function is triggered when the user submits a message
// It emits the 'newMessage' event with the message content to the server
const handleSubmit = (e: React.FormEvent<HTMLFormElement>) => {
e.preventDefault();
if (message.trim()) {
socket.emit('newMessage', message);
setMessage('');
}
};
return (
<div className="align">
<div className="heading">
Chat
</div>
<div className="msg">
{messages.map((msg, index) => (
<div key={index} className="msg-li">
{msg}
</div>
))}
<div ref={messagesEndRef} />
</div>
<form onSubmit={handleSubmit} className="form">
<input
type="text"
value={message}
onChange={(e) => setMessage(e.target.value)}
placeholder="Type your message..."
className="input"
/>
<button
type="submit"
className="btn"
>
Send
</button>
</form>
</div>
);
};
export default ChatComponent;
步骤 2:使用组件设置上下文
重组代码
Context
–client/src/context/socket-provider.tsx
"use client";
import React, { createContext, useCallback, useEffect, useState } from "react";
import io, { Socket } from "socket.io-client";
// Interface defining the shape of the SocketContext value
// This interface specifies the structure of the data that will be provided by the SocketContext
interface SocketContextValue {
// sendMessage is a function that takes a string argument and is used to send a message to the server
sendMessage: (msg: string) => void;
// messages is an array of strings representing the messages received from the server
messages: string[];
}
// Creating the SocketContext with a null initial value
// Creates a new React context with an initial value of null
// This context will be used to share the sendMessage function and messages array with other components
export const SocketContext = createContext<SocketContextValue | null>(null);
// SocketProvider component that manages the WebSocket connection and provides context value
// This is a React component that sets up the WebSocket connection, manages its state, and provides the context value
export const SocketProvider: React.FC<{ children: React.ReactNode }> = ({ children }) => {
// Using the useState hook to store the socket instance and the array of messages
const [socket, setSocket] = useState<Socket | null>(null);
const [messages, setMessages] = useState<string[]>([]);
// Memoized sendMessage function using useCallback
// The useCallback hook is used to memoize the sendMessage function
// This ensures that the function is only recreated when the socket instance changes
const sendMessage = useCallback((msg: string) => {
// If the socket instance is available, emit the 'newMessage' event with the provided message
if (socket) {
socket.emit("newMessage", msg);
}
}, [socket]);
// Setting up WebSocket connection and event listeners
// The useEffect hook is used to set up the WebSocket connection and event listeners
useEffect(() => {
// Create a new WebSocket connection to the server running at http://localhost:3002
const newSocket = io("http://localhost:3002");
// Update the socket state with the new socket instance
setSocket(newSocket);
// Event listener for 'message' event
// When the server sends a message, this event listener is triggered
// The received message is appended to the messages array using the setMessages function
newSocket.on("message", (data) => {
setMessages((prevMessages) => [...prevMessages, data]);
});
// Event listener for 'user-joined' event
// When a new user joins the chat, this event listener is triggered
// The join message is appended to the messages array
newSocket.on("user-joined", (data) => {
setMessages((prevMessages) => [...prevMessages, data.message]);
});
// Event listener for 'user-left' event
// When a user leaves the chat, this event listener is triggered
// The leave message is appended to the messages array
newSocket.on("user-left", (data) => {
setMessages((prevMessages) => [...prevMessages, data.message]);
});
// Cleanup function to remove event listeners and disconnect the socket
// This function is returned from the useEffect hook and will be called when the component unmounts
// It removes the event listeners and disconnects the WebSocket connection to prevent memory leaks
return () => {
newSocket.off("message");
newSocket.off("user-joined");
newSocket.off("user-left");
newSocket.disconnect();
};
}, []);
// Defining the context value object
// The context value object contains the sendMessage function and the messages array
const contextValue: SocketContextValue = {
sendMessage,
messages,
};
// Rendering the SocketContext.Provider with the context value
// The SocketContext.Provider component provides the context value to its children components
// Any component wrapped by this provider will have access to the sendMessage function and messages array
return <SocketContext.Provider value={contextValue}>{children}</SocketContext.Provider>;
};
// Custom hook to access the SocketContext value
// This is a custom hook that provides a convenient way to access the SocketContext value
export const useSocket = () => {
// Use the useContext hook to retrieve the context value
const context = React.useContext(SocketContext);
// If the context value is null, throw an error
// This ensures that the useSocket hook is only used within a SocketProvider
if (!context) {
throw new Error("useSocket must be used within a SocketProvider");
}
// Return the context value
return context;
};
chat.tsx
–client/src/components/chat.tsx
"use client";
import React, { useEffect, useRef } from "react";
import { useSocket } from "@/context/socket-provider";
import ChatInput from "@/components/chat-input";
const ChatComponent = () => {
// Accessing the messages state from the SocketContext using the useSocket hook
const { messages } = useSocket();
const messagesEndRef = useRef<HTMLDivElement | null>(null);
// Scrolling to the bottom of the message container whenever the messages state changes
useEffect(() => {
messagesEndRef.current?.scrollIntoView({ behavior: "smooth" });
}, [messages]);
return (
<div className="align">
<div className="heading">Chat</div>
<div className="msg">
{messages.map((msg, index) => (
<div key={index} className="msg-li">
{msg}
</div>
))}
<div ref={messagesEndRef} />
</div>
<ChatInput />
</div>
);
};
export default ChatComponent;
chat-input.tsx
–client/src/components/chat-input.tsx
import React from "react";
import { useSocket } from "@/context/socket-provider";
const ChatInput = () => {
const [message, setMessage] = React.useState("");
const { sendMessage } = useSocket();
// Handling the form submission and sending the message
const handleSubmit = (e: React.FormEvent<HTMLFormElement>) => {
e.preventDefault();
if (message.trim()) {
sendMessage(message);
setMessage("");
}
};
return (
<form onSubmit={handleSubmit} className="form">
<input
type="text"
value={message}
onChange={(e) => setMessage(e.target.value)}
placeholder="Type your message..."
className="input"
/>
<button type="submit" className="btn">
Send
</button>
</form>
);
};
export default ChatInput;
layout.tsx
–client/src/app/layout.tsx
export default function RootLayout({
children,
}: Readonly<{
children: React.ReactNode;
}>) {
return (
<html lang="en">
<SocketProvider>
<body className={inter.className}>{children}</body>
</SocketProvider>
</html>
);
}
步骤3:设置Redis(pub-sub)
参考代码库:https://github.com/Subham-Maity/scalable_chat_app_nest
.env
–scalable_chat_app_nest\api\.env
#Port
PORT=""
#Redis Host
REDIS_HOST=""
REDIS_PORT=
REDIS_USERNAME=""
REDIS_PASSWORD=""
pub-sub
–api/src/redis/pubsub.service.ts
import { Injectable } from '@nestjs/common';
import { Redis } from 'ioredis';
import { ConfigService } from '@nestjs/config';
@Injectable()
export class PubSubService {
private readonly publisher: Redis;
private readonly subscriber: Redis;
constructor(private configService: ConfigService) {
// Get the Redis configuration options from the ConfigService
const redisHost =
this.configService.get<string>('REDIS_HOST') || 'localhost';
const redisPort = this.configService.get<number>('REDIS_PORT') || 6379;
const redisUsername =
this.configService.get<string>('REDIS_USERNAME') || '';
const redisPassword =
this.configService.get<string>('REDIS_PASSWORD') || '';
// Connect to Redis with the configuration options
this.publisher = new Redis({
host: redisHost,
port: redisPort,
username: redisUsername,
password: redisPassword,
});
this.subscriber = new Redis({
host: redisHost,
port: redisPort,
username: redisUsername,
password: redisPassword,
});
}
publish(channel: string, message: string): void {
this.publisher.publish(channel, message);
}
subscribe(channel: string, callback: (message: string) => void): void {
this.subscriber.subscribe(channel);
this.subscriber.on('message', (receivedChannel, receivedMessage) => {
if (receivedChannel === channel) {
callback(receivedMessage);
}
});
}
}
modify the chat.gateway.ts
–api/src/chat/chat.gateway.ts
import {
MessageBody,
OnGatewayConnection,
OnGatewayDisconnect,
SubscribeMessage,
WebSocketGateway,
WebSocketServer,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger } from '@nestjs/common';
import { PubSubService } from '../redis/pubsub.service';
@WebSocketGateway(3002, { cors: true })
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer() server: Server;
private readonly logger = new Logger(ChatGateway.name);
// Inject the RedisService
constructor(private pubSubService: PubSubService) {
this.pubSubService.subscribe('chat', (message) => {
this.server.emit('message', message);
});
}
handleConnection(client: Socket) {
this.logger.verbose('New client connected', client.id);
client.broadcast.emit(`user-joined`, {
message: `New user joined the chat: ${client.id}`,
id: client.id,
});
}
handleDisconnect(client: Socket) {
this.logger.verbose('Client disconnected', client.id);
this.server.emit(`user-left`, {
message: `User left the chat: ${client.id}`,
id: client.id,
});
}
@SubscribeMessage('newMessage')
async handleNewMessage(@MessageBody() message: any) {
this.logger.debug(message);
//Shift this to the redis service
// this.server.emit('message', message);
// Publish the message to the Redis channel
this.pubSubService.publish('chat', JSON.stringify(message));
}
}
测试
作者:Subham
原文:https://dev.to/codexam/scaling-real-time-communication-with-redis-pubsub-and-socketio-3p56
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/47992.html