Commit a71063dd1fc25d99486b36ba65c3081a3c8c7c01

Authored by lawrencehj
1 parent 32fbfd8d

增加上级点播停止后通知设备停止推流功能,并自动与本地播放协同

src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
@@ -156,6 +156,7 @@ public class SIPProcessorFactory { @@ -156,6 +156,7 @@ public class SIPProcessorFactory {
156 processor.setRequestEvent(evt); 156 processor.setRequestEvent(evt);
157 processor.setRedisCatchStorage(redisCatchStorage); 157 processor.setRedisCatchStorage(redisCatchStorage);
158 processor.setZlmrtpServerFactory(zlmrtpServerFactory); 158 processor.setZlmrtpServerFactory(zlmrtpServerFactory);
  159 + processor.setSIPCommander(cmder);
159 return processor; 160 return processor;
160 } else if (Request.CANCEL.equals(method)) { 161 } else if (Request.CANCEL.equals(method)) {
161 CancelRequestProcessor processor = new CancelRequestProcessor(); 162 CancelRequestProcessor processor = new CancelRequestProcessor();
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java
1 package com.genersoft.iot.vmp.gb28181.transmit.request.impl; 1 package com.genersoft.iot.vmp.gb28181.transmit.request.impl;
2 2
  3 +import javax.sip.address.SipURI;
3 import javax.sip.Dialog; 4 import javax.sip.Dialog;
4 import javax.sip.DialogState; 5 import javax.sip.DialogState;
5 import javax.sip.InvalidArgumentException; 6 import javax.sip.InvalidArgumentException;
6 import javax.sip.RequestEvent; 7 import javax.sip.RequestEvent;
7 import javax.sip.SipException; 8 import javax.sip.SipException;
  9 +import javax.sip.header.FromHeader;
  10 +import javax.sip.header.HeaderAddress;
  11 +import javax.sip.header.ToHeader;
8 import javax.sip.message.Response; 12 import javax.sip.message.Response;
9 13
10 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; 14 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
  15 +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
11 import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; 16 import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor;
12 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; 17 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
13 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 18 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
14 19
  20 +import org.apache.log4j.Logger;
  21 +
15 import java.text.ParseException; 22 import java.text.ParseException;
16 import java.util.HashMap; 23 import java.util.HashMap;
17 import java.util.Map; 24 import java.util.Map;
18 25
19 /** 26 /**
20 * @Description: BYE请求处理器 27 * @Description: BYE请求处理器
21 - * @author: swwheihei  
22 - * @date: 2020年5月3日 下午5:32:05 28 + * @author: lawrencehj
  29 + * @date: 2021年3月9日
23 */ 30 */
24 public class ByeRequestProcessor extends SIPRequestAbstractProcessor { 31 public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
25 32
26 - private IRedisCatchStorage redisCatchStorage; 33 + private ISIPCommander cmder;
  34 +
  35 + private IRedisCatchStorage redisCatchStorage;
27 36
28 private ZLMRTPServerFactory zlmrtpServerFactory; 37 private ZLMRTPServerFactory zlmrtpServerFactory;
29 38
@@ -38,10 +47,8 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { @@ -38,10 +47,8 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
38 Dialog dialog = evt.getDialog(); 47 Dialog dialog = evt.getDialog();
39 if (dialog == null) return; 48 if (dialog == null) return;
40 if (dialog.getState().equals(DialogState.TERMINATED)) { 49 if (dialog.getState().equals(DialogState.TERMINATED)) {
41 - String remoteUri = dialog.getRemoteParty().getURI().toString();  
42 - String localUri = dialog.getLocalParty().getURI().toString();  
43 - String platformGbId = remoteUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@"));  
44 - String channelId = localUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@")); 50 + String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
  51 + String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
45 SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); 52 SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId);
46 String streamId = sendRtpItem.getStreamId(); 53 String streamId = sendRtpItem.getStreamId();
47 Map<String, Object> param = new HashMap<>(); 54 Map<String, Object> param = new HashMap<>();
@@ -50,6 +57,11 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { @@ -50,6 +57,11 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
50 param.put("stream",streamId); 57 param.put("stream",streamId);
51 System.out.println("停止向上级推流:" + streamId); 58 System.out.println("停止向上级推流:" + streamId);
52 zlmrtpServerFactory.stopSendRtpStream(param); 59 zlmrtpServerFactory.stopSendRtpStream(param);
  60 + redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
  61 + if (zlmrtpServerFactory.totalReaderCount(streamId) == 0) {
  62 + System.out.println(streamId + "无其它观看者,通知设备停止推流");
  63 + cmder.streamByeCmd(streamId);
  64 + }
53 } 65 }
54 } catch (SipException e) { 66 } catch (SipException e) {
55 e.printStackTrace(); 67 e.printStackTrace();
@@ -58,8 +70,6 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { @@ -58,8 +70,6 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
58 } catch (ParseException e) { 70 } catch (ParseException e) {
59 e.printStackTrace(); 71 e.printStackTrace();
60 } 72 }
61 - // TODO 优先级99 Bye Request消息实现,此消息一般为级联消息,上级给下级发送视频停止指令  
62 -  
63 } 73 }
64 74
65 /*** 75 /***
@@ -89,4 +99,13 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { @@ -89,4 +99,13 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
89 public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) { 99 public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) {
90 this.zlmrtpServerFactory = zlmrtpServerFactory; 100 this.zlmrtpServerFactory = zlmrtpServerFactory;
91 } 101 }
  102 +
  103 + public ISIPCommander getSIPCommander() {
  104 + return cmder;
  105 + }
  106 +
  107 + public void setSIPCommander(ISIPCommander cmder) {
  108 + this.cmder = cmder;
  109 + }
  110 +
92 } 111 }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -267,20 +267,25 @@ public class ZLMHttpHookListener { @@ -267,20 +267,25 @@ public class ZLMHttpHookListener {
267 } 267 }
268 268
269 String streamId = json.getString("stream"); 269 String streamId = json.getString("stream");
270 -  
271 - cmder.streamByeCmd(streamId);  
272 StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); 270 StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
273 - if (streamInfo!=null){  
274 - redisCatchStorage.stopPlay(streamInfo);  
275 - storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); 271 +
  272 + JSONObject ret = new JSONObject();
  273 + ret.put("code", 0);
  274 + ret.put("close", true);
  275 +
  276 + if (streamInfo != null) {
  277 + if (redisCatchStorage.isChannelSendingRTP(streamInfo.getChannelId())) {
  278 + ret.put("close", false);
  279 + } else {
  280 + cmder.streamByeCmd(streamId);
  281 + redisCatchStorage.stopPlay(streamInfo);
  282 + storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
  283 + }
276 }else{ 284 }else{
  285 + cmder.streamByeCmd(streamId);
277 streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); 286 streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
278 redisCatchStorage.stopPlayback(streamInfo); 287 redisCatchStorage.stopPlayback(streamInfo);
279 } 288 }
280 -  
281 - JSONObject ret = new JSONObject();  
282 - ret.put("code", 0);  
283 - ret.put("close", true);  
284 return new ResponseEntity<String>(ret.toString(),HttpStatus.OK); 289 return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
285 } 290 }
286 291
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -153,6 +153,16 @@ public class ZLMRTPServerFactory { @@ -153,6 +153,16 @@ public class ZLMRTPServerFactory {
153 } 153 }
154 154
155 /** 155 /**
  156 + * 查询转推的流是否有其它观看者
  157 + * @param streamId
  158 + * @return
  159 + */
  160 + public int totalReaderCount(String streamId) {
  161 + JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo("rtp", "rtmp", streamId);
  162 + return mediaInfo.getInteger("totalReaderCount");
  163 + }
  164 +
  165 + /**
156 * 调用zlm RESTful API —— stopSendRtp 166 * 调用zlm RESTful API —— stopSendRtp
157 */ 167 */
158 public Boolean stopSendRtpStream(Map<String, Object>param) { 168 public Boolean stopSendRtpStream(Map<String, Object>param) {
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -89,4 +89,17 @@ public interface IRedisCatchStorage { @@ -89,4 +89,17 @@ public interface IRedisCatchStorage {
89 */ 89 */
90 SendRtpItem querySendRTPServer(String platformGbId, String channelId); 90 SendRtpItem querySendRTPServer(String platformGbId, String channelId);
91 91
  92 + /**
  93 + * 删除RTP推送信息缓存
  94 + * @param platformGbId
  95 + * @param channelId
  96 + */
  97 + void deleteSendRTPServer(String platformGbId, String channelId);
  98 +
  99 + /**
  100 + * 查询某个通道是否存在上级点播(RTP推送)
  101 + * @param channelId
  102 + */
  103 + boolean isChannelSendingRTP(String channelId);
  104 +
92 } 105 }
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -225,4 +225,30 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @@ -225,4 +225,30 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
225 return (SendRtpItem)redis.get(key); 225 return (SendRtpItem)redis.get(key);
226 } 226 }
227 227
  228 + /**
  229 + * 删除RTP推送信息缓存
  230 + * @param platformGbId
  231 + * @param channelId
  232 + */
  233 + @Override
  234 + public void deleteSendRTPServer(String platformGbId, String channelId) {
  235 + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + platformGbId + "_" + channelId;
  236 + redis.del(key);
  237 + }
  238 +
  239 + /**
  240 + * 查询某个通道是否存在上级点播(RTP推送)
  241 + * @param channelId
  242 + */
  243 + @Override
  244 + public boolean isChannelSendingRTP(String channelId) {
  245 + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + "*_" + channelId;
  246 + List<Object> RtpStreams = redis.scan(key);
  247 + if (RtpStreams.size() > 0) {
  248 + return true;
  249 + } else {
  250 + return false;
  251 + }
  252 + }
  253 +
228 } 254 }