最近,我在两个应用服务器之间的通信中遇到了一个重大挑战。我的想法是将队列作业从一台服务器(服务器一)分派到另一台处理复杂查询和计算的服务器(服务器二)。在完成所有数据库事务和计算后,服务器二应通知服务器一所有操作已成功完成。
起初,我考虑过 RabbitMQ。但是,我发现它并不适合我的使用案例,因此我选择了 Kafka。
Kafka 服务器
我决定使用 Upstash 的 Kafka,而不是部署服务器、安装和配置 Kafka。这是一个无服务器数据平台,具有低延迟的 Kafka 连接。有趣的是,他们采用的是 “即用即付 “模式,因此我不必一直使用专用服务器。
操作非常简单:
- 在 Upstash 上创建一个账户
- 打开 “Kafka “选项卡并创建一个集群(稍后可以创建主题)
- 将端点、端口、用户名和密码复制到一个安全的地方
- 点击 “主题”,创建一个主题(你可以把它们看作广播信息的通道)
我假设你已经有两个 Laravel 应用程序要进行测试。
配置两个应用程序
在两个应用程序上配置 .env 文件和 config/kafka.php 文件。
将这些内容添加到 .env 文件中:
KAFKA_URL=https://***************rest-kafka.upstash.io
KAFKA_USERNAME=ZXhjaXR**********************izHMAz_W6wmwJ8
KAFKA_PASSWORD=OWI1ODcz*****mZTY1N2VhMTNm
KAFKA_GROUP=default_group
KAFKA_INSTANCE=default_instance
KAFKA_URL 应不含端口号
创建文件 config/kafka.php,复制并粘贴以下代码:
<?php
return [
'url' => env('KAFKA_URL', ''),
'user' => env('KAFKA_USERNAME', ''),
'password' => env('KAFKA_PASSWORD', ''),
'group' => env('KAFKA_GROUP', 'default_group'),
'instance' => env('KAFKA_INSTANCE', 'default_instance'),
];
生成有关某个主题的消息
在通过主题生成消息之前,在 app/services/Kafka/KafkaService.php
文件中创建一个 Laravel 服务容器
<?php
namespace App\Services\Kafka;
use Illuminate\Support\Facades\Http;
class KafkaService
{
private $http;
private $url;
private $user;
private $password;
public function __construct(Http $http)
{
$this->http = $http;
$this->url = config('kafka.url');
$this->user = config('kafka.user');
$this->password = config('kafka.password');
}
public function produce($topic, $producer, $data, $headers = [])
{
$url = $this->getUrl($topic);
$defaultHeaders = $this->getHeaders();
$headers = array_merge($defaultHeaders, $headers);
$response = $this->http::withHeaders($headers)->post($url, [
'key' => $producer,
'value' => json_encode($data),
]);
return $response->json();
}
private function getUrl($topic)
{
return sprintf('%s/produce/%s', $this->url, $topic);
}
private function getHeaders()
{
$auth = base64_encode($this->user . ':' . $this->password);
return [
'Authorization' => 'Basic ' . $auth,
'Content-Type' => 'application/json',
];
}
}
produce()
方法有四个参数
$topic
是我们在 Upstash UI 上创建的主题名称$producer
是一个类,用于识别哪个类负责生成消息,我们将在稍后创建消费者时看到它的作用$data
包含需要生成的有效负载消息- 可选
$headers
在生成消息时发送。
其余方法是不言自明的。
现在,您可以在任何控制器的构造函数中注入此服务容器,并调用 Produce 方法。示例:
<?php
namespace App\Http\Controllers;
use App\Services\Kafka\KafkaService;
class YourController extends Controller
{
private $kafkaService;
public function __construct(KafkaService $kafkaService)
{
$this->kafkaService = $kafkaService;
}
public function yourMethod()
{
$topic = 'your-topic';
$producer = self::class;
$data = ['key' => 'value']; // your data
$response = $this->kafkaService->produce($topic, $producer, $data);
// handle the response
}
}
响应成功后,点击主题名称,然后点击消息选项卡,就能在 Upstash UI 上看到生成的消息。
消费消息
现在如何在另一个 Laravel 应用程序中消费该消息?由于 PHP 不允许我们持续订阅进程,我们需要一个变通方法来持续消费一个主题。
在之前创建的 KafkaService.php
中添加以下方法:
public function consume($group, $instance, $topic, $headers = [])
{
$url = sprintf('%s/consume/%s/%s', $this->url, $group, $instance);
$defaultHeaders = $this->getHeaders();
$headers = array_merge($defaultHeaders, $headers);
$response = $this->http::withHeaders($headers)->post($url, [
'topic' => $topic,
'timeout' => 10000,
]);
return $response->json();
}
下面是代码的详细说明:
- 它使用提供的
$group
和$instance
参数以及存储在 $this->url 中的基本 URL 构建一个 URL。URL 格式为 {$this->url}/consume/{$group}/{$instance}。 - 它通过调用
getHeaders
方法获取 HTTP 请求的默认头信息。 - 它会将默认头信息与作为参数提供的头信息合并。这样就能确保传递给
consume
方法的任何其他头信息都会包含在 HTTP 请求中。 - 它使用 http 类向构建的 URL 发出 HTTP POST 请求。请求包括 $topic 和 10000 的
timeout
值 (代表 10 秒超时)。 - 它会以 JSON 格式返回 HTTP 请求的响应。
通过以下方式创建一个 Laravel 命令:
php artisan make:command KafkaConsume
现在打开创建的文件 app/Console/Commands/KafkaConsume.php,复制并粘贴以下代码:
<?php
namespace App\Console\Commands;
use App\Services\Kafka\KafkaService;
use App\Services\Kafka\ProcessKafkaEvent;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Http;
class KafkaConsume extends Command
{
protected $signature = 'kafka:consume {--topic=default}';
protected $description = 'Consume Messages from Kafka by Upstash';
private $kafkaService;
public function __construct(KafkaService $kafkaService)
{
parent::__construct();
$this->kafkaService = $kafkaService;
}
public function handle()
{
$topic = $this->option('topic');
$group = config('kafka.group');
$instance = config('kafka.instance');
$headers = [
'Kafka-Auto-Offset-Reset' => 'latest',
'Kafka-Auto-Commit-Interval' => 5000,
];
$messages = $this->kafkaService->consume($group, $instance, $topic, $headers);
foreach ($messages as $message) {
$producer = $message['key'];
$payload = collect(json_decode($message['value']));
ProcessKafkaEvent::handle($topic, $producer, $payload);
}
}
}
在方法开始时,它会使用 $this->option(‘topic’) 从命令选项中获取主题。如果运行命令时没有提供主题,它将使用命令签名中指定的默认值。
接下来,它会使用 config 函数从应用程序的配置中获取 Kafka 组和实例。组和实例用于识别 Kafka 集群中的消费者。
然后,它会定义一个包含两个 Kafka 特定头的 $headers 数组: Kafka-Auto-Offset-Reset 和 Kafka-Auto-Commit-Interval。Kafka-Auto-Offset-Reset 头被设置为 latest,这意味着如果没有提供初始偏移量,消费者将从主题上最新的消息开始消费消息。Kafka-Auto-Commit-Interval 标头设置为 5000,这意味着消费者将每隔 5000 毫秒自动提交其消费的最后一条消息的偏移量。
然后,KafkaService 的消耗方法(consume method)就会被调用,其中包括组、实例、主题和标题。该方法负责向 Kafka 服务器发出 HTTP 请求,以消耗消息。消耗的消息以数组形式返回,并存储在 $messages 变量中。
然后,该方法会循环遍历 $messages 数组中的每条消息。对于每条消息,它都会检索键(代表消息的生产者)和值(代表消息的有效载荷)。使用 collect 函数将有效载荷从 JSON 格式解码并转换为 Laravel 集合。
最后,ProcessKafkaEvent 类的 handle 方法会被调用,其中包括 topic、producer 和 payload。该方法负责处理消耗的 Kafka 事件。
创建另一个文件app/Services/Kafka/ProcessKafkaEvent.php
<?php
namespace App\Services\Kafka;
use App\Services\Report\ReferralReport;
class ProcessKafkaEvent
{
public static function handle($topic, $producer, $payload)
{
$target = match ($producer . ':' . $topic) {
'App\\Http\\Controllers\\YourController:test' => SamplHandler::class,
default => UnhandledKafkaEvent::class
};
$instance = new $target;
$instance->handle($payload, $producer, $topic);
}
}
handle
方法是一个静态方法,它采用三个参数:$topic
、$producer
和$payload
。它们分别代表 Kafka 主题、消息生产者和消息负载。match
表达式用于确定应处理 Kafka 事件的目标类。它通过使用冒号 () 连接$producer
和$topic
并将结果与可能值列表进行匹配来实现此目的。在这种情况下,如果生产者和主题匹配'App\\Http\\Controllers\\YourController:test'
,它将使用SampleController
类。如果没有匹配,则默认为UnhandledKafkaEvent
类。- 然后创建目标类的一个新实例,并调用该实例中的
handle
方法,同时传递$payload
、$producer
和$topic
。
请记住,我们在生成消息时添加了一个生产者类 key
,这里我们过滤 producer
类以获得正确的类处理消息。ProcessKafkaEvent
类充当路由器。您可以在 match
中添加任意数量的类。
创建一个新文件 app/Services/Kafka/UnhandleKafkaEvent.php,如果没有为特定生产者定义处理程序类,该文件将充当后备处理程序:
<?php
namespace App\Services\Kafka;
use Illuminate\Support\Facades\Log;
class UnhandledKafkaEvent
{
public function handle($payload, $producer, $topic)
{
// log the unhandled messages here, customize it according to your need
Log::error('Unhandled Kafka queued job: ' . $producer, [
'level' => ActivityLevel::emergency->value,
'job' => $producer,
'queue' => $topic,
'connection' => 'kafka',
'job_arguments_payload' => $payload->toArray(),
]);
}
}
在 SampleHandler
类中,应该有一个 handle()
方法,可以执行以下操作:
public function handle($payload, $producer, $topic)
{
// process the payload here
}
现在运行我们在这里创建的 artisan 命令会发生什么?如果没有信息要处理,它会在 10 秒后关闭,或者在处理完信息后立即关闭。但这并不是我们想要的。我们希望命令始终运行或消耗信息。为此,我们需要一个 Worker。
设置 Worker
在 Ubuntu 上安装 supervisor。安装后创建并编辑文件 – sudo nano /etc/supervisor/conf.d/consumer-worker.conf。复制并粘贴以下内容:
[program:consumer-worker]
directory=/var/www/html ; laravel root directory
process_name=%(program_name)s_%(process_num)02d
command=php artisan kafka:consume --topic=test ; whatever your topic name is
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/supervisor-consumer-worker.log
stopwaitsecs=1800
现在保存并关闭文件。
运行:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start consumer-worker:*
现在,它将从第一个 Laravel 应用程序中持续运行并消耗信息。
所有这些都是一次性设置,你现在可以创建任意数量的生产者,并消费它们。
如果…
1. 从第一个应用程序生成信息后,如果想在第二个应用程序上生成一些 Excel 报告,该怎么办?就是这样
// first application
// ReportsController.php
$response = $this->kafkaService->produce('reports', self::class, [
'from' => now()->subMonth(),
'to' => now()
]);
// second application
// ProcessKafkaEvent.php
$target = match ($producer . ':' . $topic) {
'App\\Http\\Controllers\\YourController:test' => SamplHandler::class,
'App\\Http\\Controllers\\ReportsController:reports' => HandleReport::class,
default => UnhandledKafkaEvent::class
};
// HandleReport.php
public function handle($payload, $producer, $topic)
{
$from = $payload['from'];
$to = $payload['to'];
// generate the actual report
}
2. 如果想进一步通知第一台服务器报告已生成怎么办?可以遵循相同的生产者和消费者步骤,但现在相反,配置第二个应用程序以生成消息,并配置第一个应用程序以使用消息。
3. 如果想通知前端应用程序报告已生成怎么办?好吧,在这种情况下,我们可以使用网络套接字。
作者:Subham Chakraborty
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/39674.html