Commit b7f2a6b25ba8bab01d7e10d5556702be8af37b80

Authored by 648540858
2 parents db3240d9 97b04c4d

Merge branch '2.6.8' into wvp-28181-2.0

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
src/main/java/com/genersoft/iot/vmp/common/GeneralCallback.java 0 → 100644
  1 +package com.genersoft.iot.vmp.common;
  2 +
  3 +public interface GeneralCallback<T>{
  4 + void run(int code, String msg, T data);
  5 +}
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java
... ... @@ -121,16 +121,4 @@ public class SSRCFactory {
121 121 return redisTemplate.opsForSet().members(redisKey) != null;
122 122 }
123 123  
124   - /**
125   - * 查询ssrc是否可用
126   - *
127   - * @param mediaServerId
128   - * @param ssrc
129   - * @return
130   - */
131   - public boolean checkSsrc(String mediaServerId, String ssrc) {
132   - String sn = ssrc.substring(1);
133   - String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId;
134   - return redisTemplate.opsForSet().isMember(redisKey, sn) != null;
135   - }
136 124 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java
... ... @@ -39,6 +39,8 @@ public class DeferredResultHolder {
39 39  
40 40 public static final String CALLBACK_CMD_DOWNLOAD = "CALLBACK_DOWNLOAD";
41 41  
  42 + public static final String CALLBACK_CMD_PROXY = "CALLBACK_PROXY";
  43 +
42 44 public static final String CALLBACK_CMD_STOP = "CALLBACK_STOP";
43 45  
44 46 public static final String UPLOAD_FILE_CHANNEL = "UPLOAD_FILE_CHANNEL";
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
... ... @@ -452,7 +452,7 @@ public class SIPCommander implements ISIPCommander {
452 452 }
453 453 subscribe.removeSubscribe(hookSubscribe);
454 454 });
455   - Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc());
  455 + Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc());
456 456  
457 457 sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> {
458 458 ResponseEvent responseEvent = (ResponseEvent) event.event;
... ... @@ -568,7 +568,10 @@ public class SIPCommander implements ISIPCommander {
568 568 });
569 569 });
570 570  
571   - Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc());
  571 + Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc());
  572 + if (inviteStreamCallback != null) {
  573 + inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null,callId, "rtp", ssrcInfo.getStream()));
  574 + }
