MqttConfiguration.java 6.2 KB
package com.bsth.data.mqtt;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.scheduling.annotation.Async;

import javax.annotation.Resource;

/**
 * @author 蔡王珏
 * @date 2025/2/14 20:15
 */
@Slf4j
@Data
@Configuration
public class MqttConfiguration {

    /**
     * 连接地址
     */
    @Value("${mqtt.host-url}")
    private String hostUrl;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    /**
     * mqtt发送者客户端id
     */
    @Value("${mqtt.client-id-subscribe}")
    private String clientIdSubscribe;

    /**
     * mqtt订阅者主题
     */
    @Value("${mqtt.default-subscribe-topic}")
    private String subscribeTopic;

    /**
     * 超时时间
     */
    @Value("${mqtt.connection-timeout}")
    private int timeout;

    /**
     * 保持连接数
     */
    @Value("${mqtt.keep-alive-interval}")
    private int keepalive;

    /**
     * 是否清除会话
     */
    @Value("${mqtt.cleanSession}")
    private boolean cleanSession;

    /**
     * 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
     */
    private static final byte[] WILL_DATA = "offline".getBytes();

    /**
     * mqtt订阅者使用信道名称
     */
    public static final String MQTT_INBOUND_CHANNEL = "mqttInboundChannel";

    @Resource
    private MqttInBoundService mqttInBoundService;

    /**
     * 注册MQTT客户端工厂
     *
     * @return MqttPahoClientFactory
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        // 客户端工厂
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();

        // 连接配置
        MqttConnectOptions options = new MqttConnectOptions();
        // 设置连接的地址
        options.setServerURIs(new String[]{hostUrl});
        //设置用户名
        options.setUserName(username);
        //设置密码
        options.setPassword(password.toCharArray());

        // 如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持:
        // 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。
        // 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。
        options.setCleanSession(false);

        // 设置超时时间,该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。
        // 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。
        options.setConnectionTimeout(30);

        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
        // 此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0
        // 但这个方法并没有重连的机制
        options.setKeepAliveInterval(30);

        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
        options.setWill("willTopic", WILL_DATA, 2, false);

        //自动重新连接
        options.setAutomaticReconnect(true);

        factory.setConnectionOptions(options);

        log.info("初始化 MQTT 配置成功");
        return factory;
    }

    /**
     * MQTT信息通道(消费者)
     *
     */
    @Bean(name = MQTT_INBOUND_CHANNEL)
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT消息订阅绑定(消费者)
     *
     */
    @Bean
    public MessageProducer inbound() {
        String[] topics = subscribeTopic.contains(",") ?
                subscribeTopic.split(",") : new String[]{subscribeTopic};

        // 可以同时消费(订阅)多个Topic
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(
                        clientIdSubscribe, mqttClientFactory(), topics);
        adapter.setCompletionTimeout(5000);
        adapter.setQos(2);
        // 设置订阅通道
        adapter.setOutputChannel(mqttInboundChannel());
        return adapter;
    }

    /**
     * MQTT消息处理器(消费者)
     *
     */
    @Bean
    @ServiceActivator(inputChannel = MQTT_INBOUND_CHANNEL)
    public MessageHandler handler() {
        return message -> {
            try {
                String topic = message.getHeaders().get("mqtt_receivedTopic", String.class);
                String payload = message.getPayload().toString();

                //log.info("接收到主题: {}, 消息:{}", topic, payload);
                handleMqttMessageAsync(topic, payload);
            } catch (Exception e) {
                log.error("处理MQTT消息时发生异常: topic={}, payload={}",
                        message.getHeaders().get("mqtt_receivedTopic", String.class),
                        message.getPayload().toString(), e);
            }
        };
    }

    @Async
    public void handleMqttMessageAsync(String topic, String payload) {
        try {
            mqttInBoundService.handle(topic, payload);
        } catch (Exception e) {
            log.error("异步处理MQTT消息时发生异常: topic={}, payload={}", topic, payload, e);
        }
    }
}