Commit fc90cd7951600ce5173f71c3e28d78e69b4db4ae

Authored by 648540858
1 parent 38a85d43

优化tcp主动方式的语音对讲

src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
... ... @@ -43,6 +43,8 @@ public class UserSetting {
43 43  
44 44 private Boolean syncChannelOnDeviceOnline = Boolean.FALSE;
45 45  
  46 + private Boolean pushStreamAfterAck = Boolean.FALSE;
  47 +
46 48 private String serverId = "000000";
47 49  
48 50 private String thirdPartyGBIdReg = "[\\s\\S]*";
... ... @@ -196,4 +198,12 @@ public class UserSetting {
196 198 public void setSyncChannelOnDeviceOnline(Boolean syncChannelOnDeviceOnline) {
197 199 this.syncChannelOnDeviceOnline = syncChannelOnDeviceOnline;
198 200 }
  201 +
  202 + public Boolean getPushStreamAfterAck() {
  203 + return pushStreamAfterAck;
  204 + }
  205 +
  206 + public void setPushStreamAfterAck(Boolean pushStreamAfterAck) {
  207 + this.pushStreamAfterAck = pushStreamAfterAck;
  208 + }
199 209 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
1 1 package com.genersoft.iot.vmp.gb28181.transmit.event.request;
2 2  
3   -import com.genersoft.iot.vmp.conf.SipConfig;
4 3 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
5   -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
6 4 import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
7 5 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
8   -import gov.nist.javax.sip.SipProviderImpl;
9 6 import gov.nist.javax.sip.message.SIPRequest;
10 7 import gov.nist.javax.sip.message.SIPResponse;
11   -import gov.nist.javax.sip.stack.SIPServerTransactionImpl;
12 8 import org.apache.commons.lang3.ArrayUtils;
13 9 import org.dom4j.Document;
14 10 import org.dom4j.DocumentException;
... ... @@ -17,14 +13,14 @@ import org.dom4j.io.SAXReader;
17 13 import org.slf4j.Logger;
18 14 import org.slf4j.LoggerFactory;
19 15 import org.springframework.beans.factory.annotation.Autowired;
20   -import org.springframework.beans.factory.annotation.Qualifier;
21   -import org.springframework.security.core.parameters.P;
22 16  
23 17 import javax.sip.*;
24 18 import javax.sip.address.Address;
25 19 import javax.sip.address.AddressFactory;
26 20 import javax.sip.address.SipURI;
27   -import javax.sip.header.*;
  21 +import javax.sip.header.ContentTypeHeader;
  22 +import javax.sip.header.ExpiresHeader;
  23 +import javax.sip.header.HeaderFactory;
28 24 import javax.sip.message.MessageFactory;
29 25 import javax.sip.message.Request;
30 26 import javax.sip.message.Response;
... ... @@ -157,7 +153,10 @@ public abstract class SIPRequestProcessorParent {
157 153 responseAckExtraParam.content = sdp;
158 154 responseAckExtraParam.sipURI = sipURI;
159 155  
160   - return responseAck(request, Response.OK, null, responseAckExtraParam);
  156 + SIPResponse sipResponse = responseAck(request, Response.OK, null, responseAckExtraParam);
  157 +
  158 +
  159 + return sipResponse;
161 160 }
162 161  
163 162 /**
... ... @@ -190,7 +189,8 @@ public abstract class SIPRequestProcessorParent {
190 189 reader.setEncoding(charset);
191 190 // 对海康出现的未转义字符做处理。
192 191 String[] destStrArray = new String[]{"<",">","&","'","""};
193   - char despChar = '&'; // 或许可扩展兼容其他字符
  192 + // 或许可扩展兼容其他字符
  193 + char despChar = '&';
194 194 byte destBye = (byte) despChar;
195 195 List<Byte> result = new ArrayList<>();
196 196 byte[] rawContent = request.getRawContent();
... ... @@ -220,4 +220,5 @@ public abstract class SIPRequestProcessorParent {
220 220 return xml.getRootElement();
221 221 }
222 222  
  223 +
223 224 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
1 1 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
2 2  
3   -import com.alibaba.fastjson2.JSON;
4 3 import com.alibaba.fastjson2.JSONObject;
5 4 import com.genersoft.iot.vmp.conf.DynamicTask;
6   -import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
7   -import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch;
8   -import com.genersoft.iot.vmp.gb28181.bean.Device;
  5 +import com.genersoft.iot.vmp.conf.UserSetting;
9 6 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
10 7 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
11   -import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
12 8 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
13   -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
14   -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
15 9 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
16 10 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
17 11 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
... ... @@ -19,8 +13,8 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
19 13 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
20 14 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
21 15 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
22   -import com.genersoft.iot.vmp.service.IDeviceService;
23 16 import com.genersoft.iot.vmp.service.IMediaServerService;
  17 +import com.genersoft.iot.vmp.service.IPlayService;
24 18 import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
25 19 import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
26 20 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
... ... @@ -31,15 +25,12 @@ import org.springframework.beans.factory.InitializingBean;
31 25 import org.springframework.beans.factory.annotation.Autowired;
32 26 import org.springframework.stereotype.Component;
33 27  
34   -import javax.sip.InvalidArgumentException;
35 28 import javax.sip.RequestEvent;
36   -import javax.sip.SipException;
37 29 import javax.sip.address.SipURI;
38 30 import javax.sip.header.CallIdHeader;
39 31 import javax.sip.header.FromHeader;
40 32 import javax.sip.header.HeaderAddress;
41 33 import javax.sip.header.ToHeader;
42   -import java.text.ParseException;
43 34 import java.util.HashMap;
44 35 import java.util.Map;
45 36  
... ... @@ -50,7 +41,7 @@ import java.util.Map;
50 41 @Component
51 42 public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
52 43  
53   - private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class);
  44 + private final Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class);
54 45 private final String method = "ACK";
55 46  
56 47 @Autowired
... ... @@ -78,31 +69,20 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
78 69 private IMediaServerService mediaServerService;
79 70  
80 71 @Autowired
81   - private ZlmHttpHookSubscribe subscribe;
82   -
83   - @Autowired
84 72 private DynamicTask dynamicTask;
85 73  
86 74 @Autowired
87   - private ISIPCommander cmder;
88   -
89   - @Autowired
90   - private IDeviceService deviceService;
91   -
92   - @Autowired
93   - private ISIPCommanderForPlatform commanderForPlatform;
  75 + private RedisGbPlayMsgListener redisGbPlayMsgListener;
94 76  
95 77 @Autowired
96   - private AudioBroadcastManager audioBroadcastManager;
  78 + private UserSetting userSetting;
97 79  
98 80 @Autowired
99   - private RedisGbPlayMsgListener redisGbPlayMsgListener;
  81 + private IPlayService playService;
100 82  
101 83  
102 84 /**
103 85 * 处理 ACK请求
104   - *
105   - * @param evt
106 86 */
107 87 @Override
108 88 public void process(RequestEvent evt) {
... ... @@ -110,100 +90,73 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
110 90  
111 91 String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
112 92 logger.info("[收到ACK]: platformGbId->{}", platformGbId);
113   - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId);
114   - // 取消设置的超时任务
115   - dynamicTask.stop(callIdHeader.getCallId());
116   - String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
117   - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
118   - if (sendRtpItem == null) {
119   - logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId);
120   - return;
121   - }
122   - String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
123   - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
124   - logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(),
125   - sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
126   - Map<String, Object> param = new HashMap<>(12);
127   - param.put("vhost","__defaultVhost__");
128   - param.put("app",sendRtpItem.getApp());
129   - param.put("stream",sendRtpItem.getStreamId());
130   - param.put("ssrc", sendRtpItem.getSsrc());
131   - param.put("src_port", sendRtpItem.getLocalPort());
132   - param.put("pt", sendRtpItem.getPt());
133   - param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
134   - param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
135   - if (!sendRtpItem.isTcp()) {
136   - // udp模式下开启rtcp保活
137   - param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
138   - }
  93 + if (userSetting.getPushStreamAfterAck()) {
  94 + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId);
  95 + // 取消设置的超时任务
  96 + dynamicTask.stop(callIdHeader.getCallId());
  97 + String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
  98 + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
  99 + if (sendRtpItem == null) {
  100 + logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId);
  101 + return;
  102 + }
  103 + String isUdp = sendRtpItem.isTcp() ? "0" : "1";
  104 + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
  105 + logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(),
  106 + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
  107 + Map<String, Object> param = new HashMap<>(12);
  108 + param.put("vhost","__defaultVhost__");
  109 + param.put("app",sendRtpItem.getApp());
  110 + param.put("stream",sendRtpItem.getStreamId());
  111 + param.put("ssrc", sendRtpItem.getSsrc());
  112 + param.put("src_port", sendRtpItem.getLocalPort());
  113 + param.put("pt", sendRtpItem.getPt());
  114 + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
  115 + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
  116 + param.put("is_udp", isUdp);
  117 + if (!sendRtpItem.isTcp()) {
  118 + // udp模式下开启rtcp保活
  119 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
  120 + }
