Commit c1d7f867c2ffcb1364334a5e013eb8f208819ef5

Authored by 648540858
1 parent 41616f72

优化目录订阅以及国标级联目录订阅回复

Showing 25 changed files with 451 additions and 208 deletions
src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java
... ... @@ -60,8 +60,8 @@ public class MediaConfig{
60 60 @Value("${media.secret}")
61 61 private String secret;
62 62  
63   - @Value("${media.stream-none-reader-delay-ms:18000}")
64   - private int streamNoneReaderDelayMS = 18000;
  63 + @Value("${media.stream-none-reader-delay-ms:10000}")
  64 + private int streamNoneReaderDelayMS = 10000;
65 65  
66 66 @Value("${media.rtp.enable}")
67 67 private boolean rtpEnable;
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
... ... @@ -47,7 +47,7 @@ public class SipLayer{
47 47 Properties properties = new Properties();
48 48 properties.setProperty("javax.sip.STACK_NAME", "GB28181_SIP");
49 49 properties.setProperty("javax.sip.IP_ADDRESS", sipConfig.getMonitorIp());
50   - properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "false");
  50 + properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "true");
51 51 /**
52 52 * sip_server_log.log 和 sip_debug_log.log public static final int TRACE_NONE =
53 53 * 0; public static final int TRACE_MESSAGES = 16; public static final int
... ... @@ -57,6 +57,7 @@ public class SipLayer{
57 57 properties.setProperty("gov.nist.javax.sip.SERVER_LOG", "sip_server_log");
58 58 properties.setProperty("gov.nist.javax.sip.DEBUG_LOG", "sip_debug_log");
59 59 sipStack = (SipStackImpl) sipFactory.createSipStack(properties);
  60 +
60 61 return sipStack;
61 62 }
62 63  
... ... @@ -70,6 +71,7 @@ public class SipLayer{
70 71 tcpSipProvider = (SipProviderImpl)sipStack.createSipProvider(tcpListeningPoint);
71 72 tcpSipProvider.setDialogErrorsAutomaticallyHandled();
72 73 tcpSipProvider.addSipListener(sipProcessorObserver);
  74 +// tcpSipProvider.setAutomaticDialogSupportEnabled(false);
73 75 logger.info("Sip Server TCP 启动成功 port {" + sipConfig.getMonitorIp() + ":" + sipConfig.getPort() + "}");
74 76 } catch (TransportNotSupportedException e) {
75 77 e.printStackTrace();
... ... @@ -93,6 +95,7 @@ public class SipLayer{
93 95 udpListeningPoint = sipStack.createListeningPoint(sipConfig.getMonitorIp(), sipConfig.getPort(), "UDP");
94 96 udpSipProvider = (SipProviderImpl)sipStack.createSipProvider(udpListeningPoint);
95 97 udpSipProvider.addSipListener(sipProcessorObserver);
  98 +// udpSipProvider.setAutomaticDialogSupportEnabled(false);
96 99 } catch (TransportNotSupportedException e) {
97 100 e.printStackTrace();
98 101 } catch (InvalidArgumentException e) {
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java
... ... @@ -27,18 +27,18 @@ public class RegisterLogicHandler {
27 27  
28 28 public void onRegister(Device device) {
29 29 // 只有第一次注册时调用查询设备信息,如需更新调用更新API接口
30   - // TODO 此处错误无法获取到通道
31   - Device device1 = storager.queryVideoDevice(device.getDeviceId());
32   - if (device.isFirsRegister()) {
33   - logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId());
34   - try {
35   - Thread.sleep(100);
36   - cmder.deviceInfoQuery(device);
37   - Thread.sleep(100);
38   - cmder.catalogQuery(device, null);
39   - } catch (InterruptedException e) {
40   - e.printStackTrace();
41   - }
42   - }
  30 +// // TODO 此处错误无法获取到通道
  31 +// Device device1 = storager.queryVideoDevice(device.getDeviceId());
  32 +// if (device.isFirsRegister()) {
  33 +// logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId());
  34 +// try {
  35 +// Thread.sleep(100);
  36 +// cmder.deviceInfoQuery(device);
  37 +// Thread.sleep(100);
  38 +// cmder.catalogQuery(device, null);
  39 +// } catch (InterruptedException e) {
  40 +// e.printStackTrace();
  41 +// }
  42 +// }
43 43 }
44 44 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
... ... @@ -220,4 +220,6 @@ public class SendRtpItem {
220 220 public void setDialog(byte[] dialog) {
221 221 this.dialog = dialog;
222 222 }
  223 +
  224 +
223 225 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java 0 → 100644
  1 +package com.genersoft.iot.vmp.gb28181.bean;
  2 +
  3 +import org.springframework.stereotype.Component;
  4 +
  5 +import java.util.concurrent.ConcurrentHashMap;
  6 +
  7 +@Component
  8 +public class SubscribeHolder {
  9 +
  10 + private static ConcurrentHashMap<String, SubscribeInfo> catalogMap = new ConcurrentHashMap<>();
  11 + private static ConcurrentHashMap<String, SubscribeInfo> mobilePositionMap = new ConcurrentHashMap<>();
  12 +
  13 +
  14 + public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) {
  15 + catalogMap.put(platformId, subscribeInfo);
  16 + }
  17 +
  18 + public SubscribeInfo getCatalogSubscribe(String platformId) {
  19 + return catalogMap.get(platformId);
  20 + }
  21 +
  22 + public void removeCatalogSubscribe(String platformId) {
  23 + catalogMap.remove(platformId);
  24 + }
  25 +
  26 + public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo) {
  27 + mobilePositionMap.put(platformId, subscribeInfo);
  28 + }
  29 +
  30 + public SubscribeInfo getMobilePositionSubscribe(String platformId) {
  31 + return mobilePositionMap.get(platformId);
  32 + }
  33 +
  34 + public void removeMobilePositionSubscribe(String platformId) {
  35 + mobilePositionMap.remove(platformId);
  36 + }
  37 +}
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java
1 1 package com.genersoft.iot.vmp.gb28181.bean;
2 2  
  3 +import com.genersoft.iot.vmp.utils.SerializeUtils;
  4 +
  5 +import javax.sip.Dialog;
3 6 import javax.sip.RequestEvent;
  7 +import javax.sip.ServerTransaction;
4 8 import javax.sip.header.*;
5 9 import javax.sip.message.Request;
6 10  
7 11 public class SubscribeInfo {
8 12  
9   - public SubscribeInfo() {
10   - }
11 13  
12 14 public SubscribeInfo(RequestEvent evt, String id) {
13 15 this.id = id;
... ... @@ -23,6 +25,8 @@ public class SubscribeInfo {
23 25 this.eventType = eventHeader.getEventType();
24 26 ViaHeader viaHeader = (ViaHeader)request.getHeader(ViaHeader.NAME);
25 27 this.branch = viaHeader.getBranch();
  28 + this.transaction = evt.getServerTransaction();
  29 + this.dialog = evt.getDialog();
26 30 }
27 31  
28 32 private String id;
... ... @@ -33,6 +37,8 @@ public class SubscribeInfo {
33 37 private String fromTag;
34 38 private String toTag;
35 39 private String branch;
  40 + private ServerTransaction transaction;
  41 + private Dialog dialog;
36 42  
37 43 public String getId() {
38 44 return id;
... ... @@ -97,4 +103,20 @@ public class SubscribeInfo {
97 103 public void setBranch(String branch) {
98 104 this.branch = branch;
99 105 }
  106 +
  107 + public ServerTransaction getTransaction() {
  108 + return transaction;
  109 + }
  110 +
  111 + public void setTransaction(ServerTransaction transaction) {
  112 + this.transaction = transaction;
  113 + }
  114 +
  115 + public Dialog getDialog() {
  116 + return dialog;
  117 + }
  118 +
  119 + public void setDialog(Dialog dialog) {
  120 + this.dialog = dialog;
  121 + }
100 122 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
... ... @@ -33,12 +33,20 @@ public class EventPublisher {
33 33 @Autowired
34 34 private ApplicationEventPublisher applicationEventPublisher;
35 35  
36   - public void onlineEventPublish(Device device, String from) {
  36 + public void onlineEventPublish(Device device, String from, int expires) {
37 37 OnlineEvent onEvent = new OnlineEvent(this);
38 38 onEvent.setDevice(device);
39 39 onEvent.setFrom(from);
  40 + onEvent.setExpires(expires);
40 41 applicationEventPublisher.publishEvent(onEvent);
41 42 }
  43 +
  44 + public void onlineEventPublish(Device device, String from) {
  45 + OnlineEvent onEvent = new OnlineEvent(this);
  46 + onEvent.setDevice(device);
  47 + onEvent.setFrom(from);
  48 + applicationEventPublisher.publishEvent(onEvent);
  49 + }
42 50  
43 51 public void outlineEventPublish(String deviceId, String from){
44 52 OfflineEvent outEvent = new OfflineEvent(this);
... ... @@ -107,6 +115,12 @@ public class EventPublisher {
107 115 }
108 116  
109 117  
  118 + /**
  119 + *
  120 + * @param platformId
  121 + * @param deviceChannels
  122 + * @param type
  123 + */
