Commit a9ab5c28e9fd52c1d936a245ac46c9e556f6bc3e

Authored by 648540858
1 parent b079039f

优化订阅机制,需要重新订阅时,取消命令发送后再发送订阅命令 #1273

src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
... ... @@ -52,7 +52,7 @@ public class SubscribeHolder {
52 52 Runnable runnable = dynamicTask.get(taskOverdueKey);
53 53 if (runnable instanceof ISubscribeTask) {
54 54 ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
55   - subscribeTask.stop();
  55 + subscribeTask.stop(null);
56 56 }
57 57 // 添加任务处理订阅过期
58 58 dynamicTask.stop(taskOverdueKey);
... ... @@ -87,7 +87,7 @@ public class SubscribeHolder {
87 87 Runnable runnable = dynamicTask.get(taskOverdueKey);
88 88 if (runnable instanceof ISubscribeTask) {
89 89 ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
90   - subscribeTask.stop();
  90 + subscribeTask.stop(null);
91 91 }
92 92 // 添加任务处理订阅过期
93 93 dynamicTask.stop(taskOverdueKey);
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java
1 1 package com.genersoft.iot.vmp.gb28181.task;
2 2  
3   -import javax.sip.DialogState;
  3 +import com.genersoft.iot.vmp.common.CommonCallback;
4 4  
5 5 /**
6 6 * @author lin
7 7 */
8 8 public interface ISubscribeTask extends Runnable{
9   - void stop();
  9 + void stop(CommonCallback<Boolean> callback);
10 10 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java
1 1 package com.genersoft.iot.vmp.gb28181.task.impl;
2 2  
  3 +import com.genersoft.iot.vmp.common.CommonCallback;
3 4 import com.genersoft.iot.vmp.conf.DynamicTask;
4 5 import com.genersoft.iot.vmp.gb28181.bean.Device;
5 6 import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
... ... @@ -7,14 +8,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
7 8 import gov.nist.javax.sip.message.SIPRequest;
8 9 import org.slf4j.Logger;
9 10 import org.slf4j.LoggerFactory;
10   -import org.springframework.scheduling.annotation.Async;
11   -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
12 11  
13   -import javax.sip.*;
  12 +import javax.sip.DialogState;
  13 +import javax.sip.InvalidArgumentException;
  14 +import javax.sip.ResponseEvent;
  15 +import javax.sip.SipException;
14 16 import javax.sip.header.ToHeader;
15 17 import java.text.ParseException;
16   -import java.util.Timer;
17   -import java.util.TimerTask;
18 18  
19 19 /**
20 20 * 目录订阅任务
... ... @@ -71,7 +71,7 @@ public class CatalogSubscribeTask implements ISubscribeTask {
71 71 }
72 72  
73 73 @Override
74   - public void stop() {
  74 + public void stop(CommonCallback<Boolean> callback) {
75 75 /**
76 76 * dialog 的各个状态
77 77 * EARLY-> Early state状态-初始请求发送以后,收到了一个临时响应消息
... ... @@ -94,6 +94,9 @@ public class CatalogSubscribeTask implements ISubscribeTask {
94 94 // 成功
95 95 logger.info("[取消目录订阅]成功: {}", device.getDeviceId());
96 96 }
  97 + if (callback != null) {
  98 + callback.run(event.getResponse().getRawContent() != null);
  99 + }
97 100 },eventResult -> {
98 101 // 失败
99 102 logger.warn("[取消目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
1 1 package com.genersoft.iot.vmp.gb28181.task.impl;
2 2  
3   -import com.genersoft.iot.vmp.conf.DynamicTask;
4   -import com.genersoft.iot.vmp.gb28181.bean.*;
  3 +import com.genersoft.iot.vmp.common.CommonCallback;
5 4 import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
6   -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
7 5 import com.genersoft.iot.vmp.service.IPlatformService;
8   -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
9   -import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
10   -import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
11 6 import com.genersoft.iot.vmp.utils.SpringBeanFactory;
12   -import org.slf4j.Logger;
13   -import org.slf4j.LoggerFactory;
14   -import org.springframework.scheduling.annotation.Async;
15   -
16   -import javax.sip.DialogState;
17   -import java.util.List;
18 7  
19 8 /**
20 9 * 向已经订阅(移动位置)的上级发送MobilePosition消息
... ... @@ -38,7 +27,7 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
38 27 }
39 28  
40 29 @Override
41   - public void stop() {
  30 + public void stop(CommonCallback<Boolean> callback) {
42 31  
43 32 }
44 33 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java
1 1 package com.genersoft.iot.vmp.gb28181.task.impl;
2 2  
  3 +import com.genersoft.iot.vmp.common.CommonCallback;
3 4 import com.genersoft.iot.vmp.conf.DynamicTask;
4 5 import com.genersoft.iot.vmp.gb28181.bean.Device;
5 6 import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
6 7 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
7 8 import gov.nist.javax.sip.message.SIPRequest;
8   -import gov.nist.javax.sip.message.SIPResponse;
9   -import org.dom4j.Element;
10 9 import org.slf4j.Logger;
11 10 import org.slf4j.LoggerFactory;
12   -import org.springframework.scheduling.annotation.Async;
13 11  
14   -import javax.sip.*;
  12 +import javax.sip.InvalidArgumentException;
  13 +import javax.sip.ResponseEvent;
  14 +import javax.sip.SipException;
15 15 import javax.sip.header.ToHeader;
16 16 import java.text.ParseException;
17   -import java.util.Timer;
18   -import java.util.TimerTask;
19 17  
20 18 /**
21 19 * 移动位置订阅的定时更新
... ... @@ -70,7 +68,7 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
70 68 }
71 69  
72 70 @Override
73   - public void stop() {
  71 + public void stop(CommonCallback<Boolean> callback) {
74 72 /**
75 73 * dialog 的各个状态
76 74 * EARLY-> Early state状态-初始请求发送以后,收到了一个临时响应消息
... ... @@ -92,6 +90,9 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
92 90 // 成功
93 91 logger.info("[取消移动位置订阅]成功: {}", device.getDeviceId());
94 92 }
  93 + if (callback != null) {
  94 + callback.run(event.getResponse().getRawContent() != null);
  95 + }
95 96 },eventResult -> {
96 97 // 失败
97 98 logger.warn("[取消移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java
... ... @@ -82,8 +82,9 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
82 82 device.setIp(remoteAddressInfo.getIp());
83 83 // 设备地址变化会引起目录订阅任务失效,需要重新添加
84 84 if (device.getSubscribeCycleForCatalog() > 0) {
85   - deviceService.removeCatalogSubscribe(device);
86   - deviceService.addCatalogSubscribe(device);
  85 + deviceService.removeCatalogSubscribe(device, result->{
  86 + deviceService.addCatalogSubscribe(device);
  87 + });
87 88 }
88 89 }
89 90 if (device.getKeepaliveTime() == null) {
... ...
src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java
1 1 package com.genersoft.iot.vmp.service;
2 2  
  3 +import com.genersoft.iot.vmp.common.CommonCallback;
3 4 import com.genersoft.iot.vmp.gb28181.bean.Device;
4 5 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
5 6 import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
... ... @@ -39,7 +40,7 @@ public interface IDeviceService {
39 40 * @param device 设备信息
40 41 * @return 布尔
41 42 */
42   - boolean removeCatalogSubscribe(Device device);
  43 + boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback);
43 44  
44 45 /**
45 46 * 添加移动位置订阅
... ... @@ -53,7 +54,7 @@ public interface IDeviceService {
53 54 * @param device 设备信息
54 55 * @return 布尔
55 56 */
56   - boolean removeMobilePositionSubscribe(Device device);
  57 + boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback);
57 58  
58 59 /**
59 60 * 移除移动位置订阅
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
1 1 package com.genersoft.iot.vmp.service.impl;
2 2  
3 3 import com.baomidou.dynamic.datasource.annotation.DS;
  4 +import com.genersoft.iot.vmp.common.CommonCallback;
4 5 import com.genersoft.iot.vmp.common.VideoManagerConstants;
5 6 import com.genersoft.iot.vmp.conf.DynamicTask;
6 7 import com.genersoft.iot.vmp.conf.UserSetting;
... ... @@ -231,8 +232,8 @@ public class DeviceServiceImpl implements IDeviceService {
231 232 }
232 233 }
233 234 // 移除订阅
234   - removeCatalogSubscribe(device);
235   - removeMobilePositionSubscribe(device);
  235 + removeCatalogSubscribe(device, null);
  236 + removeMobilePositionSubscribe(device, null);
236 237 }
237 238  
238 239 @Override
... ... @@ -251,7 +252,7 @@ public class DeviceServiceImpl implements IDeviceService {
251 252 }
252 253  
253 254 @Override
254   - public boolean removeCatalogSubscribe(Device device) {
  255 + public boolean removeCatalogSubscribe(Device device, CommonCallback<Boolean> callback) {
255 256 if (device == null || device.getSubscribeCycleForCatalog() < 0) {
256 257 return false;
257 258 }
... ... @@ -261,7 +262,7 @@ public class DeviceServiceImpl implements IDeviceService {
261 262 Runnable runnable = dynamicTask.get(taskKey);
262 263 if (runnable instanceof ISubscribeTask) {
263 264 ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
264   - subscribeTask.stop();
  265 + subscribeTask.stop(callback);
265 266 }
266 267 }
267 268 dynamicTask.stop(taskKey);
... ... @@ -284,7 +285,7 @@ public class DeviceServiceImpl implements IDeviceService {
284 285 }
285 286  
286 287 @Override
287   - public boolean removeMobilePositionSubscribe(Device device) {
  288 + public boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback) {
288 289 if (device == null || device.getSubscribeCycleForCatalog() < 0) {
289 290 return false;
290 291 }
... ... @@ -294,7 +295,7 @@ public class DeviceServiceImpl implements IDeviceService {
294 295 Runnable runnable = dynamicTask.get(taskKey);
295 296 if (runnable instanceof ISubscribeTask) {
296 297 ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
297   - subscribeTask.stop();
  298 + subscribeTask.stop(callback);
298 299 }
299 300 }
300 301 dynamicTask.stop(taskKey);
... ... @@ -522,39 +523,54 @@ public class DeviceServiceImpl implements IDeviceService {
522 523 if (!ObjectUtils.isEmpty(device.getStreamMode())) {
523 524 deviceInStore.setStreamMode(device.getStreamMode());
524 525 }
525   -
526   -
527 526 // 目录订阅相关的信息
528 527 if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
529 528 if (device.getSubscribeCycleForCatalog() > 0) {
530 529 // 若已开启订阅,但订阅周期不同,则先取消
531 530 if (deviceInStore.getSubscribeCycleForCatalog() != 0) {
532   - removeCatalogSubscribe(deviceInStore);
  531 + removeCatalogSubscribe(deviceInStore, result->{
  532 + // 开启订阅
  533 + deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
  534 + addCatalogSubscribe(deviceInStore);
  535 + // 因为是异步执行,需要在这里更新下数据
  536 + deviceMapper.updateCustom(deviceInStore);
  537 + redisCatchStorage.updateDevice(deviceInStore);
  538 + });
  539 + }else {
  540 + // 开启订阅
  541 + deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
  542 + addCatalogSubscribe(deviceInStore);
533 543 }
534   - // 开启订阅
535   - deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
536   - addCatalogSubscribe(deviceInStore);
  544 +
537 545 }else if (device.getSubscribeCycleForCatalog() == 0) {
538 546 // 取消订阅
539   - deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
540   - removeCatalogSubscribe(deviceInStore);
  547 + deviceInStore.setSubscribeCycleForCatalog(0);
  548 + removeCatalogSubscribe(deviceInStore, null);
541 549 }
542 550 }
543   -
544 551 // 移动位置订阅相关的信息
545   - if (device.getSubscribeCycleForMobilePosition() > 0) {
546   - if (deviceInStore.getSubscribeCycleForMobilePosition() == 0 || deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) {
547   - deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
548   - deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
549   - // 开启订阅
550   - addMobilePositionSubscribe(deviceInStore);
551   - }
552   - }else if (device.getSubscribeCycleForMobilePosition() == 0) {
553   - if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
554   - deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
555   - deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
  552 + if (deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) {
  553 + if (device.getSubscribeCycleForMobilePosition() > 0) {
  554 + // 若已开启订阅,但订阅周期不同,则先取消
  555 + if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
  556 + removeMobilePositionSubscribe(deviceInStore, result->{
  557 + // 开启订阅
  558 + deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
  559 + addMobilePositionSubscribe(deviceInStore);
  560 + // 因为是异步执行,需要在这里更新下数据
  561 + deviceMapper.updateCustom(deviceInStore);
  562 + redisCatchStorage.updateDevice(deviceInStore);
  563 + });
  564 + }else {
  565 + // 开启订阅
  566 + deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
  567 + addMobilePositionSubscribe(deviceInStore);
  568 + }
  569 +
  570 + }else if (device.getSubscribeCycleForMobilePosition() == 0) {
556 571 // 取消订阅
557   - removeMobilePositionSubscribe(deviceInStore);
  572 + deviceInStore.setSubscribeCycleForCatalog(0);
  573 + removeCatalogSubscribe(deviceInStore, null);
558 574 }
559 575 }
560 576 if (deviceInStore.getGeoCoordSys() != null) {
... ... @@ -574,9 +590,8 @@ public class DeviceServiceImpl implements IDeviceService {
574 590 //作为消息通道
575 591 deviceInStore.setAsMessageChannel(device.isAsMessageChannel());
576 592  
577   - // 更新redis
578 593 deviceMapper.updateCustom(deviceInStore);
579   - redisCatchStorage.removeDevice(deviceInStore.getDeviceId());
  594 + redisCatchStorage.updateDevice(deviceInStore);
580 595 }
581 596  
582 597 @Override
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java
... ... @@ -153,10 +153,7 @@ public class MobilePositionController {
153 153 Device device = storager.queryVideoDevice(deviceId);
154 154 device.setSubscribeCycleForMobilePosition(Integer.parseInt(expires));
155 155 device.setMobilePositionSubmissionInterval(Integer.parseInt(interval));
156   - deviceService.updateDevice(device);
157   - if (!deviceService.removeMobilePositionSubscribe(device)) {
158   - throw new ControllerException(ErrorCode.ERROR100);
159   - }
  156 + deviceService.updateCustomDevice(device);
160 157 }
161 158  
162 159 /**
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
... ... @@ -199,7 +199,7 @@ public class DeviceQuery {
199 199 Runnable runnable = dynamicTask.get(key);
200 200 if (runnable instanceof ISubscribeTask) {
201 201 ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
202   - subscribeTask.stop();
  202 + subscribeTask.stop(null);
203 203 }
204 204 dynamicTask.stop(key);
205 205 }
... ...