Commit 1b81080f6f7da35d2af7d194a5aa7be720d1eaa4
1 parent
dc37f667
修复通道目录同步异常问题
Showing
1 changed file
with
6 additions
and
4 deletions
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
| @@ -30,6 +30,7 @@ import java.util.ArrayList; | @@ -30,6 +30,7 @@ import java.util.ArrayList; | ||
| 30 | import java.util.Iterator; | 30 | import java.util.Iterator; |
| 31 | import java.util.List; | 31 | import java.util.List; |
| 32 | import java.util.concurrent.ConcurrentLinkedQueue; | 32 | import java.util.concurrent.ConcurrentLinkedQueue; |
| 33 | +import java.util.concurrent.atomic.AtomicBoolean; | ||
| 33 | 34 | ||
| 34 | /** | 35 | /** |
| 35 | * 目录查询的回复 | 36 | * 目录查询的回复 |
| @@ -60,6 +61,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp | @@ -60,6 +61,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp | ||
| 60 | 61 | ||
| 61 | @Autowired | 62 | @Autowired |
| 62 | private SipConfig sipConfig; | 63 | private SipConfig sipConfig; |
| 64 | + private AtomicBoolean processing = new AtomicBoolean(false); | ||
| 63 | 65 | ||
| 64 | @Override | 66 | @Override |
| 65 | public void afterPropertiesSet() throws Exception { | 67 | public void afterPropertiesSet() throws Exception { |
| @@ -68,7 +70,6 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp | @@ -68,7 +70,6 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp | ||
| 68 | 70 | ||
| 69 | @Override | 71 | @Override |
| 70 | public void handForDevice(RequestEvent evt, Device device, Element element) { | 72 | public void handForDevice(RequestEvent evt, Device device, Element element) { |
| 71 | - boolean isEmpty = taskQueue.isEmpty(); | ||
| 72 | taskQueue.offer(new HandlerCatchData(evt, device, element)); | 73 | taskQueue.offer(new HandlerCatchData(evt, device, element)); |
| 73 | // 回复200 OK | 74 | // 回复200 OK |
| 74 | try { | 75 | try { |
| @@ -76,8 +77,8 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp | @@ -76,8 +77,8 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp | ||
| 76 | } catch (SipException | InvalidArgumentException | ParseException e) { | 77 | } catch (SipException | InvalidArgumentException | ParseException e) { |
| 77 | logger.error("[命令发送失败] 目录查询回复: {}", e.getMessage()); | 78 | logger.error("[命令发送失败] 目录查询回复: {}", e.getMessage()); |
| 78 | } | 79 | } |
| 79 | - // 如果不为空则说明已经开启消息处理 | ||
| 80 | - if (isEmpty) { | 80 | + // 已经开启消息处理则跳过 |
| 81 | + if (processing.compareAndSet(false, true)) { | ||
| 81 | taskExecutor.execute(() -> { | 82 | taskExecutor.execute(() -> { |
| 82 | while (!taskQueue.isEmpty()) { | 83 | while (!taskQueue.isEmpty()) { |
| 83 | // 全局异常捕获,保证下一条可以得到处理 | 84 | // 全局异常捕获,保证下一条可以得到处理 |
| @@ -146,11 +147,12 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp | @@ -146,11 +147,12 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp | ||
| 146 | } | 147 | } |
| 147 | 148 | ||
| 148 | } | 149 | } |
| 149 | - }catch (Exception e) { | 150 | + } catch (Exception e) { |
| 150 | logger.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest()); | 151 | logger.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest()); |
| 151 | logger.error("[收到通道] 异常内容: ", e); | 152 | logger.error("[收到通道] 异常内容: ", e); |
| 152 | } | 153 | } |
| 153 | } | 154 | } |
| 155 | + processing.set(false); | ||
| 154 | }); | 156 | }); |
| 155 | } | 157 | } |
| 156 | 158 |