百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

SpringBoot集成MQTT服务(springboot集成netty websocket)

bigegpt 2024-08-01 11:54 7 浏览

最近实现一个物联网小型应用,需要实现MQTT服务,在小型系统中尤其是通用产品中不宜依赖过多的独立服务,这样使得产品的分发与部署会变得复杂,这里采用内部集成MQTT服务方案,只需部署一个应用就可以实现应用的所有功能。

1、依赖包引入

在SpringBoot项目中集成MQTT服务,这里选择的是基于ActiveMQ提供的MQTT服务,ActiveMQ可以独立部署也可以嵌入到自有应用中,在SpringBoot中集成ActiveMQ比较容易,首先需要引入要用到的jar依赖包:

<!-- ActiveMQ 不推荐直接引入ActiveMQ all,会引入不必要的依赖 -->
<dependency>
  <!-- spring boot连接ActiveMQ的集成依赖 -->
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
   <!-- spring boot 集成ActiveMQ 核心依赖 -->
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-spring</artifactId>
</dependency>
<dependency>
   <!-- activemq stomp协议依赖 -->
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-stomp</artifactId>
</dependency>
<dependency>
   <!-- activemq amqp协议依赖 -->
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-amqp</artifactId>
</dependency>
<dependency>
   <!-- activemq mqtt协议依赖 -->
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-mqtt</artifactId>
</dependency>
<dependency>
   <!-- activemq kahadb持久化依赖 -->
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-kahadb-store</artifactId>
</dependency>
<!-- end ActiveMQ -->

2、ActiveMQ服务配置

依赖引入完成以后,需要建立ActiveMQ的配置文件,这里可以直接在原ActiveMQ配置文件基础上进行修改,由于是嵌入式的,所以就不需要原独立部署的web管理界面了,配置文件放在SpringBoot项目中的resources目录下,名为activemq.xml,配置内容如下:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="activemq-data">
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry topic=">" producerFlowControl="true">
                        <pendingMessageLimitStrategy>
                            <constantPendingMessageLimitStrategy limit="1000"/>
                        </pendingMessageLimitStrategy>
                    </policyEntry>
                    <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>
        <persistenceAdapter>
            <kahaDB directory="activemq-data/kahadb"/><!-- kahabd持久化目录 -->
        </persistenceAdapter>
        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="64 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>
        <transportConnectors>
            <!-- 如果需要实现其它协议,需要引入对应的协议依赖jar包 -->
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>
    </broker>
</beans>

3、在SpringBoot应用加入ActiveMQ服务

在SpringBoot加入ActiveMQ工厂服务实例,以下代码加在SpringBoot启动类中。

/**
 * ActiveMQ服务初始化
 * @return ActiveMQ Broker工厂实例
 * @throws Exception
 */
 @Bean
 public BrokerFactoryBean brokerService() throws Exception {
      BrokerFactoryBean brokerFactoryBean = new BrokerFactoryBean();
      ClassPathResource res = new ClassPathResource("activemq.xml");//加载配置信息
      brokerFactoryBean.setConfig(res);//加载配置信息
      brokerFactoryBean.setStart(true);//启动服务
      return brokerFactoryBean;
 }

4、连接本地ActiveMQ服务

经过以上配置以后,就可以随应一起启动ActiveMQ服务了,这里实现了AMQP、STOMP、MQTT三种协议,由于是在本地连接同一JVM虚拟机内的服务,就不需要像连接外部ActiveMQ服务那样指定IP和端口,这里只需要指定其服务名就可以了,服务名是在ActiveMQ中的brokerName指定的,在application.properties的文加入以下配置:

#这里指向的连接是本应用内的localhost服务,这里采用vm连接
spring.activemq.broker-url=vm://localhost
#开启订阅模式,MQTT协议是以订阅模式存在的
spring.jms.pub-sub-domain=true

5、在应用中使用MQTT服务,进行业务研发

经过以上配置以后,就可以在应用中像使用普通JMS一样来与MQTT设备进行数据交互,MQTT订阅是可以使用通配符进行主题订阅的,通常来讲某一个设备上传数据会以编号+主题的方式实现,如topic:/run/0001/data,其中topic为固定协议头,run为数据分类,0001为设备的编号,data为具体的功能块数据,如果要订阅这一数据,显然不可能分每一个设备单独订阅一个主题,这个时候就可以订阅topic:/run/*/data这个主题,这样所有设备的这个主题都能订阅到。在java中分隔符"/"要用"."代替。

 //-----------订阅run下的data主题--------------------
/**
 * 订阅.ru.*.data主题
 * @param message 消息内容
 */
 @JmsListener(destination = ".ru.*.data")
 public void onRun(ActiveMQMessage message) throws Exception{
    String body = null;//消息内容
    String hid = null;//设备编号
    String type = message.getJMSDestination().toString();//主题类型,形如:topic:/run/0001/data
    //--------解析消息内容--------------------------
    if(null!=message.getContent()){
       body = new String(message.getContent().getData(),"UTF-8");//byte[]类型的消息内容
    }else if(message instanceof ActiveMQTextMessage) {
       ActiveMQTextMessage sg = (ActiveMQTextMessage) message;//String类型的消息内容
       body=sg.getText();
    }
    //-------从主题类型中解析出设备编号---------------
    hid = StringUtils.splitByWholeSeparator(type,".")[2];//设备编号
    //------业务处理代码----------------------------
 }

