百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

使用EdgeX的eKuiper规则引擎,配置处理数据,并转发到http服务上

bigegpt 2024-08-04 11:28 1 浏览

#头条创作挑战赛#

目录

  • 前言
  • 1,关于 eKuiper 规则引擎
  • 2,使用 EdgeX 进行相关开发。
  • 3,在edgex 管理端上面进行操作:
  • 4,或者升级到最新的ui:2.2.0也可以解决问题
  • 5,主要是利用redis的topic订阅发布消息
  • 6,编写beego的http服务接收数据
  • 7,总结EdgeX规则引擎使用

1,关于 ekuiper 规则引擎


LF Edge eKuiper 是一个轻量级的物联网数据分析和流处理引擎,运行在资源受限的边缘设备上。
可以进行配置规则,使用的是golang 进行开发的。
使用 apache2.0 协议开源的。
https://ekuiper.org/zh
github 地址:
https://github.com/lf-edge/ekuiper

简单地介绍:


主要就是编写SQL Flow 语句,进行规则配置。
视频参考:
https://www.bilibili.com/video/BV11B4y1V7pQ/

EdgeX 规则引擎 eKuiper 实战

2,使用 EdgeX 进行相关开发。


可以增加官方的管理后台界面,修改docker-compose的配置。

  rulesengine:
    container_name: edgex-kuiper
    depends_on:
    - database
    environment:
      CONNECTION__EDGEX__REDISMSGBUS__PORT: 6379
      CONNECTION__EDGEX__REDISMSGBUS__PROTOCOL: redis
      CONNECTION__EDGEX__REDISMSGBUS__SERVER: edgex-redis
      CONNECTION__EDGEX__REDISMSGBUS__TYPE: redis
      EDGEX__DEFAULT__PORT: 6379
      EDGEX__DEFAULT__PROTOCOL: redis
      EDGEX__DEFAULT__SERVER: edgex-redis
      EDGEX__DEFAULT__TOPIC: rules-events
      EDGEX__DEFAULT__TYPE: redis
      KUIPER__BASIC__CONSOLELOG: "true"
      KUIPER__BASIC__RESTPORT: 59720
    hostname: edgex-kuiper
    image: lfedge/ekuiper:1.4.4-alpine
    networks:
      edgex-network: {}
    ports:
    - 127.0.0.1:59720:59720/tcp
    read_only: true
    restart: always
    security_opt:
    - no-new-privileges:true
    user: kuiper:kuiper
    volumes:
    - kuiper-data:/kuiper/data:z
  rulesengine-manager:
    container_name: edgex-kuiper-manager
    image: emqx/ekuiper-manager:1.6
    networks:
      edgex-network: {}
    ports:
    - 9082:9082/tcp

特别注意端口是9082 ,镜像地址是:emqx/ekuiper-manager:1.6,同时放到一个网络里面。就可以直接访问edgex的规则引擎了。

https://ekuiper.org/docs/zh/latest/operation/manager-ui/overview.html

# 地址:http://localhost:9082
用户名:admin
密码:public

默认进入管理端是空,需要增加服务:http://edgex-kuiper:59720



增加成功了,同时相关的配置也读取到了,操作也很方便。


3,在edgex 管理端上面进行操作:


http://127.0.0.1:4000/

创建一个 edgex 共享的流

# 简单配置
CREATE STREAM EdgexStream () WITH (
  FORMAT = "JSON",
  TYPE = "edgex"
)
# 或者复杂点的配置也加上
CREATE STREAM EdgexStream () WITH (
  DATASOURCE = "redis",
  KEY = "",
  FORMAT = "json",
  CONF_KEY = "default",
  TYPE = "edgex",
  STRICT_VALIDATION = "true",
  TIMESTAMP = "",
  TIMESTAMP_FORMAT = "",
  RETAIN_SIZE = "0",
  SHARED = "true"
)
SELECT * FROM  EdgexStream

创建规则


可以设置一个空消息:


也可以定制一个消息模板,配置好地址和服务:


结果服务没有启动成功:

报错:

