Commit b2c953fc76de2a9686ee81d5311bd9b06e453912

Authored by 648540858
1 parent 9d37b411

优化ssrc释放逻辑,优化级联点播速度,去除等待流格式的配置项

Showing 37 changed files with 554 additions and 206 deletions
src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
... ... @@ -5,6 +5,7 @@ import org.springframework.context.annotation.Bean;
5 5 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
6 6 import org.springframework.stereotype.Component;
7 7  
  8 +import java.util.Date;
8 9 import java.util.Map;
9 10 import java.util.concurrent.ConcurrentHashMap;
10 11 import java.util.concurrent.ScheduledFuture;
... ... @@ -25,15 +26,38 @@ public class DynamicTask {
25 26 return new ThreadPoolTaskScheduler();
26 27 }
27 28  
  29 + /**
  30 + * 循环执行的任务
  31 + * @param key 任务ID
  32 + * @param task 任务
  33 + * @param cycleForCatalog 间隔
  34 + * @return
  35 + */
28 36 public String startCron(String key, Runnable task, int cycleForCatalog) {
29   - stopCron(key);
  37 + stop(key);
30 38 // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
31 39 ScheduledFuture future = threadPoolTaskScheduler.scheduleWithFixedDelay(task, cycleForCatalog * 1000L);
32 40 futureMap.put(key, future);
33 41 return "startCron";
34 42 }
35 43  
36   - public void stopCron(String key) {
  44 + /**
  45 + * 延时任务
  46 + * @param key 任务ID
  47 + * @param task 任务
  48 + * @param delay 延时 /秒
  49 + * @return
  50 + */
  51 + public String startDelay(String key, Runnable task, int delay) {
  52 + stop(key);
  53 + Date starTime = new Date(System.currentTimeMillis() + delay * 1000);
  54 + // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
  55 + ScheduledFuture future = threadPoolTaskScheduler.schedule(task, starTime);
  56 + futureMap.put(key, future);
  57 + return "startCron";
  58 + }
  59 +
  60 + public void stop(String key) {
37 61 if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) {
38 62 futureMap.get(key).cancel(true);
39 63 }
... ...
src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java
... ... @@ -59,8 +59,11 @@ public class SipPlatformRunner implements CommandLineRunner {
59 59 redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
60 60  
61 61 // 取消订阅
62   - sipCommanderForPlatform.unregister(parentPlatform, null, null);
63   - Thread.sleep(500);
  62 + sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{
  63 + ParentPlatform platform = storager.queryParentPlatByServerGBId(parentPlatform.getServerGBId());
  64 + sipCommanderForPlatform.register(platform, null, null);
  65 + });
  66 +
64 67 // 发送平台未注册消息
65 68 publisher.platformNotRegisterEventPublish(parentPlatform.getServerGBId());
66 69 }
... ...
src/main/java/com/genersoft/iot/vmp/conf/UserSetup.java
... ... @@ -19,8 +19,6 @@ public class UserSetup {
19 19  
20 20 private Long playTimeout = 18000L;
21 21  
22   - private Boolean waitTrack = Boolean.FALSE;
23   -
24 22 private Boolean interfaceAuthentication = Boolean.TRUE;
25 23  
26 24 private Boolean recordPushLive = Boolean.TRUE;
... ... @@ -57,10 +55,6 @@ public class UserSetup {
57 55 return playTimeout;
58 56 }
59 57  
60   - public Boolean isWaitTrack() {
61   - return waitTrack;
62   - }
63   -
64 58 public Boolean isInterfaceAuthentication() {
65 59 return interfaceAuthentication;
66 60 }
... ... @@ -89,10 +83,6 @@ public class UserSetup {
89 83 this.playTimeout = playTimeout;
90 84 }
91 85  
92   - public void setWaitTrack(Boolean waitTrack) {
93   - this.waitTrack = waitTrack;
94   - }
95   -
96 86 public void setInterfaceAuthentication(boolean interfaceAuthentication) {
97 87 this.interfaceAuthentication = interfaceAuthentication;
98 88 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java
1 1 package com.genersoft.iot.vmp.gb28181.auth;
2 2  
  3 +import com.genersoft.iot.vmp.storager.impl.VideoManagerStoragerImpl;
3 4 import org.slf4j.Logger;
4 5 import org.slf4j.LoggerFactory;
5 6 import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -20,13 +21,24 @@ public class RegisterLogicHandler {
20 21  
21 22 @Autowired
22 23 private SIPCommander cmder;
  24 +
  25 + @Autowired
  26 + private VideoManagerStoragerImpl storager;
23 27  
24 28 public void onRegister(Device device) {
25 29 // 只有第一次注册时调用查询设备信息,如需更新调用更新API接口
  30 + // TODO 此处错误无法获取到通道
  31 + Device device1 = storager.queryVideoDevice(device.getDeviceId());
26 32 if (device.isFirsRegister()) {
27 33 logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId());
28   - cmder.deviceInfoQuery(device);
29   - cmder.catalogQuery(device, null);
  34 + try {
  35 + Thread.sleep(100);
  36 + cmder.deviceInfoQuery(device);
  37 + Thread.sleep(100);
  38 + cmder.catalogQuery(device, null);
  39 + } catch (InterruptedException e) {
  40 + e.printStackTrace();
  41 + }
30 42 }
31 43 }
32 44 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
... ... @@ -81,6 +81,10 @@ public class SendRtpItem {
81 81 */
82 82 private boolean isPlay;
83 83  
  84 + private byte[] transaction;
  85 +
  86 + private byte[] dialog;
  87 +
84 88 public String getIp() {
85 89 return ip;
86 90 }
... ... @@ -200,4 +204,20 @@ public class SendRtpItem {
200 204 public void setPlay(boolean play) {
201 205 isPlay = play;
202 206 }
  207 +
  208 + public byte[] getTransaction() {
  209 + return transaction;
  210 + }
  211 +
  212 + public void setTransaction(byte[] transaction) {
  213 + this.transaction = transaction;
  214 + }
  215 +
  216 + public byte[] getDialog() {
  217 + return dialog;
  218 + }
  219 +
  220 + public void setDialog(byte[] dialog) {
  221 + this.dialog = dialog;
  222 + }
203 223 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java
... ... @@ -2,7 +2,10 @@ package com.genersoft.iot.vmp.gb28181.event.offline;
2 2  
3 3 import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
4 4 import com.genersoft.iot.vmp.conf.UserSetup;
  5 +import com.genersoft.iot.vmp.gb28181.bean.Device;
  6 +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
5 7 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
  8 +import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
6 9 import org.slf4j.Logger;
7 10 import org.slf4j.LoggerFactory;
8 11 import org.springframework.beans.factory.InitializingBean;
... ... @@ -39,6 +42,9 @@ public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEvent
39 42 @Autowired
40 43 private SipSubscribe sipSubscribe;
41 44  
  45 + @Autowired
  46 + private IVideoManagerStorager storager;
  47 +
42 48 public KeepaliveTimeoutListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) {
43 49 super(listenerContainer, userSetup);
44 50 }
... ... @@ -61,15 +67,22 @@ public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEvent
61 67 String REGISTER_INFO_PREFIX = VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetup.getServerId() + "_";
62 68 if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) {
63 69 String platformGBId = expiredKey.substring(PLATFORM_KEEPLIVEKEY_PREFIX.length(),expiredKey.length());
64   -
65   - publisher.platformKeepaliveExpireEventPublish(platformGBId);
  70 + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformGBId);
  71 + if (platform != null) {
  72 + publisher.platformKeepaliveExpireEventPublish(platformGBId);
  73 + }
66 74 }else if (expiredKey.startsWith(PLATFORM_REGISTER_PREFIX)) {
67 75 String platformGBId = expiredKey.substring(PLATFORM_REGISTER_PREFIX.length(),expiredKey.length());
68   -
69   - publisher.platformRegisterCycleEventPublish(platformGBId);
  76 + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformGBId);
  77 + if (platform != null) {
  78 + publisher.platformRegisterCycleEventPublish(platformGBId);
  79 + }
70 80 }else if (expiredKey.startsWith(KEEPLIVEKEY_PREFIX)){
71 81 String deviceId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length());
72   - publisher.outlineEventPublish(deviceId, KEEPLIVEKEY_PREFIX);
  82 + Device device = storager.queryVideoDevice(deviceId);
  83 + if (device != null) {
  84 + publisher.outlineEventPublish(deviceId, KEEPLIVEKEY_PREFIX);
  85 + }
