Commit 9c555b56b70ed05c1b13f0a26098b702d7364839

Authored by 648540858
1 parent 6fa9dae8

优化语音广播的TCP主动模式

src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -107,29 +107,41 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @@ -107,29 +107,41 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
107 SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, null, null, callIdHeader.getCallId()); 107 SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, null, null, callIdHeader.getCallId());
108 String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; 108 String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
109 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); 109 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
110 - logger.info("收到ACK,开始向上级推流 rtp/{}", sendRtpItem.getStreamId()); 110 + logger.info("[收到ACK],开始使用{}向上级推流 {}/{}->{}:{}({})", sendRtpItem.isTcp() ? "TCP" : "UDP",
  111 + sendRtpItem.getApp(), sendRtpItem.getStreamId(),
  112 + sendRtpItem.getIp() ,sendRtpItem.getPort(),
  113 + sendRtpItem.getSsrc());
111 Map<String, Object> param = new HashMap<>(); 114 Map<String, Object> param = new HashMap<>();
112 param.put("vhost","__defaultVhost__"); 115 param.put("vhost","__defaultVhost__");
113 param.put("app",sendRtpItem.getApp()); 116 param.put("app",sendRtpItem.getApp());
114 param.put("stream",sendRtpItem.getStreamId()); 117 param.put("stream",sendRtpItem.getStreamId());
115 param.put("ssrc", sendRtpItem.getSsrc()); 118 param.put("ssrc", sendRtpItem.getSsrc());
116 - param.put("dst_url",sendRtpItem.getIp());  
117 - param.put("dst_port", sendRtpItem.getPort());  
118 - param.put("is_udp", is_Udp);  
119 param.put("src_port", sendRtpItem.getLocalPort()); 119 param.put("src_port", sendRtpItem.getLocalPort());
120 param.put("pt", sendRtpItem.getPt()); 120 param.put("pt", sendRtpItem.getPt());
121 param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); 121 param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
122 param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); 122 param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
123 - JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); 123 + JSONObject jsonObject;
  124 + if (sendRtpItem.isTcpActive()) {
  125 + jsonObject = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
  126 + }else {
  127 + param.put("is_udp", is_Udp);
  128 + param.put("dst_url",sendRtpItem.getIp());
  129 + param.put("dst_port", sendRtpItem.getPort());
  130 + jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
  131 + }
  132 +
