Commit ef6693aabbbf12e83d09ad8749f6e60faacc012d

Authored by 648540858
Committed by GitHub
2 parents d6045256 3aabbd69

Merge branch 'wvp-28181-2.0' into Zafu-Dev-1127

Showing 17 changed files with 485 additions and 572 deletions
sql/mysql.sql
1   --- MySQL dump 10.13 Distrib 8.0.30, for Linux (x86_64)
  1 +-- MySQL dump 10.13 Distrib 8.0.31, for Linux (x86_64)
2 2 --
3 3 -- Host: 127.0.0.1 Database: wvp
4 4 -- ------------------------------------------------------
... ... @@ -34,13 +34,13 @@ CREATE TABLE `device` (
34 34 `online` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
35 35 `registerTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
36 36 `keepaliveTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
37   - `ip` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
  37 + `ip` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
38 38 `createTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
39 39 `updateTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
40 40 `port` int DEFAULT NULL,
41 41 `expires` int DEFAULT NULL,
42 42 `subscribeCycleForCatalog` int DEFAULT NULL,
43   - `hostAddress` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
  43 + `hostAddress` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
44 44 `charset` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
45 45 `subscribeCycleForMobilePosition` int DEFAULT NULL,
46 46 `mobilePositionSubmissionInterval` int DEFAULT '5',
... ... @@ -48,12 +48,13 @@ CREATE TABLE `device` (
48 48 `ssrcCheck` int DEFAULT '0',
49 49 `geoCoordSys` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
50 50 `treeType` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
51   - `mediaServerId` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT 'auto',
52   - `custom_name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
53   - `password` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
  51 + `custom_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
  52 + `password` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
  53 + `sdpIp` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
  54 + `localIp` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
54 55 PRIMARY KEY (`id`),
55 56 UNIQUE KEY `device_deviceId_uindex` (`deviceId`)
56   -) ENGINE=InnoDB AUTO_INCREMENT=47 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
  57 +) ENGINE=InnoDB AUTO_INCREMENT=57 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
57 58 /*!40101 SET character_set_client = @saved_cs_client */;
58 59  
59 60 --
... ... @@ -145,7 +146,7 @@ CREATE TABLE `device_channel` (
145 146 PRIMARY KEY (`id`),
146 147 UNIQUE KEY `device_channel_id_uindex` (`id`),
147 148 UNIQUE KEY `device_channel_pk` (`channelId`,`deviceId`)
148   -) ENGINE=InnoDB AUTO_INCREMENT=60301 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
  149 +) ENGINE=InnoDB AUTO_INCREMENT=74416 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
149 150 /*!40101 SET character_set_client = @saved_cs_client */;
150 151  
151 152 --
... ... @@ -215,7 +216,7 @@ CREATE TABLE `gb_stream` (
215 216 PRIMARY KEY (`gbStreamId`) USING BTREE,
216 217 UNIQUE KEY `app` (`app`,`stream`) USING BTREE,
217 218 UNIQUE KEY `gbId` (`gbId`) USING BTREE
218   -) ENGINE=InnoDB AUTO_INCREMENT=301059 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
  219 +) ENGINE=InnoDB AUTO_INCREMENT=331060 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