73 86 }else if (expiredKey.startsWith(REGISTER_INFO_PREFIX)) {
74 87 String callid = expiredKey.substring(REGISTER_INFO_PREFIX.length());
75 88 SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java
... ... @@ -2,8 +2,13 @@ package com.genersoft.iot.vmp.gb28181.event.offline;
2 2  
3 3 import com.genersoft.iot.vmp.conf.UserSetup;
4 4 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
  5 +import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
5 6 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
6 7 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
  8 +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  9 +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
  10 +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  11 +import com.genersoft.iot.vmp.service.IMediaServerService;
7 12 import org.slf4j.Logger;
8 13 import org.slf4j.LoggerFactory;
9 14 import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -32,6 +37,9 @@ public class OfflineEventListener implements ApplicationListener<OfflineEvent> {
32 37  
33 38 @Autowired
34 39 private IVideoManagerStorager storager;
  40 +
  41 + @Autowired
  42 + private VideoStreamSessionManager streamSession;
35 43  
36 44 @Autowired
37 45 private RedisUtil redis;
... ... @@ -42,6 +50,14 @@ public class OfflineEventListener implements ApplicationListener<OfflineEvent> {
42 50 @Autowired
43 51 private EventPublisher eventPublisher;
44 52  
  53 +
  54 + @Autowired
  55 + private IMediaServerService mediaServerService;
  56 +
  57 +
  58 + @Autowired
  59 + private ZLMRTPServerFactory zlmrtpServerFactory;
  60 +
45 61 @Override
46 62 public void onApplicationEvent(OfflineEvent event) {
47 63  
... ... @@ -73,5 +89,15 @@ public class OfflineEventListener implements ApplicationListener<OfflineEvent> {
73 89  
74 90 // TODO 离线取消订阅
75 91  
  92 + // 离线释放所有ssrc
  93 + List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(event.getDeviceId(), null, null, null);
  94 + if (ssrcTransactions.size() > 0) {
  95 + for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
  96 + mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
  97 + mediaServerService.closeRTPServer(event.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
  98 + streamSession.remove(event.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
  99 + }
  100 + }
  101 +
76 102 }
77 103 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java
... ... @@ -75,7 +75,7 @@ public class PlatformNotRegisterEventLister implements ApplicationListener&lt;Platf
75 75 stream.append(",");
76 76 }
77 77 stream.append(sendRtpItem.getStreamId());
78   - redisCatchStorage.deleteSendRTPServer(event.getPlatformGbID(), sendRtpItem.getChannelId());
  78 + redisCatchStorage.deleteSendRTPServer(event.getPlatformGbID(), sendRtpItem.getChannelId(), null, null);
79 79 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
80 80 Map<String, Object> param = new HashMap<>();
81 81 param.put("vhost", "__defaultVhost__");
... ... @@ -84,9 +84,7 @@ public class PlatformNotRegisterEventLister implements ApplicationListener&lt;Platf
84 84 zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
85 85 }
86 86  
87   -
88 87 }
89   -
90 88 Timer timer = new Timer();
91 89 SipSubscribe.Event okEvent = (responseEvent)->{
92 90 timer.cancel();
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java
... ... @@ -4,8 +4,6 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
4 4 import com.genersoft.iot.vmp.conf.DynamicTask;
5 5 import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
6 6 import com.genersoft.iot.vmp.conf.UserSetup;
7   -import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
8   -import org.checkerframework.checker.units.qual.A;
9 7 import org.slf4j.Logger;
10 8 import org.slf4j.LoggerFactory;
11 9 import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -46,7 +44,7 @@ public class SubscribeListenerForPlatform extends RedisKeyExpirationEventMessage
46 44 String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_";
47 45 if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) {
48 46 // 取消定时任务
49   - dynamicTask.stopCron(expiredKey);
  47 + dynamicTask.stop(expiredKey);
50 48 }
51 49 }
52 50 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java
... ... @@ -86,6 +86,15 @@ public class VideoStreamSessionManager {
86 86 return dialog;
87 87 }
88 88  
  89 + public SIPDialog getDialogByCallId(String deviceId, String channelId, String callID){
  90 + SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, callID, null);
  91 + if (ssrcTransaction == null) return null;
  92 + byte[] dialogByteArray = ssrcTransaction.getDialog();
  93 + if (dialogByteArray == null) return null;
  94 + SIPDialog dialog = (SIPDialog)SerializeUtils.deSerialize(dialogByteArray);
  95 + return dialog;
  96 + }
  97 +
