最近实现一个物联网小型应用,需要实现MQTT服务,在小型系统中尤其是通用产品中不宜依赖过多的独立服务,这样使得产品的分发与部署会变得复杂,这里采用内部集成MQTT服务方案,只需部署一个应用就可以实现应用的所有功能。
1、依赖包引入
在SpringBoot项目中集成MQTT服务,这里选择的是基于ActiveMQ提供的MQTT服务,ActiveMQ可以独立部署也可以嵌入到自有应用中,在SpringBoot中集成ActiveMQ比较容易,首先需要引入要用到的jar依赖包:
<!-- ActiveMQ 不推荐直接引入ActiveMQ all,会引入不必要的依赖 -->
<dependency>
<!-- spring boot连接ActiveMQ的集成依赖 -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<!-- spring boot 集成ActiveMQ 核心依赖 -->
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
</dependency>
<dependency>
<!-- activemq stomp协议依赖 -->
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-stomp</artifactId>
</dependency>
<dependency>
<!-- activemq amqp协议依赖 -->
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-amqp</artifactId>
</dependency>
<dependency>
<!-- activemq mqtt协议依赖 -->
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-mqtt</artifactId>
</dependency>
<dependency>
<!-- activemq kahadb持久化依赖 -->
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
</dependency>
<!-- end ActiveMQ -->
2、ActiveMQ服务配置
依赖引入完成以后,需要建立ActiveMQ的配置文件,这里可以直接在原ActiveMQ配置文件基础上进行修改,由于是嵌入式的,所以就不需要原独立部署的web管理界面了,配置文件放在SpringBoot项目中的resources目录下,名为activemq.xml,配置内容如下:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="activemq-data">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<kahaDB directory="activemq-data/kahadb"/><!-- kahabd持久化目录 -->
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="64 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<!-- 如果需要实现其它协议,需要引入对应的协议依赖jar包 -->
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
</broker>
</beans>
3、在SpringBoot应用加入ActiveMQ服务
在SpringBoot加入ActiveMQ工厂服务实例,以下代码加在SpringBoot启动类中。
/**
* ActiveMQ服务初始化
* @return ActiveMQ Broker工厂实例
* @throws Exception
*/
@Bean
public BrokerFactoryBean brokerService() throws Exception {
BrokerFactoryBean brokerFactoryBean = new BrokerFactoryBean();
ClassPathResource res = new ClassPathResource("activemq.xml");//加载配置信息
brokerFactoryBean.setConfig(res);//加载配置信息
brokerFactoryBean.setStart(true);//启动服务
return brokerFactoryBean;
}
4、连接本地ActiveMQ服务
经过以上配置以后,就可以随应一起启动ActiveMQ服务了,这里实现了AMQP、STOMP、MQTT三种协议,由于是在本地连接同一JVM虚拟机内的服务,就不需要像连接外部ActiveMQ服务那样指定IP和端口,这里只需要指定其服务名就可以了,服务名是在ActiveMQ中的brokerName指定的,在application.properties的文加入以下配置:
#这里指向的连接是本应用内的localhost服务,这里采用vm连接
spring.activemq.broker-url=vm://localhost
#开启订阅模式,MQTT协议是以订阅模式存在的
spring.jms.pub-sub-domain=true
5、在应用中使用MQTT服务,进行业务研发
经过以上配置以后,就可以在应用中像使用普通JMS一样来与MQTT设备进行数据交互,MQTT订阅是可以使用通配符进行主题订阅的,通常来讲某一个设备上传数据会以编号+主题的方式实现,如topic:/run/0001/data,其中topic为固定协议头,run为数据分类,0001为设备的编号,data为具体的功能块数据,如果要订阅这一数据,显然不可能分每一个设备单独订阅一个主题,这个时候就可以订阅topic:/run/*/data这个主题,这样所有设备的这个主题都能订阅到。在java中分隔符"/"要用"."代替。
//-----------订阅run下的data主题--------------------
/**
* 订阅.ru.*.data主题
* @param message 消息内容
*/
@JmsListener(destination = ".ru.*.data")
public void onRun(ActiveMQMessage message) throws Exception{
String body = null;//消息内容
String hid = null;//设备编号
String type = message.getJMSDestination().toString();//主题类型,形如:topic:/run/0001/data
//--------解析消息内容--------------------------
if(null!=message.getContent()){
body = new String(message.getContent().getData(),"UTF-8");//byte[]类型的消息内容
}else if(message instanceof ActiveMQTextMessage) {
ActiveMQTextMessage sg = (ActiveMQTextMessage) message;//String类型的消息内容
body=sg.getText();
}
//-------从主题类型中解析出设备编号---------------
hid = StringUtils.splitByWholeSeparator(type,".")[2];//设备编号
//------业务处理代码----------------------------
}
6、向设备端发布主题
这里采用JSM的方式向设备端发布对应的主题及内容,具体代码如下:
private final JmsTemplate jmsTemplate;//JMS发送实例
/**
* 发布主题
* @param topic 主题,形如 .run.0001.data,这里的.相当于/
* @param body 消息内容
*/
public void sendTopic(String topic,String body){
jmsTemplate.convertAndSend(topic,body.getBytes());//这里以byte[]的内容发送
}