MqttConfiguration.java
6.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package com.bsth.data.mqtt;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.scheduling.annotation.Async;
import javax.annotation.Resource;
/**
* @author 蔡王珏
* @date 2025/2/14 20:15
*/
@Slf4j
@Data
@Configuration
public class MqttConfiguration {
/**
* 连接地址
*/
@Value("${mqtt.host-url}")
private String hostUrl;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
/**
* mqtt发送者客户端id
*/
@Value("${mqtt.client-id-subscribe}")
private String clientIdSubscribe;
/**
* mqtt订阅者主题
*/
@Value("${mqtt.default-subscribe-topic}")
private String subscribeTopic;
/**
* 超时时间
*/
@Value("${mqtt.connection-timeout}")
private int timeout;
/**
* 保持连接数
*/
@Value("${mqtt.keep-alive-interval}")
private int keepalive;
/**
* 是否清除会话
*/
@Value("${mqtt.cleanSession}")
private boolean cleanSession;
/**
* 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
*/
private static final byte[] WILL_DATA = "offline".getBytes();
/**
* mqtt订阅者使用信道名称
*/
public static final String MQTT_INBOUND_CHANNEL = "mqttInboundChannel";
@Resource
private MqttInBoundService mqttInBoundService;
/**
* 注册MQTT客户端工厂
*
* @return MqttPahoClientFactory
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
// 客户端工厂
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
// 连接配置
MqttConnectOptions options = new MqttConnectOptions();
// 设置连接的地址
options.setServerURIs(new String[]{hostUrl});
//设置用户名
options.setUserName(username);
//设置密码
options.setPassword(password.toCharArray());
// 如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持:
// 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。
// 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。
options.setCleanSession(false);
// 设置超时时间,该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。
// 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。
options.setConnectionTimeout(30);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
// 此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0
// 但这个方法并没有重连的机制
options.setKeepAliveInterval(30);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
options.setWill("willTopic", WILL_DATA, 2, false);
//自动重新连接
options.setAutomaticReconnect(true);
factory.setConnectionOptions(options);
log.info("初始化 MQTT 配置成功");
return factory;
}
/**
* MQTT信息通道(消费者)
*
*/
@Bean(name = MQTT_INBOUND_CHANNEL)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息订阅绑定(消费者)
*
*/
@Bean
public MessageProducer inbound() {
String[] topics = subscribeTopic.contains(",") ?
subscribeTopic.split(",") : new String[]{subscribeTopic};
// 可以同时消费(订阅)多个Topic
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
clientIdSubscribe, mqttClientFactory(), topics);
adapter.setCompletionTimeout(5000);
adapter.setQos(2);
// 设置订阅通道
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
/**
* MQTT消息处理器(消费者)
*
*/
@Bean
@ServiceActivator(inputChannel = MQTT_INBOUND_CHANNEL)
public MessageHandler handler() {
return message -> {
try {
String topic = message.getHeaders().get("mqtt_receivedTopic", String.class);
String payload = message.getPayload().toString();
//log.info("接收到主题: {}, 消息:{}", topic, payload);
handleMqttMessageAsync(topic, payload);
} catch (Exception e) {
log.error("处理MQTT消息时发生异常: topic={}, payload={}",
message.getHeaders().get("mqtt_receivedTopic", String.class),
message.getPayload().toString(), e);
}
};
}
@Async
public void handleMqttMessageAsync(String topic, String payload) {
try {
mqttInBoundService.handle(topic, payload);
} catch (Exception e) {
log.error("异步处理MQTT消息时发生异常: topic={}, payload={}", topic, payload, e);
}
}
}