89 98 public SsrcTransaction getSsrcTransaction(String deviceId, String channelId, String callId, String stream){
90 99 if (StringUtils.isEmpty(callId)) callId ="*";
91 100 if (StringUtils.isEmpty(stream)) stream ="*";
... ... @@ -95,6 +104,21 @@ public class VideoStreamSessionManager {
95 104 return (SsrcTransaction)redisUtil.get((String) scanResult.get(0));
96 105 }
97 106  
  107 + public List<SsrcTransaction> getSsrcTransactionForAll(String deviceId, String channelId, String callId, String stream){
  108 + if (StringUtils.isEmpty(deviceId)) deviceId ="*";
  109 + if (StringUtils.isEmpty(channelId)) channelId ="*";
  110 + if (StringUtils.isEmpty(callId)) callId ="*";
  111 + if (StringUtils.isEmpty(stream)) stream ="*";
  112 + String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetup.getServerId() + "_" + deviceId + "_" + channelId + "_" + callId+ "_" + stream;
  113 + List<Object> scanResult = redisUtil.scan(key);
  114 + if (scanResult.size() == 0) return null;
  115 + List<SsrcTransaction> result = new ArrayList<>();
  116 + for (Object keyObj : scanResult) {
  117 + result.add((SsrcTransaction)redisUtil.get((String) keyObj));
  118 + }
  119 + return result;
  120 + }
  121 +
98 122 public String getMediaServerId(String deviceId, String channelId, String stream){
99 123 SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, null, stream);
100 124 if (ssrcTransaction == null) return null;
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java
... ... @@ -63,7 +63,5 @@ public class GPSSubscribeTask implements Runnable{
63 63 }
64 64 }
65 65 }
66   -
67   -
68 66 }
69 67 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java
... ... @@ -96,4 +96,11 @@ public interface ISIPCommanderForPlatform {
96 96 * @param recordInfo 录像信息
97 97 */
98 98 boolean recordInfo(DeviceChannel deviceChannel, ParentPlatform parentPlatform, String fromTag, RecordInfo recordInfo);
  99 +
  100 + /**
  101 + * 向发起点播的上级回复bye
  102 + * @param platform 平台信息
  103 + * @param callId callId
  104 + */
  105 + void streamByeCmd(ParentPlatform platform, String callId);
99 106 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
... ... @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
2 2  
3 3 import com.alibaba.fastjson.JSONObject;
4 4 import com.genersoft.iot.vmp.common.StreamInfo;
  5 +import com.genersoft.iot.vmp.conf.DynamicTask;
5 6 import com.genersoft.iot.vmp.conf.SipConfig;
6 7 import com.genersoft.iot.vmp.conf.UserSetup;
7 8 import com.genersoft.iot.vmp.gb28181.bean.Device;
... ... @@ -85,6 +86,9 @@ public class SIPCommander implements ISIPCommander {
85 86 @Autowired
86 87 private IMediaServerService mediaServerService;
87 88  
  89 + @Autowired
  90 + private DynamicTask dynamicTask;
  91 +
88 92  
89 93 /**
90 94 * 云台方向放控制,使用配置文件中的默认镜头移动速度
... ... @@ -330,7 +334,8 @@ public class SIPCommander implements ISIPCommander {
330 334 * @param errorEvent sip错误订阅
331 335 */
332 336 @Override
333   - public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) {
  337 + public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
  338 + ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) {
334 339 String streamId = ssrcInfo.getStream();
335 340 try {
336 341 if (device == null) return;
... ... @@ -342,15 +347,13 @@ public class SIPCommander implements ISIPCommander {
342 347 subscribeKey.put("app", "rtp");
343 348 subscribeKey.put("stream", streamId);
344 349 subscribeKey.put("regist", true);
  350 + subscribeKey.put("schema", "rtmp");
345 351 subscribeKey.put("mediaServerId", mediaServerItem.getId());
346 352 subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
347 353 (MediaServerItem mediaServerItemInUse, JSONObject json)->{
348   - if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return;
349 354 if (event != null) {
350 355 event.response(mediaServerItemInUse, json);
351 356 }
352   -
353   -// subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
354 357 });
355 358 //
356 359 StringBuffer content = new StringBuffer(200);
... ... @@ -419,7 +422,7 @@ public class SIPCommander implements ISIPCommander {
419 422  
420 423 transmitRequest(device, request, (e -> {
421 424 streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
422   - mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc());
  425 + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
423 426 errorEvent.response(e);
424 427 }), e ->{
425 428 // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
... ... @@ -458,8 +461,6 @@ public class SIPCommander implements ISIPCommander {
458 461 logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
459 462 subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
460 463 (MediaServerItem mediaServerItemInUse, JSONObject json)->{
461   - System.out.println(344444);
462   - if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return;
463 464 if (event != null) {
464 465 event.response(mediaServerItemInUse, json);
465 466 }
... ... @@ -565,7 +566,6 @@ public class SIPCommander implements ISIPCommander {
565 566 logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
566 567 subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
567 568 (MediaServerItem mediaServerItemInUse, JSONObject json)->{
568   - if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return;
569 569 event.response(mediaServerItemInUse, json);
570 570 subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
571 571 });
... ... @@ -662,6 +662,7 @@ public class SIPCommander implements ISIPCommander {
662 662 @Override
663 663 public void streamByeCmd(String deviceId, String channelId, String stream, SipSubscribe.Event okEvent) {
664 664 try {
  665 + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream);
665 666 ClientTransaction transaction = streamSession.getTransactionByStream(deviceId, channelId, stream);
666 667 if (transaction == null) {
667 668 logger.warn("[ {} -> {}]停止视频流的时候发现事务已丢失", deviceId, channelId);
... ... @@ -715,10 +716,9 @@ public class SIPCommander implements ISIPCommander {
715 716  
716 717 dialog.sendRequest(clientTransaction);
717 718  
718   - SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, callIdHeader.getCallId(), null);
719 719 if (ssrcTransaction != null) {
720 720 MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId());
721   - mediaServerService.releaseSsrc(mediaServerItem, ssrcTransaction.getSsrc());
  721 + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc());
722 722 mediaServerService.closeRTPServer(deviceId, channelId, ssrcTransaction.getStream());
723 723 streamSession.remove(deviceId, channelId, ssrcTransaction.getStream());
724 724 }
... ... @@ -1169,8 +1169,6 @@ public class SIPCommander implements ISIPCommander {
1169 1169 */
1170 1170 @Override
1171 1171 public boolean catalogQuery(Device device, SipSubscribe.Event errorEvent) {
1172   - // 清空通道
1173   -// storager.cleanChannelsForDevice(device.getDeviceId());
1174 1172 try {
1175 1173 StringBuffer catalogXml = new StringBuffer(200);
1176 1174 catalogXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
... ... @@ -5,8 +5,16 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
5 5 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
6 6 import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
7 7 import com.genersoft.iot.vmp.gb28181.utils.DateUtil;
  8 +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
  9 +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  10 +import com.genersoft.iot.vmp.service.IMediaServerService;
8 11 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
9 12 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  13 +import com.genersoft.iot.vmp.utils.SerializeUtils;
  14 +import gov.nist.javax.sip.SipProviderImpl;
  15 +import gov.nist.javax.sip.SipStackImpl;
  16 +import gov.nist.javax.sip.message.SIPRequest;
  17 +import gov.nist.javax.sip.stack.SIPDialog;
10 18 import org.slf4j.Logger;
11 19 import org.slf4j.LoggerFactory;
12 20 import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -18,10 +26,14 @@ import org.springframework.stereotype.Component;
18 26 import org.springframework.util.StringUtils;
19 27  
20 28 import javax.sip.*;
  29 +import javax.sip.address.SipURI;
21 30 import javax.sip.header.CallIdHeader;
  31 +import javax.sip.header.ViaHeader;
22 32 import javax.sip.header.WWWAuthenticateHeader;
23 33 import javax.sip.message.Request;
  34 +import java.lang.reflect.Field;
24 35 import java.text.ParseException;
  36 +import java.util.HashSet;
25 37 import java.util.List;
26 38 import java.util.UUID;
27 39  
... ... @@ -38,17 +50,23 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
38 50 private IRedisCatchStorage redisCatchStorage;
39 51  
40 52 @Autowired
  53 + private IMediaServerService mediaServerService;
  54 +
  55 + @Autowired
41 56 private SipSubscribe sipSubscribe;
42 57  
  58 + @Autowired
  59 + private ZLMRTPServerFactory zlmrtpServerFactory;
  60 +
43 61 @Lazy
44 62 @Autowired
45 63 @Qualifier(value="tcpSipProvider")
46   - private SipProvider tcpSipProvider;
  64 + private SipProviderImpl tcpSipProvider;
47 65  
48 66 @Lazy
49 67 @Autowired
50 68 @Qualifier(value="udpSipProvider")
51   - private SipProvider udpSipProvider;
  69 + private SipProviderImpl udpSipProvider;
52 70  
53 71 @Override
54 72 public boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
... ... @@ -57,13 +75,12 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
57 75  
58 76 @Override
59 77 public boolean unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
60   - parentPlatform.setExpires("0");
61 78 ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
62 79 if (parentPlatformCatch != null) {
63 80 parentPlatformCatch.setParentPlatform(parentPlatform);
64 81 redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
65 82 }
66   -
  83 + parentPlatform.setExpires("0");
67 84 return register(parentPlatform, null, null, errorEvent, okEvent, false);
68 85 }
69 86  
... ... @@ -543,4 +560,59 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
543 560 }
544 561 return true;
545 562 }
  563 +
  564 + @Override
  565 + public void streamByeCmd(ParentPlatform platform, String callId) {
  566 + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), null, null, callId);
  567 + if (sendRtpItem != null) {
  568 + String mediaServerId = sendRtpItem.getMediaServerId();
  569 + MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  570 + if (mediaServerItem != null) {
  571 + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
  572 + zlmrtpServerFactory.closeRTPServer(mediaServerItem, sendRtpItem.getStreamId());
  573 + }
  574 + byte[] dialogByteArray = sendRtpItem.getDialog();
  575 + if (dialogByteArray != null) {
  576 + SIPDialog dialog = (SIPDialog) SerializeUtils.deSerialize(dialogByteArray);
  577 + SipStack sipStack = udpSipProvider.getSipStack();
  578 + SIPDialog sipDialog = ((SipStackImpl) sipStack).putDialog(dialog);
  579 + if (dialog != sipDialog) {
  580 + dialog = sipDialog;
  581 + } else {
  582 + try {
  583 + dialog.setSipProvider(udpSipProvider);
  584 + Field sipStackField = SIPDialog.class.getDeclaredField("sipStack");
  585 + sipStackField.setAccessible(true);
  586 + sipStackField.set(dialog, sipStack);
  587 + Field eventListenersField = SIPDialog.class.getDeclaredField("eventListeners");
  588 + eventListenersField.setAccessible(true);
  589 + eventListenersField.set(dialog, new HashSet<>());
  590 +
  591 + byte[] transactionByteArray = sendRtpItem.getTransaction();
  592 + ClientTransaction clientTransaction = (ClientTransaction) SerializeUtils.deSerialize(transactionByteArray);
  593 + Request byeRequest = dialog.createRequest(Request.BYE);
  594 + SipURI byeURI = (SipURI) byeRequest.getRequestURI();
  595 + SIPRequest request = (SIPRequest) clientTransaction.getRequest();
  596 + byeURI.setHost(request.getRemoteAddress().getHostName());
  597 + byeURI.setPort(request.getRemotePort());
  598 + if ("TCP".equals(platform.getTransport())) {
  599 + clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest);
  600 + } else if ("UDP".equals(platform.getTransport())) {
  601 + clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest);
  602 + }
  603 + dialog.sendRequest(clientTransaction);
  604 + } catch (SipException e) {
  605 + e.printStackTrace();
  606 + } catch (ParseException e) {
  607 + e.printStackTrace();
  608 + } catch (NoSuchFieldException e) {
  609 + e.printStackTrace();
  610 + } catch (IllegalAccessException e) {
  611 + e.printStackTrace();
  612 + }
  613 +
  614 + }
  615 + }
  616 + }
  617 + }
546 618 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
... ... @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
3 3 import com.alibaba.fastjson.JSON;
4 4 import com.alibaba.fastjson.JSONObject;
5 5 import com.genersoft.iot.vmp.common.StreamInfo;
  6 +import com.genersoft.iot.vmp.conf.DynamicTask;
6 7 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
7 8 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
8 9 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
... ... @@ -22,6 +23,7 @@ import javax.sip.Dialog;
22 23 import javax.sip.DialogState;
23 24 import javax.sip.RequestEvent;
24 25 import javax.sip.address.SipURI;
  26 +import javax.sip.header.CallIdHeader;
25 27 import javax.sip.header.FromHeader;
26 28 import javax.sip.header.HeaderAddress;
27 29 import javax.sip.header.ToHeader;
... ... @@ -60,6 +62,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
60 62 @Autowired
61 63 private ZLMHttpHookSubscribe subscribe;
62 64  
  65 + @Autowired
  66 + private DynamicTask dynamicTask;
  67 +
63 68  
64 69 /**
65 70 * 处理 ACK请求
... ... @@ -68,13 +73,16 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
68 73 */
69 74 @Override
70 75 public void process(RequestEvent evt) {
71   - logger.info("ACK请求: {}", ((System.currentTimeMillis())));
72 76 Dialog dialog = evt.getDialog();
  77 + CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
73 78 if (dialog == null) return;
74 79 if (dialog.getState()== DialogState.CONFIRMED) {
75 80 String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
  81 + logger.info("ACK请求: platformGbId->{}", platformGbId);
  82 + // 取消设置的超时任务
  83 + dynamicTask.stop(callIdHeader.getCallId());
76 84 String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
77   - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId);
  85 + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId());
78 86 String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
79 87 String deviceId = sendRtpItem.getDeviceId();
80 88 StreamInfo streamInfo = null;
... ... @@ -83,15 +91,12 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
83 91 }else {
84 92 streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId);
85 93 }
86   - System.out.println(JSON.toJSON(streamInfo));
87 94 if (streamInfo == null) {
88 95 streamInfo = new StreamInfo();
89 96 streamInfo.setApp(sendRtpItem.getApp());
90 97 streamInfo.setStream(sendRtpItem.getStreamId());
91 98 }
92 99 redisCatchStorage.updateSendRTPSever(sendRtpItem);
93   - logger.info(platformGbId);
94   - logger.info(channelId);
95 100 Map<String, Object> param = new HashMap<>();
96 101 param.put("vhost","__defaultVhost__");
97 102 param.put("app",streamInfo.getApp());
... ... @@ -100,42 +105,23 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
100 105 param.put("dst_url",sendRtpItem.getIp());
101 106 param.put("dst_port", sendRtpItem.getPort());
102 107 param.put("is_udp", is_Udp);
103   - // 设备推流查询,成功后才能转推
104 108 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
105   - zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
106   -// if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) {
107   -// logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]",
108   -// streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort());
109   -// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
110   -// } else {
111   -// // 对hook进行订阅
112   -// logger.info("等待设备推流[{}/{}].......",
113   -// streamInfo.getApp(), streamInfo.getStreamId());
114   -// Timer timer = new Timer();
115   -// timer.schedule(new TimerTask() {
116   -// @Override
117   -// public void run() {
118   -// logger.info("设备推流[{}/{}]超时,终止向上级推流",
119   -// finalStreamInfo.getApp() , finalStreamInfo.getStreamId());
120   -//
121   -// }
122   -// }, 30*1000L);
123   -// // 添加订阅
124   -// JSONObject subscribeKey = new JSONObject();
125   -// subscribeKey.put("app", "rtp");
126   -// subscribeKey.put("stream", streamInfo.getStreamId());
127   -// subscribeKey.put("mediaServerId", streamInfo.getMediaServerId());
128   -// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, subscribeKey,
129   -// (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
130   -// logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]",
131   -// finalStreamInfo.getApp(), finalStreamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort());
132   -// timer.cancel();
133   -// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
134   -// subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
135   -// });
136   -// }
137   -
138   -
  109 + JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
  110 + if (jsonObject.getInteger("code") != 0) {
  111 + logger.info("监听流以等待流上线{}/{}", streamInfo.getApp(), streamInfo.getStream());
  112 + // 监听流上线
  113 + // 添加订阅
  114 + JSONObject subscribeKey = new JSONObject();
  115 + subscribeKey.put("app", "rtp");
  116 + subscribeKey.put("stream", streamInfo.getStream());
  117 + subscribeKey.put("regist", true);
  118 + subscribeKey.put("schema", "rtmp");
  119 + subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId());
  120 + subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
  121 + (MediaServerItem mediaServerItemInUse, JSONObject json)->{
  122 + zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
  123 + });
  124 + }
