1.pom.xml
<!-- 连接MQTT-->
<!--mqtt-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2.yml配置
mqtt:
config:
username: admin
password: admin # 密码
hostUrl: tcp://xx.xx.xxx.xx:1883 # tcp://ip:端口
clientId: emq-ss-client # 客户端id
defaultTopic: topic_default # 订阅主题
timeout: 1000 # 超时时间 (单位:秒)
keepalive: 60 # 心跳 (单位:秒)
enabled: false # 是否使能mqtt功能
3.MqttProperties
package com.rfos.assistsilkworm.mqtt.properties;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @Author Michale
* @CreateDate 2022/9/4
* @Describe 读取mqtt配置信息
*/
@Data
@Component
@ConfigurationProperties(prefix = "mqtt.config")
public class MqttProperties {
@ApiModelProperty("用户名")
private String username;
@ApiModelProperty("密码")
private String password;
@ApiModelProperty("地址")
private String hostUrl;
@ApiModelProperty("客户端id")
private String clientId;
@ApiModelProperty("订阅主题")
private String defaultTopic;
@ApiModelProperty("超时时间")
private int timeout;
@ApiModelProperty("心跳")
private int keepalive;
@ApiModelProperty("mqtt开关")
private boolean enabled;
}
4.VO传输实体类
package com.rfos.assistsilkworm.mqtt.vo;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
* @Author rfos
* @Date 2023/2/20 17:08
* @Description TODO 消息体
*/
@Data
public class MqttVO {
@ApiModelProperty("订阅的主题")
public String topic ;
@ApiModelProperty("发送的内容")
public String payload ;
}
5.MqttReceiveHandle
package com.rfos.assistsilkworm.mqtt.handler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
/**
* mqtt客户端消息处理类
**/
@Slf4j
@Component
public class MqttReceiveHandler implements MessageHandler {
@Override
@ServiceActivator(inputChannel = "MQTT_INPUT_CHANNEL")
public void handleMessage(Message<?> message) throws MessagingException {
log.info("收到订阅消息: {}", message);
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
log.info("消息主题:{}", topic);
}
}
6.MqttConfig
package com.rfos.assistsilkworm.mqtt.config;
import com.rfos.assistsilkworm.mqtt.handler.MqttReceiveHandler;
import com.rfos.assistsilkworm.mqtt.properties.MqttProperties;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import java.util.Arrays;
import java.util.List;
/**
* mqtt 推送and接收 消息类
**/
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttConfig {
@Autowired
private MqttProperties mqttProperties;
@Autowired
private MqttReceiveHandler mqttReceiveHandle;
/**
* MQTT连接器选项
**/
@Bean(value = "getMqttConnectOptions")
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
mqttConnectOptions.setCleanSession(true);
// 设置超时时间 单位为秒
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setUserName(mqttProperties.getUsername());
mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getHostUrl()});
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
mqttConnectOptions.setKeepAliveInterval(10);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
//mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
return mqttConnectOptions;
}
/**
* MQTT工厂
**/
@Bean("mqttClientFactor")
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
/**
* MQTT信息通道(生产者)
**/
@Bean("MQTT_OUT_BOUND_CHANNEL")
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息处理器(生产者)
**/
@Bean
@ServiceActivator(inputChannel = "MQTT_OUT_BOUND_CHANNEL")
public MqttPahoMessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + "_producer", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
messageHandler.setAsyncEvents(true); // 消息发送和传输完成会有异步的通知回调
//设置转换器 发送bytes数据
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);
return messageHandler;
}
/**
* 配置client,监听的topic
* MQTT消息订阅绑定(消费者)
**/
@Bean("inbound")
public MqttPahoMessageDrivenChannelAdapter inbound() {
List<String> topicList = Arrays.asList(mqttProperties.getDefaultTopic().trim().split(","));
String[] topics = new String[topicList.size()];
topicList.toArray(topics);
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId() + "_consumer", mqttClientFactory(), topics);
adapter.setCompletionTimeout(mqttProperties.getTimeout());
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);
adapter.setConverter(converter);
adapter.setQos(2);
// adapter.addTopic("TOPIC1");
//设置订阅通道
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
// /**
// * 获取MQTT客户端
// * 自定义订阅消息
// */
// @Value("mqttClient")
// public IMqttClient getMqttClient() throws MqttException {
// IMqttClient mqttClient = mqttClientFactory().getClientInstance(mqttProperties.getHostUrl(), mqttProperties.getClientId());
// mqttClient.connect(getMqttConnectOptions());
// return mqttClient;
// }
/**
* MQTT信息通道(消费者)
**/
@Bean("MQTT_INPUT_CHANNEL")
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* MQTT消息处理器(消费者)
**/
@Bean
@ServiceActivator(inputChannel = "MQTT_INPUT_CHANNEL")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//处理接收消息
mqttReceiveHandle.handleMessage(message);
}
};
}
}
7.MqttPublishGateway
package com.rfos.assistsilkworm.mqtt.gateway;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
/**
* mqtt发送消息
* (defaultRequestChannel = "mqttOutboundChannel" 对应config配置)
* **/
@Service("mqttGateway")
@MessagingGateway(defaultRequestChannel = "MQTT_OUT_BOUND_CHANNEL")
public interface MqttPublishGateway {
/**
* 发送信息到MQTT服务器
*
* @param
*/
void sendToMqttObject(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param qos 对消息处理的几种机制。
* 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
* 2 多了一次去重的动作,确保订阅者收到的消息有一次。
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, Object payload);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
}
8.MqttListener
package com.rfos.assistsilkworm.mqtt.listenter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MqttListener {
/**
* 连接失败的事件通知
* @param mqttConnectionFailedEvent
*/
@EventListener(classes = MqttConnectionFailedEvent.class)
public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) {
log.info("连接失败的事件通知");
}
/**
* 已发送的事件通知
* @param mqttMessageSentEvent
*/
@EventListener(classes = MqttMessageSentEvent.class)
public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) {
log.info("已发送的事件通知[{}]",mqttMessageSentEvent.toString());
}
/**
* 已传输完成的事件通知
* 1.QOS == 0,发送消息后会即可进行此事件回调,因为不需要等待回执
* 2.QOS == 1,发送消息后会等待ACK回执,ACK回执后会进行此事件通知
* 3.QOS == 2,发送消息后会等待PubRECV回执,知道收到PubCOMP后会进行此事件通知
* @param mqttMessageDeliveredEvent
*/
@EventListener(classes = MqttMessageDeliveredEvent.class)
public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) {
log.info("已传输完成的事件通知");
}
/**
* 消息订阅的事件通知
* @param mqttSubscribedEvent
*/
@EventListener(classes = MqttSubscribedEvent.class)
public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) {
log.info("消息订阅的事件通知");
log.info("成功订阅到主题: info={}", mqttSubscribedEvent.toString());
}
}
9.MqttClientController
package com.rfos.assistsilkworm.mqtt.client;
import com.rfos.assistsilkworm.common.CommonResult;
import com.rfos.assistsilkworm.mqtt.config.MqttConfig;
import com.rfos.assistsilkworm.mqtt.gateway.MqttPublishGateway;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author rfos
* @Date 2023/2/20 19:01
* @Description TODO MQTT客户端处理发送数据
*/
@RestController
@RequestMapping("/mqtt")
public class MqttClientController {
/**
* 发送网关
*/
@Autowired
private MqttPublishGateway mqttPublishGateway;
@Autowired
@Qualifier("inbound")
private MqttPahoMessageDrivenChannelAdapter messageProducer;
/**
* 测试:发布消息
*/
@GetMapping("/testPublish")
public CommonResult sentTest(@RequestParam("topic") String topic,
@RequestParam("payload") String payload){
mqttPublishGateway.sendToMqtt(topic,1,payload);
return CommonResult.success();
}
/**
* 测试订阅信息
*/
@GetMapping("/testSubscribe")
@ServiceActivator(inputChannel = "MQTT_INPUT_CHANNEL")
public CommonResult testSubscribe() throws MqttException {
messageProducer.addTopic("TOPICS");
return CommonResult.success();
}
}
本篇文章来源于微信公众号:作者:原创 rfos 小编骑鱼之学点知识 小编骑鱼之学点知识 微信号 rfospublic 功能介绍 知识是生活的明灯,更是一种快乐。好奇则是知识的萌芽,而这里正是你们的好奇之处,也是你们的获解之地。陶行知说过:“智慧是生成的,知识是学来的。很幸运,能把所学,所悟,所知,所得的东西分享给大家。希望大家关注,谢谢 ! 发表于
转载地址:https://mp.weixin.qq.com/s/Fwhle56yhHES9x3bcHBUuQ
微信扫描下方的二维码阅读本文
Comments NOTHING