Commit 470aa479695a7821ef58457db9bd44dabba4acbf

Authored by 648540858
1 parent c041aacc

优化队列的处理逻辑

src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -116,14 +116,17 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @@ -116,14 +116,17 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
116 logger.info("接收到消息:" + cmd); 116 logger.info("接收到消息:" + cmd);
117 } 117 }
118 } catch (DocumentException e) { 118 } catch (DocumentException e) {
119 - throw new RuntimeException(e); 119 + logger.error("处理NOTIFY消息时错误", e);
  120 + } finally {
  121 + taskQueueHandlerRun = false;
120 } 122 }
121 } 123 }
122 - taskQueueHandlerRun = false;  
123 }); 124 });
124 } 125 }
125 } catch (SipException | InvalidArgumentException | ParseException e) { 126 } catch (SipException | InvalidArgumentException | ParseException e) {
126 e.printStackTrace(); 127 e.printStackTrace();
  128 + } finally {
  129 + taskQueueHandlerRun = false;
127 } 130 }
128 } 131 }
129 132
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
@@ -35,6 +35,9 @@ import java.util.Iterator; @@ -35,6 +35,9 @@ import java.util.Iterator;
35 import java.util.List; 35 import java.util.List;
36 import java.util.concurrent.ConcurrentLinkedQueue; 36 import java.util.concurrent.ConcurrentLinkedQueue;
37 37
  38 +/**
  39 + * 目录查询的回复
  40 + */
38 @Component 41 @Component
39 public class CatalogResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { 42 public class CatalogResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
40 43
@@ -85,82 +88,83 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @@ -85,82 +88,83 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
85 // 回复200 OK 88 // 回复200 OK
86 try { 89 try {
87 responseAck(evt, Response.OK); 90 responseAck(evt, Response.OK);
  91 + if (!taskQueueHandlerRun) {
  92 + taskQueueHandlerRun = true;
  93 + taskExecutor.execute(()-> {
  94 + while (!taskQueue.isEmpty()) {
  95 + HandlerCatchData take = taskQueue.poll();
  96 + try {
  97 + Element rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset());
  98 + Element deviceListElement = rootElement.element("DeviceList");
  99 + Element sumNumElement = rootElement.element("SumNum");
  100 + Element snElement = rootElement.element("SN");
  101 + if (snElement == null || sumNumElement == null || deviceListElement == null) {
  102 + responseAck(take.getEvt(), Response.BAD_REQUEST, "xml error");
  103 + return;
  104 + }
  105 + int sumNum = Integer.parseInt(sumNumElement.getText());
  106 +
  107 + if (sumNum == 0) {
  108 + logger.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId());
  109 + // 数据已经完整接收
  110 + storager.cleanChannelsForDevice(take.getDevice().getDeviceId());
  111 + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
  112 + }else {
  113 + Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
  114 + if (deviceListIterator != null) {
  115 + List<DeviceChannel> channelList = new ArrayList<>();
  116 + // 遍历DeviceList
  117 + while (deviceListIterator.hasNext()) {
  118 + Element itemDevice = deviceListIterator.next();
  119 + Element channelDeviceElement = itemDevice.element("DeviceID");
  120 + if (channelDeviceElement == null) {
  121 + continue;
  122 + }
  123 + DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice, device, null);
  124 + deviceChannel.setDeviceId(take.getDevice().getDeviceId());
  125 +
  126 + channelList.add(deviceChannel);
  127 + }
  128 + int sn = Integer.parseInt(snElement.getText());
  129 + catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList);
  130 + logger.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 :catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum);
  131 + if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) {
  132 + // 数据已经完整接收
  133 + boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId()));
  134 + if (!resetChannelsResult) {
  135 + String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "条";
  136 + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg);
  137 + }else {
  138 + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
  139 + }
  140 + }
  141 + }
  142 +
  143 + }
  144 + } catch (DocumentException e) {
  145 + e.printStackTrace();
  146 + } catch (InvalidArgumentException e) {
  147 + e.printStackTrace();
  148 + } catch (ParseException e) {
  149 + e.printStackTrace();
  150 + } catch (SipException e) {
  151 + e.printStackTrace();
  152 + } finally {
  153 + taskQueueHandlerRun = false;
  154 + }
  155 + }
  156 + });
  157 + }