139 125 }
140 126 }
141 127 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
... ... @@ -4,6 +4,8 @@ import com.genersoft.iot.vmp.common.StreamInfo;
4 4 import com.genersoft.iot.vmp.gb28181.bean.Device;
5 5 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
6 6 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
  7 +import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
  8 +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
7 9 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
8 10 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
9 11 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
... ... @@ -13,6 +15,8 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
13 15 import com.genersoft.iot.vmp.service.IMediaServerService;
14 16 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
15 17 import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
  18 +import com.genersoft.iot.vmp.utils.SerializeUtils;
  19 +import gov.nist.javax.sip.stack.SIPDialog;
16 20 import org.slf4j.Logger;
17 21 import org.slf4j.LoggerFactory;
18 22 import org.springframework.beans.factory.InitializingBean;
... ... @@ -21,6 +25,7 @@ import org.springframework.stereotype.Component;
21 25  
22 26 import javax.sip.*;
23 27 import javax.sip.address.SipURI;
  28 +import javax.sip.header.CallIdHeader;
24 29 import javax.sip.header.FromHeader;
25 30 import javax.sip.header.HeaderAddress;
26 31 import javax.sip.header.ToHeader;
... ... @@ -56,6 +61,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
56 61 @Autowired
57 62 private SIPProcessorObserver sipProcessorObserver;
58 63  
  64 + @Autowired
  65 + private VideoStreamSessionManager streamSession;
  66 +
59 67 @Override
60 68 public void afterPropertiesSet() throws Exception {
61 69 // 添加消息处理的订阅
... ... @@ -71,11 +79,12 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
71 79 try {
72 80 responseAck(evt, Response.OK);
73 81 Dialog dialog = evt.getDialog();
  82 + CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
74 83 if (dialog == null) return;
75 84 if (dialog.getState().equals(DialogState.TERMINATED)) {
76 85 String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
77 86 String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
78   - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId);
  87 + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId());
79 88 logger.info("收到bye, [{}/{}]", platformGbId, channelId);
80 89 if (sendRtpItem != null){
81 90 String streamId = sendRtpItem.getStreamId();
... ... @@ -87,35 +96,44 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
87 96 logger.info("停止向上级推流:" + streamId);
88 97 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
89 98 zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
90   - redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
  99 + redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null);
91 100 int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
92   - if (totalReaderCount == 0) {
  101 + if (totalReaderCount <= 0) {
93 102 logger.info(streamId + "无其它观看者,通知设备停止推流");
94 103 cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId);
95   - }else if (totalReaderCount == -1){
96   - logger.warn(streamId + " 查找其它观看者失败");
97 104 }
98 105 }
99 106 // 可能是设备主动停止
100 107 Device device = storager.queryVideoDeviceByChannelId(platformGbId);
101 108 if (device != null) {
  109 + storager.stopPlay(device.getDeviceId(), channelId);
102 110 StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
103   - if (sendRtpItem != null) {
104   - if (sendRtpItem.isPlay()) {
105   - if (streamInfo != null) {
106   - redisCatchStorage.stopPlay(streamInfo);
107   - }
108   - }else {
109   - if (streamInfo != null) {
110   - redisCatchStorage.stopPlayback(streamInfo);
  111 + if (streamInfo != null) {
  112 + redisCatchStorage.stopPlay(streamInfo);
  113 + mediaServerService.closeRTPServer(device.getDeviceId(), channelId, streamInfo.getStream());
  114 + }
  115 + SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
  116 + if (ssrcTransactionForPlay != null){
  117 + SIPDialog dialogForPlay = (SIPDialog) SerializeUtils.deSerialize(ssrcTransactionForPlay.getDialog());
  118 + if (dialogForPlay.getCallId().equals(callIdHeader.getCallId())){
  119 + // 释放ssrc
  120 + MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlay.getMediaServerId());
  121 + if (mediaServerItem != null) {
  122 + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlay.getSsrc());
111 123 }
  124 + streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream());
112 125 }
113   -
114   - storager.stopPlay(device.getDeviceId(), channelId);
115   - mediaServerService.closeRTPServer(device.getDeviceId(), channelId, streamInfo.getStream());
  126 + }
  127 + SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null);
  128 + if (ssrcTransactionForPlayBack != null) {
  129 + // 释放ssrc
  130 + MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlayBack.getMediaServerId());
  131 + if (mediaServerItem != null) {
  132 + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlayBack.getSsrc());
  133 + }
  134 + streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream());
