有没有想过 WhatsApp/Slack/Discord 是如何工作的?人们是如何实时收发信息的?它们还能让你知道收件人是否发送或阅读了信息。
我的第一个猜测是,客户端必须不断轮询服务器以获取新的更新。但是,当你面对数以百万计的并发用户时,这种方式无法扩展。我们需要一种方法,让服务器可以在客户端不请求的情况下向客户端发回信息。
这就是 WebSockets!它允许客户端和服务器之间进行双向通信。WebSockets 是网络浏览器和服务器之间的一种双向、全双工、持久连接。
本文主题是关于使用 WebSocket 构建聊天应用程序,创建一个简单的 1对1 聊天应用程序。
它涵盖两种基本情况,即用户 A 向用户 B 发送消息:
- 如果双方都在线,消息交互是实时的。
- 如果 B 处于离线状态,A 的消息应被持久保存在后台,标记为 “未送达”,并在 B 联机时显示在 B 的收件箱中。
看起来很简单吧?让我们开始吧!
我在这里只介绍与WebSocket相关的方面。完整源代码在这里: https://github.com/SatvikNema/satchat。
后台
获取依赖关系:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
配置“WebSocketMessageBrokerConfigurer”
package com.satvik.satchat.config.websocket;
import com.satvik.satchat.filter.WebSocketTokenFilter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.*;
import org.springframework.web.socket.server.RequestUpgradeStrategy;
import org.springframework.web.socket.server.standard.TomcatRequestUpgradeStrategy;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
@Configuration
@EnableWebSocketMessageBroker
@Order(Ordered.HIGHEST_PRECEDENCE + 1)
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Value("${frontend.caller.host:http://localhost:3000}")
private String frontendCallerHost;
private final WebSocketTokenFilter webSocketTokenFilter;
public WebSocketConfig(WebSocketTokenFilter webSocketTokenFilter) {
this.webSocketTokenFilter = webSocketTokenFilter;
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(webSocketTokenFilter);
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
RequestUpgradeStrategy upgradeStrategy = new TomcatRequestUpgradeStrategy();
registry
.addEndpoint("/ws")
.setHandshakeHandler(new DefaultHandshakeHandler(upgradeStrategy))
.setAllowedOrigins(frontendCallerHost);
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry
.setApplicationDestinationPrefixes("/app")
.enableSimpleBroker("/topic")
.setTaskScheduler(heartBeatScheduler())
.setHeartbeatValue(new long[] {10000L, 10000L});
}
@Bean
public TaskScheduler heartBeatScheduler() {
return new ThreadPoolTaskScheduler();
}
}
‘WebSocketTokenFilter’ 拦截来自客户端的 WebSocket 请求,并填充 Spring Security 上下文。这是最重要的功能,因为它会将 Web 套接字与登录用户关联起来,帮助我们跟踪哪个用户在线。
WebSocketTokenFilter:
package com.satvik.satchat.filter;
import com.satvik.satchat.config.UserDetailsServiceImpl;
import com.satvik.satchat.utils.JWTUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Component;
@Component
public class WebSocketTokenFilter implements ChannelInterceptor {
private final JWTUtils jwtUtils;
private final UserDetailsServiceImpl userDetailsService;
public WebSocketTokenFilter(JWTUtils jwtUtils, UserDetailsServiceImpl userDetailsService) {
this.jwtUtils = jwtUtils;
this.userDetailsService = userDetailsService;
}
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
final StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT == accessor.getCommand()) {
String jwt = jwtUtils.parseJwt(accessor);
if (jwt != null && jwtUtils.validateJwtToken(jwt)) {
String username = jwtUtils.getUserNameFromJwtToken(jwt);
UserDetails userDetails = userDetailsService.loadUserByUsername(username);
UsernamePasswordAuthenticationToken authentication =
new UsernamePasswordAuthenticationToken(
userDetails, null, userDetails.getAuthorities());
accessor.setUser(authentication);
}
}
return message;
}
}
本文中,我将不再讨论如何为 REST 设置 Spring-security,可以查看上文提到的GitHub库。
那么,两个用户实际上是如何互相发送信息的呢?相关的两个用户和服务器需要一起监听同一个频道。
为此,我将 2 个用户的用户 ID 连接在一起,这样,2 个用户在聊天时将始终拥有一个专用的唯一频道。
package com.satvik.satchat.controller;
import com.satvik.satchat.model.ChatMessage;
import com.satvik.satchat.service.ChatService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.stereotype.Controller;
@Controller
@Slf4j
public class ChatController {
private final ChatService chatService;
@Autowired
public ChatController(ChatService chatService) {
this.chatService = chatService;
}
@MessageMapping("/chat/sendMessage/{convId}")
public ChatMessage sendMessageToConvId(
@Payload ChatMessage chatMessage,
SimpMessageHeaderAccessor headerAccessor,
@DestinationVariable("convId") String conversationId) {
chatService.sendMessageToConvId(chatMessage, conversationId, headerAccessor);
return chatMessage;
}
}
@MessageMapping 注解为 WebSocket 客户端创建了一个发送信息的端点。在这里,convId 代表 2 个用户的共同通道。但在发送消息之前,我们需要知道接收方是在线还是离线?这将影响信息的发送状态。
离线有三种情况:
- 用户不在网络上。
- 用户在网络上,但应用程序未打开
- 用户在网络上,应用程序已打开,但未打开特定聊天视图。
为此,我们使用 WebSocketEventListener 监听 WebSocket 事件。这有助于我们在用户连接/断开应用程序或订阅/取消订阅频道时采取行动。
WebSocketEventListener:
package com.satvik.satchat.config.websocket;
import com.satvik.satchat.service.OnlineOfflineService;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionConnectedEvent;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;
import org.springframework.web.socket.messaging.SessionUnsubscribeEvent;
@Component
@Slf4j
public class WebSocketEventListener {
private final OnlineOfflineService onlineOfflineService;
private final Map<String, String> simpSessionIdToSubscriptionId;
public WebSocketEventListener(OnlineOfflineService onlineOfflineService) {
this.onlineOfflineService = onlineOfflineService;
this.simpSessionIdToSubscriptionId = new ConcurrentHashMap<>();
}
@EventListener
public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
onlineOfflineService.removeOnlineUser(event.getUser());
}
@EventListener
@SendToUser
public void handleSubscribeEvent(SessionSubscribeEvent sessionSubscribeEvent) {
String subscribedChannel =
(String) sessionSubscribeEvent.getMessage().getHeaders().get("simpDestination");
String simpSessionId =
(String) sessionSubscribeEvent.getMessage().getHeaders().get("simpSessionId");
if (subscribedChannel == null) {
log.error("SUBSCRIBED TO NULL?? WAT?!");
return;
}
simpSessionIdToSubscriptionId.put(simpSessionId, subscribedChannel);
onlineOfflineService.addUserSubscribed(sessionSubscribeEvent.getUser(), subscribedChannel);
}
@EventListener
public void handleUnSubscribeEvent(SessionUnsubscribeEvent unsubscribeEvent) {
String simpSessionId = (String) unsubscribeEvent.getMessage().getHeaders().get("simpSessionId");
String unSubscribedChannel = simpSessionIdToSubscriptionId.get(simpSessionId);
onlineOfflineService.removeUserSubscribed(unsubscribeEvent.getUser(), unSubscribedChannel);
}
@EventListener
public void handleConnectedEvent(SessionConnectedEvent sessionConnectedEvent) {
onlineOfflineService.addOnlineUser(sessionConnectedEvent.getUser());
}
}
请注意,我们正在使用消息头提取当前用户。这是因为我们使用 WebSocketTokenFilter
填充了 Spring-security 上下文。
OnlineOfflineService
负责跟踪活跃用户。它还会广播哪些用户上线,哪些用户下线:
package com.satvik.satchat.service;
import com.satvik.satchat.config.UserDetailsImpl;
import com.satvik.satchat.entity.ConversationEntity;
import com.satvik.satchat.entity.UserEntity;
import com.satvik.satchat.model.ChatMessage;
import com.satvik.satchat.model.MessageDeliveryStatusEnum;
import com.satvik.satchat.model.MessageDeliveryStatusUpdate;
import com.satvik.satchat.model.MessageType;
import com.satvik.satchat.model.UserConnection;
import com.satvik.satchat.model.UserResponse;
import com.satvik.satchat.repository.UserRepository;
import java.security.Principal;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class OnlineOfflineService {
private final Set<UUID> onlineUsers;
private final Map<UUID, Set<String>> userSubscribed;
private final UserRepository userRepository;
private final SimpMessageSendingOperations simpMessageSendingOperations;
public OnlineOfflineService(
UserRepository userRepository, SimpMessageSendingOperations simpMessageSendingOperations) {
this.onlineUsers = new ConcurrentSkipListSet<>();
this.userSubscribed = new ConcurrentHashMap<>();
this.userRepository = userRepository;
this.simpMessageSendingOperations = simpMessageSendingOperations;
}
public void addOnlineUser(Principal user) {
if (user == null) return;
UserDetailsImpl userDetails = getUserDetails(user);
log.info("{} is online", userDetails.getUsername());
for (UUID id : onlineUsers) {
simpMessageSendingOperations.convertAndSend(
"/topic/" + id,
ChatMessage.builder()
.messageType(MessageType.FRIEND_ONLINE)
.userConnection(UserConnection.builder().connectionId(userDetails.getId()).build())
.build());
}
onlineUsers.add(userDetails.getId());
}
public void removeOnlineUser(Principal user) {
if (user != null) {
UserDetailsImpl userDetails = getUserDetails(user);
log.info("{} went offline", userDetails.getUsername());
onlineUsers.remove(userDetails.getId());
userSubscribed.remove(userDetails.getId());
for (UUID id : onlineUsers) {
simpMessageSendingOperations.convertAndSend(
"/topic/" + id,
ChatMessage.builder()
.messageType(MessageType.FRIEND_OFFLINE)
.userConnection(UserConnection.builder().connectionId(userDetails.getId()).build())
.build());
}
}
}
public boolean isUserOnline(UUID userId) {
return onlineUsers.contains(userId);
}
private UserDetailsImpl getUserDetails(Principal principal) {
UsernamePasswordAuthenticationToken user = (UsernamePasswordAuthenticationToken) principal;
Object object = user.getPrincipal();
return (UserDetailsImpl) object;
}
public List<UserResponse> getOnlineUsers() {
return userRepository.findAllById(onlineUsers).stream()
.map(
userEntity ->
new UserResponse(
userEntity.getId(), userEntity.getUsername(), userEntity.getEmail()))
.toList();
}
public void addUserSubscribed(Principal user, String subscribedChannel) {
UserDetailsImpl userDetails = getUserDetails(user);
log.info("{} subscribed to {}", userDetails.getUsername(), subscribedChannel);
Set<String> subscriptions = userSubscribed.getOrDefault(userDetails.getId(), new HashSet<>());
subscriptions.add(subscribedChannel);
userSubscribed.put(userDetails.getId(), subscriptions);
}
public void removeUserSubscribed(Principal user, String subscribedChannel) {
UserDetailsImpl userDetails = getUserDetails(user);
log.info("unsubscription! {} unsubscribed {}", userDetails.getUsername(), subscribedChannel);
Set<String> subscriptions = userSubscribed.getOrDefault(userDetails.getId(), new HashSet<>());
subscriptions.remove(subscribedChannel);
userSubscribed.put(userDetails.getId(), subscriptions);
}
public boolean isUserSubscribed(UUID username, String subscription) {
Set<String> subscriptions = userSubscribed.getOrDefault(username, new HashSet<>());
return subscriptions.contains(subscription);
}
public Map<String, Set<String>> getUserSubscribed() {
Map<String, Set<String>> result = new HashMap<>();
List<UserEntity> users = userRepository.findAllById(userSubscribed.keySet());
users.forEach(user -> result.put(user.getUsername(), userSubscribed.get(user.getId())));
return result;
}
public void notifySender(
UUID senderId,
List<ConversationEntity> entities,
MessageDeliveryStatusEnum messageDeliveryStatusEnum) {
if (!isUserOnline(senderId)) {
log.info(
"{} is not online. cannot inform the socket. will persist in database",
senderId.toString());
return;
}
List<MessageDeliveryStatusUpdate> messageDeliveryStatusUpdates =
entities.stream()
.map(
e ->
MessageDeliveryStatusUpdate.builder()
.id(e.getId())
.messageDeliveryStatusEnum(messageDeliveryStatusEnum)
.content(e.getContent())
.build())
.toList();
for (ConversationEntity entity : entities) {
simpMessageSendingOperations.convertAndSend(
"/topic/" + senderId,
ChatMessage.builder()
.id(entity.getId())
.messageDeliveryStatusUpdates(messageDeliveryStatusUpdates)
.messageType(MessageType.MESSAGE_DELIVERY_UPDATE)
.content(entity.getContent())
.build());
}
}
}
SimpMessageSendingOperations
由 Springframework 提供,用于协助通过已建立的通道发送消息。
OnlineOfflineService 使我们可以非常容易地确定:
- 用户是否正在使用应用程序:
isUserOnline(userId)
- 用户是否订阅了频道(当前聊天视图):
isUserSubscribed(username, subscription)
如果以下两个返回值都为 false,则表示用户离线。消息传送状态应为 not_delievered
为了区分离线和在线(但不在聊天视图中),isUserSubscribed
应该为 false,而 isUserOnline
应该为 true。在这种情况下,信息的发送状态应该是已发送。
如果两个返回值都为 true,那么发送状态就会显示为两个用户都在同一个聊天视图上。
请参考https://github.com/SatvikNema/satchat/blob/main/src/main/java/com/satvik/satchat/service/ChatService.java,了解它是如何使用这些方法的。
对于用户界面,我们将使用带有 stompjs 7.0.0 的 react。 这些是依赖项:
"dependencies": {
"@stomp/stompjs": "^7.0.0",
"@testing-library/jest-dom": "^5.17.0",
"@testing-library/react": "^13.4.0",
"@testing-library/user-event": "^13.5.0",
"react": "^18.2.0",
"react-dom": "^18.2.0",
"react-scripts": "5.0.1",
"web-vitals": "^2.1.4",
"websocket": "^1.0.34",
"ws": "^8.16.0"
}
配置 stompjs 客户端:
import { Client } from "@stomp/stompjs";
class SocketClient {
constructor(url, jwt) {
this.url = url;
this.jwt = jwt;
this.client = new Client();
this.client.configure({
brokerURL: url,
connectHeaders: {
Authorization: `Bearer ${jwt}`,
},
onConnect: () => {
console.log("connected!");
},
});
this.client.activate();
}
publish = ({ destination, body }) => {
this.client.publish({
destination: destination,
body: JSON.stringify(body),
});
};
deactivate = () => {
this.client.deactivate();
};
subscribe = (topic, callback, ...forMessageTypes) => {
return this.client.subscribe(topic, (message) => {
if (
!forMessageTypes ||
forMessageTypes.includes(JSON.parse(message.body).messageType)
) {
callback(message);
}
});
};
awaitConnect = async (awaitConnectConfig) => {
const {
retries = 3,
curr = 0,
timeinterval = 100,
} = awaitConnectConfig || {};
return new Promise((resolve, reject) => {
console.log(timeinterval);
setTimeout(() => {
if (this.connected) {
resolve();
} else {
console.log("failed to connect! retrying");
if (curr >= retries) {
console.log("failed to connect within the specified time interval");
reject();
}
this.awaitConnect({ ...awaitConnectConfig, curr: curr + 1 });
}
}, timeinterval);
});
};
get connected() {
return this.client.connected;
}
get jwt() {
return this.jwt;
}
set jwt(value) {}
}
export default SocketClient;
connectHeaders
是我们传递 JWT 的地方。每次请求时,WebSocketTokenFilter
都会读取 JWT。
为了获取 JWT,我们有单独的 /login REST 端点。请参阅源码里面的 AuthController 和 BackendClient。
以上内容就涵盖了制作实时 1对1 聊天应用程序的所有主要部分。
需要考虑的事项
- 消息安全
没有内置消息加密功能。任何拥有数据库访问权限的人都可以读取发送的所有信息。为了防止这种情况,我们应该使用公钥加密技术,为每个对话设置唯一的密钥。使用主密钥加密后,密钥本身可以存储在同一个对话表中。
- 群聊
这个软件还不支持群聊。但它可以很容易地扩展。我们需要一个给定群组的通用 convId,它应能使后台正常工作。一种方法是为给定群组使用随机 UUID 并将其保存在数据库中。
- 文件附件
同样,只要先将附件上传到 s3 存储桶,然后将其 URI 作为信息传递给接收者,就能轻松实现这一功能。
- 横向扩展
当用户数量增加时,单个服务器无法满足需要。我们的 OnlineOfflineService 目前使用内存地图来跟踪活跃用户。当服务器数量增加时,我们就无法在内存中维护数据了。我们可以使用此处所述的 redis pubsub(插入 nasser hussein 的缩放 websockets 视频),在分布式缓存中维护(服务器 -> 套接字)全局地图。
作者:Satvik Nema
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/47561.html