Commit 95ed66293532268a26970d3c06795641999e0712

Authored by 64850858
1 parent ea32cd26

修复stream-none-reader-delay-ms为-1时不自动关闭流

src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
@@ -159,6 +159,7 @@ public class SIPProcessorFactory { @@ -159,6 +159,7 @@ public class SIPProcessorFactory {
159 ByeRequestProcessor processor = new ByeRequestProcessor(); 159 ByeRequestProcessor processor = new ByeRequestProcessor();
160 processor.setRequestEvent(evt); 160 processor.setRequestEvent(evt);
161 processor.setRedisCatchStorage(redisCatchStorage); 161 processor.setRedisCatchStorage(redisCatchStorage);
  162 + processor.setStorager(storager);
162 processor.setZlmrtpServerFactory(zlmrtpServerFactory); 163 processor.setZlmrtpServerFactory(zlmrtpServerFactory);
163 processor.setSIPCommander(cmder); 164 processor.setSIPCommander(cmder);
164 processor.setMediaServerService(mediaServerService); 165 processor.setMediaServerService(mediaServerService);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java
@@ -11,6 +11,8 @@ import javax.sip.header.HeaderAddress; @@ -11,6 +11,8 @@ import javax.sip.header.HeaderAddress;
11 import javax.sip.header.ToHeader; 11 import javax.sip.header.ToHeader;
12 import javax.sip.message.Response; 12 import javax.sip.message.Response;
13 13
  14 +import com.genersoft.iot.vmp.common.StreamInfo;
  15 +import com.genersoft.iot.vmp.gb28181.bean.Device;
14 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; 16 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
15 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; 17 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
16 import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; 18 import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor;
@@ -18,6 +20,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; @@ -18,6 +20,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
18 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 20 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
19 import com.genersoft.iot.vmp.service.IMediaServerService; 21 import com.genersoft.iot.vmp.service.IMediaServerService;
20 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 22 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  23 +import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
21 import org.slf4j.Logger; 24 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory; 25 import org.slf4j.LoggerFactory;
23 26
@@ -38,6 +41,8 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { @@ -38,6 +41,8 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
38 41
39 private IRedisCatchStorage redisCatchStorage; 42 private IRedisCatchStorage redisCatchStorage;
40 43
  44 + private IVideoManagerStorager storager;
  45 +
41 private ZLMRTPServerFactory zlmrtpServerFactory; 46 private ZLMRTPServerFactory zlmrtpServerFactory;
42 47
43 private IMediaServerService mediaServerService; 48 private IMediaServerService mediaServerService;
@@ -56,20 +61,32 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { @@ -56,20 +61,32 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
56 String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); 61 String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
57 String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); 62 String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
58 SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); 63 SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId);
59 - if (sendRtpItem == null) return;  
60 - String streamId = sendRtpItem.getStreamId();  
61 - Map<String, Object> param = new HashMap<>();  
62 - param.put("vhost","__defaultVhost__");  
63 - param.put("app",sendRtpItem.getApp());  
64 - param.put("stream",streamId);  
65 - param.put("ssrc",sendRtpItem.getSsrc());  
66 - logger.info("停止向上级推流:" + streamId);  
67 - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());  
68 - zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);  
69 - redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);  
70 - if (zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId) == 0) {  
71 - logger.info(streamId + "无其它观看者,通知设备停止推流");  
72 - cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId); 64 + logger.info("收到bye, [{}/{}]", platformGbId, channelId);
  65 + if (sendRtpItem != null){
  66 + String streamId = sendRtpItem.getStreamId();
  67 + Map<String, Object> param = new HashMap<>();
  68 + param.put("vhost","__defaultVhost__");
  69 + param.put("app",sendRtpItem.getApp());
  70 + param.put("stream",streamId);
  71 + param.put("ssrc",sendRtpItem.getSsrc());
  72 + logger.info("停止向上级推流:" + streamId);
  73 + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
  74 + zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
  75 + redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
  76 + if (zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId) == 0) {
  77 + logger.info(streamId + "无其它观看者,通知设备停止推流");
  78 + cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId);
  79 + }
  80 + }
  81 + // 可能是设备主动停止
  82 + Device device = storager.queryVideoDeviceByChannelId(platformGbId);
  83 + if (device != null) {
  84 + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
  85 + if (streamInfo != null) {
  86 + redisCatchStorage.stopPlay(streamInfo);
  87 + }
  88 + storager.stopPlay(device.getDeviceId(), channelId);
  89 + mediaServerService.closeRTPServer(device, channelId);
73 } 90 }
74 } 91 }
75 } catch (SipException e) { 92 } catch (SipException e) {
@@ -124,4 +141,12 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { @@ -124,4 +141,12 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
124 public void setMediaServerService(IMediaServerService mediaServerService) { 141 public void setMediaServerService(IMediaServerService mediaServerService) {
125 this.mediaServerService = mediaServerService; 142 this.mediaServerService = mediaServerService;
126 } 143 }
  144 +
  145 + public IVideoManagerStorager getStorager() {
  146 + return storager;
  147 + }
  148 +
  149 + public void setStorager(IVideoManagerStorager storager) {
  150 + this.storager = storager;
  151 + }
127 } 152 }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java
@@ -756,7 +756,6 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { @@ -756,7 +756,6 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
756 deferredResultHolder.invokeResult(msg); 756 deferredResultHolder.invokeResult(msg);
757 } 757 }
758 } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { 758 } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
759 - // } catch (DocumentException e) {  
760 e.printStackTrace(); 759 e.printStackTrace();
761 } 760 }
762 } 761 }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/response/impl/ByeResponseProcessor.java
@@ -27,7 +27,7 @@ public class ByeResponseProcessor implements ISIPResponseProcessor { @@ -27,7 +27,7 @@ public class ByeResponseProcessor implements ISIPResponseProcessor {
27 @Override 27 @Override
28 public void process(ResponseEvent evt, SipLayer layer, SipConfig config) { 28 public void process(ResponseEvent evt, SipLayer layer, SipConfig config) {
29 // TODO Auto-generated method stub 29 // TODO Auto-generated method stub
30 - 30 + System.out.println( );
31 } 31 }
32 32
33 } 33 }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -346,6 +346,10 @@ public class ZLMHttpHookListener { @@ -346,6 +346,10 @@ public class ZLMHttpHookListener {
346 redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch); 346 redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch);
347 } 347 }
348 } 348 }
  349 + MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  350 + if (mediaServerItem != null && "-1".equals(mediaServerItem.getStreamNoneReaderDelayMS())) {
  351 + ret.put("close", false);
  352 + }