Stopped: read properties map[method:GET retryInterval:1 sendSingle:false url:http://edgex-core-command:59882/edgex/api/getData] fail with error: 2 error(s) decoding: * 'retryInterval' expected type 'int', got unconvertible type 'string', value: '1' * 'sendSingle' expected type 'bool', got unconvertible type 'string', value: 'false'.

看样子是界面传递的 false 在 edgex 中转换错误了。

{
    "triggered": true, 
    "id": "rule1", 
    "sql": "SELECT * FROM  EdgexStream", 
    "actions": [
        {
            "rest": {
                "retryInterval": 1, 
                "sendSingle": false, 
                "url": "http://edgex-core-command:59882/edgex/api/getData", 
                "method": "GET"
            }
        }
    ], 
    "options": {
        "isEventTime": false, 
        "lateTolerance": 1000, 
        "concurrency": 1, 
        "bufferLength": 1024, 
        "sendMetaToSink": false, 
        "sendError": true, 
        "qos": 0, 
        "checkpointInterval": 300000
    }
}

直接使用 postman 提交数据:


然后可以了,规则服务终于可以了,估计需要升级 ui 和 规则引擎的版本不兼容造成的。


更多详细的配置参考:
https://ekuiper.org/docs/zh/latest/edgex/edgex_source_tutorial.html

SELECT * FROM EdgexStream WHERE meta (deviceName) = "device-virtual"

或者升级到最新的ui 版本也可以解决这个问题。

4,或者升级到最新的ui:2.2.0也可以解决问题



要保障 device-virtual 的设备启动成功,就开始往 edgex 当中发送数据。
配置好了规则引擎之后就可以接受触发的数据了。

写的假地址,这样就可以算接受数据了。

2022/10/29 14:00:19.874 [D] [server.go:2836]  |    172.18.0.10| 404 |   1.156644ms| nomatch| GET      /edgex/api/getData
2022/10/29 14:00:19.876 [D] [server.go:2836]  |    172.18.0.10| 404 |    257.062μs| nomatch| GET      /edgex/api/getData
2022/10/29 14:00:19.878 [D] [server.go:2836]  |    172.18.0.10| 404 |    297.857μs| nomatch| GET      /edgex/api/getData
2022/10/29 14:00:19.879 [D] [server.go:2836]  |    172.18.0.10| 404 |    258.943μs| nomatch| GET      /edgex/api/getData

也可以配置成发送到edgex的其他服务上:


然后再看处理的消息数量,有所增加。

5,主要是利用redis的topic订阅发布消息


通过订阅 SUBSCRIBE rules-events 消息是通过这个发布的。格式

$ docker exec -it edgex-redis sh
/data # redis-cli 
127.0.0.1:6379> SUBSCRIBE rules-events
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "rules-events"
3) (integer) 1
1) "message"
2) "rules-events"
3) "{\"ReceivedTopic\":\"\",\"CorrelationID\":\"9a7a86fd-eac7-43ea-881a-da4672592f1f\",\"Payload\":\"eyJhcGlWZXJzaW9uIjoidjIiLCJpZCI6IjgwZjBjMjgwLTQwNjEtNDRlNy04MjJmLTQ3MGNiZjQ0N2M2OCIsImRldmljZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwicHJvZmlsZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwic291cmNlTmFtZSI6IkZsb2F0MzIiLCJvcmlnaW4iOjE2NjcwNTU2Nzk3OTM2MzYwNjEsInJlYWRpbmdzIjpbeyJpZCI6Ijg3MzI2ZDE5LWM2NTEtNGE3NC04YWJkLTkxOTkyMjQ0ZTYzZSIsIm9yaWdpbiI6MTY2NzA1NTY3OTc5MzYzNjA2MSwiZGV2aWNlTmFtZSI6IlJhbmRvbS1GbG9hdC1EZXZpY2UiLCJyZXNvdXJjZU5hbWUiOiJGbG9hdDMyIiwicHJvZmlsZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwidmFsdWVUeXBlIjoiRmxvYXQzMiIsInZhbHVlIjoiMS42NTkwNzllKzM4In1dfQ==\",\"ContentType\":\"application/json\"}"
1) "message"
2) "rules-events"
3) "{\"ReceivedTopic\":\"\",\"CorrelationID\":\"232d1863-be69-4cbc-971f-29c81e8b3d61\",\"Payload\":\"eyJhcGlWZXJzaW9uIjoidjIiLCJpZCI6IjQ2ZGU0ZGE2LTcwMWEtNGQ3ZC1iZmRiLTNmMGI4OTRjNmVjYiIsImRldmljZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwicHJvZmlsZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwic291cmNlTmFtZSI6IkZsb2F0NjQiLCJvcmlnaW4iOjE2NjcwNTU2Nzk3OTQwOTg3MDUsInJlYWRpbmdzIjpbeyJpZCI6IjBjYTkyZGVkLTIyNjUtNDYyMC1hMzc1LWUxMDllNDU2MThjNSIsIm9yaWdpbiI6MTY2NzA1NTY3OTc5NDA5ODcwNSwiZGV2aWNlTmFtZSI6IlJhbmRvbS1GbG9hdC1EZXZpY2UiLCJyZXNvdXJjZU5hbWUiOiJGbG9hdDY0IiwicHJvZmlsZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwidmFsdWVUeXBlIjoiRmxvYXQ2NCIsInZhbHVlIjoiLTUuMzM1MTI4ZSszMDcifV19\",\"ContentType\":\"application/json\"}"
1) "message"