139 121  
140   - if (mediaInfo == null) {
141   - RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
142   - sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(),
143   - sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
144   - sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
145   - redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
146   - startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, json, param, callIdHeader);
147   - });
148   - } else {
149   - // 如果是非严格模式,需要关闭端口占用
150   - JSONObject startSendRtpStreamResult = null;
151   - if (sendRtpItem.getLocalPort() != 0) {
152   - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId());
153   - hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
154   - if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) {
  122 + if (mediaInfo == null) {
  123 + RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
  124 + sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(),
  125 + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
  126 + sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
  127 + redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
  128 + playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, json, param, callIdHeader);
  129 + });
  130 + } else {
  131 + // 如果是非严格模式,需要关闭端口占用
  132 + JSONObject startSendRtpStreamResult = null;
  133 + if (sendRtpItem.getLocalPort() != 0) {
  134 + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId());
  135 + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
  136 + if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) {
  137 + if (sendRtpItem.isTcpActive()) {
  138 + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
  139 + }else {
  140 + param.put("dst_url", sendRtpItem.getIp());
  141 + param.put("dst_port", sendRtpItem.getPort());
  142 + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
  143 + }
  144 + }
  145 + }else {
155 146 if (sendRtpItem.isTcpActive()) {
156 147 startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
157 148 }else {
158   - param.put("is_udp", is_Udp);
159 149 param.put("dst_url", sendRtpItem.getIp());
160 150 param.put("dst_port", sendRtpItem.getPort());
161 151 startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
162 152 }
163 153 }
164   - }else {
165   - if (sendRtpItem.isTcpActive()) {
166   - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
167   - }else {
168   - param.put("is_udp", is_Udp);
169   - param.put("dst_url", sendRtpItem.getIp());
170   - param.put("dst_port", sendRtpItem.getPort());
171   - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
172   - }
173   - }
174   - if (startSendRtpStreamResult != null) {
175   - startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader);
176   - }
177   - }
178   - }
179   - private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
180   - JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
181   - if (jsonObject == null) {
182   - logger.error("RTP推流失败: 请检查ZLM服务");
183   - } else if (jsonObject.getInteger("code") == 0) {
184   - logger.info("调用ZLM推流接口, 结果: {}", jsonObject);
185   - logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
186   - } else {
187   - logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param));
188   - if (sendRtpItem.isOnlyAudio()) {
189   - Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
190   - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
191   - if (audioBroadcastCatch != null) {
192   - try {
193   - cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
194   - } catch (SipException | ParseException | InvalidArgumentException |
195   - SsrcTransactionNotFoundException e) {
196   - logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage());
197   - }
198   - }
199   - }else {
200   - // 向上级平台
201   - try {
202   - commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
203   - } catch (SipException | InvalidArgumentException | ParseException e) {
204   - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
  154 + if (startSendRtpStreamResult != null) {
  155 + playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader);
205 156 }
206 157 }
207 158 }
  159 +