219 220 /*!40101 SET character_set_client = @saved_cs_client */;
220 221  
221 222 --
... ... @@ -245,7 +246,7 @@ CREATE TABLE `log` (
245 246 `username` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
246 247 `createTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
247 248 PRIMARY KEY (`id`) USING BTREE
248   -) ENGINE=InnoDB AUTO_INCREMENT=733627 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
  249 +) ENGINE=InnoDB AUTO_INCREMENT=760908 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
249 250 /*!40101 SET character_set_client = @saved_cs_client */;
250 251  
251 252 --
... ... @@ -337,7 +338,7 @@ CREATE TABLE `parent_platform` (
337 338 PRIMARY KEY (`id`),
338 339 UNIQUE KEY `parent_platform_id_uindex` (`id`),
339 340 UNIQUE KEY `parent_platform_pk` (`serverGBId`)
340   -) ENGINE=InnoDB AUTO_INCREMENT=44 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
  341 +) ENGINE=InnoDB AUTO_INCREMENT=47 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
341 342 /*!40101 SET character_set_client = @saved_cs_client */;
342 343  
343 344 --
... ... @@ -389,7 +390,7 @@ CREATE TABLE `platform_gb_channel` (
389 390 `catalogId` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
390 391 `deviceChannelId` int NOT NULL,
391 392 PRIMARY KEY (`id`)
392   -) ENGINE=InnoDB AUTO_INCREMENT=102 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
  393 +) ENGINE=InnoDB AUTO_INCREMENT=3146 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
393 394 /*!40101 SET character_set_client = @saved_cs_client */;
394 395  
395 396 --
... ... @@ -415,7 +416,7 @@ CREATE TABLE `platform_gb_stream` (
415 416 `id` int NOT NULL AUTO_INCREMENT,
416 417 PRIMARY KEY (`id`),
417 418 UNIQUE KEY `platform_gb_stream_pk` (`platformId`,`catalogId`,`gbStreamId`)
418   -) ENGINE=InnoDB AUTO_INCREMENT=301766 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
  419 +) ENGINE=InnoDB AUTO_INCREMENT=391772 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
419 420 /*!40101 SET character_set_client = @saved_cs_client */;
420 421  
421 422 --
... ... @@ -457,7 +458,7 @@ CREATE TABLE `stream_proxy` (
457 458 `enable_disable_none_reader` bit(1) DEFAULT NULL,
458 459 PRIMARY KEY (`id`),
459 460 UNIQUE KEY `stream_proxy_pk` (`app`,`stream`)
460   -) ENGINE=InnoDB AUTO_INCREMENT=548 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
  461 +) ENGINE=InnoDB AUTO_INCREMENT=568 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
461 462 /*!40101 SET character_set_client = @saved_cs_client */;
462 463  
463 464 --
... ... @@ -494,7 +495,7 @@ CREATE TABLE `stream_push` (
494 495 `self` int DEFAULT NULL,
495 496 PRIMARY KEY (`id`),
496 497 UNIQUE KEY `stream_push_pk` (`app`,`stream`)
497   -) ENGINE=InnoDB AUTO_INCREMENT=310558 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
  498 +) ENGINE=InnoDB AUTO_INCREMENT=361492 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
498 499 /*!40101 SET character_set_client = @saved_cs_client */;
499 500  
500 501 --
... ... @@ -572,4 +573,4 @@ UNLOCK TABLES;
572 573 /*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
573 574 /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
574 575  
575   --- Dump completed on 2022-10-18 17:00:02
  576 +-- Dump completed on 2022-11-29 11:47:46
576 577 \ No newline at end of file
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
... ... @@ -29,8 +29,6 @@ public class SipLayer implements CommandLineRunner {
29 29 @Autowired
30 30 private ISIPProcessorObserver sipProcessorObserver;
31 31  
32   -
33   -
34 32 private final Map<String, SipProviderImpl> tcpSipProviderMap = new ConcurrentHashMap<>();
35 33 private final Map<String, SipProviderImpl> udpSipProviderMap = new ConcurrentHashMap<>();
36 34  
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/auth/DigestServerAuthenticationHelper.java
... ... @@ -25,10 +25,9 @@
25 25 */
26 26 package com.genersoft.iot.vmp.gb28181.auth;
27 27  
28   -import java.security.MessageDigest;
29   -import java.security.NoSuchAlgorithmException;
30   -import java.time.Instant;
31   -import java.util.Random;
  28 +import gov.nist.core.InternalErrorHandler;
  29 +import org.slf4j.Logger;
  30 +import org.slf4j.LoggerFactory;
32 31  
33 32 import javax.sip.address.URI;
34 33 import javax.sip.header.AuthorizationHeader;
... ... @@ -36,10 +35,10 @@ import javax.sip.header.HeaderFactory;
36 35 import javax.sip.header.WWWAuthenticateHeader;
37 36 import javax.sip.message.Request;
38 37 import javax.sip.message.Response;
39   -
40   -import gov.nist.core.InternalErrorHandler;
41   -import org.slf4j.Logger;
42   -import org.slf4j.LoggerFactory;
  38 +import java.security.MessageDigest;
  39 +import java.security.NoSuchAlgorithmException;
  40 +import java.time.Instant;
  41 +import java.util.Random;
43 42  
44 43 /**
45 44 * Implements the HTTP digest authentication method server side functionality.
... ... @@ -201,12 +200,13 @@ public class DigestServerAuthenticationHelper {
201 200 // String ncStr = new DecimalFormat("00000000").format(Integer.parseInt(nc + "", 16));
202 201  
203 202 String A1 = username + ":" + realm + ":" + pass;
  203 +
204 204 String A2 = request.getMethod().toUpperCase() + ":" + uri.toString();
  205 +
205 206 byte mdbytes[] = messageDigest.digest(A1.getBytes());
206 207 String HA1 = toHexString(mdbytes);
207 208 logger.debug("A1: " + A1);
208 209 logger.debug("A2: " + A2);
209   -
210 210 mdbytes = messageDigest.digest(A2.getBytes());
211 211 String HA2 = toHexString(mdbytes);
212 212 logger.debug("HA1: " + HA1);
... ... @@ -238,58 +238,4 @@ public class DigestServerAuthenticationHelper {
238 238  
239 239 }
240 240  
241   -// public static void main(String[] args) throws NoSuchAlgorithmException {
242   -// String realm = "3402000000";
243   -// String username = "44010000001180008012";
244   -
245   -
246   -// String nonce = "07cab60999fbf643264ace27d3b7de8b";
247   -// String uri = "sip:34020000002000000001@3402000000";
248   -// // qop 保护质量 包含auth(默认的)和auth-int(增加了报文完整性检测)两种策略
249   -// String qop = "auth";
250   -
251   -// // 客户端随机数,这是一个不透明的字符串值,由客户端提供,并且客户端和服务器都会使用,以避免用明文文本。
252   -// // 这使得双方都可以查验对方的身份,并对消息的完整性提供一些保护
253   -// //String cNonce = authHeader.getCNonce();
254   -
255   -// // nonce计数器,是一个16进制的数值,表示同一nonce下客户端发送出请求的数量
256   -// int nc = 1;
257   -// String ncStr = new DecimalFormat("00000000").format(nc);
258   -// // String ncStr = new DecimalFormat("00000000").format(Integer.parseInt(nc + "", 16));
259   -// MessageDigest messageDigest = MessageDigest.getInstance(DEFAULT_ALGORITHM);
260   -// String A1 = username + ":" + realm + ":" + "12345678";
261   -// String A2 = "REGISTER" + ":" + uri;
262   -// byte mdbytes[] = messageDigest.digest(A1.getBytes());
263   -// String HA1 = toHexString(mdbytes);
264   -// System.out.println("A1: " + A1);
265   -// System.out.println("A2: " + A2);
266   -
267   -// mdbytes = messageDigest.digest(A2.getBytes());
268   -// String HA2 = toHexString(mdbytes);
269   -// System.out.println("HA1: " + HA1);
270   -// System.out.println("HA2: " + HA2);
271   -// String cnonce = "0a4f113b";
272   -// System.out.println("nonce: " + nonce);
273   -// System.out.println("nc: " + ncStr);
274   -// System.out.println("cnonce: " + cnonce);
275   -// System.out.println("qop: " + qop);
276   -// String KD = HA1 + ":" + nonce;
277   -
278   -// if (qop != null && qop.equals("auth") ) {
279   -// if (nc != -1) {
280   -// KD += ":" + ncStr;
281   -// }
282   -// if (cnonce != null) {
283   -// KD += ":" + cnonce;
284   -// }
285   -// KD += ":" + qop;
286   -// }
287   -// KD += ":" + HA2;
288   -// System.out.println("KD: " + KD);
289   -// mdbytes = messageDigest.digest(KD.getBytes());
290   -// String mdString = toHexString(mdbytes);
291   -// System.out.println("mdString: " + mdString);
292   -// String response = "4f0507d4b87cdecff04bdaf4c96348f0";
293   -// System.out.println("response: " + response);
294   -// }
295 241 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
1 1 package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
2 2  
3 3 import com.alibaba.fastjson2.JSON;
4   -import com.alibaba.fastjson2.JSONObject;
5 4 import com.genersoft.iot.vmp.gb28181.SipLayer;
6 5 import com.genersoft.iot.vmp.gb28181.bean.*;
7 6 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
... ... @@ -9,13 +8,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
9 8 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
10 9 import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
11 10 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
12   -import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
13   -import com.genersoft.iot.vmp.utils.DateUtil;
14 11 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
15 12 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
16 13 import com.genersoft.iot.vmp.service.IMediaServerService;
17 14 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
18 15 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  16 +import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
  17 +import com.genersoft.iot.vmp.utils.DateUtil;
19 18 import gov.nist.javax.sip.message.MessageFactoryImpl;
20 19 import gov.nist.javax.sip.message.SIPRequest;
21 20 import org.slf4j.Logger;
... ... @@ -26,8 +25,10 @@ import org.springframework.lang.Nullable;
26 25 import org.springframework.stereotype.Component;
27 26 import org.springframework.util.ObjectUtils;
28 27  
29   -import javax.sip.*;
30   -import javax.sip.header.*;
  28 +import javax.sip.InvalidArgumentException;
  29 +import javax.sip.SipException;
  30 +import javax.sip.header.CallIdHeader;
  31 +import javax.sip.header.WWWAuthenticateHeader;
31 32 import javax.sip.message.Request;
32 33 import java.text.ParseException;
33 34 import java.util.ArrayList;
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
... ... @@ -11,7 +11,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
11 11 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
12 12 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
13 13 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
14   -import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
15 14 import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
16 15 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
17 16 import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
... ... @@ -31,7 +30,6 @@ import org.springframework.beans.factory.annotation.Qualifier;
31 30 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
32 31 import org.springframework.stereotype.Component;
33 32 import org.springframework.util.ObjectUtils;
34   -import org.springframework.util.StringUtils;
35 33  
36 34 import javax.sip.InvalidArgumentException;
37 35 import javax.sip.RequestEvent;
... ... @@ -77,8 +75,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
77 75 @Autowired
78 76 private IDeviceChannelService deviceChannelService;
79 77  
80   - private boolean taskQueueHandlerRun = false;
81   -
82 78 private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
83 79  
84 80 @Qualifier("taskExecutor")
... ... @@ -98,9 +94,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
98 94 }catch (SipException | InvalidArgumentException | ParseException e) {
99 95 e.printStackTrace();
100 96 }
  97 + boolean runed = !taskQueue.isEmpty();
101 98 taskQueue.offer(new HandlerCatchData(evt, null, null));
102   - if (!taskQueueHandlerRun) {
103   - taskQueueHandlerRun = true;
  99 + if (!runed) {
104 100 taskExecutor.execute(()-> {
105 101 while (!taskQueue.isEmpty()) {
106 102 try {
... ... @@ -128,7 +124,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
128 124 logger.error("处理NOTIFY消息时错误", e);
129 125 }
130 126 }
131   - taskQueueHandlerRun = false;
132 127 });
133 128 }
134 129 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
... ... @@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
9 9 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
10 10 import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
11 11 import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
12   -import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
13 12 import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
14 13 import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
15 14 import com.genersoft.iot.vmp.service.IDeviceAlarmService;
... ... @@ -27,17 +26,15 @@ import org.springframework.beans.factory.annotation.Qualifier;
27 26 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
28 27 import org.springframework.stereotype.Component;
29 28 import org.springframework.util.ObjectUtils;
30   -import org.springframework.util.StringUtils;
31 29  
32 30 import javax.sip.InvalidArgumentException;
33 31 import javax.sip.RequestEvent;
34 32 import javax.sip.SipException;
35 33 import javax.sip.message.Response;
36   -
37 34 import java.text.ParseException;
38 35 import java.util.concurrent.ConcurrentLinkedQueue;
39 36  
40   -import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.*;
  37 +import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
41 38  
42 39 /**
43 40 * 报警事件的处理,参考:9.4
... ... @@ -72,8 +69,6 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
72 69 @Autowired
73 70 private IDeviceChannelService deviceChannelService;
74 71  
75   - private boolean taskQueueHandlerRun = false;
76   -
77 72 private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
78 73  
79 74 @Qualifier("taskExecutor")
... ... @@ -89,128 +84,128 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
89 84 @Override
90 85 public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
91 86 logger.info("[收到报警通知]设备:{}", device.getDeviceId());
92   -
  87 + boolean isEmpty = taskQueue.isEmpty();
93 88 taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
94   - if (!taskQueueHandlerRun) {
95   - taskQueueHandlerRun = true;
  89 + // 回复200 OK
  90 + try {
  91 + responseAck((SIPRequest) evt.getRequest(), Response.OK);
  92 + } catch (SipException | InvalidArgumentException | ParseException e) {
  93 + logger.error("[命令发送失败] 报警通知回复: {}", e.getMessage());
  94 + }
  95 + if (isEmpty) {
96 96 taskExecutor.execute(() -> {
97 97 logger.info("[处理报警通知]待处理数量:{}", taskQueue.size() );
98 98 while (!taskQueue.isEmpty()) {
99   - SipMsgInfo sipMsgInfo = taskQueue.poll();
100   - // 回复200 OK
101 99 try {
102   - responseAck((SIPRequest) sipMsgInfo.getEvt().getRequest(), Response.OK);
103   - } catch (SipException | InvalidArgumentException | ParseException e) {
104   - logger.error("[处理报警通知], 回复200OK失败", e);
105   - }
106   -
107   - Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID");
108   - String channelId = deviceIdElement.getText().toString();
109   -
110   - DeviceAlarm deviceAlarm = new DeviceAlarm();
111   - deviceAlarm.setCreateTime(DateUtil.getNow());
112   - deviceAlarm.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
113   - deviceAlarm.setChannelId(channelId);
114   - deviceAlarm.setAlarmPriority(getText(sipMsgInfo.getRootElement(), "AlarmPriority"));
115   - deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod"));
116   - String alarmTime = XmlUtil.getText(sipMsgInfo.getRootElement(), "AlarmTime");
117   - if (alarmTime == null) {
118   - continue;
119   - }
120   - deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
121   - String alarmDescription = getText(sipMsgInfo.getRootElement(), "AlarmDescription");
122   - if (alarmDescription == null) {
123   - deviceAlarm.setAlarmDescription("");
124   - } else {
125   - deviceAlarm.setAlarmDescription(alarmDescription);
126   - }
127   - String longitude = getText(sipMsgInfo.getRootElement(), "Longitude");
128   - if (longitude != null && NumericUtil.isDouble(longitude)) {
129   - deviceAlarm.setLongitude(Double.parseDouble(longitude));
130   - } else {
131   - deviceAlarm.setLongitude(0.00);
132   - }
133   - String latitude = getText(sipMsgInfo.getRootElement(), "Latitude");
134   - if (latitude != null && NumericUtil.isDouble(latitude)) {
135   - deviceAlarm.setLatitude(Double.parseDouble(latitude));
136   - } else {
137   - deviceAlarm.setLatitude(0.00);
138   - }
  100 + SipMsgInfo sipMsgInfo = taskQueue.poll();
  101 +
  102 + Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID");
  103 + String channelId = deviceIdElement.getText().toString();
  104 +
  105 + DeviceAlarm deviceAlarm = new DeviceAlarm();
  106 + deviceAlarm.setCreateTime(DateUtil.getNow());
  107 + deviceAlarm.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
  108 + deviceAlarm.setChannelId(channelId);
  109 + deviceAlarm.setAlarmPriority(getText(sipMsgInfo.getRootElement(), "AlarmPriority"));
  110 + deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod"));
  111 + String alarmTime = XmlUtil.getText(sipMsgInfo.getRootElement(), "AlarmTime");
  112 + if (alarmTime == null) {
  113 + continue;
  114 + }
  115 + deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
  116 + String alarmDescription = getText(sipMsgInfo.getRootElement(), "AlarmDescription");
  117 + if (alarmDescription == null) {
  118 + deviceAlarm.setAlarmDescription("");
  119 + } else {
  120 + deviceAlarm.setAlarmDescription(alarmDescription);
  121 + }
  122 + String longitude = getText(sipMsgInfo.getRootElement(), "Longitude");
  123 + if (longitude != null && NumericUtil.isDouble(longitude)) {
  124 + deviceAlarm.setLongitude(Double.parseDouble(longitude));
  125 + } else {
  126 + deviceAlarm.setLongitude(0.00);
  127 + }
  128 + String latitude = getText(sipMsgInfo.getRootElement(), "Latitude");
  129 + if (latitude != null && NumericUtil.isDouble(latitude)) {
  130 + deviceAlarm.setLatitude(Double.parseDouble(latitude));
  131 + } else {
  132 + deviceAlarm.setLatitude(0.00);
  133 + }
139 134  
140   - if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) {
141   - if ( deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) {
142   - MobilePosition mobilePosition = new MobilePosition();
143   - mobilePosition.setCreateTime(DateUtil.getNow());
144   - mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
145   - mobilePosition.setTime(deviceAlarm.getAlarmTime());
146   - mobilePosition.setLongitude(deviceAlarm.getLongitude());
147   - mobilePosition.setLatitude(deviceAlarm.getLatitude());
148   - mobilePosition.setReportSource("GPS Alarm");
149   -
150   - // 更新device channel 的经纬度
151   - DeviceChannel deviceChannel = new DeviceChannel();
152   - deviceChannel.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
153   - deviceChannel.setChannelId(channelId);
154   - deviceChannel.setLongitude(mobilePosition.getLongitude());
155   - deviceChannel.setLatitude(mobilePosition.getLatitude());
156   - deviceChannel.setGpsTime(mobilePosition.getTime());
157   -
158   - deviceChannel = deviceChannelService.updateGps(deviceChannel, sipMsgInfo.getDevice());
159   -
160   - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
161   - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
162   - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
163   - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
164   -
165   - if (userSetting.getSavePositionHistory()) {
166   - storager.insertMobilePosition(mobilePosition);
  135 + if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) {
  136 + if ( deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) {
  137 + MobilePosition mobilePosition = new MobilePosition();
  138 + mobilePosition.setCreateTime(DateUtil.getNow());
  139 + mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
  140 + mobilePosition.setTime(deviceAlarm.getAlarmTime());
  141 + mobilePosition.setLongitude(deviceAlarm.getLongitude());
  142 + mobilePosition.setLatitude(deviceAlarm.getLatitude());
  143 + mobilePosition.setReportSource("GPS Alarm");
  144 +
  145 + // 更新device channel 的经纬度
  146 + DeviceChannel deviceChannel = new DeviceChannel();
  147 + deviceChannel.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
  148 + deviceChannel.setChannelId(channelId);
  149 + deviceChannel.setLongitude(mobilePosition.getLongitude());
  150 + deviceChannel.setLatitude(mobilePosition.getLatitude());
  151 + deviceChannel.setGpsTime(mobilePosition.getTime());
  152 +
  153 + deviceChannel = deviceChannelService.updateGps(deviceChannel, sipMsgInfo.getDevice());
  154 +
  155 + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
  156 + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
  157 + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
  158 + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
  159 +
  160 + if (userSetting.getSavePositionHistory()) {
  161 + storager.insertMobilePosition(mobilePosition);
  162 + }
  163 + storager.updateChannelPosition(deviceChannel);
  164 +
  165 + // 发送redis消息。 通知位置信息的变化
  166 + JSONObject jsonObject = new JSONObject();
  167 + jsonObject.put("time", mobilePosition.getTime());
  168 + jsonObject.put("serial", deviceChannel.getDeviceId());
  169 + jsonObject.put("code", deviceChannel.getChannelId());
  170 + jsonObject.put("longitude", mobilePosition.getLongitude());
  171 + jsonObject.put("latitude", mobilePosition.getLatitude());
  172 + jsonObject.put("altitude", mobilePosition.getAltitude());
  173 + jsonObject.put("direction", mobilePosition.getDirection());
  174 + jsonObject.put("speed", mobilePosition.getSpeed());
  175 + redisCatchStorage.sendMobilePositionMsg(jsonObject);
167 176 }
168   - storager.updateChannelPosition(deviceChannel);
169   -
170   - // 发送redis消息。 通知位置信息的变化
171   - JSONObject jsonObject = new JSONObject();
172   - jsonObject.put("time", mobilePosition.getTime());
173   - jsonObject.put("serial", deviceChannel.getDeviceId());
174   - jsonObject.put("code", deviceChannel.getChannelId());
175   - jsonObject.put("longitude", mobilePosition.getLongitude());
176   - jsonObject.put("latitude", mobilePosition.getLatitude());
177   - jsonObject.put("altitude", mobilePosition.getAltitude());
178   - jsonObject.put("direction", mobilePosition.getDirection());
179   - jsonObject.put("speed", mobilePosition.getSpeed());
180   - redisCatchStorage.sendMobilePositionMsg(jsonObject);
181 177 }
182   - }
183   - if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) {
184   - if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) {
185   - deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType"));
  178 + if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) {
  179 + if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) {
  180 + deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType"));
  181 + }
  182 + }
  183 + logger.info("[收到报警通知]内容:{}", JSON.toJSONString(deviceAlarm));
  184 + if ("7".equals(deviceAlarm.getAlarmMethod()) ) {
  185 + // 发送给平台的报警信息。 发送redis通知
  186 + AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage();
  187 + alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod()));
  188 + alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription());
  189 + alarmChannelMessage.setGbId(channelId);
  190 + redisCatchStorage.sendAlarmMsg(alarmChannelMessage);
  191 + continue;
186 192 }
187   - }
188   - logger.info("[收到报警通知]内容:{}", JSON.toJSONString(deviceAlarm));
189   - if ("7".equals(deviceAlarm.getAlarmMethod()) ) {
190   - // 发送给平台的报警信息。 发送redis通知
191   - AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage();
192   - alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod()));
193   - alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription());
194   - alarmChannelMessage.setGbId(channelId);
195   - redisCatchStorage.sendAlarmMsg(alarmChannelMessage);
196   - continue;
197   - }
198 193  
199   - logger.debug("存储报警信息、报警分类");
200   - // 存储报警信息、报警分类
201   - if (sipConfig.isAlarm()) {
202   - deviceAlarmService.add(deviceAlarm);
203   - }
  194 + logger.debug("存储报警信息、报警分类");
  195 + // 存储报警信息、报警分类
  196 + if (sipConfig.isAlarm()) {
  197 + deviceAlarmService.add(deviceAlarm);
  198 + }
204 199  
205   - if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) {
206   - publisher.deviceAlarmEventPublish(deviceAlarm);
  200 + if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) {
  201 + publisher.deviceAlarmEventPublish(deviceAlarm);
  202 + }
  203 + }catch (Exception e) {
  204 + logger.warn("[收到报警通知] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest());
207 205 }
208 206 }
209   - taskQueueHandlerRun = false;
210 207 });
211 208 }
212   -
213   -
214 209 }
215 210  
216 211 @Override
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
... ... @@ -22,7 +22,6 @@ import org.springframework.beans.factory.annotation.Qualifier;
22 22 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
23 23 import org.springframework.stereotype.Component;
24 24 import org.springframework.util.ObjectUtils;
25   -import org.springframework.util.StringUtils;
26 25  
27 26 import javax.sip.InvalidArgumentException;
28 27 import javax.sip.RequestEvent;
... ... @@ -57,8 +56,6 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
57 56 @Autowired
58 57 private IDeviceChannelService deviceChannelService;
59 58  
60   - private boolean taskQueueHandlerRun = false;
61   -
62 59 private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
63 60  
64 61 @Qualifier("taskExecutor")
... ... @@ -73,21 +70,22 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
73 70 @Override
74 71 public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
75 72  
  73 + boolean isEmpty = taskQueue.isEmpty();
76 74 taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
77   - if (!taskQueueHandlerRun) {
78   - taskQueueHandlerRun = true;
  75 + // 回复200 OK
  76 + try {
  77 + responseAck((SIPRequest) evt.getRequest(), Response.OK);
  78 + } catch (SipException | InvalidArgumentException | ParseException e) {
  79 + logger.error("[命令发送失败] 移动位置通知回复: {}", e.getMessage());
  80 + }
  81 + if (isEmpty) {
79 82 taskExecutor.execute(() -> {
80 83 while (!taskQueue.isEmpty()) {
81 84 SipMsgInfo sipMsgInfo = taskQueue.poll();
82 85 try {
83 86 Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset());
84 87 if (rootElementAfterCharset == null) {
85   - try {
86   - logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest());
87   - responseAck((SIPRequest) sipMsgInfo.getEvt().getRequest(), Response.BAD_REQUEST);
88   - } catch (SipException | InvalidArgumentException | ParseException e) {
89   - logger.error("[命令发送失败] 移动设备位置数据通知 内容为空: {}", e.getMessage());
90   - }
  88 + logger.warn("[移动位置通知] {}处理失败,未识别到信息体", device.getDeviceId());
91 89 continue;
92 90 }
93 91 MobilePosition mobilePosition = new MobilePosition();
... ... @@ -137,12 +135,6 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
137 135 storager.insertMobilePosition(mobilePosition);
138 136 }
139 137 storager.updateChannelPosition(deviceChannel);
140   - //回复 200 OK
141   - try {
142   - responseAck((SIPRequest) sipMsgInfo.getEvt().getRequest(), Response.OK);
143   - } catch (SipException | InvalidArgumentException | ParseException e) {
144   - logger.error("[命令发送失败] 移动设备位置数据回复200: {}", e.getMessage());
145   - }
146 138  
147 139 // 发送redis消息。 通知位置信息的变化
148 140 JSONObject jsonObject = new JSONObject();
... ... @@ -158,14 +150,12 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
158 150  
159 151 } catch (DocumentException e) {
160 152 e.printStackTrace();
  153 + } catch (Exception e) {
  154 + logger.warn("[移动位置通知] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest());
161 155 }
162   -
163 156 }
164   - taskQueueHandlerRun = false;
165 157 });
166 158 }
167   -
168   -
169 159 }
170 160  
171 161 @Override
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
1 1 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
2 2  
3   -import com.genersoft.iot.vmp.conf.SipConfig;
4   -import com.genersoft.iot.vmp.conf.UserSetting;
5 3 import com.genersoft.iot.vmp.gb28181.bean.*;
6   -import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
7 4 import com.genersoft.iot.vmp.gb28181.session.CatalogDataCatch;
8   -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
9 5 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
10 6 import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
11 7 import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
12   -import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
13   -import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
14 8 import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
15   -import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
16 9 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
17 10 import gov.nist.javax.sip.message.SIPRequest;
18 11 import org.dom4j.DocumentException;
... ... @@ -24,7 +17,6 @@ import org.springframework.beans.factory.annotation.Autowired;
24 17 import org.springframework.beans.factory.annotation.Qualifier;
25 18 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
26 19 import org.springframework.stereotype.Component;
27   -import org.springframework.util.StringUtils;
28 20  
29 21 import javax.sip.InvalidArgumentException;
30 22 import javax.sip.RequestEvent;
... ... @@ -45,12 +37,10 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
45 37 private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class);
46 38 private final String cmdType = "Catalog";
47 39  
48   - private boolean taskQueueHandlerRun = false;
49   -
50 40 @Autowired
51 41 private ResponseMessageHandler responseMessageHandler;
52 42  
53   - private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
  43 + private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
54 44  
55 45 @Autowired
56 46 private IVideoManagerStorage storager;
... ... @@ -69,6 +59,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
69 59  
70 60 @Override
71 61 public void handForDevice(RequestEvent evt, Device device, Element element) {
  62 + boolean isEmpty = taskQueue.isEmpty();
72 63 taskQueue.offer(new HandlerCatchData(evt, device, element));
73 64 // 回复200 OK
74 65 try {
... ... @@ -76,67 +67,71 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
76 67 } catch (SipException | InvalidArgumentException | ParseException e) {
77 68 logger.error("[命令发送失败] 目录查询回复: {}", e.getMessage());
78 69 }
79   - if (!taskQueueHandlerRun) {
80   - taskQueueHandlerRun = true;
  70 + // 如果不为空则说明已经开启消息处理
  71 + if (isEmpty) {
81 72 taskExecutor.execute(() -> {
82 73 while (!taskQueue.isEmpty()) {
83   - HandlerCatchData take = taskQueue.poll();
84   - Element rootElement = null;
  74 + // 全局异常捕获,保证下一条可以得到处理
85 75 try {
86   - rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset());
87   - } catch (DocumentException e) {
88   - logger.error("[xml解析] 失败: ", e);
89   - continue;
90   - }
91   - if (rootElement == null) {
92   - logger.warn("[ 收到通道 ] content cannot be null, {}", evt.getRequest());
93   - continue;
94   - }
95   - Element deviceListElement = rootElement.element("DeviceList");
96   - Element sumNumElement = rootElement.element("SumNum");
97   - Element snElement = rootElement.element("SN");
98   - int sumNum = Integer.parseInt(sumNumElement.getText());
99   -
100   - if (sumNum == 0) {
101   - logger.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId());
102   - // 数据已经完整接收
103   - storager.cleanChannelsForDevice(take.getDevice().getDeviceId());
104   - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
105   - } else {
106   - Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
107   - if (deviceListIterator != null) {
108   - List<DeviceChannel> channelList = new ArrayList<>();
109   - // 遍历DeviceList
110   - while (deviceListIterator.hasNext()) {
111   - Element itemDevice = deviceListIterator.next();
112   - Element channelDeviceElement = itemDevice.element("DeviceID");
113   - if (channelDeviceElement == null) {
114   - continue;
  76 + HandlerCatchData take = taskQueue.poll();
  77 + Element rootElement = null;
  78 + try {
  79 + rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset());
  80 + } catch (DocumentException e) {
  81 + logger.error("[xml解析] 失败: ", e);
  82 + continue;
  83 + }
  84 + if (rootElement == null) {
  85 + logger.warn("[ 收到通道 ] content cannot be null, {}", evt.getRequest());
  86 + continue;
  87 + }
  88 + Element deviceListElement = rootElement.element("DeviceList");
  89 + Element sumNumElement = rootElement.element("SumNum");
  90 + Element snElement = rootElement.element("SN");
  91 + int sumNum = Integer.parseInt(sumNumElement.getText());
  92 +
  93 + if (sumNum == 0) {
  94 + logger.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId());
  95 + // 数据已经完整接收
  96 + storager.cleanChannelsForDevice(take.getDevice().getDeviceId());
  97 + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
  98 + } else {
  99 + Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
  100 + if (deviceListIterator != null) {
  101 + List<DeviceChannel> channelList = new ArrayList<>();
  102 + // 遍历DeviceList
  103 + while (deviceListIterator.hasNext()) {
  104 + Element itemDevice = deviceListIterator.next();
  105 + Element channelDeviceElement = itemDevice.element("DeviceID");
  106 + if (channelDeviceElement == null) {
  107 + continue;
  108 + }
  109 + DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice, device, null);
  110 + deviceChannel.setDeviceId(take.getDevice().getDeviceId());
  111 +
  112 + channelList.add(deviceChannel);
115 113 }
116   - DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice, device, null);
117   - deviceChannel.setDeviceId(take.getDevice().getDeviceId());
118   -
119   - channelList.add(deviceChannel);
120   - }
121   - int sn = Integer.parseInt(snElement.getText());
122   - catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList);
123   - logger.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 : catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum);
124   - if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) {
125   - // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理,
126   - // 目前支持设备通道上线通知时和设备上线时向上级通知
127   - boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId()));
128   - if (!resetChannelsResult) {
129   - String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "条";
130   - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg);
131   - } else {
132   - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
  114 + int sn = Integer.parseInt(snElement.getText());
  115 + catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList);
  116 + logger.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 : catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum);
  117 + if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) {
  118 + // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理,
  119 + // 目前支持设备通道上线通知时和设备上线时向上级通知
  120 + boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId()));
  121 + if (!resetChannelsResult) {
  122 + String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "条";
  123 + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg);
  124 + } else {
  125 + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
  126 + }
133 127 }
134 128 }
135   - }
136 129  
  130 + }
  131 + }catch (Exception e) {
  132 + logger.warn("[收到通道] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest());
137 133 }
138 134 }
139   - taskQueueHandlerRun = false;
140 135 });
141 136 }
142 137  
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java
... ... @@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP
9 9 import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
10 10 import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
11 11 import com.genersoft.iot.vmp.utils.DateUtil;
12   -import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
13 12 import gov.nist.javax.sip.message.SIPRequest;
14 13 import org.dom4j.DocumentException;
15 14 import org.dom4j.Element;
... ... @@ -21,17 +20,17 @@ import org.springframework.beans.factory.annotation.Qualifier;
21 20 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
22 21 import org.springframework.stereotype.Component;
23 22 import org.springframework.util.ObjectUtils;
24   -import org.springframework.util.StringUtils;
25 23  
26 24 import javax.sip.InvalidArgumentException;
27 25 import javax.sip.RequestEvent;
28 26 import javax.sip.SipException;
29 27 import javax.sip.message.Response;
30 28 import java.text.ParseException;
31   -import java.util.*;
32   -import java.util.concurrent.BlockingQueue;
  29 +import java.util.ArrayList;
  30 +import java.util.Collections;
  31 +import java.util.Iterator;
  32 +import java.util.List;
33 33 import java.util.concurrent.ConcurrentLinkedQueue;
34   -import java.util.concurrent.LinkedBlockingQueue;
35 34  
36 35 import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
37 36  
... ... @@ -46,7 +45,6 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
46 45  
47 46 private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
48 47  
49   - private boolean taskQueueHandlerRun = false;
50 48 @Autowired
51 49 private ResponseMessageHandler responseMessageHandler;
52 50  
... ... @@ -70,6 +68,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
70 68  
71 69 @Override
72 70 public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
  71 + boolean isEmpty = taskQueue.isEmpty();
73 72 try {
74 73 // 回复200 OK
75 74 responseAck((SIPRequest) evt.getRequest(), Response.OK);
... ... @@ -77,8 +76,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
77 76 logger.error("[命令发送失败] 国标级联 国标录像: {}", e.getMessage());
78 77 }
79 78 taskQueue.offer(new HandlerCatchData(evt, device, rootElement));
80   - if (!taskQueueHandlerRun) {
81   - taskQueueHandlerRun = true;
  79 + if (isEmpty) {
82 80 taskExecutor.execute(()->{
83 81 while (!taskQueue.isEmpty()) {
84 82 try {
... ... @@ -151,9 +149,10 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
151 149 }
152 150 } catch (DocumentException e) {
153 151 logger.error("xml解析异常: ", e);
  152 + } catch (Exception e) {
  153 + logger.warn("[国标录像] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest());
154 154 }
155 155 }
156   - taskQueueHandlerRun = false;
157 156 });
158 157 }
159 158 }
... ...
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java
... ... @@ -38,8 +38,6 @@ public class RedisAlarmMsgListener implements MessageListener {
38 38 @Autowired
39 39 private IVideoManagerStorage storage;
40 40  
41   - private boolean taskQueueHandlerRun = false;
42   -
43 41 private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
44 42  
45 43 @Qualifier("taskExecutor")
... ... @@ -49,69 +47,68 @@ public class RedisAlarmMsgListener implements MessageListener {
49 47 @Override
50 48 public void onMessage(@NotNull Message message, byte[] bytes) {
51 49 logger.info("收到来自REDIS的ALARM通知: {}", new String(message.getBody()));
52   -
  50 + boolean isEmpty = taskQueue.isEmpty();
53 51 taskQueue.offer(message);
54   - if (!taskQueueHandlerRun) {
55   - taskQueueHandlerRun = true;
  52 + if (isEmpty) {
56 53 logger.info("[线程池信息]活动线程数:{}, 最大线程数: {}", taskExecutor.getActiveCount(), taskExecutor.getMaxPoolSize());
57 54 taskExecutor.execute(() -> {
58 55 while (!taskQueue.isEmpty()) {
59 56 Message msg = taskQueue.poll();
60   -
61   - AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class);
62   - if (alarmChannelMessage == null) {
63   - logger.warn("[REDIS的ALARM通知]消息解析失败");
64   - continue;
65   - }
66   - String gbId = alarmChannelMessage.getGbId();
67   -
68   - DeviceAlarm deviceAlarm = new DeviceAlarm();
69   - deviceAlarm.setCreateTime(DateUtil.getNow());
70   - deviceAlarm.setChannelId(gbId);
71   - deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription());
72   - deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn());
73   - deviceAlarm.setAlarmPriority("1");
74   - deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601());
75   - deviceAlarm.setAlarmType("1");
76   - deviceAlarm.setLongitude(0);
77   - deviceAlarm.setLatitude(0);
78   -
79   - if (ObjectUtils.isEmpty(gbId)) {
80   - // 发送给所有的上级
81   - List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true);
82   - if (parentPlatforms.size() > 0) {
83   - for (ParentPlatform parentPlatform : parentPlatforms) {
84   - try {
85   - commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
86   - } catch (SipException | InvalidArgumentException | ParseException e) {
87   - logger.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage());
88   - }
89   - }
  57 + try {
  58 + AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class);
  59 + if (alarmChannelMessage == null) {
  60 + logger.warn("[REDIS的ALARM通知]消息解析失败");
  61 + continue;
90 62 }
91   - }else {
92   - Device device = storage.queryVideoDevice(gbId);
93   - ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId);
94   - if (device != null && platform == null) {
95   - try {
96   - commander.sendAlarmMessage(device, deviceAlarm);
97   - } catch (InvalidArgumentException | SipException | ParseException e) {
98   - logger.error("[命令发送失败] 发送报警: {}", e.getMessage());
99   - }
100   - }else if (device == null && platform != null){
101   - try {
102   - commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);
103   - } catch (InvalidArgumentException | SipException | ParseException e) {
104   - logger.error("[命令发送失败] 发送报警: {}", e.getMessage());
  63 + String gbId = alarmChannelMessage.getGbId();
  64 +
  65 + DeviceAlarm deviceAlarm = new DeviceAlarm();
  66 + deviceAlarm.setCreateTime(DateUtil.getNow());
  67 + deviceAlarm.setChannelId(gbId);
  68 + deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription());
  69 + deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn());
  70 + deviceAlarm.setAlarmPriority("1");
  71 + deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601());
  72 + deviceAlarm.setAlarmType("1");
  73 + deviceAlarm.setLongitude(0);
  74 + deviceAlarm.setLatitude(0);
  75 +
  76 + if (ObjectUtils.isEmpty(gbId)) {
  77 + // 发送给所有的上级
  78 + List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true);
  79 + if (parentPlatforms.size() > 0) {
  80 + for (ParentPlatform parentPlatform : parentPlatforms) {
  81 + try {
  82 + commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
  83 + } catch (SipException | InvalidArgumentException | ParseException e) {
  84 + logger.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage());
  85 + }
  86 + }
105 87 }
106 88 }else {
107   - logger.warn("无法确定" + gbId + "是平台还是设备");
  89 + Device device = storage.queryVideoDevice(gbId);
  90 + ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId);
  91 + if (device != null && platform == null) {
  92 + try {
  93 + commander.sendAlarmMessage(device, deviceAlarm);
  94 + } catch (InvalidArgumentException | SipException | ParseException e) {
  95 + logger.error("[命令发送失败] 发送报警: {}", e.getMessage());
  96 + }
  97 + }else if (device == null && platform != null){
  98 + try {
  99 + commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);
  100 + } catch (InvalidArgumentException | SipException | ParseException e) {
  101 + logger.error("[命令发送失败] 发送报警: {}", e.getMessage());
  102 + }
  103 + }else {
  104 + logger.warn("无法确定" + gbId + "是平台还是设备");
  105 + }
108 106 }
  107 + }catch (Exception e) {
  108 + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
109 109 }
110 110 }
111   - taskQueueHandlerRun = false;
112 111 });
113 112 }
114   -
115   -
116 113 }
117 114 }
... ...
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
... ... @@ -88,8 +88,6 @@ public class RedisGbPlayMsgListener implements MessageListener {
88 88 @Autowired
89 89 private ZlmHttpHookSubscribe subscribe;
90 90  
91   - private boolean taskQueueHandlerRun = false;
92   -
93 91 private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
94 92  
95 93 @Qualifier("taskExecutor")
... ... @@ -111,107 +109,104 @@ public class RedisGbPlayMsgListener implements MessageListener {
111 109  
112 110 @Override
113 111 public void onMessage(Message message, byte[] bytes) {
114   -
  112 + boolean isEmpty = taskQueue.isEmpty();
115 113 taskQueue.offer(message);
116   - if (!taskQueueHandlerRun) {
117   - taskQueueHandlerRun = true;
  114 + if (isEmpty) {
118 115 taskExecutor.execute(() -> {
119 116 while (!taskQueue.isEmpty()) {
120 117 Message msg = taskQueue.poll();
121   - JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class);
122   - WvpRedisMsg wvpRedisMsg = JSON.to(WvpRedisMsg.class, msgJSON);
123   - if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
124   - continue;
125   - }
126   - if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
127   - logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody()));
128   -
129   - switch (wvpRedisMsg.getCmd()){
130   - case WvpRedisMsgCmd.GET_SEND_ITEM:
131   - RequestSendItemMsg content = JSON.to(RequestSendItemMsg.class, wvpRedisMsg.getContent());
132   - requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
133   - break;
134   - case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
135   - RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());;
136   - requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
137   - break;
138   - default:
139   - break;
  118 + try {
  119 + JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class);
  120 + WvpRedisMsg wvpRedisMsg = JSON.to(WvpRedisMsg.class, msgJSON);
  121 + if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
  122 + continue;
140 123 }
141   -
142   - }else {
143   - logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody()));
144   - switch (wvpRedisMsg.getCmd()){
145   - case WvpRedisMsgCmd.GET_SEND_ITEM:
146   -
147   - WVPResult content = JSON.to(WVPResult.class, wvpRedisMsg.getContent());
148   -
149   - String key = wvpRedisMsg.getSerial();
150   - switch (content.getCode()) {
151   - case 0:
152   - ResponseSendItemMsg responseSendItemMsg =JSON.to(ResponseSendItemMsg.class, content.getData());
153   - PlayMsgCallback playMsgCallback = callbacks.get(key);
154   - if (playMsgCallback != null) {
155   - callbacksForError.remove(key);
156   - try {
157   - playMsgCallback.handler(responseSendItemMsg);
158   - } catch (ParseException e) {
159   - logger.error("[REDIS消息处理异常] ", e);
  124 + if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
  125 + logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody()));
  126 +
  127 + switch (wvpRedisMsg.getCmd()){
  128 + case WvpRedisMsgCmd.GET_SEND_ITEM:
  129 + RequestSendItemMsg content = JSON.to(RequestSendItemMsg.class, wvpRedisMsg.getContent());
  130 + requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
  131 + break;
  132 + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
  133 + RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
  134 + requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
  135 + break;
  136 + default:
  137 + break;
  138 + }
  139 +
  140 + }else {
  141 + logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody()));
  142 + switch (wvpRedisMsg.getCmd()){
  143 + case WvpRedisMsgCmd.GET_SEND_ITEM:
  144 +
  145 + WVPResult content = JSON.to(WVPResult.class, wvpRedisMsg.getContent());
  146 +
  147 + String key = wvpRedisMsg.getSerial();
  148 + switch (content.getCode()) {
  149 + case 0:
  150 + ResponseSendItemMsg responseSendItemMsg =JSON.to(ResponseSendItemMsg.class, content.getData());
  151 + PlayMsgCallback playMsgCallback = callbacks.get(key);
  152 + if (playMsgCallback != null) {
  153 + callbacksForError.remove(key);
  154 + try {
  155 + playMsgCallback.handler(responseSendItemMsg);
  156 + } catch (ParseException e) {
  157 + logger.error("[REDIS消息处理异常] ", e);
  158 + }
  159 + }
  160 + break;
  161 + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
  162 + case ERROR_CODE_OFFLINE:
  163 + case ERROR_CODE_TIMEOUT:
  164 + PlayMsgErrorCallback errorCallback = callbacksForError.get(key);
  165 + if (errorCallback != null) {
  166 + callbacks.remove(key);
  167 + errorCallback.handler(content);
  168 + }
  169 + break;
  170 + default:
  171 + break;
  172 + }
  173 + break;
  174 + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
  175 + WVPResult wvpResult = JSON.to(WVPResult.class, wvpRedisMsg.getContent());
  176 + String serial = wvpRedisMsg.getSerial();
  177 + switch (wvpResult.getCode()) {
  178 + case 0:
  179 + JSONObject jsonObject = (JSONObject)wvpResult.getData();
  180 + PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
  181 + if (playMsgCallback != null) {
  182 + callbacksForError.remove(serial);
  183 + playMsgCallback.handler(jsonObject);
  184 + }
  185 + break;
  186 + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
  187 + case ERROR_CODE_OFFLINE:
  188 + case ERROR_CODE_TIMEOUT:
  189 + PlayMsgErrorCallback errorCallback = callbacksForError.get(serial);
  190 + if (errorCallback != null) {
  191 + callbacks.remove(serial);
  192 + errorCallback.handler(wvpResult);
160 193 }
161   - }
162   - break;
163   - case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
164   - case ERROR_CODE_OFFLINE:
165   - case ERROR_CODE_TIMEOUT:
166   - PlayMsgErrorCallback errorCallback = callbacksForError.get(key);
167   - if (errorCallback != null) {
168   - callbacks.remove(key);
169   - errorCallback.handler(content);
170   - }
171   - break;
172   - default:
173   - break;
174   - }
175   - break;
176   - case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
177   - WVPResult wvpResult = JSON.to(WVPResult.class, wvpRedisMsg.getContent());
178   - String serial = wvpRedisMsg.getSerial();
179   - switch (wvpResult.getCode()) {
180   - case 0:
181   - JSONObject jsonObject = (JSONObject)wvpResult.getData();
182   - PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
183   - if (playMsgCallback != null) {
184   - callbacksForError.remove(serial);
185   - playMsgCallback.handler(jsonObject);
186   - }
187   - break;
188   - case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
189   - case ERROR_CODE_OFFLINE:
190   - case ERROR_CODE_TIMEOUT:
191   - PlayMsgErrorCallback errorCallback = callbacksForError.get(serial);
192   - if (errorCallback != null) {
193   - callbacks.remove(serial);
194   - errorCallback.handler(wvpResult);
195   - }
196   - break;
197   - default:
198   - break;
199   - }
200   - break;
201   - default:
202   - break;
  194 + break;
  195 + default:
  196 + break;
  197 + }
  198 + break;
  199 + default:
  200 + break;
  201 + }
  202 +
203 203 }
  204 + }catch (Exception e) {
  205 + logger.warn("[RedisGbPlayMsg] 发现未处理的异常, {}",e.getMessage());
204 206 }
205 207 }
206   - taskQueueHandlerRun = false;
207 208 });
208 209 }
209   -
210   -
211   -
212   -
213   -
214   -
215 210 }
216 211  
217 212 /**
... ...
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java
... ... @@ -27,8 +27,6 @@ public class RedisGpsMsgListener implements MessageListener {
27 27  
28 28 private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class);
29 29  
30   - private boolean taskQueueHandlerRun = false;
31   -
32 30 @Autowired
33 31 private IRedisCatchStorage redisCatchStorage;
34 32  
... ... @@ -44,17 +42,20 @@ public class RedisGpsMsgListener implements MessageListener {
44 42  
45 43 @Override
46 44 public void onMessage(@NotNull Message message, byte[] bytes) {
  45 + boolean isEmpty = taskQueue.isEmpty();
47 46 taskQueue.offer(message);
48   - if (!taskQueueHandlerRun) {
49   - taskQueueHandlerRun = true;
  47 + if (isEmpty) {
50 48 taskExecutor.execute(() -> {
51 49 while (!taskQueue.isEmpty()) {
52 50 Message msg = taskQueue.poll();
53   - GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class);
54   - // 只是放入redis缓存起来
55   - redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
  51 + try {
  52 + GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class);
  53 + // 只是放入redis缓存起来
  54 + redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
  55 + }catch (Exception e) {
  56 + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
  57 + }
56 58 }
57   - taskQueueHandlerRun = false;
58 59 });
59 60 }
60 61 }
... ...
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
1 1 package com.genersoft.iot.vmp.service.redisMsg;
2 2  
3 3 import com.alibaba.fastjson2.JSON;
4   -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
5 4 import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
6 5 import org.slf4j.Logger;
7 6 import org.slf4j.LoggerFactory;
... ... @@ -26,8 +25,6 @@ public class RedisPushStreamResponseListener implements MessageListener {
26 25  
27 26 private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
28 27  
29   - private boolean taskQueueHandlerRun = false;
30   -
31 28 private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
32 29  
33 30 @Qualifier("taskExecutor")
... ... @@ -43,24 +40,27 @@ public class RedisPushStreamResponseListener implements MessageListener {
43 40  
44 41 @Override
45 42 public void onMessage(Message message, byte[] bytes) {
46   - logger.warn("[REDIS消息-请求推流结果]: {}", new String(message.getBody()));
  43 + logger.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody()));
  44 + boolean isEmpty = taskQueue.isEmpty();
47 45 taskQueue.offer(message);
48   - if (!taskQueueHandlerRun) {
49   - taskQueueHandlerRun = true;
  46 + if (isEmpty) {
50 47 taskExecutor.execute(() -> {
51 48 while (!taskQueue.isEmpty()) {
52 49 Message msg = taskQueue.poll();
53   - MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
54   - if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
55   - logger.info("[REDIS消息-请求推流结果]:参数不全");
56   - continue;
57   - }
58   - // 查看正在等待的invite消息
59   - if (responseEvents.get(response.getApp() + response.getStream()) != null) {
60   - responseEvents.get(response.getApp() + response.getStream()).run(response);
  50 + try {
  51 + MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
  52 + if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
  53 + logger.info("[REDIS消息-请求推流结果]:参数不全");
  54 + continue;
  55 + }
  56 + // 查看正在等待的invite消息
  57 + if (responseEvents.get(response.getApp() + response.getStream()) != null) {
  58 + responseEvents.get(response.getApp() + response.getStream()).run(response);
  59 + }
  60 + }catch (Exception e) {
  61 + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
61 62 }
62 63 }
63   - taskQueueHandlerRun = false;
64 64 });
65 65 }
66 66 }
... ...
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
... ... @@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
6 6 import com.genersoft.iot.vmp.service.IGbStreamService;
7 7 import com.genersoft.iot.vmp.service.IMediaServerService;
8 8 import com.genersoft.iot.vmp.service.IStreamPushService;
9   -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
10 9 import com.genersoft.iot.vmp.utils.DateUtil;
11 10 import org.slf4j.Logger;
12 11 import org.slf4j.LoggerFactory;
... ... @@ -18,7 +17,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
18 17 import org.springframework.stereotype.Component;
19 18  
20 19 import javax.annotation.Resource;
21   -import java.util.*;
  20 +import java.util.ArrayList;
  21 +import java.util.List;
22 22 import java.util.concurrent.ConcurrentLinkedQueue;
23 23  
24 24 /**
... ... @@ -38,7 +38,6 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
38 38 @Resource
39 39 private IGbStreamService gbStreamService;
40 40  
41   - private boolean taskQueueHandlerRun = false;
42 41  
43 42 private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
44 43  
... ... @@ -49,54 +48,56 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
49 48 @Override
50 49 public void onMessage(Message message, byte[] bytes) {
51 50 logger.info("[REDIS消息-推流设备列表更新]: {}", new String(message.getBody()));
52   -
  51 + boolean isEmpty = taskQueue.isEmpty();
53 52 taskQueue.offer(message);
54   - if (!taskQueueHandlerRun) {
55   - taskQueueHandlerRun = true;
  53 + if (isEmpty) {
56 54 taskExecutor.execute(() -> {
57 55 while (!taskQueue.isEmpty()) {
58 56 Message msg = taskQueue.poll();
59   - List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
60   - //查询全部的app+stream 用于判断是添加还是修改
61   - List<String> allAppAndStream = streamPushService.getAllAppAndStream();
  57 + try {
  58 + List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
  59 + //查询全部的app+stream 用于判断是添加还是修改
  60 + List<String> allAppAndStream = streamPushService.getAllAppAndStream();
62 61  
63   - /**
64   - * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表
65   - */
66   - List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
67   - List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
68   - for (StreamPushItem streamPushItem : streamPushItems) {
69   - String app = streamPushItem.getApp();
70   - String stream = streamPushItem.getStream();
71   - boolean contains = allAppAndStream.contains(app + stream);
72   - //不存在就添加
73   - if (!contains) {
74   - streamPushItem.setStreamType("push");
75   - streamPushItem.setCreateTime(DateUtil.getNow());
76   - streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
77   - streamPushItem.setOriginType(2);
78   - streamPushItem.setOriginTypeStr("rtsp_push");
79   - streamPushItem.setTotalReaderCount("0");
80   - streamPushItemForSave.add(streamPushItem);
81   - } else {
82   - //存在就只修改 name和gbId
83   - streamPushItemForUpdate.add(streamPushItem);
  62 + /**
  63 + * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表
  64 + */
  65 + List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
  66 + List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
  67 + for (StreamPushItem streamPushItem : streamPushItems) {
  68 + String app = streamPushItem.getApp();
  69 + String stream = streamPushItem.getStream();
  70 + boolean contains = allAppAndStream.contains(app + stream);
  71 + //不存在就添加
  72 + if (!contains) {
  73 + streamPushItem.setStreamType("push");
  74 + streamPushItem.setCreateTime(DateUtil.getNow());
  75 + streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
  76 + streamPushItem.setOriginType(2);
  77 + streamPushItem.setOriginTypeStr("rtsp_push");
  78 + streamPushItem.setTotalReaderCount("0");
  79 + streamPushItemForSave.add(streamPushItem);
  80 + } else {
  81 + //存在就只修改 name和gbId
  82 + streamPushItemForUpdate.add(streamPushItem);
  83 + }
84 84 }
85   - }
86   - if (streamPushItemForSave.size() > 0) {
  85 + if (streamPushItemForSave.size() > 0) {
87 86  
88   - logger.info("添加{}条",streamPushItemForSave.size());
89   - logger.info(JSONObject.toJSONString(streamPushItemForSave));
90   - streamPushService.batchAdd(streamPushItemForSave);
  87 + logger.info("添加{}条",streamPushItemForSave.size());
  88 + logger.info(JSONObject.toJSONString(streamPushItemForSave));
  89 + streamPushService.batchAdd(streamPushItemForSave);
91 90  
92   - }
93   - if(streamPushItemForUpdate.size()>0){
94   - logger.info("修改{}条",streamPushItemForUpdate.size());
95   - logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
96   - gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
  91 + }
  92 + if(streamPushItemForUpdate.size()>0){
  93 + logger.info("修改{}条",streamPushItemForUpdate.size());
  94 + logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
  95 + gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
  96 + }
  97 + }catch (Exception e) {
  98 + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
97 99 }
98 100 }
99   - taskQueueHandlerRun = false;
100 101 });
101 102 }
102 103 }
... ...
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java
... ... @@ -29,8 +29,6 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
29 29  
30 30 private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusMsgListener.class);
31 31  
32   - private boolean taskQueueHandlerRun = false;
33   -
34 32 @Autowired
35 33 private IRedisCatchStorage redisCatchStorage;
36 34  
... ... @@ -50,37 +48,40 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
50 48  
51 49 @Override
52 50 public void onMessage(Message message, byte[] bytes) {
  51 + boolean isEmpty = taskQueue.isEmpty();
53 52 logger.warn("[REDIS消息-推流设备状态变化]: {}", new String(message.getBody()));
54 53 taskQueue.offer(message);
55 54  
56   - if (!taskQueueHandlerRun) {
57   - taskQueueHandlerRun = true;
  55 + if (isEmpty) {
58 56 taskExecutor.execute(() -> {
59 57 while (!taskQueue.isEmpty()) {
60 58 Message msg = taskQueue.poll();
61   - PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class);
62   - if (statusChangeFromPushStream == null) {
63   - logger.warn("[REDIS消息]推流设备状态变化消息解析失败");
64   - continue;
65   - }
66   - // 取消定时任务
67   - dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED);
68   - if (statusChangeFromPushStream.isSetAllOffline()) {
69   - // 所有设备离线
70   - streamPushService.allStreamOffline();
71   - }
72   - if (statusChangeFromPushStream.getOfflineStreams() != null
73   - && statusChangeFromPushStream.getOfflineStreams().size() > 0) {
74   - // 更新部分设备离线
75   - streamPushService.offline(statusChangeFromPushStream.getOfflineStreams());
76   - }
77   - if (statusChangeFromPushStream.getOnlineStreams() != null &&
78   - statusChangeFromPushStream.getOnlineStreams().size() > 0) {
79   - // 更新部分设备上线
80   - streamPushService.online(statusChangeFromPushStream.getOnlineStreams());
  59 + try {
  60 + PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class);
  61 + if (statusChangeFromPushStream == null) {
  62 + logger.warn("[REDIS消息]推流设备状态变化消息解析失败");
  63 + continue;
  64 + }
  65 + // 取消定时任务
  66 + dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED);
  67 + if (statusChangeFromPushStream.isSetAllOffline()) {
  68 + // 所有设备离线
  69 + streamPushService.allStreamOffline();
  70 + }
  71 + if (statusChangeFromPushStream.getOfflineStreams() != null
  72 + && statusChangeFromPushStream.getOfflineStreams().size() > 0) {
  73 + // 更新部分设备离线
  74 + streamPushService.offline(statusChangeFromPushStream.getOfflineStreams());
  75 + }
  76 + if (statusChangeFromPushStream.getOnlineStreams() != null &&
  77 + statusChangeFromPushStream.getOnlineStreams().size() > 0) {
  78 + // 更新部分设备上线
  79 + streamPushService.online(statusChangeFromPushStream.getOnlineStreams());
  80 + }
  81 + }catch (Exception e) {
  82 + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
81 83 }
82 84 }
83   - taskQueueHandlerRun = false;
84 85 });
85 86 }
86 87 }
... ...
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
... ... @@ -33,8 +33,6 @@ public class RedisStreamMsgListener implements MessageListener {
33 33 @Autowired
34 34 private ZLMMediaListManager zlmMediaListManager;
35 35  
36   - private boolean taskQueueHandlerRun = false;
37   -
38 36 private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
39 37  
40 38 @Qualifier("taskExecutor")
... ... @@ -43,48 +41,50 @@ public class RedisStreamMsgListener implements MessageListener {
43 41  
44 42 @Override
45 43 public void onMessage(Message message, byte[] bytes) {
46   -
  44 + boolean isEmpty = taskQueue.isEmpty();
47 45 taskQueue.offer(message);
48   - if (!taskQueueHandlerRun) {
49   - taskQueueHandlerRun = true;
  46 + if (isEmpty) {
50 47 taskExecutor.execute(() -> {
51 48 while (!taskQueue.isEmpty()) {
52 49 Message msg = taskQueue.poll();
53   - JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
54   - if (steamMsgJson == null) {
55   - logger.warn("[收到redis 流变化]消息解析失败");
56   - continue;
57   - }
58   - String serverId = steamMsgJson.getString("serverId");
  50 + try {
  51 + JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
  52 + if (steamMsgJson == null) {
  53 + logger.warn("[收到redis 流变化]消息解析失败");
  54 + continue;
  55 + }
  56 + String serverId = steamMsgJson.getString("serverId");
59 57  
60   - if (userSetting.getServerId().equals(serverId)) {
61   - // 自己发送的消息忽略即可
62   - continue;
63   - }
64   - logger.info("[收到redis 流变化]: {}", new String(message.getBody()));
65   - String app = steamMsgJson.getString("app");
66   - String stream = steamMsgJson.getString("stream");
67   - boolean register = steamMsgJson.getBoolean("register");
68   - String mediaServerId = steamMsgJson.getString("mediaServerId");
69   - OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
70   - onStreamChangedHookParam.setSeverId(serverId);
71   - onStreamChangedHookParam.setApp(app);
72   - onStreamChangedHookParam.setStream(stream);
73   - onStreamChangedHookParam.setRegist(register);
74   - onStreamChangedHookParam.setMediaServerId(mediaServerId);
75   - onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
76   - onStreamChangedHookParam.setAliveSecond(0L);
77   - onStreamChangedHookParam.setTotalReaderCount("0");
78   - onStreamChangedHookParam.setOriginType(0);
79   - onStreamChangedHookParam.setOriginTypeStr("0");
80   - onStreamChangedHookParam.setOriginTypeStr("unknown");
81   - if (register) {
82   - zlmMediaListManager.addPush(onStreamChangedHookParam);
83   - }else {
84   - zlmMediaListManager.removeMedia(app, stream);
  58 + if (userSetting.getServerId().equals(serverId)) {
  59 + // 自己发送的消息忽略即可
  60 + continue;
  61 + }
  62 + logger.info("[收到redis 流变化]: {}", new String(message.getBody()));
  63 + String app = steamMsgJson.getString("app");
  64 + String stream = steamMsgJson.getString("stream");
  65 + boolean register = steamMsgJson.getBoolean("register");
  66 + String mediaServerId = steamMsgJson.getString("mediaServerId");
  67 + OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
  68 + onStreamChangedHookParam.setSeverId(serverId);
  69 + onStreamChangedHookParam.setApp(app);
  70 + onStreamChangedHookParam.setStream(stream);
  71 + onStreamChangedHookParam.setRegist(register);
  72 + onStreamChangedHookParam.setMediaServerId(mediaServerId);
  73 + onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
  74 + onStreamChangedHookParam.setAliveSecond(0L);
  75 + onStreamChangedHookParam.setTotalReaderCount("0");
  76 + onStreamChangedHookParam.setOriginType(0);
  77 + onStreamChangedHookParam.setOriginTypeStr("0");
  78 + onStreamChangedHookParam.setOriginTypeStr("unknown");
  79 + if (register) {
  80 + zlmMediaListManager.addPush(onStreamChangedHookParam);
  81 + }else {
  82 + zlmMediaListManager.removeMedia(app, stream);
  83 + }
  84 + }catch (Exception e) {
  85 + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage());
85 86 }
86 87 }
87   - taskQueueHandlerRun = false;
88 88 });
89 89 }
90 90 }
... ...
src/main/resources/all-application.yml
... ... @@ -150,8 +150,6 @@ media:
150 150 enable: true
151 151 # [可选] 在此范围内选择端口用于媒体流传输, 必须提前在zlm上配置该属性,不然自动配置此属性可能不成功
152 152 port-range: 30000,30500 # 端口范围
153   - # [可选] 国标级联在此范围内选择端口发送媒体流
154   - send-port-range: 30000,30500 # 端口范围
155 153 # 录像辅助服务, 部署此服务可以实现zlm录像的管理与下载, 0 表示不使用
156 154 record-assist-port: 0
157 155  
... ...