Commit 6868577e329dea7b48b33a2e5d662ad855edb341

Authored by lawrencehj
1 parent d3fa1dd2

将录像文件统计改为独立线程进行,实现超时返回已接收结果

src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/CheckForAllRecordsThread.java 0 → 100644
  1 +package com.genersoft.iot.vmp.gb28181.transmit.callback;
  2 +
  3 +import java.util.ArrayList;
  4 +import java.util.Comparator;
  5 +import java.util.List;
  6 +import java.util.concurrent.TimeUnit;
  7 +
  8 +import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
  9 +import com.genersoft.iot.vmp.gb28181.bean.RecordItem;
  10 +import com.genersoft.iot.vmp.gb28181.transmit.request.impl.MessageRequestProcessor;
  11 +import com.genersoft.iot.vmp.utils.redis.RedisUtil;
  12 +
  13 +import org.slf4j.Logger;
  14 +
  15 +@SuppressWarnings("unchecked")
  16 +public class CheckForAllRecordsThread extends Thread {
  17 +
  18 + private String key;
  19 +
  20 + private RecordInfo recordInfo;
  21 +
  22 + private RedisUtil redis;
  23 +
  24 + private Logger logger;
  25 +
  26 + private DeferredResultHolder deferredResultHolder;
  27 +
  28 + public CheckForAllRecordsThread(String key, RecordInfo recordInfo) {
  29 + this.key = key;
  30 + this.recordInfo = recordInfo;
  31 + }
  32 +
  33 + public void run() {
  34 +
  35 + String cacheKey = this.key;
  36 +
  37 + for (long stop = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); stop > System.nanoTime();) {
  38 + List<Object> cacheKeys = redis.scan(cacheKey + "_*");
  39 + List<RecordItem> totalRecordList = new ArrayList<RecordItem>();
  40 + for (int i = 0; i < cacheKeys.size(); i++) {
  41 + totalRecordList.addAll((List<RecordItem>) redis.get(cacheKeys.get(i).toString()));
  42 + }
  43 + if (totalRecordList.size() < this.recordInfo.getSumNum()) {
  44 + logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + this.recordInfo.getSumNum() + "项");
  45 + } else {
  46 + logger.info("录像数据已全部获取,共" + this.recordInfo.getSumNum() + "项");
  47 + this.recordInfo.setRecordList(totalRecordList);
  48 + for (int i = 0; i < cacheKeys.size(); i++) {
  49 + redis.del(cacheKeys.get(i).toString());
  50 + }
  51 + break;
  52 + }
  53 + }
  54 + // 自然顺序排序, 元素进行升序排列
  55 + this.recordInfo.getRecordList().sort(Comparator.naturalOrder());
  56 + RequestMessage msg = new RequestMessage();
  57 + String deviceId = recordInfo.getDeviceId();
  58 + msg.setDeviceId(deviceId);
  59 + msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO);
  60 + msg.setData(recordInfo);
  61 + deferredResultHolder.invokeResult(msg);
  62 + logger.info("处理完成,返回结果");
  63 + MessageRequestProcessor.threadNameList.remove(cacheKey);
  64 + }
  65 +
  66 + public void setRedis(RedisUtil redis) {
  67 + this.redis = redis;
  68 + }
  69 +
  70 + public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) {
  71 + this.deferredResultHolder = deferredResultHolder;
  72 + }
  73 +
  74 + public void setLogger(Logger logger) {
  75 + this.logger = logger;
  76 + }
  77 +
  78 +}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java
@@ -23,6 +23,7 @@ import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; @@ -23,6 +23,7 @@ import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
23 import com.genersoft.iot.vmp.gb28181.bean.RecordItem; 23 import com.genersoft.iot.vmp.gb28181.bean.RecordItem;
24 import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; 24 import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
25 import com.genersoft.iot.vmp.gb28181.event.EventPublisher; 25 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  26 +import com.genersoft.iot.vmp.gb28181.transmit.callback.CheckForAllRecordsThread;
26 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; 27 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
27 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; 28 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
28 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; 29 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
@@ -43,14 +44,17 @@ import org.dom4j.io.SAXReader; @@ -43,14 +44,17 @@ import org.dom4j.io.SAXReader;
43 import org.slf4j.Logger; 44 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory; 45 import org.slf4j.LoggerFactory;
45 import org.springframework.util.StringUtils; 46 import org.springframework.util.StringUtils;
  47 +
