Commit f275daa3f86f0cbcbe3176e7942994c3a9869480

Authored by 648540858
1 parent 6fa5b37b

合并主线

src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -10,6 +10,8 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor @@ -10,6 +10,8 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor
10 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; 10 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
11 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; 11 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
12 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; 12 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
  13 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
  14 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
13 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 15 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
14 import com.genersoft.iot.vmp.service.IMediaServerService; 16 import com.genersoft.iot.vmp.service.IMediaServerService;
15 import com.genersoft.iot.vmp.service.IPlayService; 17 import com.genersoft.iot.vmp.service.IPlayService;
@@ -29,7 +31,6 @@ import javax.sip.header.CallIdHeader; @@ -29,7 +31,6 @@ import javax.sip.header.CallIdHeader;
29 import javax.sip.header.FromHeader; 31 import javax.sip.header.FromHeader;
30 import javax.sip.header.HeaderAddress; 32 import javax.sip.header.HeaderAddress;
31 import javax.sip.header.ToHeader; 33 import javax.sip.header.ToHeader;
32 -import java.text.ParseException;  
33 import java.util.HashMap; 34 import java.util.HashMap;
34 import java.util.Map; 35 import java.util.Map;
35 36
@@ -99,49 +100,62 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @@ -99,49 +100,62 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
99 logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId); 100 logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId);
100 return; 101 return;
101 } 102 }
102 - String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; 103 + String isUdp = sendRtpItem.isTcp() ? "0" : "1";
103 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); 104 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
104 logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(), 105 logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(),
105 sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); 106 sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
  107 + Map<String, Object> param = new HashMap<>(12);
  108 + param.put("vhost","__defaultVhost__");
  109 + param.put("app",sendRtpItem.getApp());
  110 + param.put("stream",sendRtpItem.getStreamId());
  111 + param.put("ssrc", sendRtpItem.getSsrc());
  112 + param.put("src_port", sendRtpItem.getLocalPort());
  113 + param.put("pt", sendRtpItem.getPt());
  114 + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
  115 + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
  116 + param.put("is_udp", isUdp);
  117 + if (!sendRtpItem.isTcp()) {
  118 + // udp模式下开启rtcp保活
  119 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
  120 + }
  121 +
106 if (mediaInfo == null) { 122 if (mediaInfo == null) {
107 RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( 123 RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
108 sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), 124 sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(),
109 sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), 125 sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
110 sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); 126 sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
111 redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { 127 redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
112 - startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, json, callIdHeader); 128 + playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, json, param, callIdHeader);
113 }); 129 });
114 - }else {  
115 - JSONObject startSendRtpStreamResult = zlmrtpServerFactory.startSendRtp(mediaInfo, sendRtpItem);  
116 - if (startSendRtpStreamResult != null) {  
117 - startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, callIdHeader);  
118 - }  
119 - }  
120 - }  
121 - }  
122 - private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform,  
123 - JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {  
124 - if (jsonObject == null) {  
125 - logger.error("RTP推流失败: 请检查ZLM服务");  
126 - } else if (jsonObject.getInteger("code") == 0) {  
127 - logger.info("调用ZLM推流接口, 结果: {}", jsonObject);  
128 - logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));  
129 - } else {  
130 - logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param));  
131 - if (sendRtpItem.isOnlyAudio()) {  
132 - Device device = deviceService.getDevice(sendRtpItem.getDeviceId());  
133 - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());  
134 - if (audioBroadcastCatch != null) {  
135 - try {  
136 - cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);  
137 - } catch (SipException | ParseException | InvalidArgumentException |  
138 - SsrcTransactionNotFoundException e) {  
139 - logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage()); 130 + } else {
  131 + // 如果是非严格模式,需要关闭端口占用
  132 + JSONObject startSendRtpStreamResult = null;
  133 + if (sendRtpItem.getLocalPort() != 0) {
  134 + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId());
  135 + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
  136 + if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) {
  137 + if (sendRtpItem.isTcpActive()) {
  138 + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
  139 + }else {
  140 + param.put("dst_url", sendRtpItem.getIp());
  141 + param.put("dst_port", sendRtpItem.getPort());
  142 + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
  143 + }
  144 + }
  145 + }else {
  146 + if (sendRtpItem.isTcpActive()) {
  147 + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
  148 + }else {
  149 + param.put("dst_url", sendRtpItem.getIp());
  150 + param.put("dst_port", sendRtpItem.getPort());
  151 + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
140 } 152 }
141 } 153 }
  154 + if (startSendRtpStreamResult != null) {
  155 + playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader);
  156 + }
142 } 157 }
143 } 158 }
144 -  
145 } 159 }
146 160
147 } 161 }
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -1035,12 +1035,12 @@ public class PlayServiceImpl implements IPlayService { @@ -1035,12 +1035,12 @@ public class PlayServiceImpl implements IPlayService {
1035 SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); 1035 SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
1036 if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { 1036 if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
1037 // 查询流是否存在,不存在则认为是异常状态 1037 // 查询流是否存在,不存在则认为是异常状态
1038 - MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());  
1039 - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStreamId()); 1038 + MediaServerItem mediaServerItemInUse = mediaServerService.getOne(sendRtpItem.getMediaServerId());
  1039 + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItemInUse, sendRtpItem.getApp(), sendRtpItem.getStreamId());
1040 if (streamReady) { 1040 if (streamReady) {
1041 logger.warn("语音广播已经开启: {}", channelId); 1041 logger.warn("语音广播已经开启: {}", channelId);
1042 event.call("语音广播已经开启"); 1042 event.call("语音广播已经开启");
1043 - return; 1043 + return false;
1044 } else { 1044 } else {
1045 stopAudioBroadcast(device.getDeviceId(), channelId); 1045 stopAudioBroadcast(device.getDeviceId(), channelId);
1046 } 1046 }