124 if (jsonObject == null) { 133 if (jsonObject == null) {
125 logger.error("RTP推流失败: 请检查ZLM服务"); 134 logger.error("RTP推流失败: 请检查ZLM服务");
126 } else if (jsonObject.getInteger("code") == 0) { 135 } else if (jsonObject.getInteger("code") == 0) {
  136 +
127 if (sendRtpItem.isOnlyAudio()) { 137 if (sendRtpItem.isOnlyAudio()) {
128 AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); 138 AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
129 audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok); 139 audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok);
130 audioBroadcastCatch.setDialog((SIPDialog) evt.getDialog()); 140 audioBroadcastCatch.setDialog((SIPDialog) evt.getDialog());
131 audioBroadcastCatch.setRequest((SIPRequest) evt.getRequest()); 141 audioBroadcastCatch.setRequest((SIPRequest) evt.getRequest());
132 audioBroadcastManager.update(audioBroadcastCatch); 142 audioBroadcastManager.update(audioBroadcastCatch);
  143 + String waiteStreamTimeoutTaskKey = "waite-stream-" + audioBroadcastCatch.getDeviceId() + audioBroadcastCatch.getChannelId();
  144 + dynamicTask.stop(waiteStreamTimeoutTaskKey);
133 } 145 }
134 logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); 146 logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
135 } else { 147 } else {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
1 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; 1 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
2 2
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.alibaba.fastjson.JSONArray;
3 import com.alibaba.fastjson.JSONObject; 5 import com.alibaba.fastjson.JSONObject;
4 import com.genersoft.iot.vmp.conf.DynamicTask; 6 import com.genersoft.iot.vmp.conf.DynamicTask;
5 import com.genersoft.iot.vmp.conf.SipConfig; 7 import com.genersoft.iot.vmp.conf.SipConfig;
@@ -20,7 +22,9 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils; @@ -20,7 +22,9 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
20 import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; 22 import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
21 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; 23 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
22 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; 24 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
  25 +import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
23 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; 26 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
  27 +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
24 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 28 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
25 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItemLite; 29 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItemLite;
26 import com.genersoft.iot.vmp.service.IMediaServerService; 30 import com.genersoft.iot.vmp.service.IMediaServerService;
@@ -91,6 +95,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -91,6 +95,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
91 private ZLMRTPServerFactory zlmrtpServerFactory; 95 private ZLMRTPServerFactory zlmrtpServerFactory;
92 96
93 @Autowired 97 @Autowired
  98 + private ZLMRESTfulUtils zlmresTfulUtils;
  99 +
  100 + @Autowired
94 private IMediaServerService mediaServerService; 101 private IMediaServerService mediaServerService;
95 102
96 @Autowired 103 @Autowired
@@ -674,7 +681,19 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -674,7 +681,19 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
674 subscribeKey.put("mediaServerId", mediaServerItem.getId()); 681 subscribeKey.put("mediaServerId", mediaServerItem.getId());
675 String finalSsrc = ssrc; 682 String finalSsrc = ssrc;
676 // 流已经存在时直接推流 683 // 流已经存在时直接推流
677 - if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) { 684 + JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtsp", stream);
  685 + JSONArray tracks = mediaInfo.getJSONArray("tracks");
  686 + Integer codecId = null;
  687 + if (tracks != null && tracks.size() > 0) {
  688 + for (int i = 0; i < tracks.size(); i++) {
  689 + MediaItem.MediaTrack track = JSON.toJavaObject((JSON)tracks.get(i),MediaItem.MediaTrack.class);
  690 + if (track.getCodecType() == 1) {
  691 + codecId = track.getCodecId();
  692 + break;
  693 + }
  694 + }
  695 + }
  696 + if ((mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"))) {
678 logger.info("发现已经在推流"); 697 logger.info("发现已经在推流");
679 sendRtpItem.setStatus(2); 698 sendRtpItem.setStatus(2);
680 redisCatchStorage.updateSendRTPSever(sendRtpItem); 699 redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -684,9 +703,40 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -684,9 +703,40 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
684 content.append("s=Play\r\n"); 703 content.append("s=Play\r\n");
685 content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); 704 content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
686 content.append("t=0 0\r\n"); 705 content.append("t=0 0\r\n");
687 - content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); 706 + if (codecId == null) {
  707 + if (mediaTransmissionTCP) {
  708 + content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n");
  709 + }else {
  710 + content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");
  711 + }
  712 +
  713 + content.append("a=rtpmap:8 PCMA/8000\r\n");
  714 + }else {
  715 + if (codecId == 4) {
  716 + if (mediaTransmissionTCP) {
  717 + content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 0\r\n");
  718 + }else {
  719 + content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 0\r\n");
  720 + }
  721 + content.append("a=rtpmap:0 PCMU/8000\r\n");
  722 + }else {
  723 + if (mediaTransmissionTCP) {
  724 + content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n");
  725 + }else {
  726 + content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");
  727 + }
  728 + content.append("a=rtpmap:8 PCMA/8000\r\n");
  729 + }
  730 + }
  731 + if (sendRtpItem.isTcp()) {
  732 + content.append("a=connection:new\r\n");
  733 + if (!sendRtpItem.isTcpActive()) {
  734 + content.append("a=setup:active\r\n");
  735 + }else {
  736 + content.append("a=setup:passive\r\n");
  737 + }
  738 + }
688 content.append("a=sendonly\r\n"); 739 content.append("a=sendonly\r\n");
689 - content.append("a=rtpmap:8 PCMA/8000\r\n");  
690 content.append("y="+ finalSsrc + "\r\n"); 740 content.append("y="+ finalSsrc + "\r\n");
691 content.append("f=v/////a/1/8/1\r\n"); 741 content.append("f=v/////a/1/8/1\r\n");
692 742
@@ -727,9 +777,22 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -727,9 +777,22 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
727 } 777 }
728 }, 20*1000); 778 }, 20*1000);
729 779
  780 + boolean finalMediaTransmissionTCP = mediaTransmissionTCP;