116 135 }
117 136 }
118   -
119 137 }
120 138 } catch (SipException e) {
121 139 e.printStackTrace();
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
... ... @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
3 3 import com.alibaba.fastjson.JSON;
4 4 import com.alibaba.fastjson.JSONObject;
5 5 import com.genersoft.iot.vmp.common.StreamInfo;
  6 +import com.genersoft.iot.vmp.conf.DynamicTask;
6 7 import com.genersoft.iot.vmp.gb28181.bean.*;
7 8 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
8 9 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
... ... @@ -21,6 +22,7 @@ import com.genersoft.iot.vmp.service.IPlayService;
21 22 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
22 23 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
23 24 import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
  25 +import com.genersoft.iot.vmp.utils.SerializeUtils;
24 26 import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
25 27 import gov.nist.javax.sdp.TimeDescriptionImpl;
26 28 import gov.nist.javax.sdp.fields.TimeField;
... ... @@ -69,6 +71,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
69 71 private IRedisCatchStorage redisCatchStorage;
70 72  
71 73 @Autowired
  74 + private DynamicTask dynamicTask;
  75 +
  76 + @Autowired
72 77 private SIPCommander cmder;
73 78  
74 79 @Autowired
... ... @@ -257,11 +262,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
257 262 }
258 263 sendRtpItem.setCallId(callIdHeader.getCallId());
259 264 sendRtpItem.setPlay("Play".equals(sessionName));
  265 + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
  266 + sendRtpItem.setDialog(dialogByteArray);
  267 + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
  268 + sendRtpItem.setTransaction(transactionByteArray);
260 269 // 写入redis, 超时时回复
261 270 redisCatchStorage.updateSendRTPSever(sendRtpItem);
262 271  
263   - Device finalDevice = device;
264   - MediaServerItem finalMediaServerItem = mediaServerItem;
265 272 Long finalStartTime = startTime;
266 273 Long finalStopTime = stopTime;
267 274 ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{
... ... @@ -289,7 +296,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
289 296 content.append("f=\r\n");
290 297  
291 298 try {
  299 + // 超时未收到Ack应该回复bye,当前等待时间为10秒
  300 + dynamicTask.startDelay(callIdHeader.getCallId(), ()->{
  301 + logger.info("Ack 等待超时");
  302 + mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), ssrc);
  303 + // 回复bye
  304 + cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
  305 + }, 60);
292 306 responseSdpAck(evt, content.toString(), platform);
  307 +
293 308 } catch (SipException e) {
294 309 e.printStackTrace();
295 310 } catch (InvalidArgumentException e) {
... ... @@ -320,6 +335,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
320 335 if (result.getEvent() != null) {
321 336 errorEvent.response(result.getEvent());
322 337 }
  338 + redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
323 339 try {
324 340 responseAck(evt, Response.REQUEST_TIMEOUT);
325 341 } catch (SipException e) {
... ... @@ -343,7 +359,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
343 359 sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId));
344 360 }
345 361 sendRtpItem.setPlay(false);
346   - playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent,errorEvent);
  362 + playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent, errorEvent, ()->{
  363 + redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
  364 + });
347 365 }else {
348 366 sendRtpItem.setStreamId(streamInfo.getStream());
349 367 hookEvent.response(mediaServerItem, null);
... ... @@ -365,6 +383,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
365 383  
366 384 // 写入redis, 超时时回复
367 385 sendRtpItem.setStatus(1);
  386 + sendRtpItem.setCallId(callIdHeader.getCallId());
  387 + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
  388 + sendRtpItem.setDialog(dialogByteArray);
  389 + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
  390 + sendRtpItem.setTransaction(transactionByteArray);
368 391 redisCatchStorage.updateSendRTPSever(sendRtpItem);
369 392 StringBuffer content = new StringBuffer(200);
370 393 content.append("v=0\r\n");
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java
... ... @@ -158,6 +158,10 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
158 158 device.setCharset("gb2312");
159 159 device.setDeviceId(deviceId);
160 160 device.setFirsRegister(true);
  161 + }else {
  162 + if (device.getOnline() == 0) {
  163 + device.setFirsRegister(true);
  164 + }
161 165 }
162 166 device.setIp(received);
163 167 device.setPort(rPort);
... ... @@ -187,7 +191,6 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
187 191 if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
188 192 // 注册成功
189 193 // 保存到redis
190   - // 下发catelog查询目录
191 194 if (registerFlag == 1 ) {
192 195 logger.info("[{}] 注册成功! deviceId:" + device.getDeviceId(), requestAddress);
193 196 publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER);
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
... ... @@ -27,9 +27,7 @@ import javax.sip.InvalidArgumentException;
27 27 import javax.sip.RequestEvent;
28 28 import javax.sip.ServerTransaction;
29 29 import javax.sip.SipException;
30   -import javax.sip.header.CallIdHeader;
31 30 import javax.sip.header.ExpiresHeader;
32   -import javax.sip.header.Header;
33 31 import javax.sip.header.ToHeader;
34 32 import javax.sip.message.Request;
35 33 import javax.sip.message.Response;
... ... @@ -139,19 +137,17 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
139 137  
140 138 if (subscribeInfo.getExpires() > 0) {
141 139 if (redisCatchStorage.getSubscribe(key) != null) {
142   - dynamicTask.stopCron(key);
  140 + dynamicTask.stop(key);
143 141 }
144 142 String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
145 143 dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key), Integer.parseInt(interval));
146 144  
147 145 redisCatchStorage.updateSubscribe(key, subscribeInfo);
148 146 }else if (subscribeInfo.getExpires() == 0) {
149   - dynamicTask.stopCron(key);
  147 + dynamicTask.stop(key);
150 148 redisCatchStorage.delSubscribe(key);
151 149 }
152 150  
153   -
154   -
155 151 try {
156 152 ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
157 153 Response response = responseXmlAck(evt, resultXml.toString(), parentPlatform);
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
... ... @@ -85,19 +85,18 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
85 85 redisCatchStorage.delPlatformRegisterInfo(callId);
86 86 parentPlatform.setStatus("注册".equals(action));
87 87 // 取回Expires设置,避免注销过程中被置为0
88   - ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId);
89   - String expires = parentPlatformTmp.getExpires();
90   - parentPlatform.setExpires(expires);
91   - parentPlatform.setId(parentPlatformTmp.getId());
  88 + if (!parentPlatformCatch.getParentPlatform().getExpires().equals("0")) {
  89 + ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId);
  90 + String expires = parentPlatformTmp.getExpires();
  91 + parentPlatform.setExpires(expires);
  92 + parentPlatform.setId(parentPlatformTmp.getId());
  93 + redisCatchStorage.updatePlatformRegister(parentPlatform);
  94 + redisCatchStorage.updatePlatformKeepalive(parentPlatform);
  95 + parentPlatformCatch.setParentPlatform(parentPlatform);
  96 + redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
  97 + }
92 98 storager.updateParentPlatformStatus(platformGBId, "注册".equals(action));
93 99  
94   - redisCatchStorage.updatePlatformRegister(parentPlatform);
95   -
96   - redisCatchStorage.updatePlatformKeepalive(parentPlatform);
97   -
98   - parentPlatformCatch.setParentPlatform(parentPlatform);
99   -
100   - redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
101 100 }
102 101 }
103 102  
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
... ... @@ -489,7 +489,7 @@ public class ZLMHttpHookListener {
489 489 }
490 490 String mediaServerId = json.getString("mediaServerId");
491 491 MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
492   - if (userSetup.isAutoApplyPlay() && mediaInfo != null) {
  492 + if (userSetup.isAutoApplyPlay() && mediaInfo != null && mediaInfo.isRtpEnable()) {
493 493 String app = json.getString("app");
494 494 String streamId = json.getString("stream");
495 495 if ("rtp".equals(app)) {
... ... @@ -499,28 +499,16 @@ public class ZLMHttpHookListener {
499 499 String channelId = s[1];
500 500 Device device = redisCatchStorage.getDevice(deviceId);
501 501 if (device != null) {
502   - UUID uuid = UUID.randomUUID();
503   - SSRCInfo ssrcInfo;
504   - String streamId2 = null;
505   - if (mediaInfo.isRtpEnable()) {
506   - streamId2 = String.format("%s_%s", device.getDeviceId(), channelId);
507   - }
508   - ssrcInfo = mediaServerService.openRTPServer(mediaInfo, streamId2);
509   - cmder.playStreamCmd(mediaInfo, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
510   - logger.info("收到订阅消息: " + response.toJSONString());
511   - playService.onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid.toString());
512   - }, null);
  502 + playService.play(mediaInfo,deviceId, channelId, null, null, null);
513 503 }
514   -
515 504 }
516 505 }
517   -
518 506 }
519 507  
520 508 JSONObject ret = new JSONObject();
521 509 ret.put("code", 0);
522 510 ret.put("msg", "success");
523   - return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
  511 + return new ResponseEntity<>(ret.toString(),HttpStatus.OK);
