使用 Redis Pub/Sub 和 Socket.IO 扩展实时通信

在多个客户端(用户)需要实时通信的情况下,传统的单台服务器方法无法很好地扩展。随着客户端数量的增加,单一服务器将成为瓶颈,导致性能问题和潜在故障。此外,如果客户端连接到不同的服务器,它们将无法相互通信,从而导致信息孤岛。

解决方法

使用 Redis Pub/Sub 和 Socket.IO 扩展实时通信
  • 多服务器实例:部署多个服务器实例(服务器 1、服务器 2、服务器 3)来处理连接客户端的负载,而不是依赖单个服务器。
  • Redis Pub/Sub:Redis 实例使用其 Pub/Sub 功能充当中央消息代理。每个服务器实例都会与 Redis 建立连接,同时充当发布者和订阅者。
  • 发布消息:当客户端向所连接的服务器发送消息时,服务器会使用 Pub/Sub 机制将消息发布到 Redis 中的特定通道(如 “聊天”)。
  • 订阅消息:所有其他服务器实例都订阅了 Redis 中的同一频道。发布消息时,Redis 会将消息分发到所有订阅的服务器。
  • 广播消息:每个服务器实例从 Redis 接收消息后,会将消息广播给连接到该服务器实例的所有客户端。

通过利用 Redis Pub/Sub,该解决方案将消息分发与单个服务器分离开来。负载分布在多个服务器实例上,确保了可扩展性,因为可以在不影响通信流的情况下添加新的实例。连接到不同服务器实例的客户端可以进行无缝通信,消除消息孤岛,实现整个系统的实时通信。

步骤 1:基本设置(通信服务器-客户端)

参考代码库:https://github.com/Subham-Maity/scalable_chat_app_nest/tree/5adeea135b5010e44b30ea36bdc156ee929ed893

  • server – api/src/chat/chat.gateway.ts
import {
  WebSocketGateway,
  SubscribeMessage,
  MessageBody,
  WebSocketServer,
  OnGatewayConnection, OnGatewayDisconnect,
} from '@nestjs/websockets';
import { ChatService } from './chat.service';
import { Server, Socket } from 'socket.io';
import { Logger } from '@nestjs/common';

/**
 * @WebSocketGateway is a decorator that creates a WebSocket gateway
 * and exposes a specified port (3002) for the WebSocket server.
 * The `cors` option allows cross-origin requests.
 */
@WebSocketGateway(3002,{cors:true})
/*
✅//OnGatewayConnection
 io.on("connection", (socket) => {
    console.log("Client connected");
✅ //OnGatewayDisconnect
 socket.on("disconnect", () => {
      console.log("Client disconnected");
    });
  });
*//**
 * This class implements the OnGatewayConnection and OnGatewayDisconnect interfaces,
 * which provide methods to handle client connections and disconnections.
 *
 * The commented-out code demonstrates how to handle connections and disconnections
 * using the socket.io event listeners `io.on("connection", ...)` and `socket.on("disconnect", ...)`.
 */

export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect{
  private readonly logger = new Logger(ChatGateway.name);
  /**
   * @WebSocketServer() is a decorator that injects the Socket.IO server instance.
   * It allows you to access and interact with the WebSocket server.
   */
  @WebSocketServer() server: Server

  /**
   * This method is called when a new client connects to the WebSocket server.
   * It logs the client's connection, and emits a `user-joined` event to all connected clients,
   * including the client's ID in the event payload.
   */
  handleConnection(client: Socket){
    this.logger.verbose('New client connected', client.id);

    //The event data will only be broadcast to every socket but the sender.
    //instead of this.server.emit we use client.broadcast.emit because we don't need to show the same user that he join the chat again
    //so broadcast the message to all clients except the user who just joined
    client.broadcast.emit( `user-joined`, {
      message: `New user joined the chat: ${client.id}`,
      id: client.id
    })
  }

  /**
   * This method is called when a client disconnects from the WebSocket server.
   * It logs the client's disconnection, and emits a `user-left` event to all connected clients,
   * including the client's ID in the event payload.
   */
  handleDisconnect(client: Socket){
    this.logger.verbose('Client disconnected', client.id);
    this.server.emit( `user-left`, {
      message: `User left the chat: ${client.id}`,
      id: client.id
    })
  }
  /**
   * This method is called when a client sends a message with the event name 'newMessage'.
   * It receives the client socket instance and the message payload.
   *
   * The method logs the received message, broadcasts the message to all connected clients
   * using the 'message' event, and sends a reply to the current client with the "Hello from server" message.
   *
   * It also demonstrates how to broadcast a message to all clients using `this.server.emit("reply", "broadcasting...")`.
   */
  @SubscribeMessage('newMessage')
  handleNewMessage( client: Socket,@MessageBody() message: any) {

    // Log the received message
    this.logger.debug(message);

    // Broadcast the received message to all clients (group chat)
    this.server.emit('message', message); //broadcast to all clients (group chat)
    /**
     //socket.emit("reply", "Hello from server") - Reply to the current client with a "Hello from server" message
     if (client) {
     client.emit("reply", "Hello from server")
     } else {
     console.log('Client is undefined');
     }

     // io.emit("reply", "broadcasting...") - broadcast to all clients
     //reply - event , broadcasting... - arguments
     this.server.emit("reply", "broadcasting...")
     */
  }
}
  • Client – client/src/components/chat.tsx