208 160 }
  161 +
209 162 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
... ... @@ -439,18 +439,23 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
439 439  
440 440 try {
441 441 // 超时未收到Ack应该回复bye,当前等待时间为10秒
442   - dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
443   - logger.info("Ack 等待超时");
444   - mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc());
445   - // 回复bye
446   - try {
447   - cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
448   - } catch (SipException | InvalidArgumentException | ParseException e) {
449   - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
450   - }
451   - }, 60 * 1000);
452   - responseSdpAck(request, content.toString(), platform);
  442 + if (userSetting.getPushStreamAfterAck()) {
  443 + dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
  444 + logger.info("Ack 等待超时");
  445 + mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc());
  446 + // 回复bye
  447 + try {
  448 + cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
  449 + } catch (SipException | InvalidArgumentException | ParseException e) {
  450 + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
  451 + }
  452 + }, 60 * 1000);
  453 + }
453 454  
  455 + SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform);
  456 + if (!userSetting.getPushStreamAfterAck()) {
  457 + playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader());
  458 + }
454 459 } catch (SipException e) {
455 460 e.printStackTrace();
456 461 } catch (InvalidArgumentException e) {
... ... @@ -878,7 +883,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
878 883 content.append("f=\r\n");
879 884  
880 885 try {
881   - return responseSdpAck(request, content.toString(), platform);
  886 + SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform);
  887 + if (!userSetting.getPushStreamAfterAck()) {
  888 + playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader());
  889 + }
  890 + return sipResponse;