524 512 }
525 513  
526 514 /**
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
... ... @@ -205,7 +205,7 @@ public class ZLMRTPServerFactory {
205 205 /**
206 206 * 调用zlm RESTful API —— startSendRtp
207 207 */
208   - public Boolean startSendRtpStream(MediaServerItem mediaServerItem, Map<String, Object>param) {
  208 + public JSONObject startSendRtpStream(MediaServerItem mediaServerItem, Map<String, Object>param) {
209 209 Boolean result = false;
210 210 JSONObject jsonObject = zlmresTfulUtils.startSendRtp(mediaServerItem, param);
211 211 if (jsonObject == null) {
... ... @@ -216,7 +216,7 @@ public class ZLMRTPServerFactory {
216 216 } else {
217 217 logger.error("RTP推流失败: " + jsonObject.getString("msg"));
218 218 }
219   - return result;
  219 + return jsonObject;
220 220 }
221 221  
222 222 /**
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMStatusEventListener.java
1 1 package com.genersoft.iot.vmp.media.zlm.event;
2 2  
3 3 import com.genersoft.iot.vmp.service.IMediaServerService;
  4 +import com.genersoft.iot.vmp.service.IPlayService;
4 5 import com.genersoft.iot.vmp.service.IStreamProxyService;
5 6 import com.genersoft.iot.vmp.service.IStreamPushService;
6 7 import org.slf4j.Logger;
... ... @@ -34,6 +35,9 @@ public class ZLMStatusEventListener {
34 35 @Autowired
35 36 private IMediaServerService mediaServerService;
36 37  
  38 + @Autowired
  39 + private IPlayService playService;
  40 +
37 41 private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
38 42  
39 43 @Async
... ... @@ -55,6 +59,6 @@ public class ZLMStatusEventListener {
55 59 mediaServerService.zlmServerOffline(event.getMediaServerId());
56 60 streamProxyService.zlmServerOffline(event.getMediaServerId());
57 61 streamPushService.zlmServerOffline(event.getMediaServerId());
58   - // TODO 处理对国标的影响
  62 + playService.zlmServerOffline(event.getMediaServerId());
59 63 }
60 64 }
... ...
src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
... ... @@ -58,7 +58,7 @@ public interface IMediaServerService {
58 58  
59 59 void removeCount(String mediaServerId);
60 60  
61   - void releaseSsrc(MediaServerItem mediaServerItem, String ssrc);
  61 + void releaseSsrc(String mediaServerItemId, String ssrc);
62 62  
63 63 void clearMediaServerForOnline();
64 64  
... ...
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
... ... @@ -17,11 +17,13 @@ public interface IPlayService {
17 17  
18 18 void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid);
19 19  
20   - PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent);
  20 + PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback);
21 21  
22 22 MediaServerItem getNewMediaServerItem(Device device);
23 23  
24 24 void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String toString);
25 25  
26 26 DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime, String endTime, PlayBackCallback errorCallBack);
  27 +
  28 + void zlmServerOffline(String mediaServerId);
27 29 }
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
... ... @@ -52,11 +52,9 @@ public class DeviceServiceImpl implements IDeviceService {
52 52 return false;
53 53 }
54 54 logger.info("移除目录订阅: {}", device.getDeviceId());
55   - dynamicTask.stopCron(device.getDeviceId());
  55 + dynamicTask.stop(device.getDeviceId());
56 56 device.setSubscribeCycleForCatalog(0);
57 57 sipCommander.catalogSubscribe(device, null, null);
58   - // 清空cseq计数
59   -
60 58 return true;
61 59 }
62 60 }
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
... ... @@ -167,13 +167,14 @@ public class MediaServerServiceImpl implements IMediaServerService {
167 167 if (mediaServerItem != null) {
168 168 String streamId = String.format("%s_%s", deviceId, channelId);
169 169 zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
170   - releaseSsrc(mediaServerItem, ssrc);
  170 + releaseSsrc(mediaServerItem.getId(), ssrc);
171 171 }
172 172 streamSession.remove(deviceId, channelId, stream);
173 173 }
174 174  
175 175 @Override
176   - public void releaseSsrc(MediaServerItem mediaServerItem, String ssrc) {
  176 + public void releaseSsrc(String mediaServerItemId, String ssrc) {
  177 + MediaServerItem mediaServerItem = getOne(mediaServerItemId);
177 178 if (mediaServerItem == null || ssrc == null) {
178 179 return;
179 180 }
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
... ... @@ -5,13 +5,13 @@ import com.alibaba.fastjson.JSONArray;
5 5 import com.alibaba.fastjson.JSONObject;
6 6 import com.genersoft.iot.vmp.common.StreamInfo;
7 7 import com.genersoft.iot.vmp.conf.UserSetup;
8   -import com.genersoft.iot.vmp.gb28181.bean.Device;
9   -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
  8 +import com.genersoft.iot.vmp.gb28181.bean.*;
10 9 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
11 10 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
12 11 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
13 12 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
14 13 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
  14 +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
15 15 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
16 16 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
17 17 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
... ... @@ -37,8 +37,7 @@ import org.springframework.util.ResourceUtils;
37 37 import org.springframework.web.context.request.async.DeferredResult;
38 38  
39 39 import java.io.FileNotFoundException;
40   -import java.util.Objects;
41   -import java.util.UUID;
  40 +import java.util.*;
42 41  
43 42 @SuppressWarnings(value = {"rawtypes", "unchecked"})
44 43 @Service
... ... @@ -53,6 +52,9 @@ public class PlayServiceImpl implements IPlayService {
53 52 private SIPCommander cmder;
54 53  
55 54 @Autowired
  55 + private SIPCommanderFroPlatform sipCommanderFroPlatform;
  56 +
  57 + @Autowired
56 58 private IRedisCatchStorage redisCatchStorage;
57 59  
58 60 @Autowired
... ... @@ -78,7 +80,9 @@ public class PlayServiceImpl implements IPlayService {
78 80  
79 81  
80 82 @Override
81   - public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent) {
  83 + public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
  84 + ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
  85 + Runnable timeoutCallback) {
82 86 PlayResult playResult = new PlayResult();
83 87 RequestMessage msg = new RequestMessage();
84 88 String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
... ... @@ -101,29 +105,10 @@ public class PlayServiceImpl implements IPlayService {
101 105 Device device = redisCatchStorage.getDevice(deviceId);
102 106 StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
103 107 playResult.setDevice(device);
104   - // 超时处理
105   - result.onTimeout(()->{
106   - logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
107   - WVPResult wvpResult = new WVPResult();
108   - wvpResult.setCode(-1);
109   - SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, streamInfo.getStream());
110   - if (dialog != null) {
111   - wvpResult.setMsg("收流超时,请稍候重试");
112   - }else {
113   - wvpResult.setMsg("点播超时,请稍候重试");
114   - }
115 108  
116   - msg.setData(wvpResult);
117   - // 点播超时回复BYE
118   - cmder.streamByeCmd(device.getDeviceId(), channelId, streamInfo.getStream());
119   - // 释放rtpserver
120   - mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, streamInfo.getStream());
121   - // 回复之前所有的点播请求
122   - resultHolder.invokeAllResult(msg);
123   - // TODO 释放ssrc
124   - });
125 109 result.onCompletion(()->{
126 110 // 点播结束时调用截图接口
  111 + // TODO 应该在上流时调用更好,结束也可能是错误结束
127 112 try {
128 113 String classPath = ResourceUtils.getURL("classpath:").getPath();
129 114 // 兼容打包为jar的class路径
... ... @@ -161,31 +146,60 @@ public class PlayServiceImpl implements IPlayService {
161 146 if (mediaServerItem.isRtpEnable()) {
162 147 streamId = String.format("%s_%s", device.getDeviceId(), channelId);
163 148 }
164   -
165 149 SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId);
  150 + // 超时处理
  151 + Timer timer = new Timer();
  152 + timer.schedule(new TimerTask() {
  153 + @Override
  154 + public void run() {
  155 + logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  156 + if (timeoutCallback != null) {
  157 + timeoutCallback.run();
  158 + }
  159 + WVPResult wvpResult = new WVPResult();
  160 + wvpResult.setCode(-1);
  161 + SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
  162 + if (dialog != null) {
  163 + wvpResult.setMsg("收流超时,请稍候重试");
  164 + // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  165 + cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream());
  166 + }else {
  167 + wvpResult.setMsg("点播超时,请稍候重试");
  168 + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  169 + mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
  170 + streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  171 + }
  172 +
  173 + msg.setData(wvpResult);
  174 +
  175 + // 回复之前所有的点播请求
  176 + resultHolder.invokeAllResult(msg);
  177 + }
  178 + }, userSetup.getPlayTimeout());
166 179 // 发送点播消息
167 180 cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
168 181 logger.info("收到订阅消息: " + response.toJSONString());
  182 + timer.cancel();
169 183 onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, uuid);
170 184 if (hookEvent != null) {
171 185 hookEvent.response(mediaServerItem, response);
172 186 }
173 187 }, (event) -> {
  188 + timer.cancel();
174 189 WVPResult wvpResult = new WVPResult();
175 190 wvpResult.setCode(-1);
176 191 // 点播返回sip错误
177 192 mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream());
178 193 // 释放ssrc
179   - mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc());
  194 + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
180 195 streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  196 +
181 197 wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
182 198 msg.setData(wvpResult);
183 199 resultHolder.invokeAllResult(msg);
184 200 if (errorEvent != null) {
185 201 errorEvent.response(event);
186 202 }
187   -
188   -
189 203 });
190 204 } else {
191 205 String streamId = streamInfo.getStream();
... ... @@ -222,13 +236,41 @@ public class PlayServiceImpl implements IPlayService {
222 236 streamId2 = String.format("%s_%s", device.getDeviceId(), channelId);
223 237 }
224 238 SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId2);
  239 + // 超时处理
  240 + Timer timer = new Timer();
  241 + timer.schedule(new TimerTask() {
  242 + @Override
  243 + public void run() {
  244 + logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  245 + if (timeoutCallback != null) {
  246 + timeoutCallback.run();
  247 + }
  248 + WVPResult wvpResult = new WVPResult();
  249 + wvpResult.setCode(-1);
  250 + SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
  251 + if (dialog != null) {
  252 + wvpResult.setMsg("收流超时,请稍候重试");
  253 + // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  254 + cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream());
  255 + }else {
  256 + wvpResult.setMsg("点播超时,请稍候重试");
  257 + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  258 + mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
  259 + streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  260 + }
  261 +
  262 + msg.setData(wvpResult);
  263 + // 回复之前所有的点播请求
  264 + resultHolder.invokeAllResult(msg);
  265 + }
  266 + }, userSetup.getPlayTimeout());
225 267 cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
226 268 logger.info("收到订阅消息: " + response.toJSONString());
227 269 onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid);
228 270 }, (event) -> {
229 271 mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream());
230 272 // 释放ssrc
231   - mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc());
  273 + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
232 274 streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
233 275 WVPResult wvpResult = new WVPResult();
234 276 wvpResult.setCode(-1);
... ... @@ -306,14 +348,23 @@ public class PlayServiceImpl implements IPlayService {
306 348 msg.setId(uuid);
307 349 msg.setKey(key);
308 350 PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>();
309   - result.onTimeout(()->{
310   - msg.setData("回放超时");
311   - playBackResult.setCode(-1);
312   - playBackResult.setData(msg);
313   - callback.call(playBackResult);
314   - });
  351 + Timer timer = new Timer();
  352 + timer.schedule(new TimerTask() {
  353 + @Override
  354 + public void run() {
  355 + logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  356 + playBackResult.setCode(-1);
  357 + playBackResult.setData(msg);
  358 + callback.call(playBackResult);
  359 + // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  360 + cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream());
  361 + // 回复之前所有的点播请求
  362 + callback.call(playBackResult);
  363 + }
  364 + }, userSetup.getPlayTimeout());
315 365 cmder.playbackStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, (MediaServerItem mediaServerItem, JSONObject response) -> {
316 366 logger.info("收到订阅消息: " + response.toJSONString());
  367 + timer.cancel();
317 368 StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
318 369 if (streamInfo == null) {
319 370 logger.warn("设备回放API调用失败!");
... ... @@ -331,6 +382,7 @@ public class PlayServiceImpl implements IPlayService {
331 382 playBackResult.setResponse(response);
332 383 callback.call(playBackResult);
333 384 }, event -> {
  385 + timer.cancel();
334 386 msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
335 387 playBackResult.setCode(-1);
336 388 playBackResult.setData(msg);
... ... @@ -370,4 +422,26 @@ public class PlayServiceImpl implements IPlayService {
370 422 return streamInfo;
371 423 }
372 424  
  425 + @Override
  426 + public void zlmServerOffline(String mediaServerId) {
  427 + // 处理正在向上推流的上级平台
  428 + List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
  429 + if (sendRtpItems.size() > 0) {
  430 + for (SendRtpItem sendRtpItem : sendRtpItems) {
  431 + if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
  432 + ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
  433 + sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
  434 + }
  435 + }
  436 + }
  437 + // 处理正在观看的国标设备
  438 + List<SsrcTransaction> allSsrc = streamSession.getAllSsrc();
  439 + if (allSsrc.size() > 0) {
  440 + for (SsrcTransaction ssrcTransaction : allSsrc) {
  441 + if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
  442 + cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
  443 + }
  444 + }
  445 + }
  446 + }
373 447 }
... ...
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
... ... @@ -89,7 +89,7 @@ public interface IRedisCatchStorage {
89 89 * @param channelId
90 90 * @return sendRtpItem
91 91 */
92   - SendRtpItem querySendRTPServer(String platformGbId, String channelId);
  92 + SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId);
93 93  
94 94 List<SendRtpItem> querySendRTPServer(String platformGbId);
95 95  
... ... @@ -98,7 +98,7 @@ public interface IRedisCatchStorage {
98 98 * @param platformGbId
99 99 * @param channelId
100 100 */
101   - void deleteSendRTPServer(String platformGbId, String channelId);
  101 + void deleteSendRTPServer(String platformGbId, String channelId, String callId, String streamId);
102 102  
103 103 /**
104 104 * 查询某个通道是否存在上级点播(RTP推送)
... ...
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
... ... @@ -135,6 +135,32 @@ public interface DeviceChannelMapper {
135 135 "'${item.ipAddress}', ${item.port}, '${item.password}', ${item.PTZType}, ${item.status}, " +
136 136 "'${item.streamId}', ${item.longitude}, ${item.latitude},'${item.createTime}', '${item.updateTime}')" +
137 137 "</foreach> " +
  138 + "ON DUPLICATE KEY UPDATE " +
  139 + "updateTime=VALUES(updateTime), " +
  140 + "name=VALUES(name), " +
  141 + "manufacture=VALUES(manufacture), " +
  142 + "model=VALUES(model), " +
  143 + "owner=VALUES(owner), " +
  144 + "civilCode=VALUES(civilCode), " +
  145 + "block=VALUES(block), " +
  146 + "subCount=VALUES(subCount), " +
  147 + "address=VALUES(address), " +
  148 + "parental=VALUES(parental), " +
  149 + "parentId=VALUES(parentId), " +
  150 + "safetyWay=VALUES(safetyWay), " +
  151 + "registerWay=VALUES(registerWay), " +
  152 + "certNum=VALUES(certNum), " +
  153 + "certifiable=VALUES(certifiable), " +
  154 + "errCode=VALUES(errCode), " +
  155 + "secrecy=VALUES(secrecy), " +
  156 + "ipAddress=VALUES(ipAddress), " +
  157 + "port=VALUES(port), " +
  158 + "password=VALUES(password), " +
  159 + "PTZType=VALUES(PTZType), " +
  160 + "status=VALUES(status), " +
  161 + "streamId=VALUES(streamId), " +
  162 + "longitude=VALUES(longitude), " +
  163 + "latitude=VALUES(latitude)" +
138 164 "</script>")
139 165 int batchAdd(List<DeviceChannel> addChannels);
140 166  
... ... @@ -211,4 +237,15 @@ public interface DeviceChannelMapper {
211 237 " from device_channel\n" +
212 238 " where deviceId = #{deviceId}")
213 239 List<DeviceChannelTree> tree(String deviceId);
  240 +
  241 + @Delete(value = {" <script>" +
  242 + "DELETE " +
  243 + "from " +
  244 + "device_channel " +
  245 + "WHERE " +
  246 + "deviceId = #{deviceId} " +
  247 + " AND channelId NOT IN " +
  248 + "<foreach collection='channels' item='item' open='(' separator=',' close=')' > #{item.channelId}</foreach>" +
  249 + " </script>"})
  250 + int cleanChannelsNotInList(String deviceId, List<DeviceChannel> channels);
214 251 }
... ...
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
... ... @@ -18,6 +18,7 @@ import com.genersoft.iot.vmp.utils.redis.RedisUtil;
18 18 import org.slf4j.Logger;
19 19 import org.slf4j.LoggerFactory;
20 20 import org.springframework.beans.factory.annotation.Autowired;
  21 +import org.springframework.security.core.parameters.P;
21 22 import org.springframework.stereotype.Component;
22 23 import org.springframework.util.StringUtils;
23 24  
... ... @@ -276,19 +277,32 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
276 277  
277 278 @Override
278 279 public void updateSendRTPSever(SendRtpItem sendRtpItem) {
279   - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + sendRtpItem.getPlatformId() + "_" + sendRtpItem.getChannelId();
  280 + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_"
  281 + + sendRtpItem.getPlatformId() + "_" + sendRtpItem.getChannelId() + "_"
  282 + + sendRtpItem.getStreamId() + "_" + sendRtpItem.getCallId();
280 283 redis.set(key, sendRtpItem);
281 284 }
282 285  
283 286 @Override
284   - public SendRtpItem querySendRTPServer(String platformGbId, String channelId) {
285   - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_" + channelId;
286   - return (SendRtpItem)redis.get(key);
  287 + public SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
  288 + if (platformGbId == null) platformGbId = "*";
  289 + if (channelId == null) channelId = "*";
  290 + if (streamId == null) streamId = "*";
  291 + if (callId == null) callId = "*";
  292 + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId
  293 + + "_" + channelId + "_" + streamId + "_" + callId;
  294 + List<Object> scan = redis.scan(key);
  295 + if (scan.size() > 0) {
  296 + return (SendRtpItem)redis.get((String)scan.get(0));
  297 + }else {
  298 + return null;
  299 + }
287 300 }
288 301  
289 302 @Override
290 303 public List<SendRtpItem> querySendRTPServer(String platformGbId) {
291   - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_*";
  304 + if (platformGbId == null) platformGbId = "*";
  305 + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_*" + "_*" + "_*";
292 306 List<Object> queryResult = redis.scan(key);
293 307 List<SendRtpItem> result= new ArrayList<>();
294 308  
... ... @@ -306,18 +320,28 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
306 320 * @param channelId
307 321 */
308 322 @Override
309   - public void deleteSendRTPServer(String platformGbId, String channelId) {
310   - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_" + channelId;
311   - redis.del(key);
  323 + public void deleteSendRTPServer(String platformGbId, String channelId, String callId, String streamId) {
  324 + if (streamId == null) streamId = "*";
  325 + if (callId == null) callId = "*";
  326 + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId
  327 + + "_" + channelId + "_" + streamId + "_" + callId;
  328 + List<Object> scan = redis.scan(key);
  329 + if (scan.size() > 0) {
  330 + for (Object keyStr : scan) {
  331 + redis.del((String)keyStr);
  332 + }
  333 + }
312 334 }
313 335  
  336 +
  337 +
314 338 /**
315 339 * 查询某个通道是否存在上级点播(RTP推送)
316 340 * @param channelId
317 341 */
318 342 @Override
319 343 public boolean isChannelSendingRTP(String channelId) {
320   - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + "*_" + channelId;
  344 + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + "*_" + channelId + "*_" + "*_";
321 345 List<Object> RtpStreams = redis.scan(key);
322 346 if (RtpStreams.size() > 0) {
323 347 return true;
... ...
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
... ... @@ -284,7 +284,8 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
284 284 logger.debug("[目录查询]收到的数据存在重复: {}" , stringBuilder);
285 285 }
286 286 try {
287   - int cleanChannelsResult = deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
  287 +// int cleanChannelsResult = deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
  288 + int cleanChannelsResult = deviceChannelMapper.cleanChannelsNotInList(deviceId, channels);
288 289 int limitCount = 300;
289 290 boolean result = cleanChannelsResult < 0;
290 291 if (!result && channels.size() > 0) {
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
1 1 package com.genersoft.iot.vmp.vmanager.gb28181.device;
2 2  
3 3 import com.alibaba.fastjson.JSONObject;
  4 +import com.genersoft.iot.vmp.conf.DynamicTask;
4 5 import com.genersoft.iot.vmp.gb28181.bean.Device;
5 6 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
6 7 import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
... ... @@ -13,7 +14,6 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
13 14 import com.genersoft.iot.vmp.vmanager.bean.DeviceChannelTree;
14 15 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
15 16 import com.github.pagehelper.PageInfo;
16   -import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport;
17 17 import io.swagger.annotations.Api;
18 18 import io.swagger.annotations.ApiImplicitParam;
19 19 import io.swagger.annotations.ApiImplicitParams;
... ... @@ -57,6 +57,9 @@ public class DeviceQuery {
57 57 @Autowired
58 58 private IDeviceService deviceService;
59 59  
  60 + @Autowired
  61 + private DynamicTask dynamicTask;
  62 +
60 63 /**
61 64 * 使用ID查询国标设备
62 65 * @param deviceId 国标ID
... ... @@ -209,6 +212,8 @@ public class DeviceQuery {
209 212 boolean isSuccess = storager.delete(deviceId);
210 213 if (isSuccess) {
211 214 redisCatchStorage.clearCatchByDeviceId(deviceId);
  215 + // 停止此设备的订阅更新
  216 + dynamicTask.stop(deviceId);
212 217 JSONObject json = new JSONObject();
213 218 json.put("deviceId", deviceId);
214 219 return new ResponseEntity<>(json.toString(),HttpStatus.OK);
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java
... ... @@ -2,8 +2,9 @@ package com.genersoft.iot.vmp.vmanager.gb28181.platform;
2 2  
3 3 import com.alibaba.fastjson.JSON;
4 4 import com.alibaba.fastjson.JSONObject;
5   -import com.genersoft.iot.vmp.gb28181.bean.CatalogData;
6   -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
  5 +import com.genersoft.iot.vmp.common.VideoManagerConstants;
  6 +import com.genersoft.iot.vmp.conf.DynamicTask;
  7 +import com.genersoft.iot.vmp.conf.UserSetup;
7 8 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
8 9 import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
9 10 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
... ... @@ -40,6 +41,9 @@ public class PlatformController {
40 41 private final static Logger logger = LoggerFactory.getLogger(PlatformController.class);
41 42  
42 43 @Autowired
  44 + private UserSetup userSetup;
  45 +
  46 + @Autowired
43 47 private IVideoManagerStorager storager;
44 48  
45 49 @Autowired
... ... @@ -51,6 +55,9 @@ public class PlatformController {
51 55 @Autowired
52 56 private SipConfig sipConfig;
53 57  
  58 + @Autowired
  59 + private DynamicTask dynamicTask;
  60 +
54 61 /**
55 62 * 获取国标服务的配置
56 63 *
... ... @@ -222,7 +229,7 @@ public class PlatformController {
222 229 if (updateResult) {
223 230 // 保存时启用就发送注册
224 231 if (parentPlatform.isEnable()) {
225   - if (parentPlatformOld.isStatus()) {
  232 + if (parentPlatformOld != null && parentPlatformOld.isStatus()) {
226 233 commanderForPlatform.unregister(parentPlatformOld, null, null);
227 234 try {
228 235 Thread.sleep(500);
... ... @@ -287,8 +294,9 @@ public class PlatformController {
287 294 boolean deleteResult = storager.deleteParentPlatform(parentPlatform);
288 295 storager.delCatalogByPlatformId(parentPlatform.getServerGBId());
289 296 storager.delRelationByPlatformId(parentPlatform.getServerGBId());
290   -
291   -
  297 + // 停止发送位置订阅定时任务
  298 + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_MobilePosition_" + parentPlatform.getServerGBId();
  299 + dynamicTask.stop(key);
292 300 if (deleteResult) {
293 301 return new ResponseEntity<>("success", HttpStatus.OK);
294 302 } else {
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
... ... @@ -88,7 +88,7 @@ public class PlayController {
88 88 // 获取可用的zlm
89 89 Device device = storager.queryVideoDevice(deviceId);
90 90 MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
91   - PlayResult playResult = playService.play(newMediaServerItem, deviceId, channelId, null, null);
  91 + PlayResult playResult = playService.play(newMediaServerItem, deviceId, channelId, null, null, null);
92 92  
93 93 return playResult.getResult();
94 94 }
... ...
src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java
... ... @@ -150,7 +150,7 @@ public class ApiStreamController {
150 150 JSONObject result = new JSONObject();
151 151 result.put("error", "channel[ " + code + " ] " + eventResult.msg);
152 152 resultDeferredResult.setResult(result);
153   - });
  153 + }, null);
154 154 return resultDeferredResult;
155 155 }
156 156  
... ...
src/main/resources/all-application.yml
... ... @@ -170,8 +170,6 @@ user-settings:
170 170 save-position-history: false
171 171 # 点播等待超时时间,单位:毫秒
172 172 play-timeout: 3000
173   - # 等待音视频编码信息再返回, true: 可以根据编码选择合适的播放器,false: 可以更快点播
174   - wait-track: false
175 173 # 是否开启接口鉴权
176 174 interface-authentication: true
177 175 # 自动配置redis 可以过期事件
... ...