'use client';

import React, { useState, useEffect, useRef } from 'react';
import io from 'socket.io-client';

const socket = io('http://localhost:3002');

const ChatComponent = () => {
  const [message, setMessage] = useState('');
  const [messages, setMessages] = useState<string[]>([]);
  const messagesEndRef = useRef<HTMLDivElement | null>(null);

  useEffect(() => {
    // fn: this.server.emit('message', message)
    // This event listener is triggered when the server broadcasts a message to all clients using 'this.server.emit('message', message)'
    // It receives the broadcast message and appends it to the 'messages' state
    socket.on('message', (data) => {
      //If we directly push the new data into the messages array using the push method (e.g., messages.push(data)),
      // we would be mutating the existing state array.
      // Reacts state updates are designed to be immutable, so mutating the state directly can lead to unexpected behavior and potential bugs.
      //By using the spread operator ...prevMessages, we create a new array that includes all the elements from the previous messages array (prevMessages).
      // This way, we don't lose any existing messages when adding a new
      //...prevMessages is the spread syntax, which spreads the elements of the prevMessages array into a new array.
      //[...prevMessages, data] creates a new array by combining the elements from prevMessages and the new data element.
      // The resulting array [...prevMessages, data] becomes the new value of the messages state.
      setMessages((prevMessages) => [...prevMessages, data]);
    });

    // fn: this.server.emit('user-joined', { message: `New user joined the chat: ${client.id}`, id: client.id })
    // This event listener is triggered when the server emits a 'user-joined' event with a message and client ID
    // It receives the event payload and appends the message to the 'messages' state
    socket.on('user-joined', (data) => {
      setMessages((prevMessages) => [...prevMessages, data.message]);
    });

    // fn: this.server.emit('user-left', { message: `User left the chat: ${client.id}`, id: client.id })
    // This event listener is triggered when the server emits a 'user-left' event with a message and client ID
    // It receives the event payload and appends the message to the 'messages' state
    socket.on('user-left', (data) => {
      setMessages((prevMessages) => [...prevMessages, data.message]);
    });
    // This is a cleanup function that runs when the component is unmounted or when the effect is re-run due to a change in dependencies.
    // It removes the event listeners for 'message', 'user-joined', and 'user-left' events to prevent memory leaks.
    return () => {
      socket.off('message');
      socket.off('user-joined');
      socket.off('user-left');
    };
  }, []);
  // This useEffect hook is responsible for scrolling the message container to the bottom whenever the 'messages' state changes.
  // It ensures that the latest messages are always visible without the user having to manually scroll down.
  useEffect(() => {
    // messagesEndRef.current is a reference to the div element at the end of the message container.
    // The '?' is a safe navigation operator to ensure that messagesEndRef.current is not null or undefined before calling scrollIntoView().
    messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
  }, [messages]);

  // fn: @SubscribeMessage('newMessage')
  // This function is triggered when the user submits a message
  // It emits the 'newMessage' event with the message content to the server
  const handleSubmit = (e: React.FormEvent<HTMLFormElement>) => {
    e.preventDefault();
    if (message.trim()) {
      socket.emit('newMessage', message);
      setMessage('');
    }
  };

  return (
    <div className="align">
      <div className="heading">
        Chat
      </div>
      <div className="msg">
        {messages.map((msg, index) => (
          <div key={index} className="msg-li">
            {msg}
          </div>
        ))}
        <div ref={messagesEndRef} />
      </div>
      <form onSubmit={handleSubmit} className="form">
        <input
          type="text"
          value={message}
          onChange={(e) => setMessage(e.target.value)}
          placeholder="Type your message..."
          className="input"
        />
        <button
          type="submit"
          className="btn"
        >
          Send
        </button>
      </form>
    </div>
  );
};

export default ChatComponent;

步骤 2:使用组件设置上下文

重组代码

  • Contextclient/src/context/socket-provider.tsx
"use client";

