Commit 47abdde3392f2c5fd88d382ae63c4756b97ed4b0

Authored by 648540858
1 parent d88c95f4

解决设备上线停止线程导致的报错,优化录像的获取以及通道的更新

src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
... ... @@ -103,12 +103,12 @@ public class DynamicTask {
103 103  
104 104 public void stop(String key) {
105 105 if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) {
106   - futureMap.get(key).cancel(true);
107   - Runnable runnable = runnableMap.get(key);
108   - if (runnable instanceof ISubscribeTask) {
109   - ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
110   - subscribeTask.stop();
111   - }
  106 +// Runnable runnable = runnableMap.get(key);
  107 +// if (runnable instanceof ISubscribeTask) {
  108 +// ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
  109 +// subscribeTask.stop();
  110 +// }
  111 + futureMap.get(key).cancel(false);
112 112 }
113 113 }
114 114  
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/bean/HandlerCatchData.java 0 → 100644
  1 +package com.genersoft.iot.vmp.gb28181.bean;
  2 +
  3 +import org.dom4j.Element;
  4 +
  5 +import javax.sip.RequestEvent;
  6 +
  7 +/**
  8 + * @author lin
  9 + */
  10 +public class HandlerCatchData {
  11 + private RequestEvent evt;
  12 + private Device device;
  13 + private Element rootElement;
  14 +
  15 + public HandlerCatchData(RequestEvent evt, Device device, Element rootElement) {
  16 + this.evt = evt;
  17 + this.device = device;
  18 + this.rootElement = rootElement;
  19 + }
  20 +
  21 + public RequestEvent getEvt() {
  22 + return evt;
  23 + }
  24 +
  25 + public void setEvt(RequestEvent evt) {
  26 + this.evt = evt;
  27 + }
  28 +
  29 + public Device getDevice() {
  30 + return device;
  31 + }
  32 +
  33 + public void setDevice(Device device) {
  34 + this.device = device;
  35 + }
  36 +
  37 + public Element getRootElement() {
  38 + return rootElement;
  39 + }
  40 +
  41 + public void setRootElement(Element rootElement) {
  42 + this.rootElement = rootElement;
  43 + }
  44 +}
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
... ... @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.bean;
2 2  
3 3 import com.genersoft.iot.vmp.common.VideoManagerConstants;
4 4 import com.genersoft.iot.vmp.conf.DynamicTask;
  5 +import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
