引入依赖
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
添加配置
在application.yml中添加如下:
mq:
host: tcp://192.168.1.144:1883
clientId: client_1
topic: root/system
qos: 2
username:
password:
timeout: 1000
keepalive: 20
参数说明:
host:你的mqtt服务地址
clientId:你的客户端ID(随意填写,不能重复)(对mq服务来说,订阅者和发布者都是客户端)
topic:订阅的主题
qos:QoS
username:用户名,可为空
password:密码,可为空
timeout:超时时长
keepalive: https://blog.csdn.net/solo_jm/article/details/103403534
QoS说明
0只会发送一次,不管成不成功
1未成功会继续发送,直到成功,可能会收到多次
2未成功会继续发送,但会保证只收到一次
编写配置类
新建类,添加如下注解
@Component
@ConfigurationProperties("mq")
添加如下字段并完善Getter Setter
private String host;
private String clientId;
private String topic;
private Integer qos;
private String username;
private String password;
private Integer timeout;
private Integer keepalive;
这样就可以从配置文件中获取配置,或者可以直接在这里写死,不使用配置文件方式
新建MqttConnectionOptionsBean
@Bean
public MqttConnectOptions mqttConnectOptions(){
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
options.setAutomaticReconnect(true);
if (StrUtil.isNotBlank(username)&&StrUtil.isNotBlank(password)){
options.setUserName(username);
options.setPassword(password.toCharArray());
}
options.setServerURIs(new String[]{host});
return options;
}
创建MqttPathClientFactoryBean
@Bean
public MqttPahoClientFactory mqttPahoClientFactory(MqttConnectOptions mqttConnectOptions){
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(mqttConnectOptions);
return factory;
}
创建用于发送消息的Channel
@Bean
public MessageChannel mqttOutputChannel() {
DirectChannel directChannel = new DirectChannel();
return directChannel;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutputChannel")
public MessageHandler mqttOutbound(MqttPahoClientFactory factory) {
//订阅者发布者ID相同会导致掉线重连
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId+"_pub", factory);
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(topic);
return messageHandler;
}
创建用于接收消息的方法
@Bean
public MessageChannel mqttInputChannel() {
DirectChannel directChannel = new DirectChannel();
return directChannel;
}
@Bean
public MessageProducer inbound(MqttPahoClientFactory factory, @Qualifier("mqttInputChannel") MessageChannel channel) {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId, factory,
topic);
adapter.setCompletionTimeout(timeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(qos);
adapter.setOutputChannel(channel);
return adapter;
}
创建接收到消息的Handler
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String topic = (String)message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
int qos = (int) message.getHeaders().get(MqttHeaders.RECEIVED_QOS);
log.info("主题:{},数据:{},QOS:{}",topic, message.getPayload(),qos);
};
}
测试接收功能
我的mq使用的是mosquitto,提供了mosquitto_pub二进制文件,用于发送一次性消息
mosquitto_pub -t root/system -m test -q 1
解释:
-t 指定topic
-m 要发送的消息
-q 指定qos
实现发送功能
新建接口(名字可随意,我这里使用MqMessager)
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqMessager {
void send(String data);
void send(@Header(MqttHeaders.TOPIC)String topic,String payload);
void send(@Header(MqttHeaders.TOPIC)String topic,@Header(MqttHeaders.QOS)int qos,String payload);
}
方法名可随意,参数顺序随意,但要注意添加对应注解,否则无法正常使用
发送消息
本人使用http请求来调用接口
@Autowired
MqMessager mqMessager;
@GetMapping("test")
public String test(@RequestParam(defaultValue = "就这?") String s){
mqMessager.send("root/system",s);
return "success";
}
在handler中可接收到发送的消息即可