引入依赖

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

添加配置

在application.yml中添加如下:

mq:
  host: tcp://192.168.1.144:1883
  clientId: client_1
  topic: root/system
  qos: 2
  username:
  password:
  timeout: 1000
  keepalive: 20

参数说明:

host:你的mqtt服务地址
clientId:你的客户端ID(随意填写,不能重复)(对mq服务来说,订阅者和发布者都是客户端)
topic:订阅的主题
qos:QoS
username:用户名,可为空
password:密码,可为空
timeout:超时时长
keepalive: https://blog.csdn.net/solo_jm/article/details/103403534

QoS说明

 0只会发送一次,不管成不成功
 1未成功会继续发送,直到成功,可能会收到多次
 2未成功会继续发送,但会保证只收到一次

编写配置类

新建类,添加如下注解

@Component
@ConfigurationProperties("mq")

添加如下字段并完善Getter Setter

    private String host;
    private String clientId;
    private String topic;
    private Integer qos;
    private String username;
    private String password;
    private Integer timeout;
    private Integer keepalive;

这样就可以从配置文件中获取配置,或者可以直接在这里写死,不使用配置文件方式

新建MqttConnectionOptionsBean

    @Bean
    public MqttConnectOptions mqttConnectOptions(){
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setConnectionTimeout(timeout);
        options.setKeepAliveInterval(keepalive);
        options.setAutomaticReconnect(true);
        if (StrUtil.isNotBlank(username)&&StrUtil.isNotBlank(password)){
            options.setUserName(username);
            options.setPassword(password.toCharArray());
        }
        options.setServerURIs(new String[]{host});
        return options;
    }

创建MqttPathClientFactoryBean

    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(MqttConnectOptions mqttConnectOptions){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(mqttConnectOptions);
        return factory;
    }

创建用于发送消息的Channel

    @Bean
    public MessageChannel mqttOutputChannel() {
        DirectChannel directChannel = new DirectChannel();
        return directChannel;
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory) {
        //订阅者发布者ID相同会导致掉线重连
        MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(clientId+"_pub", factory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(topic);
        return messageHandler;
    }

创建用于接收消息的方法

    @Bean
    public MessageChannel mqttInputChannel() {
        DirectChannel directChannel = new DirectChannel();
        return directChannel;
    }
    @Bean
    public MessageProducer inbound(MqttPahoClientFactory factory, @Qualifier("mqttInputChannel") MessageChannel channel) {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId, factory,
                        topic);
        adapter.setCompletionTimeout(timeout);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(qos);
        adapter.setOutputChannel(channel);
        return adapter;
    }

创建接收到消息的Handler

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
            String topic = (String)message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
            int qos = (int) message.getHeaders().get(MqttHeaders.RECEIVED_QOS);
            log.info("主题:{},数据:{},QOS:{}",topic, message.getPayload(),qos);
        };
    }

测试接收功能

我的mq使用的是mosquitto,提供了mosquitto_pub二进制文件,用于发送一次性消息

mosquitto_pub -t root/system -m test -q 1
解释:
-t 指定topic
-m 要发送的消息
-q 指定qos

实现发送功能

新建接口(名字可随意,我这里使用MqMessager)

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqMessager {
    void send(String data);
    void send(@Header(MqttHeaders.TOPIC)String topic,String payload);
    void send(@Header(MqttHeaders.TOPIC)String topic,@Header(MqttHeaders.QOS)int qos,String payload);
}

方法名可随意,参数顺序随意,但要注意添加对应注解,否则无法正常使用

发送消息

本人使用http请求来调用接口

    @Autowired
    MqMessager mqMessager;
    @GetMapping("test")
    public String test(@RequestParam(defaultValue = "就这?") String s){
        mqMessager.send("root/system",s);
        return "success";
    }

在handler中可接收到发送的消息即可


山内有樱名为良,树本无名只待春。