在前一篇文章中,我们介绍了如何将第三方 API 与 hub 实现集成。在本文中,我们将重点介绍通过整合消息代理来增强 hub 实现的可扩展性、可靠性和稳定性。具体来说,我们将在本示例中使用 Apache Kafka 消息代理。
消息代理非常适合实施 WebSub hubs,因为它们发挥着类似的作用。但是,在深入研究消息代理集成之前,我们应该明确当前实现的局限性。
下面的流程图描述了当前的通知调度流程。
我们可以看到有一个通知发送器组件,它将天气警报发送到新闻接收器(新闻频道)。但这种方法存在一些问题。
- 每次运行通知发送器时,它都必须查询中心的内部状态,以检索特定位置的有效新闻接收器。
- 如果在向新闻接收器分发内容时出现错误,特定的天气警报就会丢失。
- 无法查询/查看已分发的天气警报。
将 Apache Kafka 纳入我们的通知分发流程将引入发布/订阅模式,这将带来诸多好处。
- 通知生成和通知分发将分开。
- Kafka 将为天气警报提供持久层。
- 如果在将内容分发到新闻接收器时出现错误,我们可以实现适当的重试机制,因为通知将保留在 Kafka 中。
Kafka 的集成将导致修改后的通知调度流程与以下格式一致。
根据流程图,hub 内有两个组件,即通知发送器(Notification Sender)和通知接收器(Notification Receiver)。通知发送器组件定期将天气报告发布到 Kafka 主题中。通知接收器组件监听 Kafka 主题,并在收到事件后将消息转发给新闻接收器(新闻频道)。每个通知接收器组件都与相应的新闻接收器直接关联,每个通知发送器都与生成天气报告的特定位置直接关联。
下图说明了 hub 内的组件如何映射到 Kafka 主题和用户组。
由于每个通知发送方都与一个位置相关联,因此将为每个位置分配一个相应的 Kafka 主题。此外,由于每个通知接收器都与新闻接收器相连,因此将为每个通知接收器分配一个唯一的 Kafka 消费者组。
Kafka 消息发布逻辑可以轻松实现。
kafka:ProducerConfiguration messagePersistConfig = {
clientId: "message-persist-client",
acks: "1",
retryCount: 3
};
final kafka:Producer messagePersistProducer = check new ("localhost:9092", messagePersistConfig);
public isolated function publishWeatherNotification(string location, WeatherReport weatherReport) returns error? {
json payload = weatherReport.toJson();
byte[] serializedContent = payload.toJsonString().toBytes();
check messagePersistProducer->send({
topic: location,
value: serializedContent
});
check messagePersistProducer->'flush();
}
由于我们的通知发送器需要定期发布天气警报,因此我们将使用计划任务来实现它。
isolated class NotificationSender {
*task:Job;
private final string location;
isolated function init(string location) {
self.location = location;
}
public isolated function execute() {
WeatherReport|error weatherReport = getWeatherReport(self.location);
if weatherReport is error {
log:printWarn(string `Error occurred while retrieving weather-report: ${weatherReport.message()}`, stackTrace = weatherReport.stackTrace());
return;
}
error? persistResult = publishWeatherNotification(self.location, weatherReport);
if persistResult is error {
log:printWarn(string `Error occurred while persisting the weather-report: ${persistResult.message()}`, stackTrace = persistResult.stackTrace());
}
}
}
isolated function startNotificationSender(string location) returns task:JobId|error {
NotificationSender notificationSender = new (location);
// schedule the job to be run every 600 seconds (10 minutes)
return task:scheduleJobRecurByFrequency(notificationSender, 600.0);
}
要设置新闻接收器,就必须具备从 Kafka 服务器获取消息的能力。因此,我们需要一个 Kafka 消费者。
function createMessageConsumer(websubhub:VerifiedSubscription message) returns kafka:Consumer|error {
string groupName = string `${message.hubTopic}_${message.hubCallback}`;
kafka:ConsumerConfiguration consumerConfiguration = {
groupId: groupName,
topics: [message.hubTopic],
// turn-off the auto offset commit
autoCommit: false
};
return check new ("localhost:9092", consumerConfiguration);
}
通知接收器将使用 Ballerina 异步函数实现。
type UpdateMessageConsumerRecord record {|
*kafka:AnydataConsumerRecord;
weatherApi:WeatherReport value;
|};
function startNotificationReceiver(websubhub:VerifiedSubscription newsReceiver) returns error? {
kafka:Consumer kafkaConsumer = check createMessageConsumer(newsReceiver);
websubhub:HubClient hubClient = check new (newsReceiver, {
retryConfig: {
interval: config:CLIENT_RETRY_INTERVAL,
count: config:CLIENT_RETRY_COUNT,
backOffFactor: 2.0,
maxWaitInterval: 20
},
timeout: config:CLIENT_TIMEOUT
});
// start the async function for `News Receiver`
_ = start pollForNewUpdates(hubClient, kafkaConsumer, newsReceiver);
}
isolated function pollForNewUpdates(websubhub:HubClient hubClient, kafka:Consumer kafkaConsumer, websubhub:VerifiedSubscription newsReceiver) returns error? {
string location = newsReceiver.hubTopic;
string receiverId = string `${newsReceiver.hubTopic}-${newsReceiver.hubCallback}`;
do {
while true {
UpdateMessageConsumerRecord[] records = check kafkaConsumer->poll(10.0);
if !isValidNewsReceiver(location, receiverId) {
fail error(string `Subscriber with Id ${receiverId} or topic ${location} is invalid`);
}
var result = notifySubscribers(records, hubClient, kafkaConsumer);
if result is error {
log:printError("Error occurred while sending notification to subscriber ", err = result.message());
check result;
} else {
// commit the Kafka offset only if the message delivery is successfull
check kafkaConsumer->'commit();
}
}
} on fail var e {
log:printError(string `Error occurred while sending notification to news-receiver: ${e.message()}`, stackTrace = e.stackTrace());
removeNewsReceiver(receiverId);
kafka:Error? result = kafkaConsumer->close(15.0);
if result is kafka:Error {
log:printError("Error occurred while gracefully closing kafka-consumer", err = result.message());
}
}
}
分析上面的代码,我们可以看到,我们禁用了 Kafka 消费者的自动偏移提交,取而代之的是手动偏移提交机制。我们特意做出这一改动,是因为有了手动偏移提交,我们就能告知 Kafka 某条消息是否已被成功处理,还能为失败的消息启用重试机制。
有了这些改动,hub 实现的状态管理也发生了变化。
// `notificationsSenders` will be used to identify already started notification senders
// location name will be the key for `notificationSenders`
isolated map<task:JobId> notificationSenders = {};
isolated map<websubhub:VerifiedSubscription> newsReceiversCache = {};
hub 服务的 onSubscriptionIntentVerified
功能现在将更新如下。
remote function onSubscriptionIntentVerified(readonly & websubhub:VerifiedSubscription subscription) returns error? {
if !validNotificationSenderExists(subscription.hubTopic) {
task:JobId notificationService = check startNotificationSender(subscription.hubTopic);
lock {
notificationSenders[subscription.hubTopic] = notificationService;
}
}
string newsReceiverId = string `${subscription.hubTopic}-${subscription.hubCallback}`;
lock {
newsReceiversCache[newsReceiverId] = subscription;
}
check startNotificationReceiver(subscription);
}
在本文中,我们讨论了如何将消息代理集成到我们的 hub 实现中。本文将结束我一直在写的 WebSub 系列文章。祝你编码愉快….!
示例代码: https: //github.com/ayeshLK/weather-reporter/tree/state-3
作者:Ayesh Almeida
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/30946.html