MqttInBoundService.java
2.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.bsth.data.mqtt;
import com.bsth.entity.kl.StationKl;
import com.bsth.service.kl.StationKlService;
import com.bsth.websocket.handler.SendUtils;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author 蔡王珏
* @date 2025/2/15 09:04
*/
@Service
@EnableScheduling
public class MqttInBoundService {
Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
SendUtils sendUtils;
@Autowired
StationKlService stationKlService;
static Map<String,String> message = new HashMap<>();
Gson gson = new Gson();
public void handle(String topic, String payload){
// 根据topic和payload分别进行消息处理。
//log.info("MqttCallbackHandle: {} | {}", topic , payload);
Map<String, Object> map = gson.fromJson(payload, new TypeToken<HashMap<String, Object>>(){}.getType());
message.put(map.get("deviceId").toString(), map.get("count").toString());
//message.put()
}
//匹配每30秒刷新一次人数
@Scheduled(cron = "15 * * * * ?")
public void TsList(){
try {
//查询所有
List<StationKl> list = (List<StationKl>) stationKlService.list(new HashMap<>());
for (Map.Entry<String, String> km : message.entrySet()){
for (StationKl stationKl : list) {
if (km.getKey().equals(stationKl.getDevice())){
Map<String, Object> map = new HashMap<>();
map.put("device", stationKl.getDevice());
map.put("lineCode", stationKl.getLineCode());
map.put("stationCode", stationKl.getStationCode());
map.put("dir", stationKl.getDir());
double doubleValue = Double.parseDouble(km.getValue());
map.put("num", (int) doubleValue);
// 根据实际的StationKl实体类属性添加其他字段
sendUtils.sendKLPlan(map);
}
}
}
}catch (Exception e) {
log.error("站台客流推送异常", e);
}
}
}