110 124 public void catalogEventPublish(String platformId, List<DeviceChannel> deviceChannels, String type) {
111 125 CatalogEvent outEvent = new CatalogEvent(this);
112 126 List<DeviceChannel> channels = new ArrayList<>();
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java
... ... @@ -91,7 +91,7 @@ public class OfflineEventListener implements ApplicationListener&lt;OfflineEvent&gt; {
91 91  
92 92 // 离线释放所有ssrc
93 93 List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(event.getDeviceId(), null, null, null);
94   - if (ssrcTransactions.size() > 0) {
  94 + if (ssrcTransactions != null && ssrcTransactions.size() > 0) {
95 95 for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
96 96 mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
97 97 mediaServerService.closeRTPServer(event.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEvent.java
... ... @@ -23,6 +23,8 @@ public class OnlineEvent extends ApplicationEvent {
23 23  
24 24 private String from;
25 25  
  26 + private int expires;
  27 +
26 28 public Device getDevice() {
27 29 return device;
28 30 }
... ... @@ -38,5 +40,12 @@ public class OnlineEvent extends ApplicationEvent {
38 40 public void setFrom(String from) {
39 41 this.from = from;
40 42 }
41   -
  43 +
  44 + public int getExpires() {
  45 + return expires;
  46 + }
  47 +
  48 + public void setExpires(int expires) {
  49 + this.expires = expires;
  50 + }
42 51 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java
... ... @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
6 6 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
7 7 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
8 8 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
  9 +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
9 10 import com.genersoft.iot.vmp.service.IDeviceService;
10 11 import com.genersoft.iot.vmp.storager.dao.dto.User;
11 12 import org.slf4j.Logger;
... ... @@ -51,6 +52,9 @@ public class OnlineEventListener implements ApplicationListener&lt;OnlineEvent&gt; {
51 52 @Autowired
52 53 private EventPublisher eventPublisher;
53 54  
  55 + @Autowired
  56 + private SIPCommander cmder;
  57 +
54 58 private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
55 59  
56 60 @Override
... ... @@ -62,13 +66,21 @@ public class OnlineEventListener implements ApplicationListener&lt;OnlineEvent&gt; {
62 66 Device device = event.getDevice();
63 67 if (device == null) return;
64 68 String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + userSetup.getServerId() + "_" + event.getDevice().getDeviceId();
65   -
  69 + Device deviceInStore = storager.queryVideoDevice(device.getDeviceId());
  70 + device.setOnline(1);
  71 + // 处理上线监听
  72 + storager.updateDevice(device);
66 73 switch (event.getFrom()) {
67 74 // 注册时触发的在线事件,先在redis中增加超时超时监听
68 75 case VideoManagerConstants.EVENT_ONLINE_REGISTER:
69 76 // 超时时间
70 77 redis.set(key, event.getDevice().getDeviceId(), sipConfig.getKeepaliveTimeOut());
71 78 device.setRegisterTime(format.format(System.currentTimeMillis()));
  79 + if (deviceInStore == null) { //第一次上线
  80 + logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId());
  81 + cmder.deviceInfoQuery(device);
  82 + cmder.catalogQuery(device, null);
  83 + }
72 84 break;
73 85 // 设备主动发送心跳触发的在线事件
74 86 case VideoManagerConstants.EVENT_ONLINE_KEEPLIVE:
... ... @@ -87,19 +99,11 @@ public class OnlineEventListener implements ApplicationListener&lt;OnlineEvent&gt; {
87 99 break;
88 100 }
89 101  
90   - device.setOnline(1);
91   - Device deviceInStore = storager.queryVideoDevice(device.getDeviceId());
92   - if (deviceInStore != null && deviceInStore.getOnline() == 0) {
93   - List<DeviceChannel> deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId());
94   - eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON);
95   - }
96   - // 处理上线监听
97   - storager.updateDevice(device);
98   -
  102 + List<DeviceChannel> deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId());
  103 + eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON);
99 104 // 上线添加订阅
100 105 if (device.getSubscribeCycleForCatalog() > 0) {
101 106 deviceService.addCatalogSubscribe(device);
102 107 }
103   -
104 108 }
105 109 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
... ... @@ -52,6 +52,9 @@ public class CatalogEventLister implements ApplicationListener&lt;CatalogEvent&gt; {
52 52 @Autowired
53 53 private IGbStreamService gbStreamService;
54 54  
  55 + @Autowired
  56 + private SubscribeHolder subscribeHolder;
  57 +
55 58 @Override
56 59 public void onApplicationEvent(CatalogEvent event) {
57 60 SubscribeInfo subscribe = null;
... ... @@ -62,7 +65,8 @@ public class CatalogEventLister implements ApplicationListener&lt;CatalogEvent&gt; {
62 65 parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformId());
63 66 if (parentPlatform != null && !parentPlatform.isStatus())return;
64 67 String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + event.getPlatformId();
65   - subscribe = redisCatchStorage.getSubscribe(key);
  68 +// subscribe = redisCatchStorage.getSubscribe(key);
  69 + subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
66 70  
67 71 if (subscribe == null) {
68 72 logger.debug("发送订阅消息时发现订阅信息已经不存在");
... ... @@ -114,7 +118,8 @@ public class CatalogEventLister implements ApplicationListener&lt;CatalogEvent&gt; {
114 118 if (parentPlatforms != null && parentPlatforms.size() > 0) {
115 119 for (ParentPlatform platform : parentPlatforms) {
116 120 String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId();
117   - SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key);
  121 +// SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key);
  122 + SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());
118 123 if (subscribeInfo == null) continue;
119 124 logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);
120 125 List<DeviceChannel> deviceChannelList = new ArrayList<>();
... ... @@ -153,8 +158,9 @@ public class CatalogEventLister implements ApplicationListener&lt;CatalogEvent&gt; {
153 158 List<ParentPlatform> parentPlatforms = parentPlatformMap.get(gbId);
154 159 if (parentPlatforms != null && parentPlatforms.size() > 0) {
155 160 for (ParentPlatform platform : parentPlatforms) {
156   - String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId();
157   - SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key);
  161 +// String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId();
  162 +// SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key);
  163 + SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
158 164 if (subscribeInfo == null) continue;
159 165 logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);
160 166 List<DeviceChannel> deviceChannelList = new ArrayList<>();
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java
... ... @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.task;
2 2  
3 3 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
4 4 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
  5 +import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
5 6 import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
6 7 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
7 8 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
... ... @@ -16,25 +17,28 @@ public class GPSSubscribeTask implements Runnable{
16 17 private IRedisCatchStorage redisCatchStorage;
17 18 private IVideoManagerStorager storager;
18 19 private ISIPCommanderForPlatform sipCommanderForPlatform;
  20 + private SubscribeHolder subscribeHolder;
19 21 private String platformId;
20 22 private String sn;
21 23 private String key;
22 24  
23 25 private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
24 26  
25   - public GPSSubscribeTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorager storager, String platformId, String sn, String key) {
  27 + public GPSSubscribeTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorager storager, String platformId, String sn, String key, SubscribeHolder subscribeInfo) {
26 28 this.redisCatchStorage = redisCatchStorage;
27 29 this.storager = storager;
28 30 this.platformId = platformId;
29 31 this.sn = sn;
30 32 this.key = key;
31 33 this.sipCommanderForPlatform = sipCommanderForPlatform;
  34 + this.subscribeHolder = subscribeInfo;
32 35 }
33 36  
34 37 @Override
35 38 public void run() {
36 39  
37   - SubscribeInfo subscribe = redisCatchStorage.getSubscribe(key);
  40 + SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platformId);
  41 +
38 42 if (subscribe != null) {
39 43 ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
40 44 if (parentPlatform == null || parentPlatform.isStatus()) {
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
... ... @@ -94,6 +94,7 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
94 94 Response response = responseEvent.getResponse();
95 95 logger.debug("\n收到响应:\n{}", responseEvent.getResponse());
96 96 int status = response.getStatusCode();
  97 +
97 98 if (((status >= 200) && (status < 300)) || status == 401) { // Success!
98 99 CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
99 100 String method = cseqHeader.getMethod();
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
... ... @@ -236,57 +236,57 @@ public class SIPRequestHeaderPlarformProvider {
236 236 return request;
237 237 }
238 238  
239   - public Request createNotifyRequest(ParentPlatform parentPlatform, String content, CallIdHeader callIdHeader, String viaTag, SubscribeInfo subscribeInfo) throws PeerUnavailableException, ParseException, InvalidArgumentException {
240   - Request request = null;
241   - // sipuri
242   - SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP()+ ":" + parentPlatform.getServerPort());
243   - // via
244   - ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
245   - ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()),
246   - parentPlatform.getTransport(), subscribeInfo.getBranch());
247   - viaHeader.setRPort();
248   - viaHeaders.add(viaHeader);
249   - // from
250   - SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(),
251   - parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort());
252   - Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI);
253   - FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, subscribeInfo.getToTag());
254   - // to
255   - SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain());
256   - Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI);
257   - ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, subscribeInfo.getFromTag());
258   -
259   - // Forwards
260   - MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
261   - // ceq
262   - CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.NOTIFY), Request.NOTIFY);
263   - MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
264   - // 设置编码, 防止中文乱码
265   - messageFactory.setDefaultContentEncodingCharset("gb2312");
266   - request = messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader,
267   - toHeader, viaHeaders, maxForwards);
268   - List<String> agentParam = new ArrayList<>();
269   - agentParam.add("wvp-pro");
270   - UserAgentHeader userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
271   - request.addHeader(userAgentHeader);
272   -
273   - EventHeader event = sipFactory.createHeaderFactory().createEventHeader(subscribeInfo.getEventType());
274   - if (subscribeInfo.getEventId() != null) {
275   - event.setEventId(subscribeInfo.getEventId());
276   - }
277   -
278   - request.addHeader(event);
279   -
280   - SubscriptionStateHeader active = sipFactory.createHeaderFactory().createSubscriptionStateHeader("active");
281   - request.setHeader(active);
282   -
283   - String sipAddress = sipConfig.getIp() + ":" + sipConfig.getPort();
284   - Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory()
285   - .createSipURI(parentPlatform.getDeviceGBId(), sipAddress));
286   - request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
287   -
288   - ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
289   - request.setContent(content, contentTypeHeader);
290   - return request;
291   - }
  239 +// public Request createNotifyRequest(ParentPlatform parentPlatform, String content, CallIdHeader callIdHeader, String viaTag, String fromTag, SubscribeInfo subscribeInfo) throws PeerUnavailableException, ParseException, InvalidArgumentException {
  240 +// Request request = null;
  241 +// // sipuri
  242 +// SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP()+ ":" + parentPlatform.getServerPort());
  243 +// // via
  244 +// ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
  245 +// ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()),
  246 +// parentPlatform.getTransport(), viaTag);
  247 +// viaHeader.setRPort();
  248 +// viaHeaders.add(viaHeader);
  249 +// // from
  250 +// SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(),
  251 +// parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort());
  252 +// Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI);
  253 +// FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, fromTag);
  254 +// // to
  255 +// SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain());
  256 +// Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI);
  257 +// ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, subscribeInfo.getFromTag());
  258 +//
  259 +// // Forwards
  260 +// MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
  261 +// // ceq
  262 +// CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.NOTIFY), Request.NOTIFY);
  263 +// MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
  264 +// // 设置编码, 防止中文乱码
  265 +// messageFactory.setDefaultContentEncodingCharset("gb2312");
  266 +// request = messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader,
  267 +// toHeader, viaHeaders, maxForwards);
  268 +// List<String> agentParam = new ArrayList<>();
  269 +// agentParam.add("wvp-pro");
  270 +// UserAgentHeader userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
  271 +// request.addHeader(userAgentHeader);
  272 +//
  273 +// EventHeader event = sipFactory.createHeaderFactory().createEventHeader(subscribeInfo.getEventType());
  274 +// if (subscribeInfo.getEventId() != null) {
  275 +// event.setEventId(subscribeInfo.getEventId());
  276 +// }
  277 +//
  278 +// request.addHeader(event);
  279 +//
  280 +// SubscriptionStateHeader active = sipFactory.createHeaderFactory().createSubscriptionStateHeader("active");
  281 +// request.setHeader(active);
  282 +//
  283 +// String sipAddress = sipConfig.getIp() + ":" + sipConfig.getPort();
  284 +// Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory()
  285 +// .createSipURI(parentPlatform.getDeviceGBId(), sipAddress));
  286 +// request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
  287 +//
  288 +// ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
  289 +// request.setContent(content, contentTypeHeader);
  290 +// return request;
  291 +// }