349 return new ResponseEntity<String>(ret.toString(),HttpStatus.OK); 353 return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
350 }else { 354 }else {
351 JSONObject ret = new JSONObject(); 355 JSONObject ret = new JSONObject();
@@ -371,7 +375,7 @@ public class ZLMHttpHookListener { @@ -371,7 +375,7 @@ public class ZLMHttpHookListener {
371 if (userSetup.isAutoApplyPlay() && mediaInfo != null) { 375 if (userSetup.isAutoApplyPlay() && mediaInfo != null) {
372 String app = json.getString("app"); 376 String app = json.getString("app");
373 String streamId = json.getString("stream"); 377 String streamId = json.getString("stream");
374 - if ("rtp".equals(app) && streamId.contains("gb_play") ) { 378 + if ("rtp".equals(app)) {
375 String[] s = streamId.split("_"); 379 String[] s = streamId.split("_");
376 if (s.length == 4) { 380 if (s.length == 4) {
377 String deviceId = s[2]; 381 String deviceId = s[2];
@@ -382,7 +386,7 @@ public class ZLMHttpHookListener { @@ -382,7 +386,7 @@ public class ZLMHttpHookListener {
382 SSRCInfo ssrcInfo; 386 SSRCInfo ssrcInfo;
383 String streamId2 = null; 387 String streamId2 = null;
384 if (mediaInfo.isRtpEnable()) { 388 if (mediaInfo.isRtpEnable()) {
385 - streamId2 = String.format("gb_play_%s_%s", device.getDeviceId(), channelId); 389 + streamId2 = String.format("%s/%s", device.getDeviceId(), channelId);
386 } 390 }
387 ssrcInfo = mediaServerService.openRTPServer(mediaInfo, streamId2); 391 ssrcInfo = mediaServerService.openRTPServer(mediaInfo, streamId2);
388 cmder.playStreamCmd(mediaInfo, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { 392 cmder.playStreamCmd(mediaInfo, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -123,7 +123,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR @@ -123,7 +123,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
123 String mediaServerId = streamSession.getMediaServerId(device.getDeviceId(), channelId); 123 String mediaServerId = streamSession.getMediaServerId(device.getDeviceId(), channelId);
124 MediaServerItem mediaServerItem = this.getOne(mediaServerId); 124 MediaServerItem mediaServerItem = this.getOne(mediaServerId);
125 if (mediaServerItem != null) { 125 if (mediaServerItem != null) {
126 - String streamId = String.format("gb_play_%s_%s", device.getDeviceId(), channelId); 126 + String streamId = String.format("%s/%s", device.getDeviceId(), channelId);
127 zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId); 127 zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
128 releaseSsrc(mediaServerItem, streamSession.getSSRC(device.getDeviceId(), channelId)); 128 releaseSsrc(mediaServerItem, streamSession.getSSRC(device.getDeviceId(), channelId));
129 } 129 }
@@ -380,7 +380,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR @@ -380,7 +380,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
380 param.put("hook.on_stream_none_reader",String.format("%s/on_stream_none_reader", hookPrex)); 380 param.put("hook.on_stream_none_reader",String.format("%s/on_stream_none_reader", hookPrex));
381 param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrex)); 381 param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrex));
382 param.put("hook.timeoutSec","20"); 382 param.put("hook.timeoutSec","20");
383 - param.put("general.streamNoneReaderDelayMS",mediaServerItem.getStreamNoneReaderDelayMS()); 383 + param.put("general.streamNoneReaderDelayMS","-1".equals(mediaServerItem.getStreamNoneReaderDelayMS())?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() );
384 384
385 JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param); 385 JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param);
386 386
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -156,7 +156,7 @@ public class PlayServiceImpl implements IPlayService { @@ -156,7 +156,7 @@ public class PlayServiceImpl implements IPlayService {
156 SSRCInfo ssrcInfo; 156 SSRCInfo ssrcInfo;
157 String streamId = null; 157 String streamId = null;
158 if (mediaServerItem.isRtpEnable()) { 158 if (mediaServerItem.isRtpEnable()) {
159 - streamId = String.format("gb_play_%s_%s", device.getDeviceId(), channelId); 159 + streamId = String.format("%s/%s", device.getDeviceId(), channelId);
160 } 160 }
161 161
162 ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId); 162 ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId);
@@ -221,7 +221,7 @@ public class PlayServiceImpl implements IPlayService { @@ -221,7 +221,7 @@ public class PlayServiceImpl implements IPlayService {
221 SSRCInfo ssrcInfo; 221 SSRCInfo ssrcInfo;
222 String streamId2 = null; 222 String streamId2 = null;
223 if (mediaServerItem.isRtpEnable()) { 223 if (mediaServerItem.isRtpEnable()) {
224 - streamId2 = String.format("gb_play_%s_%s", device.getDeviceId(), channelId); 224 + streamId2 = String.format("%s/%s", device.getDeviceId(), channelId);
225 } 225 }
226 ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId2); 226 ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId2);
227 227
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java
@@ -374,4 +374,6 @@ public interface IVideoManagerStorager { @@ -374,4 +374,6 @@ public interface IVideoManagerStorager {
374 void updateMediaServer(MediaServerItem mediaServerItem); 374 void updateMediaServer(MediaServerItem mediaServerItem);
375 375
376 List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean b); 376 List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean b);
  377 +
  378 + Device queryVideoDeviceByChannelId(String platformGbId);
377 } 379 }
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -102,4 +102,7 @@ public interface DeviceChannelMapper { @@ -102,4 +102,7 @@ public interface DeviceChannelMapper {
102 " </script>"}) 102 " </script>"})
103 103
104 List<ChannelReduce> queryChannelListInAll(String query, Boolean online, Boolean hasSubChannel, String platformId, Boolean inPlatform); 104 List<ChannelReduce> queryChannelListInAll(String query, Boolean online, Boolean hasSubChannel, String platformId, Boolean inPlatform);
  105 +
  106 + @Select("SELECT * FROM device_channel WHERE channelId=#{channelId}")
  107 + List<DeviceChannel> queryChannelByChannelId(String channelId);
105 } 108 }
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
@@ -623,4 +623,14 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @@ -623,4 +623,14 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
623 public List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean enable) { 623 public List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean enable) {
624 return streamProxyMapper.selectForEnableInMediaServer(id, enable); 624 return streamProxyMapper.selectForEnableInMediaServer(id, enable);
625 } 625 }
  626 +
  627 + @Override
  628 + public Device queryVideoDeviceByChannelId(String channelId) {
  629 + Device result = null;
  630 + List<DeviceChannel> channelList = deviceChannelMapper.queryChannelByChannelId(channelId);
  631 + if (channelList.size() == 1) {
  632 + result = deviceMapper.getDeviceByDeviceId(channelList.get(0).getDeviceId());
  633 + }
  634 + return result;
  635 + }
626 } 636 }
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
@@ -168,7 +168,10 @@ public class PlayController { @@ -168,7 +168,10 @@ public class PlayController {
168 }) 168 })
169 @PostMapping("/convert/{streamId}") 169 @PostMapping("/convert/{streamId}")
170 public ResponseEntity<String> playConvert(@PathVariable String streamId) { 170 public ResponseEntity<String> playConvert(@PathVariable String streamId) {
171 - StreamInfo streamInfo = streamId.startsWith("gb_play_") ? redisCatchStorage.queryPlayByStreamId(streamId) : redisCatchStorage.queryPlaybackByStreamId(streamId); 171 + StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
  172 + if (streamInfo == null) {
  173 + streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
  174 + }
172 if (streamInfo == null) { 175 if (streamInfo == null) {
173 logger.warn("视频转码API调用失败!, 视频流已经停止!"); 176 logger.warn("视频转码API调用失败!, 视频流已经停止!");
174 return new ResponseEntity<String>("未找到视频流信息, 视频流可能已经停止", HttpStatus.OK); 177 return new ResponseEntity<String>("未找到视频流信息, 视频流可能已经停止", HttpStatus.OK);