import React, { createContext, useCallback, useEffect, useState } from "react";
import io, { Socket } from "socket.io-client";

// Interface defining the shape of the SocketContext value
// This interface specifies the structure of the data that will be provided by the SocketContext
interface SocketContextValue {
  // sendMessage is a function that takes a string argument and is used to send a message to the server
  sendMessage: (msg: string) => void;
  // messages is an array of strings representing the messages received from the server
  messages: string[];
}

// Creating the SocketContext with a null initial value
// Creates a new React context with an initial value of null
// This context will be used to share the sendMessage function and messages array with other components
export const SocketContext = createContext<SocketContextValue | null>(null);

// SocketProvider component that manages the WebSocket connection and provides context value
// This is a React component that sets up the WebSocket connection, manages its state, and provides the context value
export const SocketProvider: React.FC<{ children: React.ReactNode }> = ({ children }) => {
  // Using the useState hook to store the socket instance and the array of messages
  const [socket, setSocket] = useState<Socket | null>(null);
  const [messages, setMessages] = useState<string[]>([]);

  // Memoized sendMessage function using useCallback
  // The useCallback hook is used to memoize the sendMessage function
  // This ensures that the function is only recreated when the socket instance changes
  const sendMessage = useCallback((msg: string) => {
    // If the socket instance is available, emit the 'newMessage' event with the provided message
    if (socket) {
      socket.emit("newMessage", msg);
    }
  }, [socket]);

  // Setting up WebSocket connection and event listeners
  // The useEffect hook is used to set up the WebSocket connection and event listeners
  useEffect(() => {
    // Create a new WebSocket connection to the server running at http://localhost:3002
    const newSocket = io("http://localhost:3002");
    // Update the socket state with the new socket instance
    setSocket(newSocket);

    // Event listener for 'message' event
    // When the server sends a message, this event listener is triggered
    // The received message is appended to the messages array using the setMessages function
    newSocket.on("message", (data) => {
      setMessages((prevMessages) => [...prevMessages, data]);
    });

    // Event listener for 'user-joined' event
    // When a new user joins the chat, this event listener is triggered
    // The join message is appended to the messages array
    newSocket.on("user-joined", (data) => {
      setMessages((prevMessages) => [...prevMessages, data.message]);
    });

    // Event listener for 'user-left' event
    // When a user leaves the chat, this event listener is triggered
    // The leave message is appended to the messages array
    newSocket.on("user-left", (data) => {
      setMessages((prevMessages) => [...prevMessages, data.message]);
    });

    // Cleanup function to remove event listeners and disconnect the socket
    // This function is returned from the useEffect hook and will be called when the component unmounts
    // It removes the event listeners and disconnects the WebSocket connection to prevent memory leaks
    return () => {
      newSocket.off("message");
      newSocket.off("user-joined");
      newSocket.off("user-left");
      newSocket.disconnect();
    };
  }, []);

  // Defining the context value object
  // The context value object contains the sendMessage function and the messages array
  const contextValue: SocketContextValue = {
    sendMessage,
    messages,
  };

  // Rendering the SocketContext.Provider with the context value
  // The SocketContext.Provider component provides the context value to its children components
  // Any component wrapped by this provider will have access to the sendMessage function and messages array
  return <SocketContext.Provider value={contextValue}>{children}</SocketContext.Provider>;
};

// Custom hook to access the SocketContext value
// This is a custom hook that provides a convenient way to access the SocketContext value
export const useSocket = () => {
  // Use the useContext hook to retrieve the context value
  const context = React.useContext(SocketContext);

  // If the context value is null, throw an error
  // This ensures that the useSocket hook is only used within a SocketProvider
  if (!context) {
    throw new Error("useSocket must be used within a SocketProvider");
  }

  // Return the context value
  return context;
};
  • chat.tsxclient/src/components/chat.tsx
"use client";
import React, { useEffect, useRef } from "react";
import { useSocket } from "@/context/socket-provider";
import ChatInput from "@/components/chat-input";

const ChatComponent = () => {
// Accessing the messages state from the SocketContext using the useSocket hook
  const { messages } = useSocket();
  const messagesEndRef = useRef<HTMLDivElement | null>(null);
// Scrolling to the bottom of the message container whenever the messages state changes
  useEffect(() => {
    messagesEndRef.current?.scrollIntoView({ behavior: "smooth" });
  }, [messages]);

  return (
    <div className="align">
      <div className="heading">Chat</div>
      <div className="msg">
        {messages.map((msg, index) => (
          <div key={index} className="msg-li">
            {msg}
          </div>
        ))}
        <div ref={messagesEndRef} />
      </div>
      <ChatInput />
    </div>
  );
};

