利用 eKuiper 与 EMQX 实现车联网 CAN Bus 数据实时流处理

在之前的文章中,我们对车联网 CAN Bus 协议进行了详细解读,同时介绍了目前 CAN Bus 数据在实时采集与处理中面临的挑战。我们也针对这些挑战提出了一个可行的方案:利用开源边缘计算引擎 LF Edge eKuiper 进行 CAN Bus 数据的实时处理和分析,再结合开源 MQTT 服务器 EMQX 搭建可靠且可扩展的 MQTT 基础设施,收集 CAN Bus 数据,从而实现 CAN bus 数据本地实时处理后上云。

本文将对这一方案的实现进行详细介绍。

解决 CAN Bus 数据本地处理挑战

eKuiper 是一款开源的边缘计算引擎,可以帮助您实时处理和分析 CAN Bus 数据。eKuiper 是为边缘流处理而设计的,适合对 CAN Bus 产生的典型流数据进行实时处理。eKuiper 可以应对以下挑战:

  • 具备高效实时处理 CAN Bus 大容量、高速度数据的能力。能够灵活过滤、处理和选择感兴趣的信号,从而降低传输数据所需的带宽。
  • 能够将二进制的 CAN 帧解析为有意义的信号,以进行规则处理和触发动作。它支持动态加载 DBC 文件,使用户能够灵活地更改 DBC 文件,无需重启引擎即可适配不同的 CAN Bus 设备。同时,它还确保 DBC 文件的私密性和安全性,无需暴露给开发团队。
  • 能够灵活地组合来自不同 CAN 帧的信号,可以通过规则构建完整的消息供应用程序使用。用户还可以自由地修改规则,以适应不同的用户场景或需求变化,并支持热加载。

使用 eKuiper 实现 CAN Bus 数据本地处理

第 1 步:连接到 CAN Bus

eKuiper 使用 CAN 源插件来连接 CAN Bus 并接收 CAN 帧。它支持两种连接 CAN Bus 的模式,如下图所示。

利用 eKuiper 与 EMQX 实现车联网 CAN Bus 数据实时流处理
  1. 通过 socketCan 直接连接到 CAN Bus 。SocketCAN 使用 Berkeley socket API、Linux 网络协议栈,并将 CAN 设备驱动程序实现为网络接口。一旦将 CAN Bus 连接到 Linux 系统,用户就可以获得 CAN 网络接口。在 eKuiper 中,用户可以通过指定 CAN 网络接口和 DBC 文件路径来创建 CAN 流。然后,可以对 CAN 流应用任何规则来处理 CAN Bus 数据。
  2. 利用 TCP/UDP 通过网关连接到 CAN Bus 。网关可以是一个 CAN 适配器,它将多个 CAN 帧组合成一个数据包,并通过 TCP 或 UDP 批量发送出去。注意,网关发送的数据包格式没有标准化,因此可能需要修改插件来适应它。在 eKuiper 中,用户可以通过指定 TCP/UDP 地址和 DBC 文件路径来创建 CAN 流。然后,可以对 CAN 流应用任何规则来处理 CAN Bus 数据。

第 2 步:解码 CAN Bus 数据

CAN Bus 数据是二进制数据并且以帧为单位,CAN 帧由多个字段组成。CAN 协议有多个版本,包括 CAN 2.0A、CAN 2.0B 和 CAN FD。不同版本的 CAN 帧格式略有不同。下图显示了 CAN 2.0A 的帧格式。

利用 eKuiper 与 EMQX 实现车联网 CAN Bus 数据实时流处理

其中,有两个字段对解码 CAN Bus 数据很重要:

  1. ID 字段:ID 字段用于标识 CAN 帧。CAN 2.0A 的 ID 字段是 11 位,CAN 2.0B 和 CANFD 是 29 位。
  2. 数据字段:数据字段是有效载荷,用于携带实际数据。CAN 2.0A 和 CAN 2.0B 是 0-8 字节,CAN FD 是 0-64 字节。

在有效载荷中,数据由一系列的信号组成,每个信号都有自己的名称、长度和位置。DBC 文件是一个文本文件,其中包含将原始 CAN Bus 数据解码为“物理值”的相关信息。该文件定义了信号的名称、长度、位置以及将原始数据转换为物理值所使用的转换公式。

在 eKuiper 中,用户可以在解析 CAN Bus 数据时指定要使用的 DBC 文件路径。在 eKuiper 中配置 DBC 非常灵活和安全。

  • 多个 DBC 文件:用户可以选择一个目录作为 DBC 路径,目录中的 DBC 文件将按字母顺序逐个加载并生效。这种方式可以让用户通过单独的 DBC 文件来增量添加或还原信号。
  • 动态 DBC 文件加载:DBC 文件在运行时加载,无需在开发时部署。这可以帮助用户保持 DBC 文件的私密性和安全性。
  • 热加载:用户无需重新启动引擎即可随时更改 DBC 文件,以适应不同的 CAN Bus 设备。

配置完 eKuiper CAN 源后,用户可以创建一个流来接收并解析 CAN Bus 数据,从中提取有意义的物理信号。例如,CAN 有效载荷 0x0000000000000000 可以解析为以下信号:


{
  "signal1": 0,
  "signal2": 0,
  "signal3": 0,
  "signal4": 0,
  "signal5": 0,
  "signal6": 0,
  "signal7": 0,
  "signal8": 0
}

接下来,用户可以像从 MQTT 接收数据一样,利用 eKuiper 强大的流处理能力,对解析后的数据进行灵活处理。

第 3 步:处理 CAN Bus 数据