使用的是base64 加密了下,估计是防止json 数据问题。解析后的数据:
https://tool.oschina.net/encrypt?type=3

使用工具转换下,可以看到是virtual设备发送的信息。

{"apiVersion":"v2","id":"80f0c280-4061-44e7-822f-470cbf447c68","deviceName":"Random-Float-Device","profileName":"Random-Float-Device","sourceName":"Float32","origin":1667055679793636061,"readings":[{"id":"87326d19-c651-4a74-8abd-91992244e63e","origin":1667055679793636061,"deviceName":"Random-Float-Device","resourceName":"Float32","profileName":"Random-Float-Device","valueType":"Float32","value":"1.659079e+38"}]}

6,编写beego的http服务接收数据


配置好规则地址是 POST方法,然后数据格式是json。
使用beego 的POST 方法接受数据:

func (c *IndexController) GetEdgexData() {
	body := c.Ctx.Input.RequestBody

	log.Println("########### GetEdgexData :",string(body)," ###########")
	defer c.ServeJSON()
	return
}

本来以为是给json,但是是给数组,数组里面是个json。

2022/10/29 15:59:30 ########### GetEdgexData : [{"Bool":true}]  ###########
2022/10/29 15:59:30.385 [D] [server.go:2836]  |     172.17.0.1| 200 |    308.432μs|   match| POST     /edgex/api/getData   r:/edgex/api/getData
2022/10/29 15:59:35 filterAdmin request url  :  /edgex/api/getData
2022/10/29 15:59:35 ########### GetEdgexData : [{"Int8":-46}]  ###########
2022/10/29 15:59:35.163 [D] [server.go:2836]  |     172.17.0.1| 200 |    162.124μs|   match| POST     /edgex/api/getData   r:/edgex/api/getData 
2022/10/29 15:59:35 ########### GetEdgexData : [{"Int64":5618840669469908222}]  ###########
2022/10/29 15:59:35.164 [D] [server.go:2836]  |     172.17.0.1| 200 |    196.329μs|   match| POST     /edgex/api/getData   r:/edgex/api/getData
2022/10/29 15:59:35 ########### GetEdgexData : [{"Int16":-11546}]  ###########
2022/10/29 15:59:35.165 [D] [server.go:2836]  |     172.17.0.1| 200 |     132.69μs|   match| POST     /edgex/api/getData   r:/edgex/api/getData

7,总结EdgeX规则引擎使用


总体来说 EdgeX的eKuiper 规则引擎使用起来是非常的方便的。
通过和 EdgeX 深度整合,可以直接转换调用成各种方法,也可以自定义转发到 rest mqtt mq 等地方。
同时可以接收到device-virtual的数据。整个流程也非常清晰方便。

相关推荐

得物可观测平台架构升级:基于GreptimeDB的全新监控体系实践

