Commit 885842249fb6b264b0abf78668872d04bdc179ce

Authored by 648540858
1 parent 0a3d25fb

优化第三方对接接口

src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
... ... @@ -154,6 +154,7 @@ public class VideoManagerConstants {
154 154 public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_";
155 155 public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_";
156 156 public static final String WVP_OTHER_SEND_RTP_INFO = "VMP_OTHER_SEND_RTP_INFO_";
  157 + public static final String WVP_OTHER_RECEIVE_RTP_INFO = "VMP_OTHER_RECEIVE_RTP_INFO_";
157 158  
158 159 /**
159 160 * Redis Const
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
... ... @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.media.zlm;
3 3 import com.alibaba.fastjson2.JSON;
4 4 import com.alibaba.fastjson2.JSONObject;
5 5 import com.genersoft.iot.vmp.common.StreamInfo;
  6 +import com.genersoft.iot.vmp.common.VideoManagerConstants;
6 7 import com.genersoft.iot.vmp.conf.UserSetting;
7 8 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
8 9 import com.genersoft.iot.vmp.gb28181.bean.*;
... ... @@ -22,14 +23,13 @@ import com.genersoft.iot.vmp.service.*;
22 23 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
23 24 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
24 25 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
25   -import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;
26   -import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
27   -import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
28   -import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
  26 +import com.genersoft.iot.vmp.utils.redis.RedisUtil;
  27 +import com.genersoft.iot.vmp.vmanager.bean.*;
29 28 import org.slf4j.Logger;
30 29 import org.slf4j.LoggerFactory;
31 30 import org.springframework.beans.factory.annotation.Autowired;
32 31 import org.springframework.beans.factory.annotation.Qualifier;
  32 +import org.springframework.data.redis.core.RedisTemplate;
33 33 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
34 34 import org.springframework.util.ObjectUtils;
35 35 import org.springframework.web.bind.annotation.*;
... ... @@ -106,6 +106,9 @@ public class ZLMHttpHookListener {
106 106 @Autowired
107 107 private AssistRESTfulUtils assistRESTfulUtils;
108 108  
  109 + @Autowired
  110 + private RedisTemplate<Object, Object> redisTemplate;
  111 +
109 112 @Qualifier("taskExecutor")
110 113 @Autowired
111 114 private ThreadPoolTaskExecutor taskExecutor;
... ... @@ -255,6 +258,21 @@ public class ZLMHttpHookListener {
255 258 result.setEnable_mp4(true);
256 259 }
257 260 }
  261 +
  262 + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "*";
  263 + // 将信息写入redis中,以备后用
  264 + List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey);
  265 + if (scan.size()>0) {
  266 + for (Object o : scan) {
  267 + String key = (String) o;
  268 + OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
  269 + if (otherRtpSendInfo != null && otherRtpSendInfo.getStream().equalsIgnoreCase(param.getStream())) {
  270 + result.setEnable_audio(true);
  271 + result.setEnable_mp4(true);
  272 + }
  273 + }
  274 + }
  275 +
258 276 if (mediaInfo.getRecordAssistPort() > 0 && userSetting.getRecordPath() == null) {
259 277 logger.info("推流时发现尚未设置录像路径,从assist服务中读取");
260 278 JSONObject info = assistRESTfulUtils.getInfo(mediaInfo, null);
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java
... ... @@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
11 11 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
12 12 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
13 13 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
  14 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
14 15 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
15 16 import com.genersoft.iot.vmp.service.IDeviceChannelService;
16 17 import com.genersoft.iot.vmp.service.IDeviceService;
... ... @@ -34,6 +35,7 @@ import org.springframework.web.bind.annotation.*;
34 35 import java.io.IOException;
35 36 import java.util.HashMap;
36 37 import java.util.Map;
  38 +import java.util.UUID;
37 39  
38 40 @SuppressWarnings("rawtypes")
39 41 @Tag(name = "第三方服务对接")
... ... @@ -120,12 +122,12 @@ public class RtpController {
120 122 int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode);
121 123 // 注册回调如果rtp收流超时则通过回调发送通知
122 124 if (callBack != null) {
123   - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, mediaServerItem.getId());
  125 + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId());
124 126 // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
125 127 hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
126 128 (mediaServerItemInUse, response)->{
127 129 if (stream.equals(response.getString("stream_id"))) {
128   - logger.info("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId);
  130 + logger.info("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId);
129 131 OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
130 132 OkHttpClient client = httpClientBuilder.build();
131 133 String url = callBack + "?callId=" + callId;
... ... @@ -133,7 +135,7 @@ public class RtpController {
133 135 try {
134 136 client.newCall(request).execute();
135 137 } catch (IOException e) {
136   - logger.error("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
  138 + logger.error("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
137 139 }
138 140 }
139 141 });
... ... @@ -143,6 +145,9 @@ public class RtpController {
143 145 otherRtpSendInfo.setReceivePort(localPort);
144 146 otherRtpSendInfo.setCallId(callId);
145 147 otherRtpSendInfo.setStream(stream);
  148 + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + stream;
  149 + // 将信息写入redis中,以备后用
  150 + redisTemplate.opsForValue().set(receiveKey, otherRtpSendInfo);
146 151 if (isSend != null && isSend) {
147 152 String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
148 153 // 预创建发流信息
... ... @@ -160,7 +165,7 @@ public class RtpController {
160 165 }, 15000);
161 166 otherRtpSendInfo.setIp(mediaServerItem.getSdpIp());
162 167 otherRtpSendInfo.setPort(port);
163   - logger.info("[开启收流和获取发流信息] 结果,callId->{}, {}", callId, otherRtpSendInfo);
  168 + logger.info("[第三方服务对接->开启收流和获取发流信息] 结果,callId->{}, {}", callId, otherRtpSendInfo);
164 169 }
165 170 return otherRtpSendInfo;
166 171 }
... ... @@ -173,6 +178,9 @@ public class RtpController {
173 178 logger.info("[第三方服务对接->关闭收流] stream->{}", stream);
174 179 MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
175 180 zlmServerFactory.closeRtpServer(mediaServerItem,stream);
  181 + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + stream;
  182 + // 将信息写入redis中,以备后用
  183 + redisTemplate.delete(receiveKey);
176 184 }
177 185  
178 186 @GetMapping(value = "/send/start")
... ... @@ -187,9 +195,10 @@ public class RtpController {
187 195 @Parameter(name = "onlyAudio", description = "是否只有音频", required = true)
188 196 @Parameter(name = "isUdp", description = "是否为UDP", required = true)
189 197 @Parameter(name = "streamType", description = "流类型,1为es流,2为ps流, 默认es流", required = false)
190   - public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, @RequestParam(required = false)Integer streamType) {
191   - logger.info("[第三方服务对接->发送流] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}",
192   - ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS");
  198 + @Parameter(name = "pt", description = "rtp的pt", required = true)
  199 + public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, @RequestParam(required = false)Integer streamType, Integer pt) {
  200 + logger.info("[第三方服务对接->发送流] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}, pt->{}",
  201 + ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS", pt);
193 202 if (ObjectUtils.isEmpty(streamType)) {
194 203 streamType = 1;
195 204 }
... ... @@ -197,7 +206,7 @@ public class RtpController {
197 206 String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
198 207 OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
199 208 if (sendInfo != null) {
200   - zlmServerFactory.releasePort(mediaServerItem, sendInfo.getCallId());
  209 + zlmServerFactory.releasePort(mediaServerItem, callId);
201 210 }else {
202 211 sendInfo = new OtherRtpSendInfo();
203 212 }
... ... @@ -218,20 +227,52 @@ public class RtpController {
218 227 param.put("src_port", sendInfo.getPort());
219 228 param.put("use_ps", streamType==2 ? "1" : "0");
220 229 param.put("only_audio", onlyAudio ? "1" : "0");
221   -
222   - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
223   - if (jsonObject.getInteger("code") == 0) {
224   - logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId);
225   - redisTemplate.opsForValue().set(key, sendInfo);
  230 + param.put("pt", pt);
  231 +
  232 + dynamicTask.stop(key);
  233 + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream);
  234 + if (streamReady) {
  235 + logger.info("[第三方服务对接->发送流] 流存在,开始发流,callId->{}", callId);
  236 + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
  237 + if (jsonObject.getInteger("code") == 0) {
  238 + logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId);
  239 + redisTemplate.opsForValue().set(key, sendInfo);
  240 + }else {
  241 + redisTemplate.delete(key);
  242 + logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg"));
  243 + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg"));
  244 + }
226 245 }else {
227   - redisTemplate.delete(key);
228   - logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg"));
229   - throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg"));
  246 + logger.info("[第三方服务对接->发送流] 流不存在,等待流上线,callId->{}", callId);
  247 + String uuid = UUID.randomUUID().toString();
  248 + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId());
  249 + dynamicTask.startDelay(uuid, ()->{
  250 + logger.info("[第三方服务对接->发送流] 等待流上线超时 callId->{}", callId);
  251 + redisTemplate.delete(key);
  252 + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
  253 + }, 10000);
  254 +
  255 + // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
  256 + OtherRtpSendInfo finalSendInfo = sendInfo;
  257 + hookSubscribe.addSubscribe(hookSubscribeForStreamChange,
  258 + (mediaServerItemInUse, response)->{
  259 + dynamicTask.stop(uuid);
  260 + logger.info("[第三方服务对接->发送流] 流上线,开始发流 callId->{}", callId);
  261 + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
  262 + System.out.println("========发流结果==========");
  263 + System.out.println(jsonObject);
  264 + if (jsonObject.getInteger("code") == 0) {
  265 + logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId);
  266 + redisTemplate.opsForValue().set(key, finalSendInfo);
  267 + }else {
  268 + redisTemplate.delete(key);
  269 + logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg"));
  270 + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg"));
  271 + }
  272 + });
230 273 }
231 274 }
232 275  
233   -
234   -
235 276 @GetMapping(value = "/send/stop")
236 277 @ResponseBody
237 278 @Operation(summary = "关闭发送流")
... ...