在之前的文章中我对WebSub协议做了简单的介绍。在本文中,我们将使用Ballerina Websubhub标准库实现一个 WebSub hub。
我们要实现的用例是一个天气通知 hub。新闻台可以订阅该 hub,以接收特定地点的每小时天气更新。在本例中,我们将主要关注 hub 与订阅者之间的关系。
实施过程中将涉及以下方面:
- 使用 Ballerian 语言实现基本 WebSub hub。
- 使用 Ballerina 语言实现 WebSub订阅者。
- 集成第 3 方 API 以检索某个位置的天气报告。
- 连接 Apache Kafka 消息代理作为 WebSub hub实现的持久层。
在本文中,我们将介绍前两点,其余部分将在接下来的博客文章中介绍。
WebSub Hub实施
Ballerina Websubhub 标准库提供了一个精简 API 层来实现符合 WebSub 的 hubs。如果我们仔细阅读Websubhub标准库的API规范,我们可以很容易地理解需要实现的API。
在 Ballerina 语言中,WebSub hub 以监听器和服务的形式设计。
websubhub:Listener
websubhub:Service
:可以附加的侦听器端点。websubhub:Service
:API 服务,接收 WebSub 事件。
它websubhub:Listener
充当 HTTP 侦听器的包装器,使订阅者和发布者可以向其发送请求的 HTTP 端点。每次向端点发出请求时,都会触发附加到 的websubhub:Listener
相应 API 。websubhub:Service
websubhub:Listener
以下是基本 hub 实现的示例:
import ballerina/websubhub;
service /hub on new websubhub:Listener(9000) {
remote function onRegisterTopic(websubhub:TopicRegistration msg)
returns websubhub:TopicRegistrationSuccess|websubhub:TopicRegistrationError {
// todo: implement logic here
}
remote function onDeregisterTopic(websubhub:TopicDeregistration msg)
returns websubhub:TopicDeregistrationSuccess|websubhub:TopicDeregistrationError {
// todo: implement logic here
}
// todo: implement other required remote methods
}
上面的代码将在端口上启动一个 HTTP 端点9000
并将其附加websubhub:Service
到/hub
服务路径。发布者和订阅者可以使用URL 向中心发送请求http://localhost:9000/hub
。
在我们的 hub 实现示例中,订阅者订阅特定位置的天气警报。订阅者可在订阅请求中发送地点名称作为 hub.topic 参数。
hub 应保持两种状态。
- 应发出天气警报的地点。
- 订阅天气警报的新闻接收者。
可用位置可以作为 string[]
进行管理,新闻接收器将作为 map<websubhub:VerifiedSubscription>
进行管理。新闻接收者将被分配一个 ID,该 ID 将使用订阅请求中的hub.topic
和hub.callback
参数导出。
有了上述限制,我们的 hub 实现将如下所示:
import ballerina/http;
import ballerina/websubhub;
isolated string[] locations = [];
isolated map<websubhub:VerifiedSubscription> newsReceiversCache = {};
service /hub on new websubhub:Listener(9000) {
// Topic registration is not supported by this `hub`
remote function onRegisterTopic(websubhub:TopicRegistration msg)
returns websubhub:TopicRegistrationError {
return error websubhub:TopicRegistrationError(
"Topic registration not supported", statusCode = http:STATUS_NOT_IMPLEMENTED);
}
// Topic deregistration is not supported by this `hub`
remote function onDeregisterTopic(websubhub:TopicDeregistration msg) returns websubhub:TopicDeregistrationError {
return error websubhub:TopicDeregistrationError(
"Topic deregistration not supported", statusCode = http:STATUS_NOT_IMPLEMENTED);
}
// Content update is not supported by this `hub`
remote function onUpdateMessage(websubhub:UpdateMessage msg) returns websubhub:UpdateMessageError {
return error websubhub:UpdateMessageError(
"Content update not supported", statusCode = http:STATUS_NOT_IMPLEMENTED);
}
remote function onSubscriptionValidation(readonly & websubhub:Subscription subscription) returns websubhub:SubscriptionDeniedError? {
string newsReceiverId = string `${subscription.hubTopic}-${subscription.hubCallback}`;
boolean newsReceiverAvailable = false;
lock {
newsReceiverAvailable = newsReceiversCache.hasKey(newsReceiverId);
}
if newsReceiverAvailable {
return error websubhub:SubscriptionDeniedError(
string `News receiver for location ${subscription.hubTopic} and endpoint ${subscription.hubCallback} already available`,
statusCode = http:STATUS_NOT_ACCEPTABLE
);
}
}
remote function onSubscriptionIntentVerified(readonly & websubhub:VerifiedSubscription subscription) returns error? {
lock {
if locations.indexOf(subscription.hubTopic) is () {
locations.push(subscription.hubTopic);
}
}
string newsReceiverId = string `${subscription.hubTopic}-${subscription.hubCallback}`;
lock {
newsReceiversCache[newsReceiverId] = subscription;
}
}
remote function onUnsubscriptionValidation(readonly & websubhub:Unsubscription unsubscription) returns websubhub:UnsubscriptionDeniedError? {
string newsReceiverId = string `${unsubscription.hubTopic}-${unsubscription.hubCallback}`;
boolean newsReceiverNotAvailable = false;
lock {
newsReceiverNotAvailable = !newsReceiversCache.hasKey(newsReceiverId);
}
if newsReceiverNotAvailable {
return error websubhub:UnsubscriptionDeniedError(
string `News receiver for location ${unsubscription.hubTopic} and endpoint ${unsubscription.hubCallback} not available`,
statusCode = http:STATUS_NOT_ACCEPTABLE
);
}
}
remote function onUnsubscriptionIntentVerified(readonly & websubhub:VerifiedUnsubscription unsubscription) returns error? {
string newsReceiverId = string `${unsubscription.hubTopic}-${unsubscription.hubCallback}`;
lock {
_ = newsReceiversCache.removeIfHasKey(newsReceiverId);
}
}
}
请注意,在我们的 hub 实现中,以下远程方法已被禁用,因为它们与发布者相关的功能相关。
onRegisterTopic
onDeregisterTopic
onUpdateMessage
为了提高代码的可读性,我们将在实现中添加以下附加实用方法。
isolated function removeNewsReceiver(string newsReceiverId) {
lock {
_ = newsReceiversCache.removeIfHasKey(newsReceiverId);
}
}
isolated function getNewsReceivers(string location) returns websubhub:VerifiedSubscription[] {
lock {
return newsReceiversCache
.filter(newsReceiver => newsReceiver.hubTopic == location)
.toArray().cloneReadOnly();
}
}
hub 实现能够记录位置和已注册的新闻接收器。现在,我们要实现定期向注册新闻接收器发送天气警报的功能。在初始状态下,我们将有一组预定义的警报模板,这些模板将根据位置进行定制。
import ballerina/random;
import ballerina/mime;
import ballerina/lang.runtime;
import ballerina/regex;
import ballerina/time;
import ballerina/log;
import ballerina/websubhub;
// predifined alert templates
final readonly & string[] alerts = [
"Severe weather alert for [LOCATION] until [TIME]. We will send updates as conditions develop. Please call this number 1919 for assistance or check local media.",
"TORNADO WATCH for [LOCATION] until [TIME]. Storm conditions have worsened, be prepared to move to a safe place. If you are outdoors, in a mobile home or in a vehicle, have a plan to seek shelter and protect yourself. Please call this number 1919 for assistance or check local media."
];
isolated function startSendingNotifications(string location) returns error? {
map<websubhub:HubClient> newsDispatchClients = {};
while true {
log:printInfo("Running news-alert dispatcher for ", location = location);
websubhub:VerifiedSubscription[] currentNewsReceivers = getNewsReceivers(location);
final readonly & string[] currentNewsReceiverIds = currentNewsReceivers
.'map(receiver => string `${receiver.hubTopic}-${receiver.hubCallback}`)
.cloneReadOnly();
// remove clients related to unsubscribed news-receivers
string[] unsubscribedReceivers = newsDispatchClients.keys().filter(dispatcherId => currentNewsReceiverIds.indexOf(dispatcherId) is ());
foreach string unsubscribedReceiver in unsubscribedReceivers {
_ = newsDispatchClients.removeIfHasKey(unsubscribedReceiver);
}
// add clients related to newly subscribed news-receivers
foreach var newsReceiver in currentNewsReceivers {
string newsReceiverId = string `${newsReceiver.hubTopic}-${newsReceiver.hubCallback}`;
if !newsDispatchClients.hasKey(newsReceiverId) {
newsDispatchClients[newsReceiverId] = check new (newsReceiver);
}
}
if newsDispatchClients.length() == 0 {
continue;
}
string alert = check retrieveAlert(location);
foreach var [newsReceiverId, clientEp] in newsDispatchClients.entries() {
websubhub:ContentDistributionSuccess|error response = clientEp->notifyContentDistribution({
contentType: mime:APPLICATION_JSON,
content: {
"weather-alert": alert
}
});
if response is websubhub:SubscriptionDeletedError {
removeNewsReceiver(newsReceiverId);
}
}
// wait for 1 minute befaore starting next notificaion dispatch round
runtime:sleep(60);
}
}
isolated function retrieveAlert(string location) returns string|error {
string alert = alerts[check random:createIntInRange(0, alerts.length())];
alert = regex:replace(alert, "\\[LOCATION\\]", location);
time:Utc alertExpiryTime = time:utcAddSeconds(time:utcNow(), 3600);
alert = regex:replace(alert, "\\[TIME\\]", time:utcToString(alertExpiryTime));
return alert;
}
在这里,我们使用 websubhub:HubClient
向新闻接收器传送内容。websubhub:HubClient
与 WebSub 订阅者(在本例中为新闻接收者)具有一对一的映射关系。
有了这些变化,我们就可以改进 hub 服务中的 onSubscriptionIntentVerified
远程方法。
remote function onSubscriptionIntentVerified(readonly & websubhub:VerifiedSubscription subscription) returns error? {
// flag to identify whether a particular location mentioned in the subscription request is a new location
boolean localtionUnavailble = false;
lock {
if locations.indexOf(subscription.hubTopic) is () {
locations.push(subscription.hubTopic);
localtionUnavailble = true;
}
}
string newsReceiverId = string `${subscription.hubTopic}-${subscription.hubCallback}`;
lock {
newsReceiversCache[newsReceiverId] = subscription;
}
// if the location mentioned in the subscription request is a new location, start sending notification
// `start startSendingNotifications(subscription.hubTopic)` will start a new worker to send notifications.
// this newly created worker will execute in parallel to other workers
if localtionUnavailble {
_ = start startSendingNotifications(subscription.hubTopic);
}
}
在实现过程中,我使用了 Ballerina 语言中的几种并发结构。
WebSub 订阅者的实现
在我们的示例中,WebSub 订阅者代表一个新闻接收器。我们可以使用以下代码实现一个简单的订阅器。
import ballerina/websub;
import ballerina/log;
@websub:SubscriberServiceConfig {
target: ["http://localhost:9000/hub", "Colombo"]
}
service /news\-receiver on new websub:Listener(9091) {
remote function onEventNotification(websub:ContentDistributionMessage event) returns websub:Acknowledgement {
log:printInfo("Recieved weather-alert ", alert = event);
return websub:ACKNOWLEDGEMENT;
}
}
这里需要注意的一个重要问题是,订阅者是另一个暴露 HTTP 端点的服务。
在 websub:SubscriberServiceConfig
注解中,我们需要提供目标参数(这是一个元组)。在目标参数中,第一个值代表 hub URL,第二个值代表订阅请求中应使用的 hub.topic 参数。
在本文中,我们讨论了如何使用 Ballerina 语言实现基本的 WebSub hub 和订阅者。在下一篇文章中,我们将介绍如何集成第 3 方 API 来检索天气通知。
示例代码: https: //github.com/ayeshLK/weather-reporter/tree/state-1
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/30670.html