在 Python 中使用 gRPC 创建实时聊天服务

gRPC 或谷歌的 RPC(开源)是一种允许通信的协议,使用 HTTP 2.0,它是 HTTP 1 的继承者。此外,gRPC 非常友好,因为它是跨平台的。本文介绍如何使用 gRPC 在 Python 中构建轻量级、可靠的聊天服务。

典型的聊天服务需要双向连接,这样发送信息和接收信息都可以独立完成。由于 HTTP/1 协议缺乏对服务器端推送请求的固有支持,这一点在 HTTP/1 协议中很难实现。此外,客户端和服务器之间的连接是短暂的,在客户端收到响应后就会中断。为了克服这一问题,我们可以使用服务器发送事件(SSE)。SSE 使用长效连接,服务器可以不断发送数据作为对客户端初始订阅请求的响应。虽然这可以解决问题,但使用 HTTP/1 管理客户端和服务器端的 HTTP 请求并不容易。 我们可以使用 gRPC 非常直观地实现类似的解决方案,下面的章节将对此进行说明。

在 Python 中使用 gRPC 创建实时聊天服务
使用 gRPC 的聊天服务概述

代码说明

首先克隆下面的代码库:

https://github.com/deepaks2112/python-grpc-chat-service

成功克隆后,将目录更改为 repo,然后使用以下命令安装需求:

pip3 install -r requirements.txt

该服务具有以下功能:

  • 双方聊天功能,发送信息后立即接收信息
  • 客户上线后立即接收未发送的信息

下面让我们一步步演示创建服务的过程。

原模式

syntax = "proto3";

message ChatMessageRequest {
    uint64 thread_id = 1;
    string message = 2;
    uint64 sender_id = 3;
    uint64 recipient_id = 4;
}

message ChatMessageResponse {
    uint64 id = 1;
}

message ChatMessage {
    uint64 id = 1;
    uint64 thread_id = 2;
    string message = 3;
    uint64 sender_id = 4;
    uint64 recipient_id = 5;
}

message ChatClient {
    uint64 recipient_id = 1;
}

service ChatService {
    rpc SendMessage(ChatMessageRequest) returns (ChatMessageResponse);
    rpc ReceiveMessages(ChatClient) returns (stream ChatMessage);
    // rpc ConnectDuplexStream(stream ChatMessageRequest) returns (stream ChatMessage);
}


// Command to generate python files:
// python3 -m grpc_tools.protoc -Iprotos --python_out=. --grpc_python_out=. protos/chatservice.proto

原模式定义了界面的外观。如第一行所示,我们使用 proto3 来定义模式。然后,我们定义从过程中传递和返回的对象。我们有一个 ChatService 服务,它提供了发送和接收消息的两个独立过程。请注意,ReceiveMessages 程序需要一个 ChatMessage 流。当我们从任何发送方接收到新的 ChatMessage 对象时,我们就用它向接收方推送新的 ChatMessage 对象。

要在 Python 中从 proto 模式生成模板代码,可以使用以下命令(请确保 grpc_tools 已按要求安装):

python3 -m grpc_tools.protoc -Iprotos --python_out=. --grpc_python_out=. protos/chatservice.proto

聊天服务、聊天客户端和消息队列

class ChatClient:
    client_id: int
    online: bool = True

    def __init__(self, client_id, online=True):
        self.client_id = client_id
        self.online = online

    def is_online(self) -> bool:
        return self.online

    def set_online(self, online) -> None:
        self.online = online

我们有一个 ChatClient 类,其中存储了 client_id 和在线状态。客户端只有在调用 ReceiveMessages 过程时才会处于在线状态。

import logging
from asyncio import Queue
from typing import Dict


class MessageQueue:

    topics: Dict[int, Queue] = {}

    def get_message_offset(self, recipient_id):
        recipient_queue = self.__get_or_create_queue(recipient_id)
        return recipient_queue.qsize()

    async def put_message(self, recipient_id, message_object):
        recipient_queue = self.__get_or_create_queue(recipient_id)
        message_id = recipient_queue.qsize()
        await recipient_queue.put(message_object)
        logging.info(f"Message inserted in queue for recipient={recipient_id}")
        return message_id

    async def get_message(self, recipient_id):
        recipient_queue = self.__get_or_create_queue(recipient_id)
        logging.info(f"Reading from queue for recipient={recipient_id}")
        unread_message = await recipient_queue.get()
        logging.info(f"Received new message for recipient={recipient_id}")
        return unread_message

    def __get_or_create_queue(self, recipient_id):
        if not self.topics.get(recipient_id):
            self.topics[recipient_id] = Queue()
        return self.topics[recipient_id]

我们使用异步队列按主题存储 ChatMessage 对象。每个主题由一个客户机拥有,该主题中的对象将被发送给该客户机。因此,当客户 A 向客户 B 发送信息时,服务会将信息插入客户 B 的主题。一旦客户端的主题中有消息可供读取,get 调用就会返回通过 ReceiveMessages 存储过程流式传输的对象。

import logging
from chatservice_pb2 import (
    ChatMessageRequest,
    ChatMessageResponse,
    ChatMessage,
    ChatClient,
)
from message_queue import MessageQueue