export default ChatComponent;
  • chat-input.tsxclient/src/components/chat-input.tsx
import React from "react";
import { useSocket } from "@/context/socket-provider";

const ChatInput = () => {
  const [message, setMessage] = React.useState("");
  const { sendMessage } = useSocket();
  // Handling the form submission and sending the message
  const handleSubmit = (e: React.FormEvent<HTMLFormElement>) => {
    e.preventDefault();
    if (message.trim()) {
      sendMessage(message);
      setMessage("");
    }
  };

  return (
    <form onSubmit={handleSubmit} className="form">
      <input
        type="text"
        value={message}
        onChange={(e) => setMessage(e.target.value)}
        placeholder="Type your message..."
        className="input"
      />
      <button type="submit" className="btn">
        Send
      </button>
    </form>
  );
};

export default ChatInput;
  • layout.tsxclient/src/app/layout.tsx
export default function RootLayout({
                                     children,
                                   }: Readonly<{
  children: React.ReactNode;
}>) {
  return (
    <html lang="en">
    <SocketProvider>
      <body className={inter.className}>{children}</body>
    </SocketProvider>
    </html>
  );
}

步骤3:设置Redis(pub-sub)

参考代码库:https://github.com/Subham-Maity/scalable_chat_app_nest

  • .envscalable_chat_app_nest\api\.env
#Port

PORT=""

#Redis Host

REDIS_HOST=""

REDIS_PORT=

REDIS_USERNAME=""

REDIS_PASSWORD=""
  • pub-subapi/src/redis/pubsub.service.ts
import { Injectable } from '@nestjs/common';
import { Redis } from 'ioredis';
import { ConfigService } from '@nestjs/config';

@Injectable()
export class PubSubService {
  private readonly publisher: Redis;
  private readonly subscriber: Redis;

  constructor(private configService: ConfigService) {
    // Get the Redis configuration options from the ConfigService
    const redisHost =
      this.configService.get<string>('REDIS_HOST') || 'localhost';
    const redisPort = this.configService.get<number>('REDIS_PORT') || 6379;
    const redisUsername =
      this.configService.get<string>('REDIS_USERNAME') || '';
    const redisPassword =
      this.configService.get<string>('REDIS_PASSWORD') || '';

    // Connect to Redis with the configuration options
    this.publisher = new Redis({
      host: redisHost,
      port: redisPort,
      username: redisUsername,
      password: redisPassword,
    });
    this.subscriber = new Redis({
      host: redisHost,
      port: redisPort,
      username: redisUsername,
      password: redisPassword,
    });
  }

  publish(channel: string, message: string): void {
    this.publisher.publish(channel, message);
  }

  subscribe(channel: string, callback: (message: string) => void): void {
    this.subscriber.subscribe(channel);
    this.subscriber.on('message', (receivedChannel, receivedMessage) => {
      if (receivedChannel === channel) {
        callback(receivedMessage);
      }
    });
  }
}
  • modify the chat.gateway.tsapi/src/chat/chat.gateway.ts
import {
  MessageBody,
  OnGatewayConnection,
  OnGatewayDisconnect,
  SubscribeMessage,
  WebSocketGateway,
  WebSocketServer,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger } from '@nestjs/common';
import { PubSubService } from '../redis/pubsub.service';

@WebSocketGateway(3002, { cors: true })
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer() server: Server;
  private readonly logger = new Logger(ChatGateway.name);
  // Inject the RedisService
  constructor(private pubSubService: PubSubService) {
    this.pubSubService.subscribe('chat', (message) => {
      this.server.emit('message', message);
    });
  }

  handleConnection(client: Socket) {
    this.logger.verbose('New client connected', client.id);
    client.broadcast.emit(`user-joined`, {
      message: `New user joined the chat: ${client.id}`,
      id: client.id,
    });
  }

  handleDisconnect(client: Socket) {
    this.logger.verbose('Client disconnected', client.id);
    this.server.emit(`user-left`, {
      message: `User left the chat: ${client.id}`,
      id: client.id,
    });
  }

  @SubscribeMessage('newMessage')
  async handleNewMessage(@MessageBody() message: any) {
    this.logger.debug(message);
    //Shift this to the redis service
    // this.server.emit('message', message);

    // Publish the message to the Redis channel
    this.pubSubService.publish('chat', JSON.stringify(message));
  }
}

测试

使用 Redis Pub/Sub 和 Socket.IO 扩展实时通信
使用 Redis Pub/Sub 和 Socket.IO 扩展实时通信

作者:Subham
原文:https://dev.to/codexam/scaling-real-time-communication-with-redis-pubsub-and-socketio-3p56

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/47992.html

(0)

相关推荐

发表回复

登录后才能评论