Commit 3e2486d0abd2063afa9f8fab93dc60db774298af

Authored by 648540858
2 parents 0aada74b 874a5738

Merge branch '2.6.8' into wvp-28181-2.0

src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
... ... @@ -158,7 +158,9 @@ public class VideoManagerConstants {
158 158 public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_";
159 159 public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_";
160 160 public static final String WVP_OTHER_SEND_RTP_INFO = "VMP_OTHER_SEND_RTP_INFO_";
  161 + public static final String WVP_OTHER_SEND_PS_INFO = "VMP_OTHER_SEND_PS_INFO_";
161 162 public static final String WVP_OTHER_RECEIVE_RTP_INFO = "VMP_OTHER_RECEIVE_RTP_INFO_";
  163 + public static final String WVP_OTHER_RECEIVE_PS_INFO = "VMP_OTHER_RECEIVE_PS_INFO_";
162 164  
163 165 /**
164 166 * Redis Const
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
... ... @@ -298,7 +298,10 @@ public class ZLMHttpHookListener {
298 298 if (param.getApp().equalsIgnoreCase("rtp")) {
299 299 String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + param.getStream();
300 300 OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(receiveKey);
301   - if (otherRtpSendInfo != null) {
  301 +
  302 + String receiveKeyForPS = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + param.getStream();
  303 + OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(receiveKeyForPS);
  304 + if (otherRtpSendInfo != null || otherPsSendInfo != null) {
302 305 result.setEnable_mp4(true);
303 306 }
304 307 }
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherPsSendInfo.java 0 → 100644
  1 +package com.genersoft.iot.vmp.vmanager.bean;
  2 +
  3 +public class OtherPsSendInfo {
  4 +
  5 + /**
  6 + * 发流IP
  7 + */
  8 + private String sendLocalIp;
  9 +
  10 + /**
  11 + * 发流端口
  12 + */
  13 + private int sendLocalPort;
  14 +
  15 + /**
  16 + * 收流IP
  17 + */
  18 + private String receiveIp;
  19 +
  20 + /**
  21 + * 收流端口
  22 + */
  23 + private int receivePort;
  24 +
  25 +
  26 + /**
  27 + * 会话ID
  28 + */
  29 + private String callId;
  30 +
  31 + /**
  32 + * 流ID
  33 + */
  34 + private String stream;
  35 +
  36 + /**
  37 + * 推流应用名
  38 + */
  39 + private String pushApp;
  40 +
  41 + /**
  42 + * 推流流ID
  43 + */
  44 + private String pushStream;
  45 +
  46 + /**
  47 + * 推流SSRC
  48 + */
  49 + private String pushSSRC;
  50 +
  51 + public String getSendLocalIp() {
  52 + return sendLocalIp;
  53 + }
  54 +
  55 + public void setSendLocalIp(String sendLocalIp) {
  56 + this.sendLocalIp = sendLocalIp;
  57 + }
  58 +
  59 + public int getSendLocalPort() {
  60 + return sendLocalPort;
  61 + }
  62 +
  63 + public void setSendLocalPort(int sendLocalPort) {
  64 + this.sendLocalPort = sendLocalPort;
  65 + }
  66 +
  67 + public String getReceiveIp() {
  68 + return receiveIp;
  69 + }
  70 +
  71 + public void setReceiveIp(String receiveIp) {
  72 + this.receiveIp = receiveIp;
  73 + }
  74 +
  75 + public int getReceivePort() {
  76 + return receivePort;
  77 + }
  78 +
  79 + public void setReceivePort(int receivePort) {
  80 + this.receivePort = receivePort;
  81 + }
  82 +
  83 + public String getCallId() {
  84 + return callId;
  85 + }
  86 +
  87 + public void setCallId(String callId) {
  88 + this.callId = callId;
  89 + }
  90 +
  91 + public String getStream() {
  92 + return stream;
  93 + }
  94 +
  95 + public void setStream(String stream) {
  96 + this.stream = stream;
  97 + }
  98 +
  99 + public String getPushApp() {
  100 + return pushApp;
  101 + }
  102 +
  103 + public void setPushApp(String pushApp) {
  104 + this.pushApp = pushApp;
  105 + }
  106 +
  107 + public String getPushStream() {
  108 + return pushStream;
  109 + }
  110 +
  111 + public void setPushStream(String pushStream) {
  112 + this.pushStream = pushStream;
  113 + }
  114 +
  115 + public String getPushSSRC() {
  116 + return pushSSRC;
  117 + }
  118 +
  119 + public void setPushSSRC(String pushSSRC) {
  120 + this.pushSSRC = pushSSRC;
  121 + }
  122 +
  123 + @Override
  124 + public String toString() {
  125 + return "OtherPsSendInfo{" +
  126 + "sendLocalIp='" + sendLocalIp + '\'' +
  127 + ", sendLocalPort=" + sendLocalPort +
  128 + ", receiveIp='" + receiveIp + '\'' +
  129 + ", receivePort=" + receivePort +
  130 + ", callId='" + callId + '\'' +
  131 + ", stream='" + stream + '\'' +
  132 + ", pushApp='" + pushApp + '\'' +
  133 + ", pushStream='" + pushStream + '\'' +
  134 + ", pushSSRC='" + pushSSRC + '\'' +
  135 + '}';
  136 + }
  137 +}
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java 0 → 100644
  1 +package com.genersoft.iot.vmp.vmanager.ps;
  2 +
  3 +import com.alibaba.fastjson2.JSONObject;
  4 +import com.genersoft.iot.vmp.common.VideoManagerConstants;
  5 +import com.genersoft.iot.vmp.conf.DynamicTask;
  6 +import com.genersoft.iot.vmp.conf.UserSetting;
  7 +import com.genersoft.iot.vmp.conf.exception.ControllerException;
  8 +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
  9 +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
  10 +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
  11 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
  12 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
  13 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
  14 +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  15 +import com.genersoft.iot.vmp.service.IMediaServerService;
  16 +import com.genersoft.iot.vmp.utils.redis.RedisUtil;
  17 +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
  18 +import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo;
  19 +import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
  20 +import io.swagger.v3.oas.annotations.Operation;
  21 +import io.swagger.v3.oas.annotations.Parameter;
  22 +import io.swagger.v3.oas.annotations.tags.Tag;
  23 +import okhttp3.OkHttpClient;
  24 +import okhttp3.Request;
  25 +import org.slf4j.Logger;
  26 +import org.slf4j.LoggerFactory;
  27 +import org.springframework.beans.factory.annotation.Autowired;
  28 +import org.springframework.data.redis.core.RedisTemplate;
  29 +import org.springframework.web.bind.annotation.*;
  30 +
  31 +import java.io.IOException;
  32 +import java.util.HashMap;
  33 +import java.util.List;
  34 +import java.util.Map;
  35 +import java.util.UUID;
  36 +import java.util.concurrent.TimeUnit;
  37 +
  38 +@SuppressWarnings("rawtypes")
  39 +@Tag(name = "第三方PS服务对接")
  40 +
  41 +@RestController
  42 +@RequestMapping("/api/ps")
  43 +public class PsController {
  44 +
  45 + private final static Logger logger = LoggerFactory.getLogger(PsController.class);
  46 +
  47 + @Autowired
  48 + private ZLMRTPServerFactory zlmServerFactory;
  49 +
  50 + @Autowired
  51 + private ZlmHttpHookSubscribe hookSubscribe;
  52 +
  53 + @Autowired
  54 + private IMediaServerService mediaServerService;
  55 +
  56 + @Autowired
  57 + private SendRtpPortManager sendRtpPortManager;
  58 +
  59 + @Autowired
  60 + private UserSetting userSetting;
  61 +
  62 + @Autowired
  63 + private DynamicTask dynamicTask;
  64 +
  65 +
  66 + @Autowired
  67 + private RedisTemplate<Object, Object> redisTemplate;
  68 +
  69 +
  70 + @GetMapping(value = "/receive/open")
  71 + @ResponseBody
  72 + @Operation(summary = "开启收流和获取发流信息")
  73 + @Parameter(name = "isSend", description = "是否发送,false时只开启收流, true同时返回推流信息", required = true)
  74 + @Parameter(name = "callId", description = "整个过程的唯一标识,为了与后续接口关联", required = true)
  75 + @Parameter(name = "ssrc", description = "来源流的SSRC,不传则不校验来源ssrc", required = false)
  76 + @Parameter(name = "stream", description = "形成的流的ID", required = true)
  77 + @Parameter(name = "tcpMode", description = "收流模式, 0为UDP, 1为TCP被动", required = true)
  78 + @Parameter(name = "callBack", description = "回调地址,如果收流超时会通道回调通知,回调为get请求,参数为callId", required = true)
  79 + public OtherPsSendInfo openRtpServer(Boolean isSend, @RequestParam(required = false)String ssrc, String callId, String stream, Integer tcpMode, String callBack) {
  80 +
  81 + logger.info("[第三方PS服务对接->开启收流和获取发流信息] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}",
  82 + isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP被动", callBack);
  83 +
  84 + MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
  85 + if (mediaServerItem == null) {
  86 + throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer");
  87 + }
  88 + if (stream == null) {
  89 + throw new ControllerException(ErrorCode.ERROR100.getCode(),"stream参数不可为空");
  90 + }
  91 + if (isSend != null && isSend && callId == null) {
  92 + throw new ControllerException(ErrorCode.ERROR100.getCode(),"isSend为true时,CallID不能为空");
  93 + }
  94 + int ssrcInt = 0;
  95 + if (ssrc != null) {
  96 + try {
  97 + ssrcInt = Integer.parseInt(ssrc);
  98 + }catch (NumberFormatException e) {
  99 + throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc格式错误");
  100 + }
  101 + }
  102 + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + callId + "_" + stream;
  103 + int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode);
  104 + if (localPort == 0) {
  105 + throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败");
  106 + }
  107 + // 注册回调如果rtp收流超时则通过回调发送通知
  108 + if (callBack != null) {
  109 + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId());
  110 + // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
  111 + hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
  112 + (mediaServerItemInUse, response)->{
  113 + if (stream.equals(response.getString("stream_id"))) {
  114 + logger.info("[第三方PS服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId);
  115 + // 将信息写入redis中,以备后用
  116 + redisTemplate.delete(receiveKey);
  117 + OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
  118 + OkHttpClient client = httpClientBuilder.build();
  119 + String url = callBack + "?callId=" + callId;
  120 + Request request = new Request.Builder().get().url(url).build();
  121 + try {
  122 + client.newCall(request).execute();
  123 + } catch (IOException e) {
  124 + logger.error("[第三方PS服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
  125 + }
  126 + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
  127 + }
  128 + });
  129 + }
  130 + OtherPsSendInfo otherPsSendInfo = new OtherPsSendInfo();
  131 + otherPsSendInfo.setReceiveIp(mediaServerItem.getSdpIp());
  132 + otherPsSendInfo.setReceivePort(localPort);
  133 + otherPsSendInfo.setCallId(callId);
  134 + otherPsSendInfo.setStream(stream);
  135 +
  136 + // 将信息写入redis中,以备后用
  137 + redisTemplate.opsForValue().set(receiveKey, otherPsSendInfo);
  138 + if (isSend != null && isSend) {
  139 + String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId;
  140 + // 预创建发流信息
  141 + int port = sendRtpPortManager.getNextPort(mediaServerItem);
  142 +
  143 + otherPsSendInfo.setSendLocalIp(mediaServerItem.getSdpIp());
  144 + otherPsSendInfo.setSendLocalPort(port);
  145 + // 将信息写入redis中,以备后用
  146 + redisTemplate.opsForValue().set(key, otherPsSendInfo, 300, TimeUnit.SECONDS);
  147 + logger.info("[第三方PS服务对接->开启收流和获取发流信息] 结果,callId->{}, {}", callId, otherPsSendInfo);
  148 + }
  149 + return otherPsSendInfo;
  150 + }
  151 +
  152 + @GetMapping(value = "/receive/close")
  153 + @ResponseBody
  154 + @Operation(summary = "关闭收流")
  155 + @Parameter(name = "stream", description = "流的ID", required = true)
  156 + public void closeRtpServer(String stream) {
  157 + logger.info("[第三方PS服务对接->关闭收流] stream->{}", stream);
  158 + MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
  159 + zlmServerFactory.closeRtpServer(mediaServerItem,stream);
  160 + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_*_" + stream;
  161 + List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey);
  162 + if (scan.size() > 0) {
  163 + for (Object key : scan) {
  164 + // 将信息写入redis中,以备后用
  165 + redisTemplate.delete(key);
  166 + }
  167 + }
  168 + }
  169 +
  170 + @GetMapping(value = "/send/start")
  171 + @ResponseBody
  172 + @Operation(summary = "发送流")
  173 + @Parameter(name = "ssrc", description = "发送流的SSRC", required = true)
  174 + @Parameter(name = "dstIp", description = "目标收流IP", required = true)
  175 + @Parameter(name = "dstPort", description = "目标收流端口", required = true)
  176 + @Parameter(name = "app", description = "待发送应用名", required = true)
  177 + @Parameter(name = "stream", description = "待发送流Id", required = true)
  178 + @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true)
  179 + @Parameter(name = "isUdp", description = "是否为UDP", required = true)
  180 + public void sendRTP(String ssrc,
  181 + String dstIp,
  182 + Integer dstPort,
  183 + String app,
  184 + String stream,
  185 + String callId,
  186 + Boolean isUdp
  187 + ) {
  188 + logger.info("[第三方PS服务对接->发送流] " +
  189 + "ssrc->{}, \r\n" +
  190 + "dstIp->{}, \n" +
  191 + "dstPort->{}, \n" +
  192 + "app->{}, \n" +
  193 + "stream->{}, \n" +
  194 + "callId->{} \n",
  195 + ssrc,
  196 + dstIp,
  197 + dstPort,
  198 + app,
  199 + stream,
  200 + callId);
  201 + MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
  202 + String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId;
  203 + OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
  204 + if (sendInfo == null) {
  205 + sendInfo = new OtherRtpSendInfo();
  206 + }
  207 + sendInfo.setPushApp(app);
  208 + sendInfo.setPushStream(stream);
  209 + sendInfo.setPushSSRC(ssrc);
  210 +
  211 + Map<String, Object> param;
  212 +
  213 +
  214 + param = new HashMap<>();
  215 + param.put("vhost","__defaultVhost__");
  216 + param.put("app",app);
  217 + param.put("stream",stream);
  218 + param.put("ssrc", ssrc);
  219 +
  220 + param.put("dst_url", dstIp);
  221 + param.put("dst_port", dstPort);
  222 + String is_Udp = isUdp ? "1" : "0";
  223 + param.put("is_udp", is_Udp);
  224 + param.put("src_port", sendInfo.getSendLocalPortForAudio());
  225 + param.put("use_ps", "0");
  226 + param.put("only_audio", "1");
  227 +
  228 +
  229 + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream);
  230 + if (streamReady) {
  231 + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
  232 + if (jsonObject.getInteger("code") == 0) {
  233 + logger.info("[第三方PS服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, param);
  234 + redisTemplate.opsForValue().set(key, sendInfo);
  235 + }else {
  236 + redisTemplate.delete(key);
  237 + logger.info("[第三方PS服务对接->发送流] 视频流发流失败,callId->{}, {}", callId, jsonObject.getString("msg"));
  238 + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[视频流发流失败] " + jsonObject.getString("msg"));
  239 + }
  240 + }else {
  241 + logger.info("[第三方PS服务对接->发送流] 流不存在,等待流上线,callId->{}", callId);
  242 + String uuid = UUID.randomUUID().toString();
  243 + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId());
  244 + dynamicTask.startDelay(uuid, ()->{
  245 + logger.info("[第三方PS服务对接->发送流] 等待流上线超时 callId->{}", callId);
  246 + redisTemplate.delete(key);
  247 + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
  248 + }, 10000);
  249 +
  250 + // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
  251 + OtherRtpSendInfo finalSendInfo = sendInfo;
  252 + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
  253 + hookSubscribe.addSubscribe(hookSubscribeForStreamChange,
  254 + (mediaServerItemInUse, response)->{
  255 + dynamicTask.stop(uuid);
  256 + logger.info("[第三方PS服务对接->发送流] 流上线,开始发流 callId->{}", callId);
  257 + try {
  258 + Thread.sleep(400);
  259 + } catch (InterruptedException e) {
  260 + throw new RuntimeException(e);
  261 + }
  262 + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
  263 + if (jsonObject.getInteger("code") == 0) {
  264 + logger.info("[第三方PS服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, param);
  265 + redisTemplate.opsForValue().set(key, finalSendInfo);
  266 + }else {
  267 + redisTemplate.delete(key);
  268 + logger.info("[第三方PS服务对接->发送流] 视频流发流失败,callId->{}, {}", callId, jsonObject.getString("msg"));
  269 + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[视频流发流失败] " + jsonObject.getString("msg"));
  270 + }
  271 + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
  272 + });
  273 + }
  274 + }
  275 +
  276 + @GetMapping(value = "/send/stop")
  277 + @ResponseBody
  278 + @Operation(summary = "关闭发送流")
  279 + @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true)
  280 + public void closeSendRTP(String callId) {
  281 + logger.info("[第三方PS服务对接->关闭发送流] callId->{}", callId);
  282 + String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId;
  283 + OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
  284 + if (sendInfo == null){
  285 + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未开启发流");
  286 + }
  287 + Map<String, Object> param = new HashMap<>();
  288 + param.put("vhost","__defaultVhost__");
  289 + param.put("app",sendInfo.getPushApp());
  290 + param.put("stream",sendInfo.getPushStream());
  291 + param.put("ssrc",sendInfo.getPushSSRC());
  292 + MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
  293 + Boolean result = zlmServerFactory.stopSendRtpStream(mediaServerItem, param);
  294 + if (!result) {
  295 + logger.info("[第三方PS服务对接->关闭发送流] 失败 callId->{}", callId);
  296 + throw new ControllerException(ErrorCode.ERROR100.getCode(), "停止发流失败");
  297 + }else {
  298 + logger.info("[第三方PS服务对接->关闭发送流] 成功 callId->{}", callId);
  299 + }
  300 + redisTemplate.delete(key);
  301 + }
  302 +
  303 +}
... ...