572 575  
573 576 sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> {
574 577 ResponseEvent responseEvent = (ResponseEvent) event.event;
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
... ... @@ -6,7 +6,9 @@ import com.alibaba.fastjson2.JSONObject;
6 6 import com.genersoft.iot.vmp.common.CommonCallback;
7 7 import com.genersoft.iot.vmp.conf.UserSetting;
8 8 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
9   -import com.genersoft.iot.vmp.media.zlm.dto.*;
  9 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
  10 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
  11 +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
10 12 import org.slf4j.Logger;
11 13 import org.slf4j.LoggerFactory;
12 14 import org.springframework.beans.factory.annotation.Autowired;
... ...
src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java
1 1 package com.genersoft.iot.vmp.service;
2 2  
3 3 import com.alibaba.fastjson2.JSONObject;
  4 +import com.genersoft.iot.vmp.common.GeneralCallback;
4 5 import com.genersoft.iot.vmp.common.StreamInfo;
5 6 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
6 7 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
... ... @@ -13,7 +14,7 @@ public interface IStreamProxyService {
13 14 * 保存视频代理
14 15 * @param param
15 16 */
16   - StreamInfo save(StreamProxyItem param);
  17 + void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback);
17 18  
18 19 /**
19 20 * 添加视频代理到zlm
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
... ... @@ -2,12 +2,16 @@ package com.genersoft.iot.vmp.service.impl;
2 2  
3 3 import com.alibaba.fastjson2.JSONArray;
4 4 import com.alibaba.fastjson2.JSONObject;
  5 +import com.genersoft.iot.vmp.common.GeneralCallback;
5 6 import com.genersoft.iot.vmp.common.StreamInfo;
6 7 import com.genersoft.iot.vmp.conf.UserSetting;
7 8 import com.genersoft.iot.vmp.conf.exception.ControllerException;
8 9 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
9 10 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
10 11 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  12 +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
  13 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
  14 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
11 15 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
12 16 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
13 17 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
... ... @@ -86,6 +90,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
86 90 private IMediaServerService mediaServerService;
87 91  
88 92 @Autowired
  93 + private ZlmHttpHookSubscribe hookSubscribe;
  94 +
  95 + @Autowired
89 96 DataSourceTransactionManager dataSourceTransactionManager;
90 97  
91 98 @Autowired
... ... @@ -93,7 +100,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
93 100  
94 101  
95 102 @Override
96   - public StreamInfo save(StreamProxyItem param) {
  103 + public void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback) {
97 104 MediaServerItem mediaInfo;
98 105 if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){
99 106 mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null);
... ... @@ -107,7 +114,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
107 114 String dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
108 115 param.getStream() );
109 116 param.setDstUrl(dstUrl);
110   - StringBuffer resultMsg = new StringBuffer();
111 117 param.setMediaServerId(mediaInfo.getId());
112 118 boolean saveResult;
113 119 // 更新
... ... @@ -117,14 +123,25 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
117 123 saveResult = addStreamProxy(param);
118 124 }
119 125 if (!saveResult) {
120   - throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败");
  126 + callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null);
  127 + return;
121 128 }
122   - StreamInfo resultForStreamInfo = null;
123   - resultMsg.append("保存成功");
  129 +
  130 + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
  131 + hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
  132 + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
  133 + mediaInfo, param.getApp(), param.getStream(), null, null);
  134 + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
  135 + });
  136 +
124 137 if (param.isEnable()) {
125 138 JSONObject jsonObject = addStreamProxyToZlm(param);
126   - if (jsonObject == null || jsonObject.getInteger("code") != 0) {
127   - resultMsg.append(", 但是启用失败,请检查流地址是否可用");
  139 + if (jsonObject != null && jsonObject.getInteger("code") == 0) {
  140 + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
  141 + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
  142 + mediaInfo, param.getApp(), param.getStream(), null, null);
  143 + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
  144 + }else {
128 145 param.setEnable(false);
129 146 // 直接移除
130 147 if (param.isEnableRemoveNoneReader()) {
... ... @@ -132,14 +149,15 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
132 149 }else {
133 150 updateStreamProxy(param);
134 151 }
135   -
136   - }else {
137   - resultForStreamInfo = mediaService.getStreamInfoByAppAndStream(
138   - mediaInfo, param.getApp(), param.getStream(), null, null);
139   -
  152 + if (jsonObject == null){
  153 + callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null);
  154 + return;
  155 + }else {
  156 + callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null);
  157 + return;
  158 + }
140 159 }
141 160 }
142   - return resultForStreamInfo;
143 161 }
144 162  
145 163 /**
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java
1 1 package com.genersoft.iot.vmp.vmanager.streamProxy;
2 2  
3 3 import com.alibaba.fastjson2.JSONObject;
  4 +import com.genersoft.iot.vmp.common.StreamInfo;
  5 +import com.genersoft.iot.vmp.conf.UserSetting;
4 6 import com.genersoft.iot.vmp.conf.exception.ControllerException;
  7 +import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
  8 +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
5 9 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
6 10 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
7 11 import com.genersoft.iot.vmp.service.IMediaServerService;
8 12 import com.genersoft.iot.vmp.service.IStreamProxyService;
9 13 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
10 14 import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
  15 +import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
11 16 import com.github.pagehelper.PageInfo;
12 17 import io.swagger.v3.oas.annotations.Operation;
13 18 import io.swagger.v3.oas.annotations.Parameter;
... ... @@ -18,6 +23,9 @@ import org.springframework.beans.factory.annotation.Autowired;
18 23 import org.springframework.stereotype.Controller;
19 24 import org.springframework.util.ObjectUtils;
20 25 import org.springframework.web.bind.annotation.*;
  26 +import org.springframework.web.context.request.async.DeferredResult;
  27 +
  28 +import java.util.UUID;
21 29  
22 30 @SuppressWarnings("rawtypes")
23 31 /**
... ... @@ -37,6 +45,12 @@ public class StreamProxyController {
37 45 @Autowired
38 46 private IStreamProxyService streamProxyService;
39 47  
  48 + @Autowired
  49 + private DeferredResultHolder resultHolder;
  50 +
  51 + @Autowired
  52 + private UserSetting userSetting;
  53 +
40 54  
41 55 @Operation(summary = "分页查询流代理")
42 56 @Parameter(name = "page", description = "当前页")
... ... @@ -58,7 +72,7 @@ public class StreamProxyController {
58 72 })
59 73 @PostMapping(value = "/save")
60 74 @ResponseBody
61   - public StreamContent save(@RequestBody StreamProxyItem param){
  75 + public DeferredResult<Object> save(@RequestBody StreamProxyItem param){
62 76 logger.info("添加代理: " + JSONObject.toJSONString(param));
63 77 if (ObjectUtils.isEmpty(param.getMediaServerId())) {
64 78 param.setMediaServerId("auto");
... ... @@ -69,7 +83,33 @@ public class StreamProxyController {
69 83 if (ObjectUtils.isEmpty(param.getGbId())) {
70 84 param.setGbId(null);
71 85 }
72   - return new StreamContent(streamProxyService.save(param));
  86 +
  87 + RequestMessage requestMessage = new RequestMessage();
  88 + String key = DeferredResultHolder.CALLBACK_CMD_PROXY + param.getApp() + param.getStream();
  89 + requestMessage.setKey(key);
  90 + String uuid = UUID.randomUUID().toString();
  91 + requestMessage.setId(uuid);
  92 + DeferredResult<Object> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
  93 + // 录像查询以channelId作为deviceId查询
  94 + resultHolder.put(key, uuid, result);
  95 + result.onTimeout(()->{
  96 + WVPResult<StreamInfo> wvpResult = new WVPResult<>();
  97 + wvpResult.setCode(ErrorCode.ERROR100.getCode());
  98 + wvpResult.setMsg("超时");
  99 + requestMessage.setData(wvpResult);
  100 + resultHolder.invokeAllResult(requestMessage);
  101 + });
  102 +
  103 + streamProxyService.save(param, (code, msg, streamInfo) -> {
  104 + logger.info("[拉流代理] {}", code == ErrorCode.SUCCESS.getCode()? "成功":"失败: " + msg);
  105 + if (code == ErrorCode.SUCCESS.getCode()) {
  106 + requestMessage.setData(new StreamContent(streamInfo));
  107 + }else {
  108 + requestMessage.setData(WVPResult.fail(code, msg));
  109 + }
  110 + resultHolder.invokeAllResult(requestMessage);
  111 + });
  112 + return result;
73 113 }
74 114  
75 115 @GetMapping(value = "/ffmpeg_cmd/list")
... ...
src/main/resources/application.yml
... ... @@ -2,4 +2,4 @@ spring:
2 2 application:
3 3 name: wvp
4 4 profiles:
5   - active: local
6 5 \ No newline at end of file
  6 + active: local268
7 7 \ No newline at end of file
... ...