292 292 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
... ... @@ -215,6 +215,9 @@ public class SIPRequestHeaderProvider {
215 215  
216 216 // Event
217 217 EventHeader eventHeader = sipFactory.createHeaderFactory().createEventHeader(event);
  218 +
  219 + int random = (int)Math.random() * 1000000000;
  220 + eventHeader.setEventId(random + "");
218 221 request.addHeader(eventHeader);
219 222  
220 223 ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
... ... @@ -1518,7 +1518,7 @@ public class SIPCommander implements ISIPCommander {
1518 1518  
1519 1519 // 有效时间默认为60秒以上
1520 1520 Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm,
1521   - "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog() + 60, "Catalog" ,
  1521 + "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" ,
1522 1522 callIdHeader);
1523 1523 transmitRequest(device, request, errorEvent, okEvent);
1524 1524  
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
... ... @@ -27,9 +27,7 @@ import org.springframework.util.StringUtils;
27 27  
28 28 import javax.sip.*;
29 29 import javax.sip.address.SipURI;
30   -import javax.sip.header.CallIdHeader;
31   -import javax.sip.header.ViaHeader;
32   -import javax.sip.header.WWWAuthenticateHeader;
  30 +import javax.sip.header.*;
33 31 import javax.sip.message.Request;
34 32 import java.lang.reflect.Field;
35 33 import java.text.ParseException;
... ... @@ -68,6 +66,9 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
68 66 @Qualifier(value="udpSipProvider")
69 67 private SipProviderImpl udpSipProvider;
70 68  
  69 + @Autowired
  70 + private SipFactory sipFactory;
  71 +
71 72 @Override
72 73 public boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
73 74 return register(parentPlatform, null, null, errorEvent, okEvent, false);
... ... @@ -88,7 +89,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
88 89 public boolean register(ParentPlatform parentPlatform, @Nullable String callId, @Nullable WWWAuthenticateHeader www,
89 90 SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain) {
90 91 try {
91   - Request request = null;
  92 + Request request;
92 93 String tm = Long.toString(System.currentTimeMillis());
93 94 if (!registerAgain ) {
94 95 // //callid
... ... @@ -364,16 +365,18 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
364 365 : udpSipProvider.getNewCallId();
365 366 callIdHeader.setCallId(subscribeInfo.getCallId());
366 367  
367   - String tm = Long.toString(System.currentTimeMillis());
368   -
369   - Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform,
370   - deviceStatusXml.toString(),callIdHeader,
371   - "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), subscribeInfo);
372   - transmitRequest(parentPlatform, request);
  368 +//
  369 + sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> {
  370 + logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
  371 + }, null);
373 372  
374   - } catch (SipException | ParseException | InvalidArgumentException e) {
  373 + } catch (SipException | ParseException e) {
375 374 e.printStackTrace();
376 375 return false;
  376 + } catch (NoSuchFieldException e) {
  377 + e.printStackTrace();
  378 + } catch (IllegalAccessException e) {
  379 + e.printStackTrace();
377 380 }
378 381 return true;
379 382 }
... ... @@ -386,37 +389,89 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
386 389 if (index == null) {
387 390 index = 0;
388 391 }
389   -
  392 + if (index >= deviceChannels.size()) {
  393 + return true;
  394 + }
390 395 try {
391   - if (index > deviceChannels.size() - 1) {
392   - return true;
393   - }
394   - Request request = getCatalogNotifyRequestForCatalogAddOrUpdate(parentPlatform, deviceChannels.get(index), deviceChannels.size(), type, subscribeInfo);
395   - index += 1;
396 396 Integer finalIndex = index;
397   - transmitRequest(parentPlatform, request, null, (eventResult -> {
398   - sendNotifyForCatalogAddOrUpdate(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex);
  397 + String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, deviceChannels.get(index ), deviceChannels.size(), type, subscribeInfo);
  398 + sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
  399 + logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
  400 + }, (eventResult -> {
  401 + sendNotifyForCatalogAddOrUpdate(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex + 1);
399 402 }));
400   - } catch (SipException | ParseException | InvalidArgumentException e) {
  403 + } catch (SipException | ParseException e) {
401 404 e.printStackTrace();
402 405 return false;
  406 + } catch (NoSuchFieldException e) {
  407 + e.printStackTrace();
  408 + } catch (IllegalAccessException e) {
  409 + e.printStackTrace();
403 410 }
404 411 return true;
405 412 }
406 413  
407   - private Request getCatalogNotifyRequestForCatalogAddOrUpdate(ParentPlatform parentPlatform, DeviceChannel channel, int size, String type,
408   - SubscribeInfo subscribeInfo) throws ParseException, InvalidArgumentException,
409   - PeerUnavailableException {
410   - String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channel, size, type, subscribeInfo);
411   -
412   - CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
413   - : udpSipProvider.getNewCallId();
414   - callIdHeader.setCallId(subscribeInfo.getCallId());
415   - Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, catalogXmlContent,
416   - callIdHeader, "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), subscribeInfo);
417   - return request;
  414 + private void sendNotify(ParentPlatform parentPlatform, String catalogXmlContent,
  415 + SubscribeInfo subscribeInfo, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent )
  416 + throws NoSuchFieldException, IllegalAccessException, SipException, ParseException {
  417 + Dialog dialog = subscribeInfo.getDialog();
  418 + Request notifyRequest = dialog.createRequest(Request.NOTIFY);
  419 +
  420 + ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
  421 +
  422 + notifyRequest.setContent(catalogXmlContent, contentTypeHeader);
  423 +
  424 + SubscriptionStateHeader subscriptionState = sipFactory.createHeaderFactory()
  425 + .createSubscriptionStateHeader(SubscriptionStateHeader.ACTIVE);
  426 + notifyRequest.addHeader(subscriptionState);
  427 +
  428 + EventHeader event = sipFactory.createHeaderFactory().createEventHeader(subscribeInfo.getEventType());
  429 + if (subscribeInfo.getEventId() != null) {
  430 + event.setEventId(subscribeInfo.getEventId());
  431 + }
  432 + notifyRequest.addHeader(event);
  433 +
  434 + SipURI sipURI = (SipURI) notifyRequest.getRequestURI();
  435 + SIPRequest request = (SIPRequest) subscribeInfo.getTransaction().getRequest();
  436 + sipURI.setHost(request.getRemoteAddress().getHostName());
  437 + sipURI.setPort(request.getRemotePort());
  438 + ClientTransaction transaction = null;
  439 + if ("TCP".equals(parentPlatform.getTransport())) {
  440 + transaction = tcpSipProvider.getNewClientTransaction(notifyRequest);
  441 + } else if ("UDP".equals(parentPlatform.getTransport())) {
  442 + transaction = udpSipProvider.getNewClientTransaction(notifyRequest);
  443 + }
  444 + // 添加错误订阅
  445 + if (errorEvent != null) {
  446 + sipSubscribe.addErrorSubscribe(subscribeInfo.getCallId(), errorEvent);
  447 + }
  448 + // 添加订阅
  449 + if (okEvent != null) {
  450 + sipSubscribe.addOkSubscribe(subscribeInfo.getCallId(), okEvent);
  451 + }
  452 + if (transaction == null) {
  453 + logger.error("平台{}的Transport错误:{}",parentPlatform.getServerGBId(), parentPlatform.getTransport());
  454 + return;
  455 + }
  456 + dialog.sendRequest(transaction);
  457 +
418 458 }
419 459  
  460 +// private Request getCatalogNotifyRequestForCatalogAddOrUpdate(ParentPlatform parentPlatform, DeviceChannel channel, int size, String type,
  461 +// SubscribeInfo subscribeInfo) throws ParseException, InvalidArgumentException,
  462 +// PeerUnavailableException, NoSuchFieldException, IllegalAccessException {
  463 +// String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channel, size, type, subscribeInfo);
  464 +//
  465 +// CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
  466 +// : udpSipProvider.getNewCallId();
  467 +// callIdHeader.setCallId(subscribeInfo.getCallId());
  468 +// String tm = Long.toString(System.currentTimeMillis());
  469 +//
  470 +// Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, catalogXmlContent,
  471 +// callIdHeader, "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""),"FromRegister" + tm, subscribeInfo);
  472 +// return request;
  473 +// }
  474 +
