Commit ba8fffd907b5c3a7acfd3348c80e557b816cec98

Authored by 648540858
1 parent 0592fd67

增加与第三方对接的接口

src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -153,6 +153,7 @@ public class VideoManagerConstants { @@ -153,6 +153,7 @@ public class VideoManagerConstants {
153 153
154 public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_"; 154 public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_";
155 public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_"; 155 public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_";
  156 + public static final String WVP_OTHER_SEND_RTP_INFO = "VMP_OTHER_SEND_RTP_INFO_";
156 157
157 /** 158 /**
158 * Redis Const 159 * Redis Const
src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java 0 → 100644
  1 +package com.genersoft.iot.vmp.vmanager.bean;
  2 +
  3 +public class OtherRtpSendInfo {
  4 +
  5 + /**
  6 + * 发流IP
  7 + */
  8 + private String ip;
  9 +
  10 + /**
  11 + * 发流端口
  12 + */
  13 + private int port;
  14 +
  15 + /**
  16 + * 收流IP
  17 + */
  18 + private String receiveIp;
  19 +
  20 + /**
  21 + * 收流端口
  22 + */
  23 + private int receivePort;
  24 +
  25 + /**
  26 + * 会话ID
  27 + */
  28 + private String callId;
  29 +
  30 + /**
  31 + * 流ID
  32 + */
  33 + private String stream;
  34 +
  35 + /**
  36 + * 推流应用名
  37 + */
  38 + private String pushApp;
  39 +
  40 + /**
  41 + * 推流流ID
  42 + */
  43 + private String pushStream;
  44 +
  45 + /**
  46 + * 推流SSRC
  47 + */
  48 + private String pushSSRC;
  49 +
  50 +
  51 +
  52 + public String getIp() {
  53 + return ip;
  54 + }
  55 +
  56 + public void setIp(String ip) {
  57 + this.ip = ip;
  58 + }
  59 +
  60 + public int getPort() {
  61 + return port;
  62 + }
  63 +
  64 + public void setPort(int port) {
  65 + this.port = port;
  66 + }
  67 +
  68 + public String getReceiveIp() {
  69 + return receiveIp;
  70 + }
  71 +
  72 + public void setReceiveIp(String receiveIp) {
  73 + this.receiveIp = receiveIp;
  74 + }
  75 +
  76 + public int getReceivePort() {
  77 + return receivePort;
  78 + }
  79 +
  80 + public void setReceivePort(int receivePort) {
  81 + this.receivePort = receivePort;
  82 + }
  83 +
  84 + public String getCallId() {
  85 + return callId;
  86 + }
  87 +
  88 + public void setCallId(String callId) {
  89 + this.callId = callId;
  90 + }
  91 +
  92 + public String getStream() {
  93 + return stream;
  94 + }
  95 +
  96 + public void setStream(String stream) {
  97 + this.stream = stream;
  98 + }
  99 +
  100 + public String getPushApp() {
  101 + return pushApp;
  102 + }
  103 +
  104 + public void setPushApp(String pushApp) {
  105 + this.pushApp = pushApp;
  106 + }
  107 +
  108 + public String getPushStream() {
  109 + return pushStream;
  110 + }
  111 +
  112 + public void setPushStream(String pushStream) {
  113 + this.pushStream = pushStream;
  114 + }
  115 +
  116 + public String getPushSSRC() {
  117 + return pushSSRC;
  118 + }
  119 +
  120 + public void setPushSSRC(String pushSSRC) {
  121 + this.pushSSRC = pushSSRC;
  122 + }
  123 +
  124 + @Override
  125 + public String toString() {
  126 + return "OtherRtpSendInfo{" +
  127 + "ip='" + ip + '\'' +
  128 + ", port=" + port +
  129 + ", receiveIp='" + receiveIp + '\'' +
  130 + ", receivePort=" + receivePort +
  131 + ", callId='" + callId + '\'' +
  132 + ", stream='" + stream + '\'' +
  133 + '}';
  134 + }
  135 +}
src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java
1 package com.genersoft.iot.vmp.vmanager.rtp; 1 package com.genersoft.iot.vmp.vmanager.rtp;
2 2
  3 +import com.alibaba.fastjson2.JSONObject;
  4 +import com.genersoft.iot.vmp.common.VideoManagerConstants;
  5 +import com.genersoft.iot.vmp.conf.DynamicTask;
3 import com.genersoft.iot.vmp.conf.SipConfig; 6 import com.genersoft.iot.vmp.conf.SipConfig;
4 import com.genersoft.iot.vmp.conf.UserSetting; 7 import com.genersoft.iot.vmp.conf.UserSetting;
5 import com.genersoft.iot.vmp.conf.VersionInfo; 8 import com.genersoft.iot.vmp.conf.VersionInfo;
6 import com.genersoft.iot.vmp.conf.exception.ControllerException; 9 import com.genersoft.iot.vmp.conf.exception.ControllerException;
7 -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; 10 +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
8 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; 11 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
  12 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
  13 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
9 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 14 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
10 -import com.genersoft.iot.vmp.service.*; 15 +import com.genersoft.iot.vmp.service.IDeviceChannelService;
  16 +import com.genersoft.iot.vmp.service.IDeviceService;
  17 +import com.genersoft.iot.vmp.service.IMediaServerService;
11 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 18 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
12 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; 19 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
  20 +import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
13 import io.swagger.v3.oas.annotations.Operation; 21 import io.swagger.v3.oas.annotations.Operation;
14 import io.swagger.v3.oas.annotations.Parameter; 22 import io.swagger.v3.oas.annotations.Parameter;
15 import io.swagger.v3.oas.annotations.tags.Tag; 23 import io.swagger.v3.oas.annotations.tags.Tag;
  24 +import okhttp3.OkHttpClient;
  25 +import okhttp3.Request;
  26 +import org.slf4j.Logger;
  27 +import org.slf4j.LoggerFactory;
16 import org.springframework.beans.factory.annotation.Autowired; 28 import org.springframework.beans.factory.annotation.Autowired;
17 import org.springframework.beans.factory.annotation.Value; 29 import org.springframework.beans.factory.annotation.Value;
  30 +import org.springframework.data.redis.core.RedisTemplate;
18 import org.springframework.web.bind.annotation.GetMapping; 31 import org.springframework.web.bind.annotation.GetMapping;
19 import org.springframework.web.bind.annotation.RequestMapping; 32 import org.springframework.web.bind.annotation.RequestMapping;
20 import org.springframework.web.bind.annotation.ResponseBody; 33 import org.springframework.web.bind.annotation.ResponseBody;
21 import org.springframework.web.bind.annotation.RestController; 34 import org.springframework.web.bind.annotation.RestController;
22 35
  36 +import java.io.IOException;
  37 +import java.util.HashMap;
  38 +import java.util.Map;
  39 +
23 @SuppressWarnings("rawtypes") 40 @SuppressWarnings("rawtypes")
24 @Tag(name = "第三方服务对接") 41 @Tag(name = "第三方服务对接")
25 42
@@ -27,8 +44,13 @@ import org.springframework.web.bind.annotation.RestController; @@ -27,8 +44,13 @@ import org.springframework.web.bind.annotation.RestController;
27 @RequestMapping("/api/rtp") 44 @RequestMapping("/api/rtp")
28 public class RtpController { 45 public class RtpController {
29 46
  47 + private final static Logger logger = LoggerFactory.getLogger(RtpController.class);
  48 +
  49 + @Autowired
  50 + private ZLMRTPServerFactory zlmServerFactory;
  51 +
30 @Autowired 52 @Autowired
31 - private ZlmHttpHookSubscribe zlmHttpHookSubscribe; 53 + private ZlmHttpHookSubscribe hookSubscribe;
32 54
33 @Autowired 55 @Autowired
34 private IMediaServerService mediaServerService; 56 private IMediaServerService mediaServerService;
@@ -49,11 +71,11 @@ public class RtpController { @@ -49,11 +71,11 @@ public class RtpController {
49 private IDeviceChannelService channelService; 71 private IDeviceChannelService channelService;
50 72
51 @Autowired 73 @Autowired
52 - private IStreamPushService pushService; 74 + private DynamicTask dynamicTask;
53 75
54 76
55 @Autowired 77 @Autowired
56 - private IStreamProxyService proxyService; 78 + private RedisTemplate<Object, Object> redisTemplate;
57 79
58 80
59 @Value("${server.port}") 81 @Value("${server.port}")
@@ -73,12 +95,76 @@ public class RtpController { @@ -73,12 +95,76 @@ public class RtpController {
73 @Parameter(name = "stream", description = "形成的流的ID", required = true) 95 @Parameter(name = "stream", description = "形成的流的ID", required = true)
74 @Parameter(name = "tcpMode", description = "收流模式, 0为UDP, 1为TCP被动", required = true) 96 @Parameter(name = "tcpMode", description = "收流模式, 0为UDP, 1为TCP被动", required = true)
75 @Parameter(name = "callBack", description = "回调地址,如果收流超时会通道回调通知,回调为get请求,参数为callId", required = true) 97 @Parameter(name = "callBack", description = "回调地址,如果收流超时会通道回调通知,回调为get请求,参数为callId", required = true)
76 - public SendRtpItem openRtpServer(Boolean isSend, String ssrc, String callId, String stream, Integer tcpMode, String callBack) {  
77 - MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); 98 + public OtherRtpSendInfo openRtpServer(Boolean isSend, String ssrc, String callId, String stream, Integer tcpMode, String callBack) {
  99 +
  100 + logger.info("[第三方服务对接->开启收流和获取发流信息] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}",
  101 + isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP被动", callBack);
  102 +
  103 + MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
78 if (mediaServerItem == null) { 104 if (mediaServerItem == null) {
79 throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer"); 105 throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer");
80 } 106 }
81 - return null; 107 + if (stream == null) {
  108 + throw new ControllerException(ErrorCode.ERROR100.getCode(),"stream参数不可为空");
  109 + }
  110 + if (isSend != null && isSend && callId == null) {
  111 + throw new ControllerException(ErrorCode.ERROR100.getCode(),"isSend为true时,CallID不能为空");
  112 + }
  113 + int ssrcInt = 0;
  114 + if (ssrc != null) {
  115 + try {
  116 + ssrcInt = Integer.parseInt(ssrc);
  117 + }catch (NumberFormatException e) {
  118 + throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc格式错误");
  119 + }
  120 +
  121 + }
  122 + int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode);
  123 + // 注册回调如果rtp收流超时则通过回调发送通知
  124 + if (callBack != null) {
  125 + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, mediaServerItem.getId());
  126 + // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
  127 + hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
  128 + (mediaServerItemInUse, response)->{
  129 + if (stream.equals(response.getString("stream_id"))) {
  130 + logger.info("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId);
  131 + OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
  132 + OkHttpClient client = httpClientBuilder.build();
  133 + String url = callBack + "?callId=" + callId;
  134 + Request request = new Request.Builder().get().url(url).build();
  135 + try {
  136 + client.newCall(request).execute();
  137 + } catch (IOException e) {
  138 + logger.error("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
  139 + }
  140 + }
  141 + });
  142 + }
  143 + OtherRtpSendInfo otherRtpSendInfo = new OtherRtpSendInfo();
  144 + otherRtpSendInfo.setReceiveIp(mediaServerItem.getSdpIp());
  145 + otherRtpSendInfo.setReceivePort(localPort);
  146 + otherRtpSendInfo.setCallId(callId);
  147 + otherRtpSendInfo.setStream(stream);
  148 + if (isSend != null && isSend) {
  149 + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
  150 + // 预创建发流信息
  151 + int port = zlmServerFactory.keepPort(mediaServerItem, callId, 0, ssrc1 -> {
  152 + return redisTemplate.opsForValue().get(key) != null;
  153 + });
  154 +
  155 + // 将信息写入redis中,以备后用
  156 + redisTemplate.opsForValue().set(key, otherRtpSendInfo);
  157 + // 设置超时任务,超时未使用,则自动移除,并关闭端口保持, 默认五分钟
  158 + dynamicTask.startDelay(key, ()->{
  159 + logger.info("[第三方服务对接->开启收流和获取发流信息] 端口保持超时 callId->{}", callId);
  160 + redisTemplate.delete(key);
  161 + zlmServerFactory.releasePort(mediaServerItem, callId);
  162 + }, 300000);
  163 + otherRtpSendInfo.setIp(mediaServerItem.getSdpIp());
  164 + otherRtpSendInfo.setPort(port);
  165 + logger.info("[开启收流和获取发流信息] 结果,callId->{}, {}", callId, otherRtpSendInfo);
  166 + }
  167 + return otherRtpSendInfo;
82 } 168 }
83 169
84 @GetMapping(value = "/receive/close") 170 @GetMapping(value = "/receive/close")
@@ -86,7 +172,9 @@ public class RtpController { @@ -86,7 +172,9 @@ public class RtpController {
86 @Operation(summary = "关闭收流") 172 @Operation(summary = "关闭收流")
87 @Parameter(name = "stream", description = "流的ID", required = true) 173 @Parameter(name = "stream", description = "流的ID", required = true)
88 public void closeRtpServer(String stream) { 174 public void closeRtpServer(String stream) {
89 - 175 + logger.info("[第三方服务对接->关闭收流] stream->{}", stream);
  176 + MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
  177 + zlmServerFactory.closeRtpServer(mediaServerItem,stream);
90 } 178 }
91 179
92 @GetMapping(value = "/send/start") 180 @GetMapping(value = "/send/start")
@@ -99,9 +187,46 @@ public class RtpController { @@ -99,9 +187,46 @@ public class RtpController {
99 @Parameter(name = "stream", description = "待发送流Id", required = true) 187 @Parameter(name = "stream", description = "待发送流Id", required = true)
100 @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true) 188 @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true)
101 @Parameter(name = "onlyAudio", description = "是否只有音频", required = true) 189 @Parameter(name = "onlyAudio", description = "是否只有音频", required = true)
  190 + @Parameter(name = "isUdp", description = "是否为UDP", required = true)
102 @Parameter(name = "streamType", description = "流类型,1为es流,2为ps流, 默认es流", required = false) 191 @Parameter(name = "streamType", description = "流类型,1为es流,2为ps流, 默认es流", required = false)
103 - public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Integer streamType) { 192 + public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, Integer streamType) {
  193 + logger.info("[第三方服务对接->发送流] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}",
  194 + ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS");
  195 + MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
  196 + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
  197 + OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
  198 + if (sendInfo != null) {
  199 + zlmServerFactory.releasePort(mediaServerItem, sendInfo.getCallId());
  200 + }else {
  201 + sendInfo = new OtherRtpSendInfo();
  202 + }
  203 + sendInfo.setPushApp(app);
  204 + sendInfo.setPushStream(stream);
  205 + sendInfo.setPushSSRC(ssrc);
  206 +
  207 + Map<String, Object> param = new HashMap<>(12);
  208 + param.put("vhost","__defaultVhost__");
  209 + param.put("app",app);
  210 + param.put("stream",stream);
  211 + param.put("ssrc", ssrc);
104 212
  213 + param.put("dst_url",ip);
  214 + param.put("dst_port", port);
  215 + String is_Udp = isUdp ? "1" : "0";
  216 + param.put("is_udp", is_Udp);
  217 + param.put("src_port", sendInfo.getPort());
  218 + param.put("use_ps", streamType==2 ? "1" : "0");
  219 + param.put("only_audio", onlyAudio ? "1" : "0");
  220 +
  221 + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
  222 + if (jsonObject.getInteger("code") == 0) {
  223 + logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId);
  224 + redisTemplate.opsForValue().set(key, sendInfo);
  225 + }else {
  226 + redisTemplate.delete(key);
  227 + logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg"));
  228 + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg"));
  229 + }
105 } 230 }
106 231
107 232
@@ -111,7 +236,25 @@ public class RtpController { @@ -111,7 +236,25 @@ public class RtpController {
111 @Operation(summary = "关闭发送流") 236 @Operation(summary = "关闭发送流")
112 @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true) 237 @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true)
113 public void closeSendRTP(String callId) { 238 public void closeSendRTP(String callId) {
114 - 239 + logger.info("[第三方服务对接->关闭发送流] callId->{}", callId);
  240 + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
  241 + OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
  242 + if (sendInfo == null){
  243 + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未开启发流");
  244 + }
  245 + Map<String, Object> param = new HashMap<>();
  246 + param.put("vhost","__defaultVhost__");
  247 + param.put("app",sendInfo.getPushApp());
  248 + param.put("stream",sendInfo.getPushStream());
  249 + param.put("ssrc",sendInfo.getPushSSRC());
  250 + MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
  251 + Boolean result = zlmServerFactory.stopSendRtpStream(mediaServerItem, param);
  252 + if (!result) {
  253 + logger.info("[第三方服务对接->关闭发送流] 失败 callId->{}", callId);
  254 + throw new ControllerException(ErrorCode.ERROR100.getCode(), "停止发流失败");
  255 + }else {
  256 + logger.info("[第三方服务对接->关闭发送流] 成功 callId->{}", callId);
  257 + }
115 } 258 }
116 259
117 } 260 }