技术|在 RV1126 之上使用 rknn_toolkit_lite 和 eKuiper pipeline 进行图片识别推理
本文使用 Base64,OPENCV,Numpy 和 Image 组件进行数据读取转换。
图片格式
一般在边缘计算端进行推理,不会使用一张一张图片,而是采用视频流读帧的方式。
二进制文件格式
file_bytes = "binary file" // 初始化图片二进制
encoded = base64.decodebytes(file_bytes.encode("ascii")) // 转换成base64
img = Image.open(io.BytesIO(encoded)).resize((width, height)) // 打开图片
input_data = np.array(img) // 转换成 numpy
image = cv2.cvtColor(input_data, cv2.COLOR_BGR2RGB) // OPENCV 读取图片
图片(二进制
)-->base64-->Image-->numpy array-->opencv
Base64 编码格式
如果是 Base64 格式输入,处理方法一样,但是,在输入流的地方,要注意 source 采用 json 编码。
// 代码通用
// 输入格式: {"image": "/9j/4AAQSk....X//2Q=="}
图片(json base64
)-->Image-->numpy array-->opencv
eKuiper source 读取源(输入输入)
eKuiper 定义 source 唯一区别就是 format: binary/json
,假设都是采用 mqtt broker 作为图片输入。
Binary 二进制格式
输入源定义:
{
"Name" : "binaryimage",
"Options" : {
"confKey" : "edgex_mqtt_broker",
"datasource" : "image",
"format" : "binary",
"type" : "mqtt"
},
"Statement" : null,
"StreamFields" : null,
"StreamType" : 0
}
输入图片示例代码, GOLANG,这里的 payload
直接是 ReadFile
,二进制:
package main
import (
"fmt"
"io/ioutil"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func main() {
const TOPIC = "binaryimage"
images := []string{
"parrot.jpg",
"owl.jpg",
"out.jpg",
// 其他你需要的图像
}
opts := mqtt.NewClientOptions().AddBroker("tcp://192.168.123.184:1883")
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
for _, image := range images {
fmt.Println("Publishing " + image)
payload, err := ioutil.ReadFile(image)
if err != nil {
fmt.Println(err)
continue
}
if token := client.Publish(TOPIC, 0, false, payload); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
} else {
fmt.Println("Published " + image)
}
time.Sleep(1 * time.Second)
}
client.Disconnect(0)
}
Json 格式
输入源定义:
{
"Name" : "jsonimage",
"Options" : {
"confKey" : "edgex_mqtt_broker",
"datasource" : "image",
"format" : "json",
"type" : "mqtt"
},
"Statement" : null,
"StreamFields" : null,
"StreamType" : 0
}
输入图片为 base64 编码后的结果,json 格式如下,发送到 mqtt broker 即可:
image.json
{
"image" : "/9j/4AAQSkZJRgABAQEA8ADwAAD/7Ukit.....ucen55+KEpqqGC7pTdx3X//2Q=="
}
eKuiper rule 规则定义
labelImageRknn
函数是由 pyairknn
插件(portable)来实现的逻辑处理。
Json 输入格式处理
这里的 FROM jsonimage
来自上面定义的 source,其他根据自身需要修改即可。labelImageRknn(image) 为 MQTT broker 输出内容作为 rule 的输入,这个有些难以理解,多试几遍就明白了。
{
"actions" : [
{
"mqtt" : {
"bufferLength" : 1024,
"enableCache" : false,
"format" : "json",
"insecureSkipVerify" : false,
"omitIfEmpty" : false,
"protocolVersion" : "3.1.1",
"qos" : 0,
"resourceId" : "edgex_mqtt_broker",
"runAsync" : false,
"sendSingle" : true,
"server" : "tcp://edgex-mqtt-broker:1883",
"topic" : "label"
}
}
],
"id" : "rule-mqtt-image",
"options" : {
"bufferLength" : 1024,
"checkpointInterval" : 300000,
"concurrency" : 1,
"isEventTime" : false,
"lateTolerance" : 1000,
"qos" : 0,
"restartStrategy" : {
"attempts" : 0,
"delay" : 1000,
"jitter" : 0.1,
"maxDelay" : 30000,
"multiplier" : 2
},
"sendError" : true,
"sendMetaToSink" : false
},
"sql" : "SELECT labelImageRknn(image) as label FROM jsonimage"
}
二进制输入格式处理
与 Json 格式处理差别只有 labelImageRknn(image
) --> labelImageRknn(self
),pyairknn 逻辑并没有变化。
{
"sql" : "SELECT labelImageRknn(self) as label FROM jsonimage"
}
pyairknn 核心处理模块
ekuiper 插件 portable
参照官方 pyai 插件,编写特定格式文件一堆,改几个字符就可以。
实现原理
主要逻辑分析如下几个方面:
初始化 rknn
- load model;
- 检查是否符合 RV1126 格式;
- model 载入时间,如果太长,就需要用整机来
预编译
model; - init runtime,不出错即可。
尤其要注意,这些过程要在 5 秒内完成
,因为 eKuiper 超时时间是 5 秒,目前版本还不能对开发者可见修改。这里写一个函数 load_rknn_mode() 来实现清晰的代码。
class LabelImageFunc(Function):
def __init__(self):
load_rknn_model()
pass
def validate(self, args: List[Any]):
if len(args) != 1:
return "invalid arg length"
return ""
def exec(self, args: List[Any], ctx: Context):
logging.debug("executing label")
return label(args[0])
def is_aggregate(self):
return False
核心推理逻辑: label 函数
- 注意这里的
return top5
格式就是返回给 rule 里面的结果,不支持动态解析,要做好准备; - 中间业务逻辑,就是前面说的载入图片到 numpy;
- 然后给 rknn 模块进行 inference;
- 最后对推理结果进行格式化处理;
def label(file_bytes):
logging.info('image object detecting...')
encoded = base64.decodebytes(file_bytes.encode("ascii"))
input_data = np.array(Image.open(io.BytesIO(encoded)))
image = cv2.cvtColor(input_data, cv2.COLOR_BGR2RGB)
image = cv2.resize(image, (224, 224))
t0 = time.time()
outputs = rknn_lite.inference(inputs=[image])
print('rknn inference time: {:.3f}ms'.format((time.time() - t0) * 1000))
show_outputs(outputs)
output = outputs[0][0]
output_sorted = sorted(output, reverse=True)
labels = load_labels(LABELS)
top5 = []
for i in range(5):
value = output_sorted[i]
index = np.where(output == value)
for j in range(len(index)):
if (i + j) >= 5:
break
if value > 0:
for k in range(len(index[j])):
top5.append({"index": labels[index[j][k]], "value": float(value)})
else:
top5.append({"index": "-1", "value": 0.0})
return top5
最终效果
eKuiper 日志打印
如果有错也会在这里出现,便于查看。可以打开 DEBUG 日志,更加详细。
time="2023-02-17 13:16:23" level=info msg="Start source mqttimage instance 0 successfully" file="node/source_node.go:138" rule=rule-mqtt-image
time="2023-02-17 13:16:23" level=info msg="new subscription for topic image, reqId is rule-mqtt-image_mqttimage_0" file="mqtt/mqtt_wrapper.go:220" rule=rule-mqtt-image
time="2023-02-17 13:16:23" level=info msg="Successfully subscribed to topic image." file="source/mqtt_source.go:111" rule=rule-mqtt-image
I:root:image object detecting...
rknn inference time: 7.212ms
mobilenet_v1
-----TOP 5-----
[ 9 83]: 0.2010498046875
[ 9 83]: 0.2010498046875
2023-02-17 13:16:26,316 - /kuiper/plugins/portable/pyairknn/label.py[line:68] - I: image object detecting...
Mqtt 输出
测试阶段,使用 mqtt broker 可以实时看到结果,方便调试。
{
"label" : [
{
"index" : "84:草原鸡、草原松鸡、草原鸡",
"value" : 0.2236328125
},
{
"index" : "9:母鸡",
"value" : 0.2010498046875
},
{
"index" : "83:皱褶松鸡、鹧鸪、Bonasa umbellus",
"value" : 0.2010498046875
},
{
"index" : "9:母鸡",
"value" : 0.2010498046875
},
{
"index" : "83:皱褶松鸡、鹧鸪、Bonasa umbellus",
"value" : 0.2010498046875
},
{
"index" : "139:大鸨",
"value" : 0.10638427734375
},
{
"index" : "87:鹧鸪",
"value" : 0.0626220703125
}
]
}
YiCLOUD IoT 平台输出
还可以通过脚本或插件方式,将结果输出到 IoT 平台,这里以 YiCLOUD 平台为例。