46 /** 48 /**
47 * @Description:MESSAGE请求处理器 49 * @Description:MESSAGE请求处理器
48 * @author: swwheihei 50 * @author: swwheihei
49 * @date: 2020年5月3日 下午5:32:41 51 * @date: 2020年5月3日 下午5:32:41
50 */ 52 */
51 - 53 +@SuppressWarnings(value={"unchecked", "rawtypes"})
52 public class MessageRequestProcessor extends SIPRequestAbstractProcessor { 54 public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
53 55
  56 + public static volatile List<String> threadNameList = new ArrayList();
  57 +
54 private UserSetup userSetup = (UserSetup) SpringBeanFactory.getBean("userSetup"); 58 private UserSetup userSetup = (UserSetup) SpringBeanFactory.getBean("userSetup");
55 59
56 private final static Logger logger = LoggerFactory.getLogger(MessageRequestProcessor.class); 60 private final static Logger logger = LoggerFactory.getLogger(MessageRequestProcessor.class);
@@ -240,10 +244,10 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { @@ -240,10 +244,10 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
240 try { 244 try {
241 Element rootElement = getRootElement(evt); 245 Element rootElement = getRootElement(evt);
242 String deviceId = XmlUtil.getText(rootElement, "DeviceID"); 246 String deviceId = XmlUtil.getText(rootElement, "DeviceID");
243 - String result = XmlUtil.getText(rootElement, "Result"); 247 + //String result = XmlUtil.getText(rootElement, "Result");
244 // 回复200 OK 248 // 回复200 OK
245 responseAck(evt); 249 responseAck(evt);
246 - if (!XmlUtil.isEmpty(result)) { 250 + if (rootElement.getName().equals("Response")) {//} !XmlUtil.isEmpty(result)) {
247 // 此处是对本平台发出DeviceControl指令的应答 251 // 此处是对本平台发出DeviceControl指令的应答
248 JSONObject json = new JSONObject(); 252 JSONObject json = new JSONObject();
249 XmlUtil.node2Json(rootElement, json); 253 XmlUtil.node2Json(rootElement, json);
@@ -272,11 +276,10 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { @@ -272,11 +276,10 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
272 try { 276 try {
273 Element rootElement = getRootElement(evt); 277 Element rootElement = getRootElement(evt);
274 String deviceId = XmlUtil.getText(rootElement, "DeviceID"); 278 String deviceId = XmlUtil.getText(rootElement, "DeviceID");
275 - String result = XmlUtil.getText(rootElement, "Result");  
276 // 回复200 OK 279 // 回复200 OK
277 responseAck(evt); 280 responseAck(evt);
278 - //if (!XmlUtil.isEmpty(result)) {  
279 - // 此处是对本平台发出DeviceControl指令的应答 281 + if (rootElement.getName().equals("Response")) {
  282 + // 此处是对本平台发出DeviceControl指令的应答
280 JSONObject json = new JSONObject(); 283 JSONObject json = new JSONObject();
281 XmlUtil.node2Json(rootElement, json); 284 XmlUtil.node2Json(rootElement, json);
282 if (logger.isDebugEnabled()) { 285 if (logger.isDebugEnabled()) {
@@ -287,9 +290,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { @@ -287,9 +290,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
287 msg.setType(DeferredResultHolder.CALLBACK_CMD_DEVICECONFIG); 290 msg.setType(DeferredResultHolder.CALLBACK_CMD_DEVICECONFIG);
288 msg.setData(json); 291 msg.setData(json);
289 deferredResultHolder.invokeResult(msg); 292 deferredResultHolder.invokeResult(msg);
290 - // } else {  
291 - // // 此处是上级发出的DeviceConfig指令  
292 - //} 293 + } else {
  294 + // 此处是上级发出的DeviceConfig指令
  295 + }
293 } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { 296 } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
294 e.printStackTrace(); 297 e.printStackTrace();
295 } 298 }
@@ -304,11 +307,10 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { @@ -304,11 +307,10 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
304 try { 307 try {
305 Element rootElement = getRootElement(evt); 308 Element rootElement = getRootElement(evt);
306 String deviceId = XmlUtil.getText(rootElement, "DeviceID"); 309 String deviceId = XmlUtil.getText(rootElement, "DeviceID");
307 - String result = XmlUtil.getText(rootElement, "Result");  
308 // 回复200 OK 310 // 回复200 OK
309 responseAck(evt); 311 responseAck(evt);
310 - //if (!XmlUtil.isEmpty(result)) {  
311 - // 此处是对本平台发出DeviceControl指令的应答 312 + if (rootElement.getName().equals("Response")) {
  313 + // 此处是对本平台发出DeviceControl指令的应答
312 JSONObject json = new JSONObject(); 314 JSONObject json = new JSONObject();
313 XmlUtil.node2Json(rootElement, json); 315 XmlUtil.node2Json(rootElement, json);
314 if (logger.isDebugEnabled()) { 316 if (logger.isDebugEnabled()) {
@@ -319,9 +321,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { @@ -319,9 +321,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
319 msg.setType(DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD); 321 msg.setType(DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD);
320 msg.setData(json); 322 msg.setData(json);
321 deferredResultHolder.invokeResult(msg); 323 deferredResultHolder.invokeResult(msg);
322 - // } else {  
323 - // // 此处是上级发出的DeviceConfig指令  
324 - //} 324 + } else {
  325 + // 此处是上级发出的DeviceConfig指令
  326 + }
325 } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { 327 } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
326 e.printStackTrace(); 328 e.printStackTrace();
327 } 329 }
@@ -336,7 +338,6 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { @@ -336,7 +338,6 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
336 try { 338 try {
337 Element rootElement = getRootElement(evt); 339 Element rootElement = getRootElement(evt);
338 String deviceId = XmlUtil.getText(rootElement, "DeviceID"); 340 String deviceId = XmlUtil.getText(rootElement, "DeviceID");
339 - String result = XmlUtil.getText(rootElement, "Result");  
340 // 回复200 OK 341 // 回复200 OK
341 responseAck(evt); 342 responseAck(evt);
342 if (rootElement.getName().equals("Response")) {// !XmlUtil.isEmpty(result)) { 343 if (rootElement.getName().equals("Response")) {// !XmlUtil.isEmpty(result)) {
@@ -648,8 +649,11 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { @@ -648,8 +649,11 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
648 Element recordListElement = rootElement.element("RecordList"); 649 Element recordListElement = rootElement.element("RecordList");
649 if (recordListElement == null || recordInfo.getSumNum() == 0) { 650 if (recordListElement == null || recordInfo.getSumNum() == 0) {
650 logger.info("无录像数据"); 651 logger.info("无录像数据");
651 - // responseAck(evt);  
652 - // return; 652 + RequestMessage msg = new RequestMessage();
  653 + msg.setDeviceId(deviceId);
  654 + msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO);
  655 + msg.setData(recordInfo);
  656 + deferredResultHolder.invokeResult(msg);
653 } else { 657 } else {
654 Iterator<Element> recordListIterator = recordListElement.elementIterator(); 658 Iterator<Element> recordListIterator = recordListElement.elementIterator();
655 List<RecordItem> recordList = new ArrayList<RecordItem>(); 659 List<RecordItem> recordList = new ArrayList<RecordItem>();
@@ -679,44 +683,63 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { @@ -679,44 +683,63 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
679 record.setRecorderId(XmlUtil.getText(itemRecord, "RecorderID")); 683 record.setRecorderId(XmlUtil.getText(itemRecord, "RecorderID"));
680 recordList.add(record); 684 recordList.add(record);
681 } 685 }
682 - // recordList.sort(Comparator.naturalOrder());  
683 recordInfo.setRecordList(recordList); 686 recordInfo.setRecordList(recordList);
684 } 687 }
685 688
686 - // 存在录像且如果当前录像明细个数小于总条数,说明拆包返回,需要组装,暂不返回  
687 - if (recordInfo.getSumNum() > 0 && recordList.size() > 0 && recordList.size() < recordInfo.getSumNum()) {  
688 - // 为防止连续请求该设备的录像数据,返回数据错乱,特增加sn进行区分  
689 - String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn;  
690 -  
691 - redis.set(cacheKey + "_" + uuid, recordList, 90);  
692 - List<Object> cacheKeys = redis.scan(cacheKey + "_*");  
693 - List<RecordItem> totalRecordList = new ArrayList<RecordItem>();  
694 - for (int i = 0; i < cacheKeys.size(); i++) {  
695 - totalRecordList.addAll((List<RecordItem>) redis.get(cacheKeys.get(i).toString())); 689 + // 改用单独线程统计已获取录像文件数量,避免多包并行分别统计不完整的问题
  690 + String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn;
  691 + redis.set(cacheKey + "_" + uuid, recordList, 90);
  692 + if (!threadNameList.contains(cacheKey)) {
  693 + threadNameList.add(cacheKey);
  694 + CheckForAllRecordsThread chk = new CheckForAllRecordsThread(cacheKey, recordInfo);
  695 + chk.setName(cacheKey);
  696 + chk.setDeferredResultHolder(deferredResultHolder);
  697 + chk.setRedis(redis);
  698 + chk.setLogger(logger);
  699 + chk.start();
  700 + if (logger.isDebugEnabled()) {
  701 + logger.debug("Start Thread " + cacheKey + ".");
696 } 702 }
697 - if (totalRecordList.size() < recordInfo.getSumNum()) {  
698 - logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + recordInfo.getSumNum() + "项");  
699 - return;  
700 - }  
701 - logger.info("录像数据已全部获取,共" + recordInfo.getSumNum() + "项");  
702 - recordInfo.setRecordList(totalRecordList);  
703 - for (int i = 0; i < cacheKeys.size(); i++) {  
704 - redis.del(cacheKeys.get(i).toString()); 703 + } else {
  704 + if (logger.isDebugEnabled()) {
  705 + logger.debug("Thread " + cacheKey + " already started.");
705 } 706 }
706 } 707 }
707 - // 自然顺序排序, 元素进行升序排列  
708 - recordInfo.getRecordList().sort(Comparator.naturalOrder()); 708 +
  709 + // 存在录像且如果当前录像明细个数小于总条数,说明拆包返回,需要组装,暂不返回
  710 + // if (recordInfo.getSumNum() > 0 && recordList.size() > 0 && recordList.size() < recordInfo.getSumNum()) {
  711 + // // 为防止连续请求该设备的录像数据,返回数据错乱,特增加sn进行区分
  712 + // String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn;
  713 +
  714 + // redis.set(cacheKey + "_" + uuid, recordList, 90);
  715 + // List<Object> cacheKeys = redis.scan(cacheKey + "_*");
  716 + // List<RecordItem> totalRecordList = new ArrayList<RecordItem>();
  717 + // for (int i = 0; i < cacheKeys.size(); i++) {
  718 + // totalRecordList.addAll((List<RecordItem>) redis.get(cacheKeys.get(i).toString()));
  719 + // }
  720 + // if (totalRecordList.size() < recordInfo.getSumNum()) {
  721 + // logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + recordInfo.getSumNum() + "项");
  722 + // return;
  723 + // }
  724 + // logger.info("录像数据已全部获取,共" + recordInfo.getSumNum() + "项");
  725 + // recordInfo.setRecordList(totalRecordList);
  726 + // for (int i = 0; i < cacheKeys.size(); i++) {
  727 + // redis.del(cacheKeys.get(i).toString());
  728 + // }
  729 + // }
  730 + // // 自然顺序排序, 元素进行升序排列
  731 + // recordInfo.getRecordList().sort(Comparator.naturalOrder());
709 } 732 }
710 // 走到这里,有以下可能:1、没有录像信息,第一次收到recordinfo的消息即返回响应数据,无redis操作 733 // 走到这里,有以下可能:1、没有录像信息,第一次收到recordinfo的消息即返回响应数据,无redis操作
711 // 2、有录像数据,且第一次即收到完整数据,返回响应数据,无redis操作 734 // 2、有录像数据,且第一次即收到完整数据,返回响应数据,无redis操作
712 // 3、有录像数据,在超时时间内收到多次包组装后数量足够,返回数据 735 // 3、有录像数据,在超时时间内收到多次包组装后数量足够,返回数据
713 736
714 - RequestMessage msg = new RequestMessage();  
715 - msg.setDeviceId(deviceId);  
716 - msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO);  
717 - msg.setData(recordInfo);  
718 - deferredResultHolder.invokeResult(msg);  
719 - logger.info("处理完成,返回结果"); 737 + // RequestMessage msg = new RequestMessage();
  738 + // msg.setDeviceId(deviceId);
  739 + // msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO);
  740 + // msg.setData(recordInfo);
  741 + // deferredResultHolder.invokeResult(msg);
  742 + // logger.info("处理完成,返回结果");
720 } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { 743 } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
721 e.printStackTrace(); 744 e.printStackTrace();
722 } 745 }
@@ -799,4 +822,4 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { @@ -799,4 +822,4 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
799 public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { 822 public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
800 this.redisCatchStorage = redisCatchStorage; 823 this.redisCatchStorage = redisCatchStorage;
801 } 824 }
802 -} 825 +}
803 \ No newline at end of file 826 \ No newline at end of file