88 } catch (SipException e) { 158 } catch (SipException e) {
89 throw new RuntimeException(e); 159 throw new RuntimeException(e);
90 } catch (InvalidArgumentException e) { 160 } catch (InvalidArgumentException e) {
91 throw new RuntimeException(e); 161 throw new RuntimeException(e);
92 } catch (ParseException e) { 162 } catch (ParseException e) {
93 throw new RuntimeException(e); 163 throw new RuntimeException(e);
  164 + } finally {
  165 + taskQueueHandlerRun = false;
94 } 166 }
95 - if (!taskQueueHandlerRun) {  
96 - taskQueueHandlerRun = true;  
97 - taskExecutor.execute(()-> {  
98 - while (!taskQueue.isEmpty()) {  
99 - HandlerCatchData take = taskQueue.poll();  
100 - String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + take.getDevice().getDeviceId();  
101 - Element rootElement = null;  
102 - try {  
103 - rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset());  
104 - Element deviceListElement = rootElement.element("DeviceList");  
105 - Element sumNumElement = rootElement.element("SumNum");  
106 - Element snElement = rootElement.element("SN");  
107 - if (snElement == null || sumNumElement == null || deviceListElement == null) {  
108 - responseAck(take.getEvt(), Response.BAD_REQUEST, "xml error");  
109 - return;  
110 - }  
111 - int sumNum = Integer.parseInt(sumNumElement.getText());  
112 -  
113 - if (sumNum == 0) {  
114 - logger.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId());  
115 - // 数据已经完整接收  
116 - storager.cleanChannelsForDevice(take.getDevice().getDeviceId());  
117 - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);  
118 - }else {  
119 - Iterator<Element> deviceListIterator = deviceListElement.elementIterator();  
120 - if (deviceListIterator != null) {  
121 - List<DeviceChannel> channelList = new ArrayList<>();  
122 - // 遍历DeviceList  
123 - while (deviceListIterator.hasNext()) {  
124 - Element itemDevice = deviceListIterator.next();  
125 - Element channelDeviceElement = itemDevice.element("DeviceID");  
126 - if (channelDeviceElement == null) {  
127 - continue;  
128 - }  
129 - DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice, device, null);  
130 - deviceChannel.setDeviceId(take.getDevice().getDeviceId());  
131 167
132 - channelList.add(deviceChannel);  
133 - }  
134 - int sn = Integer.parseInt(snElement.getText());  
135 - catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList);  
136 - logger.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 :catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum);  
137 - if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) {  
138 - // 数据已经完整接收  
139 - boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId()));  
140 - if (!resetChannelsResult) {  
141 - String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "条";  
142 - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg);  
143 - }else {  
144 - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);  
145 - }  
146 - }  
147 - }  
148 -  
149 - }  
150 - } catch (DocumentException e) {  
151 - e.printStackTrace();  
152 - } catch (InvalidArgumentException e) {  
153 - e.printStackTrace();  
154 - } catch (ParseException e) {  
155 - e.printStackTrace();  
156 - } catch (SipException e) {  
157 - e.printStackTrace();  
158 - }  
159 - }  
160 - taskQueueHandlerRun = false;  
161 - });  
162 -  
163 - }  
164 } 168 }
165 169
166 @Override 170 @Override
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java
@@ -76,8 +76,8 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent @@ -76,8 +76,8 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
76 if (!taskQueueHandlerRun) { 76 if (!taskQueueHandlerRun) {
77 taskQueueHandlerRun = true; 77 taskQueueHandlerRun = true;
78 taskExecutor.execute(()->{ 78 taskExecutor.execute(()->{
79 - try {  
80 - while (!taskQueue.isEmpty()) { 79 + while (!taskQueue.isEmpty()) {
  80 + try {
81 HandlerCatchData take = taskQueue.poll(); 81 HandlerCatchData take = taskQueue.poll();
82 Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset()); 82 Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset());
83 String sn = getText(rootElementForCharset, "SN"); 83 String sn = getText(rootElementForCharset, "SN");
@@ -141,10 +141,11 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent @@ -141,10 +141,11 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
141 releaseRequest(take.getDevice().getDeviceId(), sn); 141 releaseRequest(take.getDevice().getDeviceId(), sn);
142 } 142 }
143 } 143 }
  144 + } catch (DocumentException e) {
  145 + throw new RuntimeException(e);
  146 + } finally {
  147 + taskQueueHandlerRun = false;
144 } 148 }
145 - taskQueueHandlerRun = false;  
146 - }catch (DocumentException e) {  
147 - throw new RuntimeException(e);  
148 } 149 }
149 }); 150 });
150 } 151 }
@@ -155,6 +156,8 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent @@ -155,6 +156,8 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
155 e.printStackTrace(); 156 e.printStackTrace();
156 } catch (ParseException e) { 157 } catch (ParseException e) {
157 e.printStackTrace(); 158 e.printStackTrace();
  159 + }finally {
  160 + taskQueueHandlerRun = false;
158 } 161 }
159 } 162 }
160 163
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -105,6 +105,7 @@ public class DeviceServiceImpl implements IDeviceService { @@ -105,6 +105,7 @@ public class DeviceServiceImpl implements IDeviceService {
105 redisCatchStorage.updateDevice(device); 105 redisCatchStorage.updateDevice(device);
106 commander.deviceInfoQuery(device); 106 commander.deviceInfoQuery(device);
107 sync(device); 107 sync(device);
  108 + // TODO 如果设备下的通道级联到了其他平台,那么需要发送事件或者notify给上级平台
108 }else { 109 }else {
109 deviceMapper.update(device); 110 deviceMapper.update(device);
110 redisCatchStorage.updateDevice(device); 111 redisCatchStorage.updateDevice(device);