420 475 private String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, DeviceChannel channel, int sumNum, String type, SubscribeInfo subscribeInfo) {
421 476 StringBuffer catalogXml = new StringBuffer(600);
422 477 if (parentPlatform.getServerGBId().equals(channel.getParentId())) {
... ... @@ -465,34 +520,31 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
465 520 if (index == null) {
466 521 index = 0;
467 522 }
468   -
469   - if (index > deviceChannels.size() - 1) {
  523 + if (index >= deviceChannels.size()) {
470 524 return true;
471 525 }
472 526 try {
473   - String catalogXml = getCatalogXmlContentForCatalogOther(deviceChannels.get(index), type, parentPlatform);
474   - CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
475   - : udpSipProvider.getNewCallId();
476   - Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, catalogXml,
477   - callIdHeader,
478   - "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), subscribeInfo);
479   - index += 1;
480 527 Integer finalIndex = index;
481   - transmitRequest(parentPlatform, request, null, eventResult -> {
482   - sendNotifyForCatalogOther(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex);
483   - });
  528 + String catalogXmlContent = getCatalogXmlContentForCatalogOther(parentPlatform, deviceChannels.get(index), type);
  529 + sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
  530 + logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
  531 + }, (eventResult -> {
  532 + sendNotifyForCatalogOther(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex + 1);
  533 + }));
484 534 } catch (SipException e) {
485 535 e.printStackTrace();
486   - } catch (InvalidArgumentException e) {
487   - e.printStackTrace();
488 536 } catch (ParseException e) {
489 537 e.printStackTrace();
  538 + } catch (NoSuchFieldException e) {
  539 + e.printStackTrace();
  540 + } catch (IllegalAccessException e) {
  541 + e.printStackTrace();
490 542 }
491 543  
492 544 return true;
493 545 }
494 546  
495   - private String getCatalogXmlContentForCatalogOther(DeviceChannel channel, String type, ParentPlatform parentPlatform) {
  547 + private String getCatalogXmlContentForCatalogOther(ParentPlatform parentPlatform, DeviceChannel channel, String type) {
496 548 if (parentPlatform.getServerGBId().equals(channel.getParentId())) {
497 549 channel.setParentId(parentPlatform.getDeviceGBId());
498 550 }
... ... @@ -594,6 +646,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
594 646 byte[] transactionByteArray = sendRtpItem.getTransaction();
595 647 ClientTransaction clientTransaction = (ClientTransaction) SerializeUtils.deSerialize(transactionByteArray);
596 648 Request byeRequest = dialog.createRequest(Request.BYE);
  649 +
597 650 SipURI byeURI = (SipURI) byeRequest.getRequestURI();
598 651 SIPRequest request = (SIPRequest) clientTransaction.getRequest();
599 652 byeURI.setHost(request.getRemoteAddress().getHostName());
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
... ... @@ -233,6 +233,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
233 233 */
234 234 private void processNotifyCatalogList(RequestEvent evt) {
235 235 try {
  236 + System.out.println(343434);
236 237 FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
237 238 String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
238 239  
... ... @@ -309,12 +310,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
309 310  
310 311 }
311 312  
312   - // RequestMessage msg = new RequestMessage();
313   - // msg.setDeviceId(deviceId);
314   - // msg.setType(DeferredResultHolder.CALLBACK_CMD_CATALOG);
315   - // msg.setData(device);
316   - // deferredResultHolder.invokeResult(msg);
317   -
318 313 if (offLineDetector.isOnline(deviceId)) {
319 314 publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
320 315 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java
... ... @@ -81,7 +81,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
81 81 String requestAddress = evtExt.getRemoteIpAddress() + ":" + evtExt.getRemotePort();
82 82 logger.info("[{}] 收到注册请求,开始处理", requestAddress);
83 83 Request request = evt.getRequest();
84   -
  84 + ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME);
85 85 Response response = null;
86 86 boolean passwordCorrect = false;
87 87 // 注册标志 0:未携带授权头或者密码错误 1:注册成功 2:注销成功
... ... @@ -128,7 +128,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
128 128 dateHeader.setDate(wvpSipDate);
129 129 response.addHeader(dateHeader);
130 130  
131   - ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME);
  131 +
132 132 if (expiresHeader == null) {
133 133 response = getMessageFactory().createResponse(Response.BAD_REQUEST, request);
134 134 ServerTransaction serverTransaction = getServerTransaction(evt);
... ... @@ -193,9 +193,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
193 193 // 保存到redis
194 194 if (registerFlag == 1 ) {
195 195 logger.info("[{}] 注册成功! deviceId:" + device.getDeviceId(), requestAddress);
196   - publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER);
197   - // 重新注册更新设备和通道,以免设备替换或更新后信息无法更新
198   - handler.onRegister(device);
  196 + publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER, expiresHeader.getExpires());
199 197 } else if (registerFlag == 2) {
200 198 logger.info("[{}] 注销成功! deviceId:" + device.getDeviceId(), requestAddress);
201 199 publisher.outlineEventPublish(device.getDeviceId(), VideoManagerConstants.EVENT_OUTLINE_UNREGISTER);
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
... ... @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
5 5 import com.genersoft.iot.vmp.conf.UserSetup;
6 6 import com.genersoft.iot.vmp.gb28181.bean.CmdType;
7 7 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
  8 +import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
8 9 import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
9 10 import com.genersoft.iot.vmp.gb28181.task.GPSSubscribeTask;
10 11 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
... ... @@ -15,18 +16,19 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
15 16 import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
16 17 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
17 18 import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
  19 +import com.genersoft.iot.vmp.utils.SerializeUtils;
  20 +import gov.nist.javax.sip.SipProviderImpl;
18 21 import org.dom4j.DocumentException;
19 22 import org.dom4j.Element;
20 23 import org.slf4j.Logger;
21 24 import org.slf4j.LoggerFactory;
22 25 import org.springframework.beans.factory.InitializingBean;
23 26 import org.springframework.beans.factory.annotation.Autowired;
  27 +import org.springframework.beans.factory.annotation.Qualifier;
  28 +import org.springframework.context.annotation.Lazy;
24 29 import org.springframework.stereotype.Component;
25 30  
26   -import javax.sip.InvalidArgumentException;
27   -import javax.sip.RequestEvent;
28   -import javax.sip.ServerTransaction;
29   -import javax.sip.SipException;
  31 +import javax.sip.*;
30 32 import javax.sip.header.ExpiresHeader;
31 33 import javax.sip.header.ToHeader;
32 34 import javax.sip.message.Request;
... ... @@ -54,12 +56,26 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
54 56 @Autowired
55 57 private IVideoManagerStorager storager;
56 58  
  59 + @Lazy
  60 + @Autowired
  61 + @Qualifier(value="tcpSipProvider")
  62 + private SipProviderImpl tcpSipProvider;
  63 +
  64 + @Lazy
  65 + @Autowired
  66 + @Qualifier(value="udpSipProvider")
  67 + private SipProviderImpl udpSipProvider;
  68 +
57 69 @Autowired
58 70 private DynamicTask dynamicTask;
59 71  
60 72 @Autowired
61 73 private UserSetup userSetup;
62 74  
  75 +
  76 + @Autowired
  77 + private SubscribeHolder subscribeHolder;
  78 +
63 79 @Override
64 80 public void afterPropertiesSet() throws Exception {
65 81 // 添加消息处理的订阅
... ... @@ -136,16 +152,17 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
136 152 .append("</Response>\r\n");
137 153  
138 154 if (subscribeInfo.getExpires() > 0) {
139   - if (redisCatchStorage.getSubscribe(key) != null) {
  155 + if (subscribeHolder.getMobilePositionSubscribe(platformId) != null) {
140 156 dynamicTask.stop(key);
141 157 }
142 158 String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
143   - dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key), Integer.parseInt(interval));
144   -
145   - redisCatchStorage.updateSubscribe(key, subscribeInfo);
  159 + dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval));
  160 + subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
  161 +// redisCatchStorage.updateSubscribe(key, subscribeInfo);
146 162 }else if (subscribeInfo.getExpires() == 0) {
147 163 dynamicTask.stop(key);
148   - redisCatchStorage.delSubscribe(key);
  164 +// redisCatchStorage.delSubscribe(key);
  165 + subscribeHolder.removeMobilePositionSubscribe(platformId);
149 166 }
150 167  
151 168 try {
... ... @@ -168,10 +185,19 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
168 185  
169 186 }
170 187  
171   - private void processNotifyCatalogList(RequestEvent evt, Element rootElement) {
  188 + private void processNotifyCatalogList(RequestEvent evt, Element rootElement) throws SipException {
172 189 String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
173 190 String deviceID = XmlUtil.getText(rootElement, "DeviceID");
  191 + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
174 192 SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
  193 + if (evt.getServerTransaction() == null) {
  194 + ServerTransaction serverTransaction = platform.getTransport().equals("TCP") ? tcpSipProvider.getNewServerTransaction(evt.getRequest())
  195 + : udpSipProvider.getNewServerTransaction(evt.getRequest());
  196 + subscribeInfo.setTransaction(serverTransaction);
  197 + Dialog dialog = serverTransaction.getDialog();
  198 + dialog.terminateOnBye(false);
  199 + subscribeInfo.setDialog(dialog);
  200 + }
175 201 String sn = XmlUtil.getText(rootElement, "SN");
176 202 String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platformId;
177 203 logger.info("接收到{}的Catalog订阅", platformId);
... ... @@ -185,9 +211,11 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
185 211 .append("</Response>\r\n");
186 212  
187 213 if (subscribeInfo.getExpires() > 0) {
188   - redisCatchStorage.updateSubscribe(key, subscribeInfo);
  214 +// redisCatchStorage.updateSubscribe(key, subscribeInfo);
  215 + subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo);
189 216 }else if (subscribeInfo.getExpires() == 0) {
190   - redisCatchStorage.delSubscribe(key);
  217 +// redisCatchStorage.delSubscribe(key);
  218 + subscribeHolder.removeCatalogSubscribe(platformId);
191 219 }
192 220  
193 221 try {
... ... @@ -195,7 +223,8 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
195 223 Response response = responseXmlAck(evt, resultXml.toString(), parentPlatform);
196 224 ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME);
197 225 subscribeInfo.setToTag(toHeader.getTag());
198   - redisCatchStorage.updateSubscribe(key, subscribeInfo);
  226 +// redisCatchStorage.updateSubscribe(key, subscribeInfo);
  227 + subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo);