在得到解析过的数据后,我们可以利用 eKuiper 对它进行各种操作。为了节省传输数据的带宽,我们可以只挑选感兴趣的信号。比如,可以仅选择 signal1 和 signal2 这两个信号。


{
  "signal1": 0,
  "signal2": 0
}

用 eKuiper SQL 语句可以很容易实现这一点:

SELECT signal1, signal2 FROM canStream

由于 CAN 帧的大小有限,我们可能需要从多个 CAN 帧中提取所需的信号。因此,我们可以根据需要使用算法灵活地组合不同 CAN 帧中的信号,以构建完整的消息供应用程序使用。您可以在数据合并(https://ekuiper.org/docs/en/latest/example/data_merge/merge_single_stream.html)的示例中了解更多详细信息。这里,我们用 signal1 作为主要条件来筛选数据。

SELECT signal1, latest(signal2) as signal2 FROM canStream WHERE isNull(signal1) = false

另一个示例是只在特定事件发生时才收集数据,这样可以显著减少带宽的消耗。举个例子,我们可以只在 signal1 超过 100 时才进行数据收集。

SELECT signal1, signal2 FROM canStream WHERE signal1 > 100

此外,这些处理规则非常灵活,可以随时进行修改。即使在最初阶段没有确定所需的信号,也无需担心,您可以根据需求的变化随时调整规则,并实现热加载。

除了数据采集这一常见用途,eKuiper 还可以应用于其它场景,例如:

  1. 车载端实时灵活的规则引擎:可以根据特定条件触发相应动作。比如,当车速超过 70 英里/小时时,自动关闭车窗。
  2. 灵活的智能分析:可以在零编码或不连接云端的情况下,对数据和 AI 模型(目前是 TF Lite)进行组合。这个功能可以实现实时数据分析,比如根据速度、轮胎压力等数据预测和建议驾驶模式(即使没有网络连接)。
  3. 边缘计算:可以减少传输带宽,降低云端计算压力,还能解析、重组和转换数据,比如计算一段时间内的平均速度并保存。
  4. 异构数据整合:可以解析来自不同协议(TCP、UDP、HTTP、MQTT)和不同格式(CAN、JSON、CSV 等)的数据,并通过灵活的规则进行合并。
  5. 消息路由:可以决定哪些数据发送到云端,哪些数据保存在本地供其它车载应用使用。比如,根据 GDPR 或某些白名单来确定路由。

利用 MQTT 采集 CAN Bus 数据

使用 EMQX 这类 MQTT Broker 收集 CAN Bus 数据有以下几个优势:

  • 降低网络开销:MQTT 采用二进制格式和极简的头部对消息进行编码,可以节省网络带宽,提升数据传输效率。
  • 提升扩展性:单个 MQTT Broker 能够支持上千个并发连接和每秒上百万条消息。这使得可以从多个 CAN Bus 设备进行大规模数据收集,而不会影响性能或可靠性。
  • 增强安全性:MQTT 支持多种安全机制,比如 TLS/SSL 加密、用户名/密码认证、访问控制列表(ACL),以防止未经授权的数据访问或篡改。
  • 改善互操作性:MQTT 基于开放标准,被各种平台和语言广泛支持。这有利于将 CAN Bus 数据与其它系统或应用进行集成。

除此之外,EMQX 还提供了许多其它功能,并能够与 eKuiper 协同工作,从而帮助用户在传输 CAN Bus 数据时节省带宽、减少延迟、提高可靠性。

节省带宽

在通过 MQTT 传输 CAN Bus 数据时,我们通常需要在带宽受限的弱网络环境下进行传输。因此,我们需要尽量减小数据的大小。

在 eKuiper sink 中,我们可以使用 format 选项来指定数据格式。默认格式是 JSON 。我们可以将其改为 protobuf ,将数据序列化为二进制格式,以显著减少数据大小。另外,我们可以使用 compress 选项来通过 gzip 或其它压缩方法来压缩数据。通过这些方式我们可以让数据的大小比原来的 JSON 数据小很多。在我们的一个测试用例中,在批量发送数据时数据的大小可以减少 90% 以上。

实时数据

对于云端应用而言,某些数据具有时间敏感性,例如用于车辆事故诊断的数据。在这种情况下,降低延迟非常重要。在 eKuiper 规则中,我们可以用 MQTT sink 把数据快速发送到 EMQX,以满足高效的数据传输需求。

为了在实时场景中节省带宽,我们可以像前文介绍的那样,在 eKuiper MQTT sink 中设置序列化格式和压缩方法。EMQX 提供了规则引擎,可以解压缩和反序列化数据,这样云端应用不用编写代码就可以实时地处理数据。

将数据批量存入文件

对于不急于处理的数据,我们可以保存在文件或本地数据库里,然后批量发送到云端。这样可以达到更高的压缩比,进而节省更多的带宽。在 eKuiper 规则中,我们可以使用文件 sink 来本地保存和压缩数据。文件 sink 支持设置文件滚动策略,例如每 10 分钟滚动一次,这样我们可以将数据批量保存在文件中。EMQX 正在开发一个新功能,实现用 MQTT 传输文件。等这个功能完成后,保存的文件就可以用 MQTT 传输了。目前,用户可以用其它工具把文件传输到云端。

结语

本文介绍了如何利用 eKuiper 和 EMQX 从车辆收集、处理、传输 CAN Bus 数据到云端。通过结合 eKuiper 和 EMQX 的优势,我们可以更好地收集与利用 CAN Bus 数据,在有效节省资源的同时充分发挥数据价值。

来源:EMQ中文社区
原文:https://mp.weixin.qq.com/s/Hh7qdCb7R3C1nscnG7XkHw

版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。

(0)

相关推荐

发表回复

登录后才能评论