本文涉及 Angular、RxJS Subject & Observable 和 RxJS Websocket 的基础知识。下面文章分享使用 RxJS 在 Angular 应用程序中集成 WebSocket 的各个方面。
使用 JWT 令牌建立经过验证的 WebSocket 连接
让我们开发一个简单的 WebSocket 服务,通过包含 JWT 令牌建立 WebSocket 连接。然后,后端服务将通过身份验证服务验证该令牌,并相应地授权连接。
import { TokenService } from './token.service';
export const WS_ENDPOINT = 'wss://' + location.host + '/api/ws/';
@Injectable({
providedIn: 'root'
})
export class WebsocketService {
private ws: any;
public messages$: Subject<unknown> = new Subject<unknown>();
constructor(private tokenService: TokenService) {
}
public connect(): void {
this.create();
}
private create() {
if (this.ws) {
this.ws.unsubscribe();
}
let queryParams = `?token=${this.tokenService.getToken()}`;
const openObserver = new Subject<Event>();
openObserver.pipe(map((_) => true)).subscribe(this.status$);
const closeObserver = new Subject<CloseEvent>();
closeObserver.pipe(map((_) => false)).subscribe(this.status$);
this.ws = webSocket<any>(WS_ENDPOINT + queryParams);
this.ws.subscribe(this.messages$);
}
close() {
if (this.ws) {
this.ws.unsubscribe();
}
}
message(message: any) {
this.ws.next(message);
}
}
使用 RxJS 重试功能实施 WebSocket 重连策略
如果 WebSocket 连接中断,必须尝试重新连接。为此,我们需要监控连接状态并利用 RxJS 提供的重试功能。
import { TokenService } from './token.service';
export const WS_ENDPOINT = 'wss://' + location.host + '/api/ws/';
export const RECONNECT_INTERVAL = 5000;
@Injectable({
providedIn: 'root'
})
export class WebsocketService {
private observablesTopics: Record<string, ReplaySubject<any>> = {};
private status$: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
private ws: any;
public messages$: Subject<unknown> = new Subject<unknown>();
constructor(
private tokenService: TokenService
) {
}
public connect(): void {
this.create();
this.connectionStatus$.pipe(
skip(1),
filter(status => !status),
tap(() => this.create()),
).subscribe();
}
private create() {
if (this.ws) {
this.ws.unsubscribe();
}
let queryParams = `?token=${this.tokenService.getToken()}`;
const openObserver = new Subject<Event>();
openObserver.pipe(map((_) => true)).subscribe(this.status$);
const closeObserver = new Subject<CloseEvent>();
closeObserver.pipe(map((_) => false)).subscribe(this.status$);
this.ws = webSocket<any>({
url: WS_ENDPOINT + queryParams,
openObserver,
closeObserver,
});
this.ws.pipe(retry({
delay: (errs) => {
this.status$.next(false);
console.log(`Websocket connection down, will attempt reconnection in ${RECONNECT_INTERVAL}ms`);
return timer(RECONNECT_INTERVAL);
}
})).subscribe(this.messages$);
}
public get connectionStatus$(): Observable<boolean> {
return this.status$.pipe(distinctUntilChanged());
}
close() {
if (this.ws) {
this.ws.unsubscribe();
}
}
message(message: any) {
this.connectionStatus$.pipe(
filter(status => status),
tap(() => this.ws.next(message)),
take(1)
).subscribe();
}
}
解决绕过同一客户端发起的 WebSocket 消息的需求,以有效处理数据
这就产生了一个相关问题: 为什么我们需要绕过 WebSocket 消息?
请允许我详细说明一下。在多个用户界面实例连接到 WebSocket 服务器的情况下,一个用户界面实例可能会通过调用 API 来触发更改,并在 API 响应成功后更新其用户界面。更新后的数据会通过 WebSocket 连接进行广播,为其他未发起更改的用户界面实例提供有价值的信息。
为了区分这些更改的发起者,我们为每个 WebSocket 连接或用户界面实例分配了一个唯一的 WebSocket 会话 ID。在 HttpInterceptor 的帮助下,该会话 ID 会在每次 API 调用的头部传递。随后,服务器会在发送给所有已连接用户界面实例的 WebSocket 消息中包含相同的 WebSocket 会话 ID 和更新数据。
这种方法可以实现有效的数据过滤。用户界面实例可以使用与 WebSocket 会话 ID 不匹配的数据,或者,服务器可以排除发起者,并向所有其他用户界面实例广播更新。
AuthInterceptor:
@Injectable()
export class AuthInterceptor implements HttpInterceptor {
constructor(
private tokenService: TokenService,
private router: Router,
private websocketService: WebsocketService
) { }
intercept(req: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
// Get the auth token from the service.
const authToken = this.tokenService.getToken();
const websocketID = this.websocketService.websocketSessionID;
const authHeader = 'Bearer ' + authToken;
let header: any = {};
if (authToken) {
header.Authorization = authHeader;
}
if (websocketID) {
header['Websocket-Session-Id'] = websocketID;
}
const authReq = req.clone({
setHeaders: header
});
// send cloned request with header to the next handler.
return next.handle(authReq).pipe(
catchError((error: any) => {
if (error.status == 401) {
this.tokenService.removeToken();
this.router.navigate(['login']);
throw new AuthInterceptor.AuthorizationError();
} else {
throw error;
}
})
);
}
}
this.websocketSessionID = uuid();// create refresh uuid for websocket session ID
let queryParams = `?WebsocketSessionID=${this.websocketSessionID}&token=${this.tokenService.getToken()}`;
通过 WebSocket 订阅主题,并在重新连接/未连接 WebSocket 时处理主题订阅
每个组件都将订阅感兴趣的主题以接收相关数据。例如,显示警报的图表将订阅 “topic/alerts “主题,以接收警报的持续更新。在此设置中,我们将利用 RxJS WebSocket 的多路复用功能。
websocketSubscribe(topic: string) {
return this.ws.multiplex(
() => ({ subscribe: topic }),
() => ({ unsubscribe: topic }),
(message: { topics: string[], message: any }) => {
return message.topics.includes(topic)
}).pipe(
map((message: { topics: string[], message: any }) => message.message),
finalize(() => {
if (!this.observablesTopics[topic].observed) {
this.observablesTopics[topic].complete();
delete this.observablesTopics[topic];
}
})
);
}
在连接中断或 WebSocket 未连接的情况下处理主题订阅
当连接中断时,每个组件都需要重新订阅新的连接并向服务器发送订阅消息,这就带来了挑战。为避免这一问题,我们将使用高阶 ReplaySubject 来实现解决方案。该主体将使用 RxJS 的 switchAll() 在每次重新连接时切换到新的 Observable。
每个组件都将订阅 ReplaySubject,以获取多路复用函数生成的最后一个观察对象。事实上,采用这种方法将有效地从各个组件中抽象出 WebSocket 连接依赖关系。每个组件都可以订阅各自的主题,而无需关心 WebSocket 连接的实时状态。
考虑到可能有两个组件对同一个主题表示兴趣,我们将返回上次为该特定主题创建的 ReplaySubject。因此,必须跟踪所有订阅的主题及其对应的 ReplaySubject。
如果 WebSocket 连接断开,而组件想要订阅一个主题,该怎么办?
在这种情况下,WebSocket 服务应允许使用具有虚假内部可观察对象的 ReplaySubject 进行订阅,然后在连接建立后用实际可观察对象(通过调用 WebSocket 连接上的多路复用函数给出)替换它。
private observablesTopics: Record<string, ReplaySubject<any>> = {};
websocketSubscribe(topic: string, reconnect: boolean = false) {
if (this.observablesTopics[topic] && !reconnect) {// check if we have replay subcject for the topic
return this.observablesTopics[topic].pipe(switchAll());
} else {
///if it is reconnection then call multiplex and switch to new observable
let messagesSubject$ = this.observablesTopics[topic] || new ReplaySubject<Observable<any>>(1);
if (this.status$.value) {
messagesSubject$.next(this.ws.multiplex(
() => ({ subscribe: [topic] }),
() => ({ unsubscribe: [topic] }),
(message: { topics: string[], message: any }) => {
return message.topics.includes(topic)
}).pipe(
map((message: { topics: string[], message: any }) => message.message),
finalize(() => {
if (!this.observablesTopics[topic].observed) {// Remove the ReplaySubject if all components are unsubscribed.
this.observablesTopics[topic].complete();
delete this.observablesTopics[topic];
}
})
));
} else {
messagesSubject$.next(of()) //dummy observable
}
this.observablesTopics[topic] = messagesSubject$;
return this.observablesTopics[topic].pipe(switchAll());
}
}
下面是Websocket服务的完整代码:
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable, ReplaySubject, Subject, of, timer } from 'rxjs';
import { distinctUntilChanged, filter, finalize, map, retry, skip, switchAll, take, tap } from 'rxjs/operators';
import { webSocket } from 'rxjs/webSocket';
import { forEach } from 'lodash-es';
import { v4 as uuid } from 'uuid';
import { TokenService } from './token.service';
export const WS_ENDPOINT = 'wss://' + location.host + '/api/ws/';
export const RECONNECT_INTERVAL = 5000;
@Injectable({
providedIn: 'root'
})
export class WebsocketService {
websocketSessionID!: string;
private observablesTopics: Record<string, ReplaySubject<any>> = {};
private status$: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
private ws: any;
public messages$: Subject<unknown> = new Subject<unknown>();
constructor(
private tokenService: TokenService
) {
}
public connect(): void {
this.create();
this.connectionStatus$.pipe(
skip(1),
filter(status => !status),
tap(() => this.create()),
).subscribe();
}
private create() {
if (this.ws) {
this.ws.unsubscribe();
}
this.websocketSessionID = uuid();
let queryParams = `?WebsocketSessionID=${this.websocketSessionID}&token=${this.tokenService.getToken()}`;
const openObserver = new Subject<Event>();
openObserver.subscribe(() => {
this.status$.next(true);
//check if we have any topics subscribed, if yes then call multiplex for each topic and switch to new observablegiven by multiplex function.
forEach(this.observablesTopics, (value, key) => {
this.websocketSubscribe(key, true);
});
});
const closeObserver = new Subject<CloseEvent>();
closeObserver.pipe(map((_) => false)).subscribe(this.status$);
this.ws = webSocket<any>({
url: WS_ENDPOINT + queryParams,
openObserver,
closeObserver,
});
this.ws.pipe(retry({
delay: (errs) => {
this.status$.next(false);
console.log(`Websocket connection down, will attempt reconnection in ${RECONNECT_INTERVAL}ms`);
return timer(RECONNECT_INTERVAL);
}
})).subscribe(this.messages$);
}
public get connectionStatus$(): Observable<boolean> {
return this.status$.pipe(distinctUntilChanged());
}
close() {
if (this.ws) {
this.ws.unsubscribe();
}
}
message(message: any) {
this.connectionStatus$.pipe(
filter(status => status),
tap(() => this.ws.next(message)),
take(1)
).subscribe();
}
websocketSubscribe(topic: string, reconnect: boolean = false) {
if (this.observablesTopics[topic] && !reconnect) {// check if we have replay subcject for the topic
return this.observablesTopics[topic].pipe(switchAll());
} else {
//if it is reconnection then call multiplex and switch to new observable
let messagesSubject$ = this.observablesTopics[topic] || new ReplaySubject<Observable<any>>(1);
if (this.status$.value) {
messagesSubject$.next(this.ws.multiplex(
() => ({ subscribe: [topic] }),
() => ({ unsubscribe: [topic] }),
(message: { topics: string[], message: any }) => {
return message.topics.includes(topic)
}).pipe(
map((message: { topics: string[], message: any }) => message.message),
finalize(() => {
if (!this.observablesTopics[topic].observed) {// Remove the ReplaySubject if all components are unsubscribed.
this.observablesTopics[topic].complete();
delete this.observablesTopics[topic];
}
})
));
} else {
messagesSubject$.next(of()) //dummy observable
}
this.observablesTopics[topic] = messagesSubject$;
return this.observablesTopics[topic].pipe(switchAll());
}
}
}
作者:Priyank Srivastava
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/38076.html