882 891 } catch (SipException e) {
883 892 e.printStackTrace();
884 893 } catch (InvalidArgumentException e) {
... ... @@ -968,7 +977,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
968 977 return;
969 978 }
970 979 String addressStr = sdp.getOrigin().getAddress();
971   - logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", requesterId, addressStr, port, ssrc);
  980 + logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}, {}", requesterId, addressStr, port, ssrc,
  981 + mediaTransmissionTCP ? (tcpActive? "TCP主动":"TCP被动") : "UDP");
972 982  
973 983 MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device);
974 984 if (mediaServerItem == null) {
... ... @@ -993,10 +1003,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
993 1003 }
994 1004 return;
995 1005 }
996   - sendRtpItem.setTcp(mediaTransmissionTCP);
997   - if (tcpActive != null) {
998   - sendRtpItem.setTcpActive(tcpActive);
999   - }
  1006 +
1000 1007 String app = "broadcast";
1001 1008 String stream = device.getDeviceId() + "_" + audioBroadcastCatch.getChannelId();
1002 1009  
... ... @@ -1011,6 +1018,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
1011 1018 sendRtpItem.setUsePs(false);
1012 1019 sendRtpItem.setRtcp(false);
1013 1020 sendRtpItem.setOnlyAudio(true);
  1021 + sendRtpItem.setTcp(mediaTransmissionTCP);
  1022 + if (tcpActive != null) {
  1023 + sendRtpItem.setTcpActive(tcpActive);
  1024 + }
  1025 +
1014 1026 redisCatchStorage.updateSendRTPSever(sendRtpItem);
1015 1027  
1016 1028 Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream);
... ... @@ -1083,6 +1095,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
1083 1095 audioBroadcastCatch.setSipTransactionInfoByRequset(sipResponse);
1084 1096 audioBroadcastManager.update(audioBroadcastCatch);
1085 1097  
  1098 + // 开启发流,大华在收到200OK后就会开始建立连接
  1099 + if (!userSetting.getPushStreamAfterAck()) {
  1100 + playService.startPushStream(sendRtpItem, sipResponse, parentPlatform, request.getCallIdHeader());
  1101 + }
  1102 +
1086 1103 } catch (SipException | InvalidArgumentException | ParseException | SdpParseException e) {
1087 1104 logger.error("[命令发送失败] 语音对讲 回复200OK(SDP): {}", e.getMessage());
1088 1105 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
... ... @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
35 35 public class CatalogResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
36 36  
37 37 private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class);
  38 +
