如何在 Windows 上使用 Kafka-Python 进行实时流处理

如何在 Windows 上使用 Kafka-Python 进行实时流处理

Apache Kafka 是一个分布式流平台,可用于处理实时数据。Kafka 经常与 Python 结合使用,以构建实时流应用程序。

在本文中,我们将了解如何在 Windows 上开始使用 Kafka-Python 进行实时流处理。我们将讨论以下主题:

  • 在 Windows 上安装 Kafka
  • 安装 Kafka-Python
  • 创建主题和生成消息
  • 消费消息
  • Kafka 使用案例

在 Windows 上安装 Kafka

第一步是在 Windows 上安装 Kafka。您可以从 Apache Kafka 网站下载适用于 Windows 的 Kafka 二进制文件: https: //kafka.apache.org/downloads。

下载 Kafka 二进制文件后,将它们解压到您选择的目录中。Kafka 二进制文件包含许多用于管理 Kafka 的命令行工具。下面定义的步骤将有助于配置 Zookeeper 和 Kafka 服务器配置并在localhost上启动它。

1) 安装 Java JDK 8 或更高版本。
2) 从官方网站下载 Kafka 二进制文件,并将下载的文件解压缩到要保存 Kafka 文件的目录中。
3) 打开解压缩的 Kafka 文件内的配置文件夹,编辑 zookeeper.properties 文件。
4) 复制字段 dataDir 的路径,并在路径中添加 /zookeeper-data
5) 打开另一个命令提示符,使用以下命令启动 Zookeeper: .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
6) 打开另一个命令提示符,使用以下命令启动 Kafka: .\bin\windows\kafka-server-start.bat .\config\server.properties

安装 Kafka-Python

下一步是安装 Kafka-Python。Kafka-Python 是一个 Python 库,可以轻松地与 Kafka 进行交互。您可以使用 pip 安装 KafkaPython:

pip install kafka-python
or
conda install -c conda-forge kafka-python

创建主题并生成简单的消息

#KafkaProducer.py

from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable
import time

# Create a Kafka Producer on Localhost with Default Port
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# Create a New Topic to Send Messages
topic = 'Test_Topic_1'

# Retry logic to handle connection issues
max_retries = 3
current_retry = 0
while current_retry < max_retries:
    try:
        for i in range(5):
            message = f'Message {i}'
            producer.send(topic, message.encode('utf-8'))
            print(f'Sent: {message}')
        # If the messages are sent successfully, break the loop
        break

    except NoBrokersAvailable as e:
        print(f"Error: {e}")
        print("Retrying in 5 seconds...")
        time.sleep(5)
        current_retry += 1

# Close the Producer and Close the connections
producer.close()

消费消息

#KafkaConsumer.py

from kafka import KafkaConsumer

# Create a Kafka Consumer
consumer = KafkaConsumer(
    'Test_Topic_1', # Topic Name
    bootstrap_servers='localhost:9092', #Host and Port
    group_id='your_consumer_group' # Group ID
)

# Continuously listen / poll for New Messages
for message in consumer:
    print(f'Received: {message.value.decode("utf-8")}')

Kafka 用于 JSON 数据

#Kafka_JSON_Producer.py

from kafka import KafkaProducer
import json

# Create a Kafka producer
producer = KafkaProducer(bootstrap_servers='localhost:9092', 
                         value_serializer=lambda m: json.dumps(m).encode('utf-8'))

# Create a JSON object
data = {'productname': 'Pen', 'price': 20, 'city': 'Chennai'}

# Send a JSON object via Kafka
producer.send('Test_Topic_1', value=data)

producer.close()

Kafka 使用案例

物联网设备之间的通信: Kafka 可用作物联网设备间通信的消息传递系统。这对于需要在设备间进行实时数据交换的应用非常有用,如智能家居、智能城市和工业物联网。

流式 ETL 中间件: Kafka 可用作流式 ETL(提取、转换、加载)中间件,用于不同系统之间的实时数据集成。这对于需要实时数据同步的应用非常有用,如数据仓库、数据湖和数据管道。

流分析: Kafka 可用于流式处理来自传感器、社交媒体和金融市场等各种来源的数据。然后可以对这些数据进行实时分析,以识别趋势、模式和异常。

实时事件处理: Kafka 可用于处理实时事件,如客户订单、网站流量和机器健康状况。这些数据可用于触发行动,如发送警报、更新用户配置文件或调整价格。

实时协作: Kafka 可用于实现用户之间的实时协作。例如,Kafka 可用于实时共享文档、图片和视频。这对于共同开展项目的团队,或者共同完成作业的学生来说,都是非常有用的。

实时游戏: Kafka 可用于支持实时多人游戏。例如,Kafka 可用于向玩家实时发送游戏状态更新。这有助于确保所有玩家始终站在同一起跑线上,确保游戏对每个人来说都是公平和愉快的。

最终,用于大众传播的最佳消息系统将取决于您应用程序的具体要求。如果您需要一个可扩展性强、可靠的消息传递系统来实时传递消息,那么 Kafka 就是一个不错的选择。如果您需要一个易于为小型设备和低带宽网络设置和管理的消息传递系统,那么 MQTT 是一个不错的选择。

作者:Rajesh Mani Kumar G

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/31082.html

(0)

相关推荐

发表回复

登录后才能评论