5 6 import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask;
6 7 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
7 8 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
... ... @@ -38,7 +39,6 @@ public class SubscribeHolder {
38 39 catalogMap.put(platformId, subscribeInfo);
39 40 // 添加订阅到期
40 41 String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId;
41   - dynamicTask.stop(taskOverdueKey);
42 42 // 添加任务处理订阅过期
43 43 dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()),
44 44 subscribeInfo.getExpires() * 1000);
... ... @@ -49,10 +49,17 @@ public class SubscribeHolder {
49 49 }
50 50  
51 51 public void removeCatalogSubscribe(String platformId) {
  52 +
52 53 catalogMap.remove(platformId);
53 54 String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId;
  55 + Runnable runnable = dynamicTask.get(taskOverdueKey);
  56 + if (runnable instanceof ISubscribeTask) {
  57 + ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
  58 + subscribeTask.stop();
  59 + }
54 60 // 添加任务处理订阅过期
55 61 dynamicTask.stop(taskOverdueKey);
  62 +
56 63 }
57 64  
58 65 public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo) {
... ... @@ -63,7 +70,6 @@ public class SubscribeHolder {
63 70 storager, platformId, subscribeInfo.getSn(), key, this, dynamicTask),
64 71 subscribeInfo.getGpsInterval() * 1000);
65 72 String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId;
66   - dynamicTask.stop(taskOverdueKey);
67 73 // 添加任务处理订阅过期
68 74 dynamicTask.startDelay(taskOverdueKey, () -> {
69 75 removeMobilePositionSubscribe(subscribeInfo.getId());
... ... @@ -81,6 +87,11 @@ public class SubscribeHolder {
81 87 // 结束任务处理GPS定时推送
82 88 dynamicTask.stop(key);
83 89 String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId;
  90 + Runnable runnable = dynamicTask.get(taskOverdueKey);
  91 + if (runnable instanceof ISubscribeTask) {
  92 + ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
  93 + subscribeTask.stop();
  94 + }
84 95 // 添加任务处理订阅过期
85 96 dynamicTask.stop(taskOverdueKey);
86 97 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
... ... @@ -66,7 +66,7 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
66 66 subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
67 67  
68 68 if (subscribe == null) {
69   - logger.info("发送订阅消息时发现订阅信息已经不存在");
  69 + logger.info("发送订阅消息时发现订阅信息已经不存在: {}", event.getPlatformId());
70 70 return;
71 71 }
72 72 }else {
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
... ... @@ -150,30 +150,24 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
150 150 public void processTimeout(TimeoutEvent timeoutEvent) {
151 151 logger.info("[消息发送超时]");
152 152 ClientTransaction clientTransaction = timeoutEvent.getClientTransaction();
153   - eventPublisher.requestTimeOut(timeoutEvent);
  153 +
154 154 if (clientTransaction != null) {
  155 + logger.info("[发送错误订阅] clientTransaction != null");
155 156 Request request = clientTransaction.getRequest();
156 157 if (request != null) {
  158 + logger.info("[发送错误订阅] request != null");
157 159 CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
158 160 if (callIdHeader != null) {
  161 + logger.info("[发送错误订阅]");
159 162 SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
160 163 SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(timeoutEvent);
161 164 subscribe.response(eventResult);
  165 + sipSubscribe.removeOkSubscribe(callIdHeader.getCallId());
162 166 sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId());
163 167 }
164 168 }
165 169 }
166   -
167   -// Timeout timeout = timeoutEvent.getTimeout();
168   -// ServerTransaction serverTransaction = timeoutEvent.getServerTransaction();
169   -// if (serverTransaction != null) {
170   -// Request request = serverTransaction.getRequest();
171   -// URI requestURI = request.getRequestURI();
172   -// Header header = request.getHeader(FromHeader.NAME);
173   -// }
174   -// if(timeoutProcessor != null) {
175   -// timeoutProcessor.process(timeoutEvent);
176   -// }
  170 + eventPublisher.requestTimeOut(timeoutEvent);
177 171 }
178 172  
179 173 @Override
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
... ... @@ -1487,7 +1487,6 @@ public class SIPCommander implements ISIPCommander {
1487 1487  
1488 1488 Request request;
1489 1489 if (dialog != null) {
1490   - logger.info("发送移动位置订阅消息时 dialog的状态为: {}", dialog.getState());
1491 1490 request = dialog.createRequest(Request.SUBSCRIBE);
1492 1491 ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
1493 1492 request.setContent(subscribePostitionXml.toString(), contentTypeHeader);
... ... @@ -1583,12 +1582,12 @@ public class SIPCommander implements ISIPCommander {
1583 1582  
1584 1583 Request request;
1585 1584 if (dialog != null) {
1586   - logger.info("发送目录订阅消息时 dialog的状态为: {}", dialog.getState());
1587 1585 request = dialog.createRequest(Request.SUBSCRIBE);
  1586 + ExpiresHeader expiresHeader = sipFactory.createHeaderFactory().createExpiresHeader(device.getSubscribeCycleForCatalog());
  1587 + request.setExpires(expiresHeader);
  1588 +
1588 1589 ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
1589 1590 request.setContent(cmdXml.toString(), contentTypeHeader);
1590   - ExpiresHeader expireHeader = sipFactory.createHeaderFactory().createExpiresHeader(device.getSubscribeCycleForMobilePosition());
1591   - request.addHeader(expireHeader);
1592 1591 }else {
1593 1592 String tm = Long.toString(System.currentTimeMillis());
1594 1593  
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
1 1 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
2 2  
3 3 import com.alibaba.fastjson.JSONObject;
4   -import com.genersoft.iot.vmp.common.VideoManagerConstants;
5 4 import com.genersoft.iot.vmp.conf.SipConfig;
6 5 import com.genersoft.iot.vmp.conf.UserSetting;
7 6 import com.genersoft.iot.vmp.gb28181.bean.*;
... ... @@ -26,6 +25,8 @@ import org.slf4j.Logger;
26 25 import org.slf4j.LoggerFactory;
27 26 import org.springframework.beans.factory.InitializingBean;
28 27 import org.springframework.beans.factory.annotation.Autowired;
  28 +import org.springframework.beans.factory.annotation.Qualifier;
  29 +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
29 30 import org.springframework.stereotype.Component;
30 31 import org.springframework.util.StringUtils;
31 32  
... ... @@ -36,6 +37,7 @@ import javax.sip.header.FromHeader;
36 37 import javax.sip.message.Response;
37 38 import java.text.ParseException;
38 39 import java.util.Iterator;
  40 +import java.util.concurrent.ConcurrentLinkedQueue;
39 41  
40 42 /**
41 43 * SIP命令类型: NOTIFY请求
... ... @@ -64,11 +66,19 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
64 66 @Autowired
65 67 private EventPublisher publisher;
66 68  
67   - private String method = "NOTIFY";
  69 + private final String method = "NOTIFY";
68 70  
69 71 @Autowired
70 72 private SIPProcessorObserver sipProcessorObserver;
71 73  
  74 + private boolean taskQueueHandlerRun = false;
  75 +
  76 + private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
  77 +
  78 + @Qualifier("taskExecutor")
  79 + @Autowired
  80 + private ThreadPoolTaskExecutor taskExecutor;
  81 +
72 82 @Override
73 83 public void afterPropertiesSet() throws Exception {
74 84 // 添加消息处理的订阅
... ... @@ -78,23 +88,40 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
78 88 @Override
79 89 public void process(RequestEvent evt) {
80 90 try {
81   - Element rootElement = getRootElement(evt);
82   - String cmd = XmlUtil.getText(rootElement, "CmdType");
83   -
84   - if (CmdType.CATALOG.equals(cmd)) {
85   - logger.info("接收到Catalog通知");
86   - processNotifyCatalogList(evt);
87   - } else if (CmdType.ALARM.equals(cmd)) {
88   - logger.info("接收到Alarm通知");
89   - processNotifyAlarm(evt);
90   - } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
91   - logger.info("接收到MobilePosition通知");
92   - processNotifyMobilePosition(evt);
93   - } else {
94   - logger.info("接收到消息:" + cmd);
95   - responseAck(evt, Response.OK);
  91 +
  92 + taskQueue.offer(new HandlerCatchData(evt, null, null));
  93 + responseAck(evt, Response.OK);
  94 + if (!taskQueueHandlerRun) {
  95 + taskQueueHandlerRun = true;
  96 + taskExecutor.execute(()-> {
  97 + while (!taskQueue.isEmpty()) {
  98 + try {
  99 + HandlerCatchData take = taskQueue.poll();
  100 + Element rootElement = getRootElement(take.getEvt());
  101 + String cmd = XmlUtil.getText(rootElement, "CmdType");
  102 +
  103 + if (CmdType.CATALOG.equals(cmd)) {
  104 + logger.info("接收到Catalog通知");
  105 + processNotifyCatalogList(take.getEvt());
  106 + } else if (CmdType.ALARM.equals(cmd)) {
  107 + logger.info("接收到Alarm通知");
  108 + processNotifyAlarm(take.getEvt());
  109 + } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
  110 + logger.info("接收到MobilePosition通知");
  111 + processNotifyMobilePosition(take.getEvt());
  112 + } else {
  113 + logger.info("接收到消息:" + cmd);
  114 + }
  115 + } catch (DocumentException e) {
  116 + throw new RuntimeException(e);
  117 + }
  118 + }
  119 + taskQueueHandlerRun = false;
  120 + });
96 121 }
97   - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
  122 +
  123 +
  124 + } catch (SipException | InvalidArgumentException | ParseException e) {
98 125 e.printStackTrace();
99 126 }
100 127 }
... ... @@ -167,8 +194,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
167 194 jsonObject.put("direction", mobilePosition.getDirection());
168 195 jsonObject.put("speed", mobilePosition.getSpeed());
169 196 redisCatchStorage.sendMobilePositionMsg(jsonObject);
170   - responseAck(evt, Response.OK);
171   - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
  197 + } catch (DocumentException e) {
172 198 e.printStackTrace();
173 199 }
174 200 }
... ... @@ -189,7 +215,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
189 215  
190 216 Device device = redisCatchStorage.getDevice(deviceId);
191 217 if (device == null) {
192   - responseAck(evt, Response.NOT_FOUND, "device is not found");
  218 + logger.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId);
193 219 return;
194 220 }
195 221 rootElement = getRootElement(evt, device.getCharset());
... ... @@ -199,7 +225,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
199 225 deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod"));
200 226 String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");
201 227 if (alarmTime == null) {
202   - responseAck(evt, Response.BAD_REQUEST, "AlarmTime cannot be null");
  228 + logger.warn("[ NotifyAlarm ] AlarmTime cannot be null");
203 229 return;
204 230 }
205 231 deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
... ... @@ -219,7 +245,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
219 245 deviceAlarm.setLatitude(0.00);
220 246 }
221 247 logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId());
222   - if (deviceAlarm.getAlarmMethod().equals("4")) {
  248 + if ("4".equals(deviceAlarm.getAlarmMethod())) {
223 249 MobilePosition mobilePosition = new MobilePosition();
224 250 mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
225 251 mobilePosition.setTime(deviceAlarm.getAlarmTime());
... ... @@ -240,11 +266,10 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
240 266 // TODO: 需要实现存储报警信息、报警分类
241 267  
242 268 // 回复200 OK
243   - responseAck(evt, Response.OK);
244 269 if (redisCatchStorage.deviceIsOnline(deviceId)) {
245 270 publisher.deviceAlarmEventPublish(deviceAlarm);
246 271 }
247   - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
  272 + } catch (DocumentException e) {
248 273 e.printStackTrace();
249 274 }
250 275 }
... ... @@ -280,64 +305,60 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
280 305 continue;
281 306 }
282 307 Element eventElement = itemDevice.element("Event");
  308 + String event;
  309 + if (eventElement == null) {
  310 + logger.warn("[收到 目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" ));
  311 + event = CatalogEvent.ADD;
  312 + }else {
  313 + event = eventElement.getText().toUpperCase();
  314 + }
283 315 DeviceChannel channel = XmlUtil.channelContentHander(itemDevice);
284 316 channel.setDeviceId(device.getDeviceId());
285 317 logger.info("[收到 目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
286   - switch (eventElement.getText().toUpperCase()) {
  318 + switch (event) {
287 319 case CatalogEvent.ON:
288 320 // 上线
289 321 logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId());
290 322 storager.deviceChannelOnline(deviceId, channel.getChannelId());
291   - // 回复200 OK
292   - responseAck(evt, Response.OK);
293 323 break;
294 324 case CatalogEvent.OFF :
295 325 // 离线
296 326 logger.info("收到来自设备【{}】的通道【{}】离线通知", device.getDeviceId(), channel.getChannelId());
297 327 storager.deviceChannelOffline(deviceId, channel.getChannelId());
298   - // 回复200 OK
299   - responseAck(evt, Response.OK);
300 328 break;
301 329 case CatalogEvent.VLOST:
302 330 // 视频丢失
303 331 logger.info("收到来自设备【{}】的通道【{}】视频丢失通知", device.getDeviceId(), channel.getChannelId());
304 332 storager.deviceChannelOffline(deviceId, channel.getChannelId());
305   - // 回复200 OK
306   - responseAck(evt, Response.OK);
307 333 break;
308 334 case CatalogEvent.DEFECT:
309 335 // 故障
310   - // 回复200 OK
311   - responseAck(evt, Response.OK);
312 336 break;
313 337 case CatalogEvent.ADD:
314 338 // 增加
315 339 logger.info("收到来自设备【{}】的增加通道【{}】通知", device.getDeviceId(), channel.getChannelId());
316 340 storager.updateChannel(deviceId, channel);
317   - responseAck(evt, Response.OK);
318 341 break;
319 342 case CatalogEvent.DEL:
320 343 // 删除
321 344 logger.info("收到来自设备【{}】的删除通道【{}】通知", device.getDeviceId(), channel.getChannelId());
322 345 storager.delChannel(deviceId, channel.getChannelId());
323   - responseAck(evt, Response.OK);
324 346 break;
325 347 case CatalogEvent.UPDATE:
326 348 // 更新
327 349 logger.info("收到来自设备【{}】的更新通道【{}】通知", device.getDeviceId(), channel.getChannelId());
328 350 storager.updateChannel(deviceId, channel);
329   - responseAck(evt, Response.OK);
330 351 break;
331 352 default:
332   - responseAck(evt, Response.BAD_REQUEST, "event not found");
  353 + logger.warn("[ NotifyCatalog ] event not found : {}", event );
333 354  
334 355 }
335 356 // 转发变化信息
336   - eventPublisher.catalogEventPublish(null, channel, eventElement.getText().toUpperCase());
  357 + eventPublisher.catalogEventPublish(null, channel, event);
337 358  
338 359 }
339 360 }
340   - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
  361 + } catch (DocumentException e) {
341 362 e.printStackTrace();
342 363 }
343 364 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
... ... @@ -20,6 +20,8 @@ import org.slf4j.Logger;
20 20 import org.slf4j.LoggerFactory;
21 21 import org.springframework.beans.factory.InitializingBean;
22 22 import org.springframework.beans.factory.annotation.Autowired;
  23 +import org.springframework.beans.factory.annotation.Qualifier;
  24 +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
23 25 import org.springframework.stereotype.Component;
24 26 import org.springframework.util.StringUtils;
25 27  
... ... @@ -31,6 +33,7 @@ import java.text.ParseException;
31 33 import java.util.ArrayList;
32 34 import java.util.Iterator;
33 35 import java.util.List;
  36 +import java.util.concurrent.ConcurrentLinkedQueue;
34 37  
35 38 @Component
36 39 public class CatalogResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
... ... @@ -38,9 +41,13 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
38 41 private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class);
39 42 private final String cmdType = "Catalog";
40 43  
  44 + private boolean taskQueueHandlerRun = false;
  45 +
41 46 @Autowired
42 47 private ResponseMessageHandler responseMessageHandler;
43 48  
  49 + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
  50 +
44 51 @Autowired
45 52 private IVideoManagerStorage storager;
46 53  
... ... @@ -63,6 +70,10 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
63 70 @Autowired
64 71 private IRedisCatchStorage redisCatchStorage;
65 72  
  73 + @Qualifier("taskExecutor")
  74 + @Autowired
  75 + private ThreadPoolTaskExecutor taskExecutor;
  76 +
66 77 @Override
67 78 public void afterPropertiesSet() throws Exception {
68 79 responseMessageHandler.addHandler(cmdType, this);
... ... @@ -70,68 +81,88 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
70 81  
71 82 @Override
72 83 public void handForDevice(RequestEvent evt, Device device, Element element) {
73   - String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + device.getDeviceId();
74   - Element rootElement = null;
  84 + taskQueue.offer(new HandlerCatchData(evt, device, element));
  85 + // 回复200 OK
75 86 try {
76   - rootElement = getRootElement(evt, device.getCharset());
77   - Element deviceListElement = rootElement.element("DeviceList");
78   - Element sumNumElement = rootElement.element("SumNum");
79   - Element snElement = rootElement.element("SN");
80   - if (snElement == null || sumNumElement == null || deviceListElement == null) {
81   - responseAck(evt, Response.BAD_REQUEST, "xml error");
82   - return;
83   - }
84   - int sumNum = Integer.parseInt(sumNumElement.getText());
85   -
86   - if (sumNum == 0) {
87   - // 数据已经完整接收
88   - storager.cleanChannelsForDevice(device.getDeviceId());
89   - catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null);
90   - }else {
91   - Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
92   - if (deviceListIterator != null) {
93   - List<DeviceChannel> channelList = new ArrayList<>();
94   - // 遍历DeviceList
95   - while (deviceListIterator.hasNext()) {
96   - Element itemDevice = deviceListIterator.next();
97   - Element channelDeviceElement = itemDevice.element("DeviceID");
98   - if (channelDeviceElement == null) {
99   - continue;
  87 + responseAck(evt, Response.OK);
  88 + } catch (SipException e) {
  89 + throw new RuntimeException(e);
  90 + } catch (InvalidArgumentException e) {
  91 + throw new RuntimeException(e);
  92 + } catch (ParseException e) {
  93 + throw new RuntimeException(e);
  94 + }
  95 + if (!taskQueueHandlerRun) {
  96 + taskQueueHandlerRun = true;
  97 + taskExecutor.execute(()-> {
  98 + while (!taskQueue.isEmpty()) {
  99 + HandlerCatchData take = taskQueue.poll();
  100 + String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + take.getDevice().getDeviceId();
  101 + Element rootElement = null;
  102 + try {
  103 + rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset());
  104 + Element deviceListElement = rootElement.element("DeviceList");
  105 + Element sumNumElement = rootElement.element("SumNum");
  106 + Element snElement = rootElement.element("SN");
  107 + if (snElement == null || sumNumElement == null || deviceListElement == null) {
  108 + responseAck(take.getEvt(), Response.BAD_REQUEST, "xml error");
  109 + return;
100 110 }
101   - //by brewswang
102   -// if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Longitude"))) {//如果包含位置信息,就更新一下位置
103   -// processNotifyMobilePosition(evt, itemDevice);
104   -// }
105   - DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice);
106   - deviceChannel.setDeviceId(device.getDeviceId());
107   -
108   - channelList.add(deviceChannel);
109   - }
110   - int sn = Integer.parseInt(snElement.getText());
111   - catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device, channelList);
112   - logger.info("收到来自设备【{}】的通道: {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.get(device.getDeviceId()) == null ? 0 :catalogDataCatch.get(device.getDeviceId()).size(), sumNum);
113   - if (catalogDataCatch.get(device.getDeviceId()).size() == sumNum) {
114   - // 数据已经完整接收
115   - boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(device.getDeviceId()));
116   - if (!resetChannelsResult) {
117   - String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(device.getDeviceId()).size() + "条";
118   - catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), errorMsg);
  111 + int sumNum = Integer.parseInt(sumNumElement.getText());
  112 +
  113 + if (sumNum == 0) {
  114 + // 数据已经完整接收
  115 + storager.cleanChannelsForDevice(take.getDevice().getDeviceId());
  116 + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
119 117 }else {
120   - catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null);
  118 + Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
  119 + if (deviceListIterator != null) {
  120 + List<DeviceChannel> channelList = new ArrayList<>();
  121 + // 遍历DeviceList
  122 + while (deviceListIterator.hasNext()) {
  123 + Element itemDevice = deviceListIterator.next();
  124 + Element channelDeviceElement = itemDevice.element("DeviceID");
  125 + if (channelDeviceElement == null) {
  126 + continue;
  127 + }
  128 + //by brewswang
  129 + // if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Longitude"))) {//如果包含位置信息,就更新一下位置
  130 + // processNotifyMobilePosition(evt, itemDevice);
  131 + // }
  132 + DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice);
  133 + deviceChannel.setDeviceId(take.getDevice().getDeviceId());
  134 +
  135 + channelList.add(deviceChannel);
  136 + }
  137 + int sn = Integer.parseInt(snElement.getText());
  138 + catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList);
  139 + logger.info("收到来自设备【{}】的通道: {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 :catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum);
  140 + if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) {
  141 + // 数据已经完整接收
  142 + boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId()));
  143 + if (!resetChannelsResult) {
  144 + String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "条";
  145 + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg);
  146 + }else {
  147 + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
  148 + }
  149 + }
  150 + }
  151 +
121 152 }
  153 + } catch (DocumentException e) {
  154 + e.printStackTrace();
  155 + } catch (InvalidArgumentException e) {
  156 + e.printStackTrace();
  157 + } catch (ParseException e) {
  158 + e.printStackTrace();
  159 + } catch (SipException e) {
  160 + e.printStackTrace();
122 161 }
123 162 }
124   - // 回复200 OK
125   - responseAck(evt, Response.OK);
126   - }
127   - } catch (DocumentException e) {
128   - e.printStackTrace();
129   - } catch (InvalidArgumentException e) {
130   - e.printStackTrace();
131   - } catch (ParseException e) {
132   - e.printStackTrace();
133   - } catch (SipException e) {
134   - e.printStackTrace();
  163 + taskQueueHandlerRun = false;
  164 + });
  165 +
135 166 }
136 167 }
137 168  
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java
1 1 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
2 2  
3   -import com.genersoft.iot.vmp.gb28181.bean.Device;
4   -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
5   -import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
6   -import com.genersoft.iot.vmp.gb28181.bean.RecordItem;
  3 +import com.genersoft.iot.vmp.gb28181.bean.*;
7 4 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
8 5 import com.genersoft.iot.vmp.gb28181.session.RecordDataCatch;
9 6 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
... ... @@ -19,6 +16,8 @@ import org.slf4j.Logger;
19 16 import org.slf4j.LoggerFactory;
20 17 import org.springframework.beans.factory.InitializingBean;
21 18 import org.springframework.beans.factory.annotation.Autowired;
  19 +import org.springframework.beans.factory.annotation.Qualifier;
  20 +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
22 21 import org.springframework.stereotype.Component;
23 22 import org.springframework.util.StringUtils;
24 23  
... ... @@ -28,6 +27,9 @@ import javax.sip.SipException;
28 27 import javax.sip.message.Response;
29 28 import java.text.ParseException;
30 29 import java.util.*;
  30 +import java.util.concurrent.BlockingQueue;
  31 +import java.util.concurrent.ConcurrentLinkedQueue;
  32 +import java.util.concurrent.LinkedBlockingQueue;
31 33  
32 34 import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
33 35  
... ... @@ -42,6 +44,9 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
42 44 private final String cmdType = "RecordInfo";
43 45 private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_";
44 46  
  47 + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
  48 +
  49 + private boolean taskQueueHandlerRun = false;
45 50 @Autowired
46 51 private ResponseMessageHandler responseMessageHandler;
47 52  
... ... @@ -51,11 +56,13 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
51 56 @Autowired
52 57 private DeferredResultHolder deferredResultHolder;
53 58  
54   -
55   -
56 59 @Autowired
57 60 private EventPublisher eventPublisher;
58 61  
  62 + @Qualifier("taskExecutor")
  63 + @Autowired
  64 + private ThreadPoolTaskExecutor taskExecutor;
  65 +
59 66 @Override
60 67 public void afterPropertiesSet() throws Exception {
61 68 responseMessageHandler.addHandler(cmdType, this);
... ... @@ -67,75 +74,89 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
67 74 // 回复200 OK
68 75 try {
69 76 responseAck(evt, Response.OK);
70   -
71   - rootElement = getRootElement(evt, device.getCharset());
72   - String sn = getText(rootElement, "SN");
73   - RecordInfo recordInfo = new RecordInfo();
74   - recordInfo.setDeviceId(device.getDeviceId());
75   - recordInfo.setSn(sn);
76   - recordInfo.setName(getText(rootElement, "Name"));
77   - String sumNumStr = getText(rootElement, "SumNum");
78   - int sumNum = 0;
79   - if (!StringUtils.isEmpty(sumNumStr)) {
80   - sumNum = Integer.parseInt(sumNumStr);
81   - }
82   - recordInfo.setSumNum(sumNum);
83   - Element recordListElement = rootElement.element("RecordList");
84   - if (recordListElement == null || sumNum == 0) {
85   - logger.info("无录像数据");
86   - eventPublisher.recordEndEventPush(recordInfo);
87   - recordDataCatch.put(device.getDeviceId(), sn, sumNum, new ArrayList<>());
88   - releaseRequest(device.getDeviceId(), sn);
89   - } else {
90   - Iterator<Element> recordListIterator = recordListElement.elementIterator();
91   - if (recordListIterator != null) {
92   - List<RecordItem> recordList = new ArrayList<>();
93   - // 遍历DeviceList
94   - while (recordListIterator.hasNext()) {
95   - Element itemRecord = recordListIterator.next();
96   - Element recordElement = itemRecord.element("DeviceID");
97   - if (recordElement == null) {
98   - logger.info("记录为空,下一个...");
99   - continue;
  77 + taskQueue.offer(new HandlerCatchData(evt, device, rootElement));
  78 + if (!taskQueueHandlerRun) {
  79 + taskQueueHandlerRun = true;
  80 + taskExecutor.execute(()->{
  81 + try {
  82 + while (!taskQueue.isEmpty()) {
  83 + HandlerCatchData take = taskQueue.poll();
  84 + Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset());
  85 + String sn = getText(rootElementForCharset, "SN");
  86 + String channelId = getText(rootElementForCharset, "DeviceID");
  87 + RecordInfo recordInfo = new RecordInfo();
  88 + recordInfo.setChannelId(channelId);
  89 + recordInfo.setDeviceId(take.getDevice().getDeviceId());
  90 + recordInfo.setSn(sn);
  91 + recordInfo.setName(getText(rootElementForCharset, "Name"));
  92 + String sumNumStr = getText(rootElementForCharset, "SumNum");
  93 + int sumNum = 0;
  94 + if (!StringUtils.isEmpty(sumNumStr)) {
  95 + sumNum = Integer.parseInt(sumNumStr);
  96 + }
  97 + recordInfo.setSumNum(sumNum);
  98 + Element recordListElement = rootElementForCharset.element("RecordList");
  99 + if (recordListElement == null || sumNum == 0) {
  100 + logger.info("无录像数据");
  101 + eventPublisher.recordEndEventPush(recordInfo);
  102 + recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, new ArrayList<>());
  103 + releaseRequest(take.getDevice().getDeviceId(), sn);
  104 + } else {
  105 + Iterator<Element> recordListIterator = recordListElement.elementIterator();
  106 + if (recordListIterator != null) {
  107 + List<RecordItem> recordList = new ArrayList<>();
  108 + // 遍历DeviceList
  109 + while (recordListIterator.hasNext()) {
  110 + Element itemRecord = recordListIterator.next();
  111 + Element recordElement = itemRecord.element("DeviceID");
  112 + if (recordElement == null) {
  113 + logger.info("记录为空,下一个...");
  114 + continue;
  115 + }
  116 + RecordItem record = new RecordItem();
  117 + record.setDeviceId(getText(itemRecord, "DeviceID"));
  118 + record.setName(getText(itemRecord, "Name"));
  119 + record.setFilePath(getText(itemRecord, "FilePath"));
  120 + record.setFileSize(getText(itemRecord, "FileSize"));
  121 + record.setAddress(getText(itemRecord, "Address"));
  122 +
  123 + String startTimeStr = getText(itemRecord, "StartTime");
  124 + record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr));
  125 +
  126 + String endTimeStr = getText(itemRecord, "EndTime");
  127 + record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr));
  128 +
  129 + record.setSecrecy(itemRecord.element("Secrecy") == null ? 0
  130 + : Integer.parseInt(getText(itemRecord, "Secrecy")));
  131 + record.setType(getText(itemRecord, "Type"));
  132 + record.setRecorderId(getText(itemRecord, "RecorderID"));
  133 + recordList.add(record);
  134 + }
  135 + recordInfo.setRecordList(recordList);
  136 + // 发送消息,如果是上级查询此录像,则会通过这里通知给上级
  137 + eventPublisher.recordEndEventPush(recordInfo);
  138 + int count = recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, recordList);
  139 + logger.info("[国标录像], {}->{}: {}/{}", take.getDevice().getDeviceId(), sn, count, sumNum);
  140 + }
  141 +
  142 + if (recordDataCatch.isComplete(take.getDevice().getDeviceId(), sn)){
  143 + releaseRequest(take.getDevice().getDeviceId(), sn);
  144 + }
  145 + }
100 146 }
101   - RecordItem record = new RecordItem();
102   - record.setDeviceId(getText(itemRecord, "DeviceID"));
103   - record.setName(getText(itemRecord, "Name"));
104   - record.setFilePath(getText(itemRecord, "FilePath"));
105   - record.setFileSize(getText(itemRecord, "FileSize"));
106   - record.setAddress(getText(itemRecord, "Address"));
107   -
108   - String startTimeStr = getText(itemRecord, "StartTime");
109   - record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr));
110   -
111   - String endTimeStr = getText(itemRecord, "EndTime");
112   - record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr));
113   -
114   - record.setSecrecy(itemRecord.element("Secrecy") == null ? 0
115   - : Integer.parseInt(getText(itemRecord, "Secrecy")));
116   - record.setType(getText(itemRecord, "Type"));
117   - record.setRecorderId(getText(itemRecord, "RecorderID"));
118   - recordList.add(record);
  147 + taskQueueHandlerRun = false;
  148 + }catch (DocumentException e) {
  149 + throw new RuntimeException(e);
119 150 }
120   - recordInfo.setRecordList(recordList);
121   - // 发送消息,如果是上级查询此录像,则会通过这里通知给上级
122   - eventPublisher.recordEndEventPush(recordInfo);
123   - int count = recordDataCatch.put(device.getDeviceId(), sn, sumNum, recordList);
124   - logger.info("[国标录像], {}->{}: {}/{}", device.getDeviceId(), sn, count, sumNum);
125   - }
126   -
127   - if (recordDataCatch.isComplete(device.getDeviceId(), sn)){
128   - releaseRequest(device.getDeviceId(), sn);
129   - }
  151 + });
130 152 }
  153 +
131 154 } catch (SipException e) {
132 155 e.printStackTrace();
133 156 } catch (InvalidArgumentException e) {
134 157 e.printStackTrace();
135 158 } catch (ParseException e) {
136 159 e.printStackTrace();
137   - } catch (DocumentException e) {
138   - e.printStackTrace();
139 160 }
140 161 }
141 162  
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
... ... @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
4 4 import com.genersoft.iot.vmp.gb28181.bean.Device;
5 5 import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
6 6 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  7 +import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
7 8 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
8 9 import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
9 10 import com.genersoft.iot.vmp.service.IDeviceService;
... ... @@ -95,7 +96,6 @@ public class DeviceServiceImpl implements IDeviceService {
95 96 }
96 97 // 刷新过期任务
97 98 String registerExpireTaskKey = registerExpireTaskKeyPrefix + device.getDeviceId();
98   - dynamicTask.stop(registerExpireTaskKey);
99 99 dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId()), device.getExpires() * 1000);
100 100 }
101 101  
... ... @@ -144,8 +144,16 @@ public class DeviceServiceImpl implements IDeviceService {
144 144 if (device == null || device.getSubscribeCycleForCatalog() < 0) {
145 145 return false;
146 146 }
147   - logger.info("移除目录订阅: {}", device.getDeviceId());
148   - dynamicTask.stop(device.getDeviceId() + "catalog");
  147 + logger.info("[移除目录订阅]: {}", device.getDeviceId());
  148 + String taskKey = device.getDeviceId() + "catalog";
  149 + if (device.getOnline() == 1) {
  150 + Runnable runnable = dynamicTask.get(taskKey);
  151 + if (runnable instanceof ISubscribeTask) {
  152 + ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
  153 + subscribeTask.stop();
  154 + }
  155 + }
  156 + dynamicTask.stop(taskKey);
149 157 return true;
150 158 }
151 159  
... ... @@ -169,8 +177,16 @@ public class DeviceServiceImpl implements IDeviceService {
169 177 if (device == null || device.getSubscribeCycleForCatalog() < 0) {
170 178 return false;
171 179 }
172   - logger.info("移除移动位置订阅: {}", device.getDeviceId());
173   - dynamicTask.stop(device.getDeviceId() + "mobile_position");
  180 + logger.info("[移除移动位置订阅]: {}", device.getDeviceId());
  181 + String taskKey = device.getDeviceId() + "mobile_position";
  182 + if (device.getOnline() == 1) {
  183 + Runnable runnable = dynamicTask.get(taskKey);
  184 + if (runnable instanceof ISubscribeTask) {
  185 + ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
  186 + subscribeTask.stop();
  187 + }
  188 + }
  189 + dynamicTask.stop(taskKey);
174 190 return true;
175 191 }
176 192  
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
... ... @@ -206,6 +206,11 @@ public class DeviceQuery {
206 206 Set<String> allKeys = dynamicTask.getAllKeys();
207 207 for (String key : allKeys) {
208 208 if (key.startsWith(deviceId)) {
  209 + Runnable runnable = dynamicTask.get(key);
  210 + if (runnable instanceof ISubscribeTask) {
  211 + ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
  212 + subscribeTask.stop();
  213 + }
209 214 dynamicTask.stop(key);
210 215 }
211 216 }
... ...