标签 数据持久化 下的文章

目录

欢迎关注 EdgeX Foundry 中文社区公众号,及时获取最新资料和分享。
2023-10-26T03:15:05.png

日常情况下,在边缘计算端侧,一般不需要数据持久化到本地磁盘或数据库,但是,某些特殊应用下,需要做本地数据保存和分析时,这项功能就非常有必要。本文将介绍 EdgeX 边缘计算框架在本地保存 telemetry 数据到 InfluxDB 的具体方法和结果。

背景介绍

EdgeX 简介

2023-10-23T06:44:33.png
EdgeX Foundry 边缘计算框架的分层和服务为边缘设备/节点和云/企业应用之间提供了一个双向转换引擎。EdgeX 作为边缘计算平台是非常不错的选择。

2023-10-23T06:43:42.png
本文需要采用自定义开发的 EdgeX Application service 来完成数据持久化的工作。

InfluxDB 简介

InfluxDB 是一个由 InfluxData 公司开发的开源时序型数据库。它由Go写成,着力于高性能地查询与存储时序型数据。InfluxDB 被广泛应用于存储系统的监控数据,IoT行业的实时数据等场景。

2023-10-23T06:36:47.png

EdgeX 数据写入(存入)InfluxDB 是非常适合且有必要的,本文使用 MQTT 和 Telegraf 作为 InfluxDB plugin 来实现具体功能。

开发过程

架构设计

2023-10-23T06:48:36.png

EdgeX 应用程序服务(app-influxdb-export)通过 MQTT 将读数导出到 InfluxDB。此服务示例是使用 EdgeX v2.3.0 版本创建的,如需其他版本,请与我们联系或自行开发。

这个管道服务,有两部分组成:

  • 将 EdgeX 事件/读取对象转换为 Influx line 协议;
  • 将结果导出到 MQTT Broker,这是 Telegraf / Influx 用来监听并从中获取数据的 MQTT 主题;
  • InfluxDB 实现数据持久化入库工作;

EdgeX application service

前提条件

  • 需要启动 EdgeX 框架;
  • 启动 MQTT Broker 容器;

Application Service

目录结构很简单,文件不多;

├── Dockerfile // docker 文件
├── LICENSE
├── Makefile // make build
├── README.md
├── app-service-influx.png
├── go.mod // gomod 配置
├── go.sum
├── main.go // 主程序
├── pkg
│   └── transforms
│       └── conversions.go // 转换器
└── res
    └── configuration.toml // 配置文件

主程序,不复杂,遵循 SDK 原则即可;

这里省略源文件,请到github下载。

尝试编译,能成功生成二进制文件;

make build

Docker 容器

Dockerfile 描述文件;

这里省略源文件,请到github下载。

创建 docker image;

docker build . -t edgexfoundry/app-influxdb:2.3.0

docker compose 片段,加入到你的 docker-compose.yml ;

  app-service-influxdb-export:
    image: edgexfoundry/app-influxdb:2.3.0
    ports:
      - 127.0.0.1:59780:59780/tcp
    container_name: edgex-app-influxdb-export
    hostname: edgex-app-influxdb-export
    depends_on:
      - consul
      - data
    environment:
      CLIENTS_CORE_COMMAND_HOST: edgex-core-command
      CLIENTS_CORE_DATA_HOST: edgex-core-data
      CLIENTS_CORE_METADATA_HOST: edgex-core-metadata
      CLIENTS_SUPPORT_NOTIFICATIONS_HOST: edgex-support-notifications
      CLIENTS_SUPPORT_SCHEDULER_HOST: edgex-support-scheduler
      DATABASES_PRIMARY_HOST: edgex-redis
      EDGEX_SECURITY_SECRET_STORE: "false"
      MESSAGEQUEUE_HOST: edgex-redis
      REGISTRY_HOST: edgex-core-consul
      SERVICE_HOST: edgex-app-influxdb-export
      TRIGGER_EDGEXMESSAGEBUS_PUBLISHHOST_HOST: edgex-redis
      TRIGGER_EDGEXMESSAGEBUS_SUBSCRIBEHOST_HOST: edgex-redis
      MQTTCONFIG_BROKERADDRESS: edgex-mqtt-broker:1883
    read_only: true
    restart: always
    networks:
      - edgex-network
    security_opt:
      - no-new-privileges:true
    user: 2002:2001

