Commit a9f88be8c592664332de2587f9304e371ab34192

Authored by 648540858
1 parent 4162706e

优化拉流代理的添加

src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
... ... @@ -4,14 +4,13 @@ import com.alibaba.fastjson2.JSONArray;
4 4 import com.alibaba.fastjson2.JSONObject;
5 5 import com.genersoft.iot.vmp.common.GeneralCallback;
6 6 import com.genersoft.iot.vmp.common.StreamInfo;
  7 +import com.genersoft.iot.vmp.conf.DynamicTask;
7 8 import com.genersoft.iot.vmp.conf.UserSetting;
8 9 import com.genersoft.iot.vmp.conf.exception.ControllerException;
9 10 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
10 11 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
11 12 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
12 13 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
13   -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
14   -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
15 14 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
16 15 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
17 16 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
... ... @@ -41,6 +40,7 @@ import org.springframework.util.ObjectUtils;
41 40 import java.util.HashMap;
42 41 import java.util.List;
43 42 import java.util.Map;
  43 +import java.util.UUID;
44 44  
45 45 /**
46 46 * 视频代理业务
... ... @@ -93,6 +93,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
93 93 private ZlmHttpHookSubscribe hookSubscribe;
94 94  
95 95 @Autowired
  96 + private DynamicTask dynamicTask;
  97 +
  98 + @Autowired
96 99 DataSourceTransactionManager dataSourceTransactionManager;
97 100  
98 101 @Autowired
... ... @@ -158,17 +161,28 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
158 161 return;
159 162 }
160 163  
161   - HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
162   - hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
163   - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
164   - mediaInfo, param.getApp(), param.getStream(), null, null);
165   - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
166   - });
  164 + String talkKey = UUID.randomUUID().toString();
  165 + dynamicTask.startCron(talkKey, ()->{
  166 + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
  167 + if (streamInfo != null) {
  168 + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
  169 + }
  170 + }, 1000);
  171 + String delayTalkKey = UUID.randomUUID().toString();
  172 + dynamicTask.startDelay(delayTalkKey, ()->{
  173 + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
  174 + if (streamInfo != null) {
  175 + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
  176 + }else {
  177 + dynamicTask.stop(talkKey);
  178 + callback.run(ErrorCode.ERROR100.getCode(), "超时", null);
  179 + }
  180 + }, 5000);
167 181  
168 182 if (param.isEnable()) {
169 183 JSONObject jsonObject = addStreamProxyToZlm(param);
170 184 if (jsonObject != null && jsonObject.getInteger("code") == 0) {
171   - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
  185 + dynamicTask.stop(talkKey);
172 186 StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
173 187 mediaInfo, param.getApp(), param.getStream(), null, null);
174 188 callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
... ... @@ -295,10 +309,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
295 309 return null;
296 310 }
297 311 if ("default".equals(param.getType())){
298   - result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl(),
  312 + result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
299 313 param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
300 314 }else if ("ffmpeg".equals(param.getType())) {
301   - result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl(), param.getDstUrl(),
  315 + result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl().trim(), param.getDstUrl(),
302 316 param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(),
303 317 param.getFfmpegCmdKey());
304 318 }
... ...