Commit 51634b1b96fab233c6735c82c747b7de5423bf86

Authored by 648540858
Committed by GitHub
2 parents a4f6102a 7fd34cfd

Merge pull request #965 from wanghui0961/wvp-28181-2.0

feat:支持级联tcp主动
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -130,7 +130,10 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @@ -130,7 +130,10 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
130 // 开启rtcp保活 130 // 开启rtcp保活
131 param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); 131 param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
132 } 132 }
133 - 133 + // tcp主动时,此时是级联下级平台,在回复200ok时,本地已经请求zlm开启监听,跳过下面步骤
  134 + if (sendRtpItem.isTcpActive()) {
  135 + return;
  136 + }
134 if (mediaInfo == null) { 137 if (mediaInfo == null) {
135 RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( 138 RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
136 sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), 139 sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(),
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
1 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; 1 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
2 2
  3 +import com.alibaba.fastjson2.JSON;
  4 +import com.alibaba.fastjson2.JSONObject;
3 import com.genersoft.iot.vmp.common.StreamInfo; 5 import com.genersoft.iot.vmp.common.StreamInfo;
4 import com.genersoft.iot.vmp.conf.DynamicTask; 6 import com.genersoft.iot.vmp.conf.DynamicTask;
5 import com.genersoft.iot.vmp.conf.UserSetting; 7 import com.genersoft.iot.vmp.conf.UserSetting;
@@ -47,6 +49,8 @@ import javax.sip.header.CallIdHeader; @@ -47,6 +49,8 @@ import javax.sip.header.CallIdHeader;
47 import javax.sip.message.Response; 49 import javax.sip.message.Response;
48 import java.text.ParseException; 50 import java.text.ParseException;
49 import java.time.Instant; 51 import java.time.Instant;
  52 +import java.util.HashMap;
  53 +import java.util.Map;
50 import java.util.Random; 54 import java.util.Random;
51 import java.util.Vector; 55 import java.util.Vector;
52 56
@@ -396,6 +400,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -396,6 +400,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
396 content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); 400 content.append("y=" + sendRtpItem.getSsrc() + "\r\n");
397 content.append("f=\r\n"); 401 content.append("f=\r\n");
398 402
  403 +
399 try { 404 try {
400 // 超时未收到Ack应该回复bye,当前等待时间为10秒 405 // 超时未收到Ack应该回复bye,当前等待时间为10秒
401 dynamicTask.startDelay(callIdHeader.getCallId(), () -> { 406 dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
@@ -409,7 +414,33 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -409,7 +414,33 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
409 } 414 }
410 }, 60 * 1000); 415 }, 60 * 1000);
411 responseSdpAck(request, content.toString(), platform); 416 responseSdpAck(request, content.toString(), platform);
412 - 417 + // tcp主动模式,回复sdp后开启监听
  418 + if (sendRtpItem.isTcpActive()) {
  419 + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
  420 + Map<String, Object> param = new HashMap<>(12);
  421 + param.put("vhost","__defaultVhost__");
  422 + param.put("app",sendRtpItem.getApp());
  423 + param.put("stream",sendRtpItem.getStreamId());
  424 + param.put("ssrc", sendRtpItem.getSsrc());
  425 + if (!sendRtpItem.isTcpActive()) {
  426 + param.put("dst_url",sendRtpItem.getIp());
  427 + param.put("dst_port", sendRtpItem.getPort());
  428 + }
  429 + String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
  430 + param.put("is_udp", is_Udp);
  431 + param.put("src_port", localPort);
  432 + param.put("pt", sendRtpItem.getPt());
  433 + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
  434 + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
  435 + if (!sendRtpItem.isTcp()) {
  436 + // 开启rtcp保活
  437 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
  438 + }
  439 + JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpStreamForPassive(mediaInfo, param);
  440 + if (startSendRtpStreamResult != null) {
  441 + startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader);
  442 + }
  443 + }
413 } catch (SipException | InvalidArgumentException | ParseException e) { 444 } catch (SipException | InvalidArgumentException | ParseException e) {
414 logger.error("[命令发送失败] 国标级联 回复SdpAck", e); 445 logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
415 } 446 }
@@ -543,6 +574,18 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -543,6 +574,18 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
543 } 574 }
544 } 575 }
545 576
  577 + private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
  578 + JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
  579 + if (jsonObject == null) {
  580 + logger.error("下级TCP被动启动监听失败: 请检查ZLM服务");
  581 + } else if (jsonObject.getInteger("code") == 0) {
  582 + logger.info("调用ZLM-TCP被动推流接口, 结果: {}", jsonObject);
  583 + logger.info("启动监听TCP被动推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
  584 + } else {
  585 + logger.error("启动监听TCP被动推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param));
  586 + }
  587 + }
  588 +
546 /** 589 /**
547 * 安排推流 590 * 安排推流
548 */ 591 */
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
@@ -289,6 +289,10 @@ public class ZLMRESTfulUtils { @@ -289,6 +289,10 @@ public class ZLMRESTfulUtils {
289 return sendPost(mediaServerItem, "startSendRtp",param, null); 289 return sendPost(mediaServerItem, "startSendRtp",param, null);
290 } 290 }
291 291
  292 + public JSONObject startSendRtpPassive(MediaServerItem mediaServerItem, Map<String, Object> param) {
  293 + return sendPost(mediaServerItem, "startSendRtpPassive",param, null);
  294 + }
  295 +
292 public JSONObject stopSendRtp(MediaServerItem mediaServerItem, Map<String, Object> param) { 296 public JSONObject stopSendRtp(MediaServerItem mediaServerItem, Map<String, Object> param) {
293 return sendPost(mediaServerItem, "stopSendRtp",param, null); 297 return sendPost(mediaServerItem, "stopSendRtp",param, null);
294 } 298 }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
@@ -232,6 +232,13 @@ public class ZLMServerFactory { @@ -232,6 +232,13 @@ public class ZLMServerFactory {
232 } 232 }
233 233
234 /** 234 /**
  235 + * 调用zlm RESTFUL API —— startSendRtpPassive
  236 + */
  237 + public JSONObject startSendRtpStreamForPassive(MediaServerItem mediaServerItem, Map<String, Object>param) {
  238 + return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param);
  239 + }
  240 +
  241 + /**
235 * 查询待转推的流是否就绪 242 * 查询待转推的流是否就绪
236 */ 243 */
237 public Boolean isRtpReady(MediaServerItem mediaServerItem, String streamId) { 244 public Boolean isRtpReady(MediaServerItem mediaServerItem, String streamId) {