InfluxDB 环境搭建

Telegraf 配置

配置文件,注意保留你需要的部分;

[[inputs.mqtt_consumer]]
  ## Broker URLs for the MQTT server or cluster.  To connect to multiple
  ## clusters or standalone servers, use a separate plugin instance.
  ##   example: servers = ["tcp://localhost:1883"]
  ##            servers = ["ssl://localhost:1883"]
  ##            servers = ["ws://localhost:1883"]
  servers = ["tcp://192.168.123.12:1883"]

  ## Topics that will be subscribed to.
  topics = [
    "telegraf/host01/cpu",
    "telegraf/+/mem",
    "sensors/#",
    "edgex/EdgeXEvents"
  ]

  ## The message topic will be stored in a tag specified by this value.  If set
  ## to the empty string no topic tag will be created.
  # topic_tag = "topic"

  ## QoS policy for messages
  ##   0 = at most once
  ##   1 = at least once
  ##   2 = exactly once
  ##
  ## When using a QoS of 1 or 2, you should enable persistent_session to allow
  ## resuming unacknowledged messages.
  # qos = 0

  ## Connection timeout for initial connection in seconds
  # connection_timeout = "30s"

  ## Max undelivered messages
  ## This plugin uses tracking metrics, which ensure messages are read to
  ## outputs before acknowledging them to the original broker to ensure data
  ## is not lost. This option sets the maximum messages to read from the
  ## broker that have not been written by an output.
  ##
  ## This value needs to be picked with awareness of the agent's
  ## metric_batch_size value as well. Setting max undelivered messages too high
  ## can result in a constant stream of data batches to the output. While
  ## setting it too low may never flush the broker's messages.
  # max_undelivered_messages = 1000

  ## Persistent session disables clearing of the client session on connection.
  ## In order for this option to work you must also set client_id to identify
  ## the client.  To receive messages that arrived while the client is offline,
  ## also set the qos option to 1 or 2 and don't forget to also set the QoS when
  ## publishing.
  # persistent_session = false

  ## If unset, a random client ID will be generated.
  # client_id = ""

  ## Username and password to connect MQTT server.
  # username = "telegraf"
  # password = "metricsmetricsmetricsmetrics"

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## Client trace messages
  ## When set to true, and debug mode enabled in the agent settings, the MQTT
  ## client's messages are included in telegraf logs. These messages are very
  ## noisey, but essential for debugging issues.
  # client_trace = false

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

  ## Enable extracting tag values from MQTT topics
  ## _ denotes an ignored entry in the topic path
  # [[inputs.mqtt_consumer.topic_parsing]]
  #   topic = ""
  #   measurement = ""
  #   tags = ""
  #   fields = ""
  ## Value supported is int, float, unit
  #   [[inputs.mqtt_consumer.topic.types]]
  #      key = type
