Commit a426b9bc53d0029c1dcb341a6dd30308ca079243

Authored by 648540858
Committed by GitHub
2 parents 44aef5d3 1b81080f

Merge pull request #1036 from daniel10917/fix-channel-catalog

修复通道目录同步异常问题
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
@@ -30,6 +30,7 @@ import java.util.ArrayList; @@ -30,6 +30,7 @@ import java.util.ArrayList;
30 import java.util.Iterator; 30 import java.util.Iterator;
31 import java.util.List; 31 import java.util.List;
32 import java.util.concurrent.ConcurrentLinkedQueue; 32 import java.util.concurrent.ConcurrentLinkedQueue;
  33 +import java.util.concurrent.atomic.AtomicBoolean;
33 34
34 /** 35 /**
35 * 目录查询的回复 36 * 目录查询的回复
@@ -60,6 +61,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @@ -60,6 +61,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
60 61
61 @Autowired 62 @Autowired
62 private SipConfig sipConfig; 63 private SipConfig sipConfig;
  64 + private AtomicBoolean processing = new AtomicBoolean(false);
63 65
64 @Override 66 @Override
65 public void afterPropertiesSet() throws Exception { 67 public void afterPropertiesSet() throws Exception {
@@ -68,7 +70,6 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @@ -68,7 +70,6 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
68 70
69 @Override 71 @Override
70 public void handForDevice(RequestEvent evt, Device device, Element element) { 72 public void handForDevice(RequestEvent evt, Device device, Element element) {
71 - boolean isEmpty = taskQueue.isEmpty();  
72 taskQueue.offer(new HandlerCatchData(evt, device, element)); 73 taskQueue.offer(new HandlerCatchData(evt, device, element));
73 // 回复200 OK 74 // 回复200 OK
74 try { 75 try {
@@ -76,8 +77,8 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @@ -76,8 +77,8 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
76 } catch (SipException | InvalidArgumentException | ParseException e) { 77 } catch (SipException | InvalidArgumentException | ParseException e) {
77 logger.error("[命令发送失败] 目录查询回复: {}", e.getMessage()); 78 logger.error("[命令发送失败] 目录查询回复: {}", e.getMessage());
78 } 79 }
79 - // 如果不为空则说明已经开启消息处理  
80 - if (isEmpty) { 80 + // 已经开启消息处理则跳过
  81 + if (processing.compareAndSet(false, true)) {
81 taskExecutor.execute(() -> { 82 taskExecutor.execute(() -> {
82 while (!taskQueue.isEmpty()) { 83 while (!taskQueue.isEmpty()) {
83 // 全局异常捕获,保证下一条可以得到处理 84 // 全局异常捕获,保证下一条可以得到处理
@@ -146,11 +147,12 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @@ -146,11 +147,12 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
146 } 147 }
147 148
148 } 149 }
149 - }catch (Exception e) { 150 + } catch (Exception e) {
150 logger.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest()); 151 logger.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest());
151 logger.error("[收到通道] 异常内容: ", e); 152 logger.error("[收到通道] 异常内容: ", e);
152 } 153 }
153 } 154 }
  155 + processing.set(false);
154 }); 156 });
155 } 157 }
156 158