730 subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, 781 subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
731 (MediaServerItem mediaServerItemInUse, JSONObject json)->{ 782 (MediaServerItem mediaServerItemInUse, JSONObject json)->{
732 logger.info("收到语音对讲推流"); 783 logger.info("收到语音对讲推流");
  784 + MediaItem mediaItem = JSON.toJavaObject(json, MediaItem.class);
  785 + Integer audioCodecId = null;
  786 + if (mediaItem.getTracks() != null && mediaItem.getTracks().size() > 0) {
  787 + for (int i = 0; i < mediaItem.getTracks().size(); i++) {
  788 + MediaItem.MediaTrack mediaTrack = mediaItem.getTracks().get(i);
  789 + if (mediaTrack.getCodecType() == 1) {
  790 + audioCodecId = mediaTrack.getCodecId();
  791 + break;
  792 + }
  793 + }
  794 + }
  795 +
733 try { 796 try {
734 sendRtpItem.setStatus(2); 797 sendRtpItem.setStatus(2);
735 redisCatchStorage.updateSendRTPSever(sendRtpItem); 798 redisCatchStorage.updateSendRTPSever(sendRtpItem);
@@ -739,9 +802,40 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -739,9 +802,40 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
739 content.append("s=Play\r\n"); 802 content.append("s=Play\r\n");
740 content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); 803 content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
741 content.append("t=0 0\r\n"); 804 content.append("t=0 0\r\n");
742 - content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n"); 805 + if (audioCodecId == null) {
  806 + if (finalMediaTransmissionTCP) {
  807 + content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n");
  808 + }else {
  809 + content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");
  810 + }
  811 +
  812 + content.append("a=rtpmap:8 PCMA/8000\r\n");
  813 + }else {
  814 + if (audioCodecId == 4) {
  815 + if (finalMediaTransmissionTCP) {
  816 + content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 0\r\n");
  817 + }else {
  818 + content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 0\r\n");
  819 + }
  820 + content.append("a=rtpmap:0 PCMU/8000\r\n");
  821 + }else {
  822 + if (finalMediaTransmissionTCP) {
  823 + content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n");
  824 + }else {
  825 + content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");
  826 + }
  827 + content.append("a=rtpmap:8 PCMA/8000\r\n");
  828 + }
  829 + }
743 content.append("a=sendonly\r\n"); 830 content.append("a=sendonly\r\n");
744 - content.append("a=rtpmap:8 PCMA/8000\r\n"); 831 + if (sendRtpItem.isTcp()) {
  832 + content.append("a=connection:new\r\n");
  833 + if (!sendRtpItem.isTcpActive()) {
  834 + content.append("a=setup:active\r\n");
  835 + }else {
  836 + content.append("a=setup:passive\r\n");
  837 + }
  838 + }
745 content.append("y="+ finalSsrc + "\r\n"); 839 content.append("y="+ finalSsrc + "\r\n");
746 content.append("f=v/////a/1/8/1\r\n"); 840 content.append("f=v/////a/1/8/1\r\n");
747 841
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
@@ -253,6 +253,10 @@ public class ZLMRESTfulUtils { @@ -253,6 +253,10 @@ public class ZLMRESTfulUtils {
253 return sendPost(mediaServerItem, "startSendRtp",param, null); 253 return sendPost(mediaServerItem, "startSendRtp",param, null);
254 } 254 }
255 255
  256 + public JSONObject startSendRtpPassive(MediaServerItem mediaServerItem, Map<String, Object> param) {
  257 + return sendPost(mediaServerItem, "startSendRtpPassive",param, null);
  258 + }
  259 +
256 public JSONObject stopSendRtp(MediaServerItem mediaServerItem, Map<String, Object> param) { 260 public JSONObject stopSendRtp(MediaServerItem mediaServerItem, Map<String, Object> param) {
257 return sendPost(mediaServerItem, "stopSendRtp",param, null); 261 return sendPost(mediaServerItem, "stopSendRtp",param, null);
258 } 262 }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -250,6 +250,13 @@ public class ZLMRTPServerFactory { @@ -250,6 +250,13 @@ public class ZLMRTPServerFactory {
250 } 250 }
251 251
252 /** 252 /**
  253 + * 调用zlm RESTFUL API —— startSendRtpPassive
  254 + */
  255 + public JSONObject startSendRtpPassive(MediaServerItem mediaServerItem, Map<String, Object>param) {
  256 + return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param);
  257 + }
  258 +
  259 + /**
253 * 查询待转推的流是否就绪 260 * 查询待转推的流是否就绪
254 */ 261 */
255 public Boolean isRtpReady(MediaServerItem mediaServerItem, String streamId) { 262 public Boolean isRtpReady(MediaServerItem mediaServerItem, String streamId) {