开源|EdgeX 边缘计算数据持久化之 InfluxDB
目录
欢迎关注 EdgeX Foundry 中文社区公众号,及时获取最新资料和分享。
日常情况下,在边缘计算端侧,一般不需要数据持久化到本地磁盘或数据库,但是,某些特殊应用下,需要做本地数据保存和分析时,这项功能就非常有必要。本文将介绍 EdgeX 边缘计算框架在本地保存 telemetry 数据到 InfluxDB 的具体方法和结果。
背景介绍
EdgeX 简介
EdgeX Foundry 边缘计算框架的分层和服务为边缘设备/节点和云/企业应用之间提供了一个双向转换引擎。EdgeX 作为边缘计算平台是非常不错的选择。
本文需要采用自定义开发的 EdgeX Application service
来完成数据持久化的工作。
InfluxDB 简介
InfluxDB 是一个由 InfluxData 公司开发的开源时序型数据库。它由Go写成,着力于高性能地查询与存储时序型数据。InfluxDB 被广泛应用于存储系统的监控数据,IoT行业的实时数据等场景。
EdgeX 数据写入(存入)InfluxDB 是非常适合且有必要的,本文使用 MQTT 和 Telegraf 作为 InfluxDB plugin 来实现具体功能。
开发过程
架构设计
EdgeX 应用程序服务(app-influxdb-export)通过 MQTT 将读数导出到 InfluxDB。此服务示例是使用 EdgeX v2.3.0 版本创建的,如需其他版本,请与我们联系或自行开发。
这个管道服务,有两部分组成:
- 将 EdgeX 事件/读取对象转换为 Influx line 协议;
- 将结果导出到 MQTT Broker,这是 Telegraf / Influx 用来监听并从中获取数据的 MQTT 主题;
- InfluxDB 实现数据持久化入库工作;
EdgeX application service
前提条件
- 需要启动 EdgeX 框架;
- 启动 MQTT Broker 容器;
Application Service
目录结构很简单,文件不多;
├── Dockerfile // docker 文件
├── LICENSE
├── Makefile // make build
├── README.md
├── app-service-influx.png
├── go.mod // gomod 配置
├── go.sum
├── main.go // 主程序
├── pkg
│ └── transforms
│ └── conversions.go // 转换器
└── res
└── configuration.toml // 配置文件
主程序,不复杂,遵循 SDK 原则即可;
这里省略源文件,请到github下载。
尝试编译,能成功生成二进制文件;
make build
Docker 容器
Dockerfile 描述文件;
这里省略源文件,请到github下载。
创建 docker image;
docker build . -t edgexfoundry/app-influxdb:2.3.0
docker compose 片段,加入到你的 docker-compose.yml ;
app-service-influxdb-export:
image: edgexfoundry/app-influxdb:2.3.0
ports:
- 127.0.0.1:59780:59780/tcp
container_name: edgex-app-influxdb-export
hostname: edgex-app-influxdb-export
depends_on:
- consul
- data
environment:
CLIENTS_CORE_COMMAND_HOST: edgex-core-command
CLIENTS_CORE_DATA_HOST: edgex-core-data
CLIENTS_CORE_METADATA_HOST: edgex-core-metadata
CLIENTS_SUPPORT_NOTIFICATIONS_HOST: edgex-support-notifications
CLIENTS_SUPPORT_SCHEDULER_HOST: edgex-support-scheduler
DATABASES_PRIMARY_HOST: edgex-redis
EDGEX_SECURITY_SECRET_STORE: "false"
MESSAGEQUEUE_HOST: edgex-redis
REGISTRY_HOST: edgex-core-consul
SERVICE_HOST: edgex-app-influxdb-export
TRIGGER_EDGEXMESSAGEBUS_PUBLISHHOST_HOST: edgex-redis
TRIGGER_EDGEXMESSAGEBUS_SUBSCRIBEHOST_HOST: edgex-redis
MQTTCONFIG_BROKERADDRESS: edgex-mqtt-broker:1883
read_only: true
restart: always
networks:
- edgex-network
security_opt:
- no-new-privileges:true
user: 2002:2001
InfluxDB 环境搭建
Telegraf 配置
配置文件,注意保留你需要的部分;
[[inputs.mqtt_consumer]]
## Broker URLs for the MQTT server or cluster. To connect to multiple
## clusters or standalone servers, use a separate plugin instance.
## example: servers = ["tcp://localhost:1883"]
## servers = ["ssl://localhost:1883"]
## servers = ["ws://localhost:1883"]
servers = ["tcp://192.168.123.12:1883"]
## Topics that will be subscribed to.
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
"edgex/EdgeXEvents"
]
## The message topic will be stored in a tag specified by this value. If set
## to the empty string no topic tag will be created.
# topic_tag = "topic"
## QoS policy for messages
## 0 = at most once
## 1 = at least once
## 2 = exactly once
##
## When using a QoS of 1 or 2, you should enable persistent_session to allow
## resuming unacknowledged messages.
# qos = 0
## Connection timeout for initial connection in seconds
# connection_timeout = "30s"
## Max undelivered messages
## This plugin uses tracking metrics, which ensure messages are read to
## outputs before acknowledging them to the original broker to ensure data
## is not lost. This option sets the maximum messages to read from the
## broker that have not been written by an output.
##
## This value needs to be picked with awareness of the agent's
## metric_batch_size value as well. Setting max undelivered messages too high
## can result in a constant stream of data batches to the output. While
## setting it too low may never flush the broker's messages.
# max_undelivered_messages = 1000
## Persistent session disables clearing of the client session on connection.
## In order for this option to work you must also set client_id to identify
## the client. To receive messages that arrived while the client is offline,
## also set the qos option to 1 or 2 and don't forget to also set the QoS when
## publishing.
# persistent_session = false
## If unset, a random client ID will be generated.
# client_id = ""
## Username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Client trace messages
## When set to true, and debug mode enabled in the agent settings, the MQTT
## client's messages are included in telegraf logs. These messages are very
## noisey, but essential for debugging issues.
# client_trace = false
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Enable extracting tag values from MQTT topics
## _ denotes an ignored entry in the topic path
# [[inputs.mqtt_consumer.topic_parsing]]
# topic = ""
# measurement = ""
# tags = ""
# fields = ""
## Value supported is int, float, unit
# [[inputs.mqtt_consumer.topic.types]]
# key = type
# Configuration for sending metrics to InfluxDB 2.0
[[outputs.influxdb_v2]]
## The URLs of the InfluxDB cluster nodes.
##
## Multiple URLs can be specified for a single cluster, only ONE of the
## urls will be written to each interval.
## ex: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"]
urls = ["http://192.168.123.12:8086"]
## Token for authentication.
token = "PlbXqGQqYrnmZh22q88C0eS7CUyVSa9n1t2CjDY1nkA9RbXwM3BHBm0FIK3f3hT6p3oJKOrtFWLqeLgY9WQmyQ=="
## Organization is the name of the organization you wish to write to.
organization = "yiqisoft"
## Destination bucket to write into.
bucket = "edgex"
## The value of this tag will be used to determine the bucket. If this
## tag is not set the 'bucket' option is used as the default.
# bucket_tag = ""
## If true, the bucket tag will not be added to the metric.
# exclude_bucket_tag = false
## Timeout for HTTP messages.
# timeout = "5s"
## Additional HTTP headers
# http_headers = {"X-Special-Header" = "Special-Value"}
## HTTP Proxy override, if unset values the standard proxy environment
## variables are consulted to determine which proxy, if any, should be used.
# http_proxy = "http://corporate.proxy:3128"
## HTTP User-Agent
# user_agent = "telegraf"
## Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "gzip"
## Enable or disable uint support for writing uints influxdb 2.0.
# influx_uint_support = false
## HTTP/2 Timeouts
## The following values control the HTTP/2 client's timeouts. These settings
## are generally not required unless a user is seeing issues with client
## disconnects. If a user does see issues, then it is suggested to set these
## values to "15s" for ping timeout and "30s" for read idle timeout and
## retry.
##
## Note that the timer for read_idle_timeout begins at the end of the last
## successful write and not at the beginning of the next write.
# ping_timeout = "0s"
# read_idle_timeout = "0s"
## Optional TLS Config for use on HTTP connections.
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
Docker 容器
docker-compose.yml
services:
influxdb:
image: influxdb:latest
container_name: influxdb2
volumes:
- /opt/devel/influxdb/data:/var/lib/influxdb2:rw
environment:
- DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=mytoken
ports:
- 8086:8086
restart: unless-stopped
telegraf:
image: telegraf:latest
container_name: telegraf
volumes:
# Sync timezone with host
- /etc/localtime:/etc/localtime:ro
# Map Telegraf configuration file
- /opt/devel/influxdb/telegraf.conf:/etc/telegraf/telegraf.conf:ro
# Map /tmp to permanent storage (this includes /tmp/metrics.out)
- /opt/devel/influxdb:/tmp:rw
environment:
- DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=mytoken
restart: unless-stopped
depends_on:
- influxdb
启动容器
docker compose up -d
实验结果
MQTT broker 内容查看
确认 mqtt 内容为 Influx line protocol;
InfluxDB Graph
确认 InfluxDB 数据接收正常。
开源代码
完整的源代码在:https://github.com/yiqisoft/edgex-app-influxdb-export.git
关于亿琪软件
上海亿琪软件有限公司成立于2016年,专注于 5G 通信、AI 人工智能、边缘计算和大数据网络安全多项技术领域,致力于物联网领域前沿技术的创新,为用户提供全方位、智能化和安全的物联网解决方案。
2023年,公司发布“YiFUSION|工业边缘智能融合一体机”产品,为工业客户提供一整套的边缘计算+AI能力:高性能数据采集、多类型数据融合、AI算法集成、云端业务对接。在边缘网关的基础上,集成了 IoT 平台的边缘协同能力、本地Web SCADA 和 HMI 功能、本地数据存储、边缘 AI 视频分析、行业应用集成等。
2022年,公司推出 “YiCLOUD|亿琪云”一站式物联网应用解决方案。公司的业务涵盖了智慧城市、智慧农业、智能工厂和智慧园区等多个领域,公司软硬件产品和解决方案获得华为技术认证,得到中国移动OCP认证,公司还是边缘计算产业联盟ECC成员。
感知物联,畅快无限。
联系我们
网站:http://yiqisoft.cn
邮件:support@yiqisoft.cn
电话:021-68863086
手机:186-1666-9123