38 39 private final String cmdType = "Catalog";
39 40  
40 41 @Autowired
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
... ... @@ -319,7 +319,7 @@ public class ZLMHttpHookListener {
319 319 });
320 320  
321 321 if ("rtsp".equals(param.getSchema())){
322   - logger.info("on_stream_changed:注册->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream());
  322 + logger.info("流变化:注册->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream());
323 323 if (param.isRegist()) {
324 324 mediaServerService.addCount(param.getMediaServerId());
325 325 }else {
... ... @@ -399,7 +399,11 @@ public class ZLMHttpHookListener {
399 399 }
400 400 }
401 401  
  402 + }else {
  403 + logger.info("[语音对讲] 未找到通道:{}", channelId);
402 404 }
  405 + }else{
  406 + logger.info("[语音对讲] 未找到设备:{}", deviceId);
403 407 }
404 408 }
405 409 }
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
... ... @@ -36,7 +36,7 @@ public class ZLMRESTfulUtils {
36 36 // 设置连接超时时间
37 37 httpClientBuilder.connectTimeout(5,TimeUnit.SECONDS);
38 38 // 设置读取超时时间
39   - httpClientBuilder.readTimeout(5,TimeUnit.SECONDS);
  39 + httpClientBuilder.readTimeout(15,TimeUnit.SECONDS);
40 40 // 设置连接池
41 41 httpClientBuilder.connectionPool(new ConnectionPool(16, 5, TimeUnit.MINUTES));
42 42 if (logger.isDebugEnabled()) {
... ...
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
... ... @@ -3,9 +3,7 @@ package com.genersoft.iot.vmp.service;
3 3 import com.alibaba.fastjson2.JSONObject;
4 4 import com.genersoft.iot.vmp.common.StreamInfo;
5 5 import com.genersoft.iot.vmp.conf.exception.ServiceException;
6   -import com.genersoft.iot.vmp.gb28181.bean.Device;
7   -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
8   -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo;
  6 +import com.genersoft.iot.vmp.gb28181.bean.*;
9 7 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
10 8 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
11 9 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
... ... @@ -15,11 +13,14 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo;
15 13 import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
16 14 import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
17 15 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
  16 +import gov.nist.javax.sip.message.SIPResponse;
18 17 import org.springframework.web.context.request.async.DeferredResult;
19 18  
20 19 import javax.sip.InvalidArgumentException;
21 20 import javax.sip.SipException;
  21 +import javax.sip.header.CallIdHeader;
22 22 import java.text.ParseException;
  23 +import java.util.Map;
23 24  
24 25 /**
25 26 * 点播处理
... ... @@ -61,4 +62,9 @@ public interface IPlayService {
61 62 void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException;
62 63  
63 64 void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException;
  65 +
  66 + void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader);
  67 +
  68 + void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
  69 + JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader);
64 70 }
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
... ... @@ -24,16 +24,15 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
24 24 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
25 25 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
26 26 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
  27 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
27 28 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
28 29 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
29 30 import com.genersoft.iot.vmp.service.IDeviceService;
30 31 import com.genersoft.iot.vmp.service.IMediaServerService;
31 32 import com.genersoft.iot.vmp.service.IMediaService;
32 33 import com.genersoft.iot.vmp.service.IPlayService;
33   -import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
34   -import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
35   -import com.genersoft.iot.vmp.service.bean.PlayBackResult;
36   -import com.genersoft.iot.vmp.service.bean.SSRCInfo;
  34 +import com.genersoft.iot.vmp.service.bean.*;
  35 +import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
37 36 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
38 37 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
39 38 import com.genersoft.iot.vmp.utils.DateUtil;
... ... @@ -42,6 +41,7 @@ import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
42 41 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
43 42 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
44 43 import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
  44 +import gov.nist.javax.sip.message.SIPResponse;
45 45 import org.slf4j.Logger;
46 46 import org.slf4j.LoggerFactory;
47 47 import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -54,6 +54,7 @@ import org.springframework.web.context.request.async.DeferredResult;
54 54 import javax.sip.InvalidArgumentException;
55 55 import javax.sip.ResponseEvent;
56 56 import javax.sip.SipException;
  57 +import javax.sip.header.CallIdHeader;
57 58 import java.math.BigDecimal;
58 59 import java.math.RoundingMode;
59 60 import java.text.ParseException;
... ... @@ -119,11 +120,20 @@ public class PlayServiceImpl implements IPlayService {
119 120 @Autowired
120 121 private ZlmHttpHookSubscribe subscribe;
121 122  
  123 + @Autowired
  124 + private ISIPCommanderForPlatform commanderForPlatform;
  125 +
122 126  
123 127 @Qualifier("taskExecutor")
124 128 @Autowired
125 129 private ThreadPoolTaskExecutor taskExecutor;
126 130  
  131 + @Autowired
  132 + private RedisGbPlayMsgListener redisGbPlayMsgListener;
  133 +
  134 + @Autowired
  135 + private ZlmHttpHookSubscribe hookSubscribe;
  136 +
127 137  
128 138 @Override
129 139 public void play(MediaServerItem mediaServerItem, String deviceId, String channelId,
... ... @@ -1179,4 +1189,100 @@ public class PlayServiceImpl implements IPlayService {
1179 1189 Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
1180 1190 cmder.playResumeCmd(device, streamInfo);
1181 1191 }
  1192 +
  1193 + @Override
  1194 + public void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader) {
  1195 +
  1196 + // 开始发流
  1197 + // 取消设置的超时任务
  1198 +// String channelId = request.getCallIdHeader().getCallId();
  1199 +
  1200 + String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
  1201 + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
  1202 + logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(),
  1203 + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
  1204 + Map<String, Object> param = new HashMap<>(12);
  1205 + param.put("vhost","__defaultVhost__");
  1206 + param.put("app",sendRtpItem.getApp());
  1207 + param.put("stream",sendRtpItem.getStreamId());
  1208 + param.put("ssrc", sendRtpItem.getSsrc());
  1209 + param.put("src_port", sendRtpItem.getLocalPort());
  1210 + param.put("pt", sendRtpItem.getPt());
  1211 + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
  1212 + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
  1213 + param.put("is_udp", is_Udp);
  1214 + if (!sendRtpItem.isTcp()) {
  1215 + // udp模式下开启rtcp保活
  1216 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
  1217 + }
  1218 +
  1219 + if (mediaInfo == null) {
  1220 + RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
  1221 + sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(),
  1222 + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
  1223 + sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
  1224 + redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
  1225 + startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader);
  1226 + });
  1227 + } else {
  1228 + // 如果是非严格模式,需要关闭端口占用
  1229 + JSONObject startSendRtpStreamResult = null;
  1230 + if (sendRtpItem.getLocalPort() != 0) {
  1231 + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId());
  1232 + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
  1233 + if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) {
  1234 + if (sendRtpItem.isTcpActive()) {
  1235 + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
  1236 + }else {
  1237 + param.put("dst_url", sendRtpItem.getIp());
  1238 + param.put("dst_port", sendRtpItem.getPort());
  1239 + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
  1240 + }
  1241 + }
  1242 + }else {
  1243 + if (sendRtpItem.isTcpActive()) {
  1244 + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
  1245 + }else {
  1246 + param.put("dst_url", sendRtpItem.getIp());
  1247 + param.put("dst_port", sendRtpItem.getPort());
  1248 + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
  1249 + }
  1250 + }
  1251 + if (startSendRtpStreamResult != null) {
  1252 + startSendRtpStreamHand(sendRtpItem, platform, startSendRtpStreamResult, param, callIdHeader);
  1253 + }
  1254 + }
  1255 + }
  1256 +
  1257 + @Override
  1258 + public void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
  1259 + JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
  1260 + if (jsonObject == null) {
  1261 + logger.error("RTP推流失败: 请检查ZLM服务");
  1262 + } else if (jsonObject.getInteger("code") == 0) {
  1263 + logger.info("调用ZLM推流接口, 结果: {}", jsonObject);
  1264 + logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
  1265 + } else {
  1266 + logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param));
  1267 + if (sendRtpItem.isOnlyAudio()) {
  1268 + Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
  1269 + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
  1270 + if (audioBroadcastCatch != null) {
  1271 + try {
  1272 + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
  1273 + } catch (SipException | ParseException | InvalidArgumentException |
  1274 + SsrcTransactionNotFoundException e) {
  1275 + logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage());
  1276 + }
  1277 + }
  1278 + }else {
  1279 + // 向上级平台
  1280 + try {
  1281 + commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
  1282 + } catch (SipException | InvalidArgumentException | ParseException e) {
  1283 + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
  1284 + }
  1285 + }
  1286 + }
  1287 + }
1182 1288 }
... ...
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
... ... @@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
6 6 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
7 7 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
8 8 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
9   -import com.genersoft.iot.vmp.service.IGbStreamService;
10 9 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
11 10 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
12 11 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
... ... @@ -90,12 +89,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
90 89 @Autowired
91 90 private PlatformGbStreamMapper platformGbStreamMapper;
92 91  
93   - @Autowired
94   - private IGbStreamService gbStreamService;
95   -
96   - @Autowired
97   - private ParentPlatformMapper parentPlatformMapper;
98   -
99 92 /**
100 93 * 根据设备ID判断设备是否存在
101 94 *
... ...
src/main/resources/all-application.yml
... ... @@ -195,6 +195,8 @@ user-settings:
195 195 gb-send-stream-strict: false
196 196 # 设备上线时是否自动同步通道
197 197 sync-channel-on-device-online: false
  198 + # 收到ack消息后开始发流,默认false, 回复200ok后直接开始发流
  199 + push-stream-after-ack: false
198 200  
199 201 # 关闭在线文档(生产环境建议关闭)
200 202 springdoc:
... ...