199 228  
200 229 } catch (SipException e) {
201 230 e.printStackTrace();
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java
... ... @@ -21,6 +21,7 @@ import org.springframework.stereotype.Component;
21 21 import javax.sip.InvalidArgumentException;
22 22 import javax.sip.RequestEvent;
23 23 import javax.sip.SipException;
  24 +import javax.sip.header.CSeqHeader;
24 25 import javax.sip.header.CallIdHeader;
25 26 import javax.sip.message.Response;
26 27 import java.text.ParseException;
... ... @@ -64,6 +65,11 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement
64 65 String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
65 66 CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
66 67 // 查询设备是否存在
  68 + CSeqHeader cseqHeader = (CSeqHeader) evt.getRequest().getHeader(CSeqHeader.NAME);
  69 + String method = cseqHeader.getMethod();
  70 + if (method.equals("MESSAGE")) {
  71 + System.out.println();
  72 + }
67 73 Device device = redisCatchStorage.getDevice(deviceId);
68 74 // 查询上级平台是否存在
69 75 ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(deviceId);
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
... ... @@ -85,41 +85,54 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
85 85 return;
86 86 }
87 87 int sumNum = Integer.parseInt(sumNumElement.getText());
88   - Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
89   - if (deviceListIterator != null) {
90   - List<DeviceChannel> channelList = new ArrayList<>();
91   - // 遍历DeviceList
92   - while (deviceListIterator.hasNext()) {
93   - Element itemDevice = deviceListIterator.next();
94   - Element channelDeviceElement = itemDevice.element("DeviceID");
95   - if (channelDeviceElement == null) {
96   - continue;
  88 + if (sumNum == 0) {
  89 + // 数据已经完整接收
  90 + storager.cleanChannelsForDevice(device.getDeviceId());
  91 + RequestMessage msg = new RequestMessage();
  92 + msg.setKey(key);
  93 + WVPResult<Object> result = new WVPResult<>();
  94 + result.setCode(0);
  95 + result.setData(device);
  96 + msg.setData(result);
  97 + result.setMsg("更新成功,共0条");
  98 + deferredResultHolder.invokeAllResult(msg);
  99 + catalogDataCatch.del(key);
  100 + }else {
  101 + Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
  102 + if (deviceListIterator != null) {
  103 + List<DeviceChannel> channelList = new ArrayList<>();
  104 + // 遍历DeviceList
  105 + while (deviceListIterator.hasNext()) {
  106 + Element itemDevice = deviceListIterator.next();
  107 + Element channelDeviceElement = itemDevice.element("DeviceID");
  108 + if (channelDeviceElement == null) {
  109 + continue;
  110 + }
  111 + DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice);
  112 + deviceChannel.setDeviceId(device.getDeviceId());
  113 + logger.debug("收到来自设备【{}】的通道: {}【{}】", device.getDeviceId(), deviceChannel.getName(), deviceChannel.getChannelId());
  114 + channelList.add(deviceChannel);
97 115 }
98   - DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice);
99   - deviceChannel.setDeviceId(device.getDeviceId());
100   - logger.debug("收到来自设备【{}】的通道: {}【{}】", device.getDeviceId(), deviceChannel.getName(), deviceChannel.getChannelId());
101   - channelList.add(deviceChannel);
102   - }
103 116  
104   - catalogDataCatch.put(key, sumNum, device, channelList);
105   - if (catalogDataCatch.get(key).size() == sumNum) {
106   - // 数据已经完整接收
107   - boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key));
108   - RequestMessage msg = new RequestMessage();
109   - msg.setKey(key);
110   - WVPResult<Object> result = new WVPResult<>();
111   - result.setCode(0);
112   - result.setData(device);
113   - if (resetChannelsResult) {
114   - result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条");
115   - }else {
116   - result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条");
  117 + catalogDataCatch.put(key, sumNum, device, channelList);
  118 + if (catalogDataCatch.get(key).size() == sumNum) {
  119 + // 数据已经完整接收
  120 + boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key));
  121 + RequestMessage msg = new RequestMessage();
  122 + msg.setKey(key);
  123 + WVPResult<Object> result = new WVPResult<>();
  124 + result.setCode(0);
  125 + result.setData(device);
  126 + if (resetChannelsResult || sumNum ==0) {
  127 + result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条");
  128 + }else {
  129 + result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条");
  130 + }
  131 + msg.setData(result);
  132 + deferredResultHolder.invokeAllResult(msg);
  133 + catalogDataCatch.del(key);
117 134 }
118   - msg.setData(result);
119   - deferredResultHolder.invokeAllResult(msg);
120   - catalogDataCatch.del(key);
121 135 }
122   -
123 136 // 回复200 OK
124 137 responseAck(evt, Response.OK);
125 138 if (offLineDetector.isOnline(device.getDeviceId())) {
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
... ... @@ -31,8 +31,8 @@ public class DeviceServiceImpl implements IDeviceService {
31 31 return false;
32 32 }
33 33 if (dynamicTask.contains(device.getDeviceId())) {
34   - logger.info("[添加目录订阅] 设备{}的目录订阅以存在", device.getDeviceId());
35   - return false;
  34 + // 存在则停止现有的,开启新的
  35 + dynamicTask.stop(device.getDeviceId());
36 36 }
37 37 logger.info("[添加目录订阅] 设备{}", device.getDeviceId());
38 38 // 添加目录订阅
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
... ... @@ -513,6 +513,14 @@ public class MediaServerServiceImpl implements IMediaServerService {
513 513 param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrex));
514 514 param.put("hook.timeoutSec","20");
515 515 param.put("general.streamNoneReaderDelayMS","-1".equals(mediaServerItem.getStreamNoneReaderDelayMS())?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() );
  516 + // 推流断开后可以在超时时间内重新连接上继续推流,这样播放器会接着播放。
  517 + // 置0关闭此特性(推流断开会导致立即断开播放器)
  518 + // 此参数不应大于播放器超时时间
  519 + // 优化此消息以更快的收到流注销事件
  520 + param.put("general.continue_push_ms", "3000" );
  521 + // 最多等待未初始化的Track时间,单位毫秒,超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流,
  522 + // 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项
  523 + param.put("general.wait_track_ready_ms", "3000" );