class ChatService:
    message_queue: MessageQueue = MessageQueue()

    async def write_message(self, request: ChatMessageRequest) -> ChatMessageResponse:
        recipient_id = request.recipient_id
        message_id = self.message_queue.get_message_offset(recipient_id)
        message_id = await self.message_queue.put_message(
            recipient_id=request.recipient_id,
            message_object=ChatMessage(
                id=message_id,
                thread_id=request.thread_id,
                message=request.message,
                sender_id=request.sender_id,
                recipient_id=recipient_id,
            ),
        )
        return ChatMessageResponse(id=message_id)

    async def read_next_message(self, request: ChatClient) -> ChatMessage:
        return await self.message_queue.get_message(request.recipient_id)

我们使用 ChatService 类来处理消息的写入和读取。它将从发件人处收到的信息插入收件人的主题中,并从主题中读取信息流给收件人。

gRPC 服务器

import asyncio
import grpc
import logging

from typing import Dict

from chatservice_pb2_grpc import ChatServiceServicer, add_ChatServiceServicer_to_server
from chatservice_pb2 import (
    ChatMessageRequest,
    ChatMessageResponse,
    ChatMessage,
    ChatClient,
)
from chat_service import ChatService
from chat_client import ChatClient


class Server(ChatServiceServicer):
    chat_service: ChatService = ChatService()
    clients: Dict[int, ChatClient] = {}

    async def SendMessage(
        self, request: ChatMessageRequest, context: grpc.aio.ServicerContext
    ) -> ChatMessageResponse:
        logging.info(f"SendMessage called with {request=}")
        if request.message == "X":
            logging.info(f"Sender={request.sender_id} going offline.")
            self.__make_client_offline(request.sender_id)
            self.__transform_recipient_id_for_going_offline_message(request)
        await self.chat_service.write_message(request)
        return ChatMessageRequest()

    async def ReceiveMessages(
        self, request: ChatClient, context: grpc.aio.ServicerContext
    ) -> ChatMessage:
        logging.info(f"ReceiveMessages called with {request=}")
        self.__add_to_clients(request.recipient_id)
        while self.__is_client_online(request.recipient_id):
            message_object = await self.chat_service.read_next_message(request)
            if message_object.message == "X":
                break
            yield message_object

    def __add_to_clients(self, client_id):
        if not self.clients.get(client_id):
            self.clients[client_id] = ChatClient(client_id, True)
            return
        self.clients[client_id].online = True

    def __is_client_online(self, client_id):
        if client_id not in self.clients.keys():
            return False
        return self.clients[client_id].is_online()

    def __make_client_offline(self, client_id):
        if client_id in self.clients.keys():
            self.clients[client_id].set_online(False)

    def __transform_recipient_id_for_going_offline_message(self, message_object):
        message_object.recipient_id = message_object.sender_id


async def serve() -> None:
    server = grpc.aio.server()
    add_ChatServiceServicer_to_server(Server(), server)
    listen_addr = "[::]:50051"
    server.add_insecure_port(listen_addr)
    logging.info(f"Starting server on {listen_addr}")
    await server.start()
    await server.wait_for_termination()


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(serve())

主服务器类 Server 继承自 ChatServiceServicer(proto 中声明的服务),用于创建客户端使用的存根。服务器实现了 proto 模式中声明的两个 RPC。它还有额外的辅助函数来管理客户端的在线/离线状态。当客户端离线时,我们会显示消息 “X”,以便从容关闭连接。

gRPC 客户端

import logging
import threading

import grpc
from chatservice_pb2 import ChatMessageRequest, ChatClient
from chatservice_pb2_grpc import ChatServiceStub


def run():
    with grpc.insecure_channel("localhost:50051") as channel:
        stub = ChatServiceStub(channel)
        client_id = int(input("Enter client id: "))

        read_thread = threading.Thread(
            target=read_handler,
            args=(
                client_id,
                stub,
            ),
        )
        write_thread = threading.Thread(
            target=write_handler,
            args=(
                client_id,
                stub,
            ),
        )

        write_thread.start()
        read_thread.start()

        write_thread.join()
        read_thread.join()

        print("Quitting!")


def read_handler(client_id, stub):
    read_stream = stub.ReceiveMessages(ChatClient(recipient_id=client_id))
    for response in read_stream:
        print(f"\r{response.sender_id}: {response.message}")


def write_handler(client_id, stub):
    while True:
        message = input()
        recipient_id = 2 if client_id == 1 else 1

        stub.SendMessage(
            ChatMessageRequest(
                thread_id=1,
                message=message,
                sender_id=client_id,
                recipient_id=recipient_id,
            )
        )

        if message == "X":
            break


if __name__ == "__main__":
    logging.basicConfig()
    run()

这是一个使用存根并调用 RPC 的客户端实现。为了提供更好的体验,我们使用了两个独立的线程来处理发送和接收消息,而不会阻塞另一个线程。读取线程启动 ReceiveMessages 程序,然后继续监听服务器发送的 ChatMessage。另一方面,写线程会监听来自 stdin 的输入,并将其封装在 ChatMessageRequest 类中,然后调用 SendMessage RPC。

运行服务器和客户端

假设使用 requirements.txt 安装了依赖项,则可以使用以下命令启动服务器:

python3 chatservice_server.py

服务器启动后,我们可以设置两个可以相互通信的客户端。要启动两个客户端,请在两个单独的终端上运行以下命令:

python3 chatservice_client.py

感谢阅读!

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/32206.html

(0)

相关推荐

发表回复

登录后才能评论