6、向设备端发布主题

这里采用JSM的方式向设备端发布对应的主题及内容,具体代码如下:

private final JmsTemplate jmsTemplate;//JMS发送实例

/**
 * 发布主题
 * @param topic 主题,形如 .run.0001.data,这里的.相当于/
 * @param body 消息内容
 */
 public void sendTopic(String topic,String body){
     jmsTemplate.convertAndSend(topic,body.getBytes());//这里以byte[]的内容发送
 }

相关推荐

得物可观测平台架构升级:基于GreptimeDB的全新监控体系实践

一、摘要在前端可观测分析场景中,需要实时观测并处理多地、多环境的运行情况,以保障Web应用和移动端的可用性与性能。传统方案往往依赖代理Agent→消息队列→流计算引擎→OLAP存储...

warm-flow新春版:网关直连和流程图重构

本期主要解决了网关直连和流程图重构,可以自此之后可支持各种复杂的网关混合、多网关直连使用。-新增Ruoyi-Vue-Plus优秀开源集成案例更新日志[feat]导入、导出和保存等新增json格式支持...

扣子空间体验报告

在数字化时代,智能工具的应用正不断拓展到我们工作和生活的各个角落。从任务规划到项目执行,再到任务管理,作者深入探讨了这款工具在不同场景下的表现和潜力。通过具体的应用实例,文章展示了扣子空间如何帮助用户...

spider-flow:开源的可视化方式定义爬虫方案

spider-flow简介spider-flow是一个爬虫平台,以可视化推拽方式定义爬取流程,无需代码即可实现一个爬虫服务。spider-flow特性支持css选择器、正则提取支持JSON/XML格式...

solon-flow 你好世界!

solon-flow是一个基础级的流处理引擎(可用于业务规则、决策处理、计算编排、流程审批等......)。提供有“开放式”驱动定制支持,像jdbc有mysql或pgsql等驱动,可...

新一代开源爬虫平台:SpiderFlow

SpiderFlow:新一代爬虫平台,以图形化方式定义爬虫流程,不写代码即可完成爬虫。-精选真开源,释放新价值。概览Spider-Flow是一个开源的、面向所有用户的Web端爬虫构建平台,它使用Ja...

通过 SQL 训练机器学习模型的引擎

关注薪资待遇的同学应该知道,机器学习相关的岗位工资普遍偏高啊。同时随着各种通用机器学习框架的出现,机器学习的门槛也在逐渐降低,训练一个简单的机器学习模型变得不那么难。但是不得不承认对于一些数据相关的工...

鼠须管输入法rime for Mac

鼠须管输入法forMac是一款十分新颖的跨平台输入法软件,全名是中州韵输入法引擎,鼠须管输入法mac版不仅仅是一个输入法,而是一个输入法算法框架。Rime的基础架构十分精良,一套算法支持了拼音、...

Go语言 1.20 版本正式发布:新版详细介绍

Go1.20简介最新的Go版本1.20在Go1.19发布六个月后发布。它的大部分更改都在工具链、运行时和库的实现中。一如既往,该版本保持了Go1的兼容性承诺。我们期望几乎所...

iOS 10平台SpriteKit新特性之Tile Maps(上)

简介苹果公司在WWDC2016大会上向人们展示了一大批新的好东西。其中之一就是SpriteKitTileEditor。这款工具易于上手,而且看起来速度特别快。在本教程中,你将了解关于TileE...

程序员简历例句—范例Java、Python、C++模板

个人简介通用简介:有良好的代码风格,通过添加注释提高代码可读性,注重代码质量,研读过XXX,XXX等多个开源项目源码从而学习增强代码的健壮性与扩展性。具备良好的代码编程习惯及文档编写能力,参与多个高...

Telerik UI for iOS Q3 2015正式发布

近日,TelerikUIforiOS正式发布了Q32015。新版本新增对XCode7、Swift2.0和iOS9的支持,同时还新增了对数轴、不连续的日期时间轴等;改进TKDataPoin...

ios使用ijkplayer+nginx进行视频直播

上两节,我们讲到使用nginx和ngixn的rtmp模块搭建直播的服务器,接着我们讲解了在Android使用ijkplayer来作为我们的视频直播播放器,整个过程中,需要注意的就是ijlplayer编...

IOS技术分享|iOS快速生成开发文档(一)

前言对于开发人员而言,文档的作用不言而喻。文档不仅可以提高软件开发效率,还能便于以后的软件开发、使用和维护。本文主要讲述Objective-C快速生成开发文档工具appledoc。简介apple...

macOS下配置VS Code C++开发环境

本文介绍在苹果macOS操作系统下,配置VisualStudioCode的C/C++开发环境的过程,本环境使用Clang/LLVM编译器和调试器。一、前置条件本文默认前置条件是,您的开发设备已...