516 524  
517 525 JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param);
518 526  
... ... @@ -620,6 +628,8 @@ public class MediaServerServiceImpl implements IMediaServerService {
620 628 public void updateMediaServerKeepalive(String mediaServerId, JSONObject data) {
621 629 MediaServerItem mediaServerItem = getOne(mediaServerId);
622 630 if (mediaServerItem == null) {
  631 + // zlm连接重试
  632 +
623 633 logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息");
624 634 return;
625 635 }
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
1 1 package com.genersoft.iot.vmp.service.impl;
2 2  
3 3 import com.alibaba.fastjson.JSON;
  4 +import com.alibaba.fastjson.JSONArray;
4 5 import com.alibaba.fastjson.JSONObject;
5 6 import com.alibaba.fastjson.TypeReference;
6 7 import com.genersoft.iot.vmp.common.StreamInfo;
7 8 import com.genersoft.iot.vmp.conf.UserSetup;
8   -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
9   -import com.genersoft.iot.vmp.gb28181.bean.GbStream;
10   -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
11   -import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
  9 +import com.genersoft.iot.vmp.gb28181.bean.*;
12 10 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
13 11 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
14 12 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
... ... @@ -23,6 +21,8 @@ import com.genersoft.iot.vmp.storager.dao.*;
23 21 import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
24 22 import com.github.pagehelper.PageHelper;
25 23 import com.github.pagehelper.PageInfo;
  24 +import org.slf4j.Logger;
  25 +import org.slf4j.LoggerFactory;
26 26 import org.springframework.beans.factory.annotation.Autowired;
27 27 import org.springframework.stereotype.Service;
28 28 import org.springframework.util.StringUtils;
... ... @@ -33,6 +33,8 @@ import java.util.stream.Collectors;
33 33 @Service
34 34 public class StreamPushServiceImpl implements IStreamPushService {
35 35  
  36 + private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class);
  37 +
36 38 @Autowired
37 39 private GbStreamMapper gbStreamMapper;
38 40  
... ... @@ -158,12 +160,17 @@ public class StreamPushServiceImpl implements IStreamPushService {
158 160 public boolean removeFromGB(GbStream stream) {
159 161 // 判断是否需要发送事件
160 162 gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL);
161   - int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
162 163 platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
  164 + int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
163 165 MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
164 166 JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream());
165   - if (mediaList == null) {
166   - streamPushMapper.del(stream.getApp(), stream.getStream());
  167 + if (mediaList != null) {
  168 + if (mediaList.getInteger("code") == 0) {
  169 + JSONArray data = mediaList.getJSONArray("data");
  170 + if (data == null) {
  171 + streamPushMapper.del(stream.getApp(), stream.getStream());
  172 + }
  173 + }
167 174 }
168 175 return del > 0;
169 176 }
... ... @@ -180,9 +187,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
180 187 StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
181 188 gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
182 189  
183   - int delStream = streamPushMapper.del(app, streamId);
184   - gbStreamMapper.del(app, streamId);
185 190 platformGbStreamMapper.delByAppAndStream(app, streamId);
  191 + gbStreamMapper.del(app, streamId);
  192 + int delStream = streamPushMapper.del(app, streamId);
186 193 if (delStream > 0) {
187 194 MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
188 195 zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId);
... ... @@ -376,6 +383,29 @@ public class StreamPushServiceImpl implements IStreamPushService {
376 383 .collect(Collectors.toList());
377 384  
378 385 if (streamPushItemsForPlatform.size() > 0) {
  386 + // 获取所有平台,平台和目录信息一般不会特别大量。
  387 + List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList();
  388 + Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>();
  389 + if (parentPlatformList.size() == 0) {
  390 + return;
  391 + }
  392 + for (ParentPlatform platform : parentPlatformList) {
  393 + Map<String, PlatformCatalog> catalogMap = new HashMap<>();
  394 +
  395 + // 创建根节点
  396 + PlatformCatalog platformCatalog = new PlatformCatalog();
  397 + platformCatalog.setId(platform.getServerGBId());
  398 + catalogMap.put(platform.getServerGBId(), platformCatalog);
  399 +
  400 + // 查询所有节点信息
  401 + List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId());
  402 + if (platformCatalogs.size() > 0) {
  403 + for (PlatformCatalog catalog : platformCatalogs) {
  404 + catalogMap.put(catalog.getId(), catalog);
  405 + }
  406 + }
  407 + platformInfoMap.put(platform.getServerGBId(), catalogMap);
  408 + }
379 409 List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>();
380 410 Map<String, List<GbStream>> platformForEvent = new HashMap<>();
381 411 // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入
... ... @@ -388,6 +418,12 @@ public class StreamPushServiceImpl implements IStreamPushService {
388 418 streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
389 419 if (platFormInfoArray.length > 0) {
390 420 // 数组 platFormInfoArray 0 为平台ID。 1为目录ID
  421 + // 不存在这个平台,则忽略导入此关联关系
  422 + if (platformInfoMap.get(platFormInfoArray[0]) == null
  423 + || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
  424 + logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
  425 + continue;
  426 + }
391 427 streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
392 428  
393 429 List<GbStream> gbStreamList = platformForEvent.get(streamPushItem.getPlatformId());
... ... @@ -406,8 +442,6 @@ public class StreamPushServiceImpl implements IStreamPushService {
406 442 streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
407 443 }
408 444 streamPushItemListFroPlatform.add(streamPushItemForPlatform);
409   -
410   -
411 445 }
412 446 }
413 447  
... ... @@ -432,9 +466,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
432 466 }
433 467 gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL);
434 468  
435   - int delStream = streamPushMapper.delAllForGbStream(gbStreams);
436   - gbStreamMapper.batchDelForGbStream(gbStreams);
437 469 platformGbStreamMapper.delByGbStreams(gbStreams);
  470 + gbStreamMapper.batchDelForGbStream(gbStreams);
  471 + int delStream = streamPushMapper.delAllForGbStream(gbStreams);
438 472 if (delStream > 0) {
439 473 for (GbStream gbStream : gbStreams) {
440 474 MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
... ...