gRPC 或谷歌的 RPC(开源)是一种允许通信的协议,使用 HTTP 2.0,它是 HTTP 1 的继承者。此外,gRPC 非常友好,因为它是跨平台的。本文介绍如何使用 gRPC 在 Python 中构建轻量级、可靠的聊天服务。
典型的聊天服务需要双向连接,这样发送信息和接收信息都可以独立完成。由于 HTTP/1 协议缺乏对服务器端推送请求的固有支持,这一点在 HTTP/1 协议中很难实现。此外,客户端和服务器之间的连接是短暂的,在客户端收到响应后就会中断。为了克服这一问题,我们可以使用服务器发送事件(SSE)。SSE 使用长效连接,服务器可以不断发送数据作为对客户端初始订阅请求的响应。虽然这可以解决问题,但使用 HTTP/1 管理客户端和服务器端的 HTTP 请求并不容易。 我们可以使用 gRPC 非常直观地实现类似的解决方案,下面的章节将对此进行说明。
代码说明
首先克隆下面的代码库:
https://github.com/deepaks2112/python-grpc-chat-service
成功克隆后,将目录更改为 repo,然后使用以下命令安装需求:
pip3 install -r requirements.txt
该服务具有以下功能:
- 双方聊天功能,发送信息后立即接收信息
- 客户上线后立即接收未发送的信息
下面让我们一步步演示创建服务的过程。
原模式
syntax = "proto3";
message ChatMessageRequest {
uint64 thread_id = 1;
string message = 2;
uint64 sender_id = 3;
uint64 recipient_id = 4;
}
message ChatMessageResponse {
uint64 id = 1;
}
message ChatMessage {
uint64 id = 1;
uint64 thread_id = 2;
string message = 3;
uint64 sender_id = 4;
uint64 recipient_id = 5;
}
message ChatClient {
uint64 recipient_id = 1;
}
service ChatService {
rpc SendMessage(ChatMessageRequest) returns (ChatMessageResponse);
rpc ReceiveMessages(ChatClient) returns (stream ChatMessage);
// rpc ConnectDuplexStream(stream ChatMessageRequest) returns (stream ChatMessage);
}
// Command to generate python files:
// python3 -m grpc_tools.protoc -Iprotos --python_out=. --grpc_python_out=. protos/chatservice.proto
原模式定义了界面的外观。如第一行所示,我们使用 proto3 来定义模式。然后,我们定义从过程中传递和返回的对象。我们有一个 ChatService 服务,它提供了发送和接收消息的两个独立过程。请注意,ReceiveMessages 程序需要一个 ChatMessage 流。当我们从任何发送方接收到新的 ChatMessage 对象时,我们就用它向接收方推送新的 ChatMessage 对象。
要在 Python 中从 proto 模式生成模板代码,可以使用以下命令(请确保 grpc_tools 已按要求安装):
python3 -m grpc_tools.protoc -Iprotos --python_out=. --grpc_python_out=. protos/chatservice.proto
聊天服务、聊天客户端和消息队列
class ChatClient:
client_id: int
online: bool = True
def __init__(self, client_id, online=True):
self.client_id = client_id
self.online = online
def is_online(self) -> bool:
return self.online
def set_online(self, online) -> None:
self.online = online
我们有一个 ChatClient 类,其中存储了 client_id 和在线状态。客户端只有在调用 ReceiveMessages 过程时才会处于在线状态。
import logging
from asyncio import Queue
from typing import Dict
class MessageQueue:
topics: Dict[int, Queue] = {}
def get_message_offset(self, recipient_id):
recipient_queue = self.__get_or_create_queue(recipient_id)
return recipient_queue.qsize()
async def put_message(self, recipient_id, message_object):
recipient_queue = self.__get_or_create_queue(recipient_id)
message_id = recipient_queue.qsize()
await recipient_queue.put(message_object)
logging.info(f"Message inserted in queue for recipient={recipient_id}")
return message_id
async def get_message(self, recipient_id):
recipient_queue = self.__get_or_create_queue(recipient_id)
logging.info(f"Reading from queue for recipient={recipient_id}")
unread_message = await recipient_queue.get()
logging.info(f"Received new message for recipient={recipient_id}")
return unread_message
def __get_or_create_queue(self, recipient_id):
if not self.topics.get(recipient_id):
self.topics[recipient_id] = Queue()
return self.topics[recipient_id]
我们使用异步队列按主题存储 ChatMessage 对象。每个主题由一个客户机拥有,该主题中的对象将被发送给该客户机。因此,当客户 A 向客户 B 发送信息时,服务会将信息插入客户 B 的主题。一旦客户端的主题中有消息可供读取,get 调用就会返回通过 ReceiveMessages 存储过程流式传输的对象。
import logging
from chatservice_pb2 import (
ChatMessageRequest,
ChatMessageResponse,
ChatMessage,
ChatClient,
)
from message_queue import MessageQueue
class ChatService:
message_queue: MessageQueue = MessageQueue()
async def write_message(self, request: ChatMessageRequest) -> ChatMessageResponse:
recipient_id = request.recipient_id
message_id = self.message_queue.get_message_offset(recipient_id)
message_id = await self.message_queue.put_message(
recipient_id=request.recipient_id,
message_object=ChatMessage(
id=message_id,
thread_id=request.thread_id,
message=request.message,
sender_id=request.sender_id,
recipient_id=recipient_id,
),
)
return ChatMessageResponse(id=message_id)
async def read_next_message(self, request: ChatClient) -> ChatMessage:
return await self.message_queue.get_message(request.recipient_id)
我们使用 ChatService 类来处理消息的写入和读取。它将从发件人处收到的信息插入收件人的主题中,并从主题中读取信息流给收件人。
gRPC 服务器
import asyncio
import grpc
import logging
from typing import Dict
from chatservice_pb2_grpc import ChatServiceServicer, add_ChatServiceServicer_to_server
from chatservice_pb2 import (
ChatMessageRequest,
ChatMessageResponse,
ChatMessage,
ChatClient,
)
from chat_service import ChatService
from chat_client import ChatClient
class Server(ChatServiceServicer):
chat_service: ChatService = ChatService()
clients: Dict[int, ChatClient] = {}
async def SendMessage(
self, request: ChatMessageRequest, context: grpc.aio.ServicerContext
) -> ChatMessageResponse:
logging.info(f"SendMessage called with {request=}")
if request.message == "X":
logging.info(f"Sender={request.sender_id} going offline.")
self.__make_client_offline(request.sender_id)
self.__transform_recipient_id_for_going_offline_message(request)
await self.chat_service.write_message(request)
return ChatMessageRequest()
async def ReceiveMessages(
self, request: ChatClient, context: grpc.aio.ServicerContext
) -> ChatMessage:
logging.info(f"ReceiveMessages called with {request=}")
self.__add_to_clients(request.recipient_id)
while self.__is_client_online(request.recipient_id):
message_object = await self.chat_service.read_next_message(request)
if message_object.message == "X":
break
yield message_object
def __add_to_clients(self, client_id):
if not self.clients.get(client_id):
self.clients[client_id] = ChatClient(client_id, True)
return
self.clients[client_id].online = True
def __is_client_online(self, client_id):
if client_id not in self.clients.keys():
return False
return self.clients[client_id].is_online()
def __make_client_offline(self, client_id):
if client_id in self.clients.keys():
self.clients[client_id].set_online(False)
def __transform_recipient_id_for_going_offline_message(self, message_object):
message_object.recipient_id = message_object.sender_id
async def serve() -> None:
server = grpc.aio.server()
add_ChatServiceServicer_to_server(Server(), server)
listen_addr = "[::]:50051"
server.add_insecure_port(listen_addr)
logging.info(f"Starting server on {listen_addr}")
await server.start()
await server.wait_for_termination()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(serve())
主服务器类 Server 继承自 ChatServiceServicer(proto 中声明的服务),用于创建客户端使用的存根。服务器实现了 proto 模式中声明的两个 RPC。它还有额外的辅助函数来管理客户端的在线/离线状态。当客户端离线时,我们会显示消息 “X”,以便从容关闭连接。
gRPC 客户端
import logging
import threading
import grpc
from chatservice_pb2 import ChatMessageRequest, ChatClient
from chatservice_pb2_grpc import ChatServiceStub
def run():
with grpc.insecure_channel("localhost:50051") as channel:
stub = ChatServiceStub(channel)
client_id = int(input("Enter client id: "))
read_thread = threading.Thread(
target=read_handler,
args=(
client_id,
stub,
),
)
write_thread = threading.Thread(
target=write_handler,
args=(
client_id,
stub,
),
)
write_thread.start()
read_thread.start()
write_thread.join()
read_thread.join()
print("Quitting!")
def read_handler(client_id, stub):
read_stream = stub.ReceiveMessages(ChatClient(recipient_id=client_id))
for response in read_stream:
print(f"\r{response.sender_id}: {response.message}")
def write_handler(client_id, stub):
while True:
message = input()
recipient_id = 2 if client_id == 1 else 1
stub.SendMessage(
ChatMessageRequest(
thread_id=1,
message=message,
sender_id=client_id,
recipient_id=recipient_id,
)
)
if message == "X":
break
if __name__ == "__main__":
logging.basicConfig()
run()
这是一个使用存根并调用 RPC 的客户端实现。为了提供更好的体验,我们使用了两个独立的线程来处理发送和接收消息,而不会阻塞另一个线程。读取线程启动 ReceiveMessages 程序,然后继续监听服务器发送的 ChatMessage。另一方面,写线程会监听来自 stdin 的输入,并将其封装在 ChatMessageRequest 类中,然后调用 SendMessage RPC。
运行服务器和客户端
假设使用 requirements.txt 安装了依赖项,则可以使用以下命令启动服务器:
python3 chatservice_server.py
服务器启动后,我们可以设置两个可以相互通信的客户端。要启动两个客户端,请在两个单独的终端上运行以下命令:
python3 chatservice_client.py
感谢阅读!
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/32206.html