一、摘要在前端可观测分析场景中,需要实时观测并处理多地、多环境的运行情况,以保障Web应用和移动端的可用性与性能。传统方案往往依赖代理Agent→消息队列→流计算引擎→OLAP存储...

warm-flow新春版:网关直连和流程图重构

本期主要解决了网关直连和流程图重构,可以自此之后可支持各种复杂的网关混合、多网关直连使用。-新增Ruoyi-Vue-Plus优秀开源集成案例更新日志[feat]导入、导出和保存等新增json格式支持...

扣子空间体验报告

在数字化时代,智能工具的应用正不断拓展到我们工作和生活的各个角落。从任务规划到项目执行,再到任务管理,作者深入探讨了这款工具在不同场景下的表现和潜力。通过具体的应用实例,文章展示了扣子空间如何帮助用户...

spider-flow:开源的可视化方式定义爬虫方案

spider-flow简介spider-flow是一个爬虫平台,以可视化推拽方式定义爬取流程,无需代码即可实现一个爬虫服务。spider-flow特性支持css选择器、正则提取支持JSON/XML格式...

solon-flow 你好世界!

solon-flow是一个基础级的流处理引擎(可用于业务规则、决策处理、计算编排、流程审批等......)。提供有“开放式”驱动定制支持,像jdbc有mysql或pgsql等驱动,可...

新一代开源爬虫平台:SpiderFlow

SpiderFlow:新一代爬虫平台,以图形化方式定义爬虫流程,不写代码即可完成爬虫。-精选真开源,释放新价值。概览Spider-Flow是一个开源的、面向所有用户的Web端爬虫构建平台,它使用Ja...

通过 SQL 训练机器学习模型的引擎

关注薪资待遇的同学应该知道,机器学习相关的岗位工资普遍偏高啊。同时随着各种通用机器学习框架的出现,机器学习的门槛也在逐渐降低,训练一个简单的机器学习模型变得不那么难。但是不得不承认对于一些数据相关的工...

鼠须管输入法rime for Mac

鼠须管输入法forMac是一款十分新颖的跨平台输入法软件,全名是中州韵输入法引擎,鼠须管输入法mac版不仅仅是一个输入法,而是一个输入法算法框架。Rime的基础架构十分精良,一套算法支持了拼音、...

Go语言 1.20 版本正式发布:新版详细介绍

Go1.20简介最新的Go版本1.20在Go1.19发布六个月后发布。它的大部分更改都在工具链、运行时和库的实现中。一如既往,该版本保持了Go1的兼容性承诺。我们期望几乎所...

iOS 10平台SpriteKit新特性之Tile Maps(上)

简介苹果公司在WWDC2016大会上向人们展示了一大批新的好东西。其中之一就是SpriteKitTileEditor。这款工具易于上手,而且看起来速度特别快。在本教程中,你将了解关于TileE...

程序员简历例句—范例Java、Python、C++模板

个人简介通用简介:有良好的代码风格,通过添加注释提高代码可读性,注重代码质量,研读过XXX,XXX等多个开源项目源码从而学习增强代码的健壮性与扩展性。具备良好的代码编程习惯及文档编写能力,参与多个高...

Telerik UI for iOS Q3 2015正式发布

近日,TelerikUIforiOS正式发布了Q32015。新版本新增对XCode7、Swift2.0和iOS9的支持,同时还新增了对数轴、不连续的日期时间轴等;改进TKDataPoin...

ios使用ijkplayer+nginx进行视频直播

上两节,我们讲到使用nginx和ngixn的rtmp模块搭建直播的服务器,接着我们讲解了在Android使用ijkplayer来作为我们的视频直播播放器,整个过程中,需要注意的就是ijlplayer编...

IOS技术分享|iOS快速生成开发文档(一)

前言对于开发人员而言,文档的作用不言而喻。文档不仅可以提高软件开发效率,还能便于以后的软件开发、使用和维护。本文主要讲述Objective-C快速生成开发文档工具appledoc。简介apple...

macOS下配置VS Code C++开发环境

本文介绍在苹果macOS操作系统下,配置VisualStudioCode的C/C++开发环境的过程,本环境使用Clang/LLVM编译器和调试器。一、前置条件本文默认前置条件是,您的开发设备已...