# Configuration for sending metrics to InfluxDB 2.0
[[outputs.influxdb_v2]]
  ## The URLs of the InfluxDB cluster nodes.
  ##
  ## Multiple URLs can be specified for a single cluster, only ONE of the
  ## urls will be written to each interval.
  ##   ex: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"]
  urls = ["http://192.168.123.12:8086"]

  ## Token for authentication.
  token = "PlbXqGQqYrnmZh22q88C0eS7CUyVSa9n1t2CjDY1nkA9RbXwM3BHBm0FIK3f3hT6p3oJKOrtFWLqeLgY9WQmyQ=="

  ## Organization is the name of the organization you wish to write to.
  organization = "yiqisoft"

  ## Destination bucket to write into.
  bucket = "edgex"

  ## The value of this tag will be used to determine the bucket.  If this
  ## tag is not set the 'bucket' option is used as the default.
  # bucket_tag = ""

  ## If true, the bucket tag will not be added to the metric.
  # exclude_bucket_tag = false

  ## Timeout for HTTP messages.
  # timeout = "5s"

  ## Additional HTTP headers
  # http_headers = {"X-Special-Header" = "Special-Value"}

  ## HTTP Proxy override, if unset values the standard proxy environment
  ## variables are consulted to determine which proxy, if any, should be used.
  # http_proxy = "http://corporate.proxy:3128"

  ## HTTP User-Agent
  # user_agent = "telegraf"

  ## Content-Encoding for write request body, can be set to "gzip" to
  ## compress body or "identity" to apply no encoding.
  # content_encoding = "gzip"

  ## Enable or disable uint support for writing uints influxdb 2.0.
  # influx_uint_support = false

  ## HTTP/2 Timeouts
  ## The following values control the HTTP/2 client's timeouts. These settings
  ## are generally not required unless a user is seeing issues with client
  ## disconnects. If a user does see issues, then it is suggested to set these
  ## values to "15s" for ping timeout and "30s" for read idle timeout and
  ## retry.
  ##
  ## Note that the timer for read_idle_timeout begins at the end of the last
  ## successful write and not at the beginning of the next write.
  # ping_timeout = "0s"
  # read_idle_timeout = "0s"

  ## Optional TLS Config for use on HTTP connections.
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

Docker 容器

docker-compose.yml

services:
  influxdb:
    image: influxdb:latest
    container_name: influxdb2
    volumes:
      - /opt/devel/influxdb/data:/var/lib/influxdb2:rw
    environment:
      - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=mytoken
    ports:
      - 8086:8086
    restart: unless-stopped

  telegraf:
    image: telegraf:latest
    container_name: telegraf
    volumes:
      #  Sync timezone with host
      - /etc/localtime:/etc/localtime:ro
      #  Map Telegraf configuration file
      - /opt/devel/influxdb/telegraf.conf:/etc/telegraf/telegraf.conf:ro
      #  Map /tmp to permanent storage  (this includes /tmp/metrics.out)
      - /opt/devel/influxdb:/tmp:rw
    environment:
      - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=mytoken
    restart: unless-stopped
    depends_on:
      - influxdb

启动容器

docker compose up -d

实验结果

MQTT broker 内容查看

2023-10-23T07:14:42.png

确认 mqtt 内容为 Influx line protocol;

InfluxDB Graph

2023-10-23T07:32:45.png

确认 InfluxDB 数据接收正常。

开源代码

完整的源代码在:https://github.com/yiqisoft/edgex-app-influxdb-export.git

关于亿琪软件

上海亿琪软件有限公司成立于2016年,专注于 5G 通信、AI 人工智能、边缘计算和大数据网络安全多项技术领域,致力于物联网领域前沿技术的创新,为用户提供全方位、智能化和安全的物联网解决方案。

2023年,公司发布“YiFUSION|工业边缘智能融合一体机”产品,为工业客户提供一整套的边缘计算+AI能力:高性能数据采集、多类型数据融合、AI算法集成、云端业务对接。在边缘网关的基础上,集成了 IoT 平台的边缘协同能力、本地Web SCADA 和 HMI 功能、本地数据存储、边缘 AI 视频分析、行业应用集成等。

2022年,公司推出 “YiCLOUD|亿琪云”一站式物联网应用解决方案。公司的业务涵盖了智慧城市、智慧农业、智能工厂和智慧园区等多个领域,公司软硬件产品和解决方案获得华为技术认证,得到中国移动OCP认证,公司还是边缘计算产业联盟ECC成员。

感知物联,畅快无限。
联系我们
网站:http://yiqisoft.cn
邮件:support@yiqisoft.cn
电话:021-68863086
手机:186-1666-9123