Commit 42d8fff57403500b880886e13093e1841bf47e54

Authored by 648540858
1 parent 963a74d2

兼容流注销时流产生类型错误的问题

src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -56,5 +56,5 @@ public class VideoManagerConstants { @@ -56,5 +56,5 @@ public class VideoManagerConstants {
56 public static final String MEDIA_TRANSACTION_USED_PREFIX = "VMP_media_transaction_"; 56 public static final String MEDIA_TRANSACTION_USED_PREFIX = "VMP_media_transaction_";
57 57
58 //************************** redis 消息********************************* 58 //************************** redis 消息*********************************
59 - public static final String WVP_MSG_STREAM_PUSH_CHANGE_PREFIX = "WVP_MSG_STREAM_PUSH_CHANGE"; 59 + public static final String WVP_MSG_STREAM_PUSH_CHANGE_PREFIX = "WVP_MSG_STREAM_CHANGE_";
60 } 60 }
src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java
@@ -38,7 +38,7 @@ public class ThreadPoolTaskConfig { @@ -38,7 +38,7 @@ public class ThreadPoolTaskConfig {
38 /** 38 /**
39 * 线程池名前缀 39 * 线程池名前缀
40 */ 40 */
41 - private static final String threadNamePrefix = "wvp-sip-"; 41 + private static final String threadNamePrefix = "wvp-";
42 42
43 @Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名 43 @Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名
44 public ThreadPoolTaskExecutor taskExecutor() { 44 public ThreadPoolTaskExecutor taskExecutor() {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -9,17 +9,11 @@ import com.genersoft.iot.vmp.common.StreamInfo; @@ -9,17 +9,11 @@ import com.genersoft.iot.vmp.common.StreamInfo;
9 import com.genersoft.iot.vmp.conf.MediaConfig; 9 import com.genersoft.iot.vmp.conf.MediaConfig;
10 import com.genersoft.iot.vmp.conf.UserSetup; 10 import com.genersoft.iot.vmp.conf.UserSetup;
11 import com.genersoft.iot.vmp.gb28181.bean.Device; 11 import com.genersoft.iot.vmp.gb28181.bean.Device;
12 -import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;  
13 -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;  
14 -import com.genersoft.iot.vmp.media.zlm.dto.OriginType;  
15 -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;  
16 -import com.genersoft.iot.vmp.service.IMediaServerService;  
17 -import com.genersoft.iot.vmp.service.IMediaService;  
18 -import com.genersoft.iot.vmp.service.IStreamProxyService; 12 +import com.genersoft.iot.vmp.media.zlm.dto.*;
  13 +import com.genersoft.iot.vmp.service.*;
19 import com.genersoft.iot.vmp.service.bean.SSRCInfo; 14 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
20 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 15 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
21 import com.genersoft.iot.vmp.storager.IVideoManagerStorager; 16 import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
22 -import com.genersoft.iot.vmp.service.IPlayService;  
23 import org.slf4j.Logger; 17 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory; 18 import org.slf4j.LoggerFactory;
25 import org.springframework.beans.factory.annotation.Autowired; 19 import org.springframework.beans.factory.annotation.Autowired;
@@ -66,6 +60,9 @@ public class ZLMHttpHookListener { @@ -66,6 +60,9 @@ public class ZLMHttpHookListener {
66 private IStreamProxyService streamProxyService; 60 private IStreamProxyService streamProxyService;
67 61
68 @Autowired 62 @Autowired
  63 + private IStreamPushService streamPushService;
  64 +
  65 + @Autowired
69 private IMediaService mediaService; 66 private IMediaService mediaService;
70 67
71 @Autowired 68 @Autowired
@@ -84,6 +81,32 @@ public class ZLMHttpHookListener { @@ -84,6 +81,32 @@ public class ZLMHttpHookListener {
84 private MediaConfig mediaConfig; 81 private MediaConfig mediaConfig;
85 82
86 /** 83 /**
  84 + * 服务器定时上报时间,上报间隔可配置,默认10s上报一次
  85 + *
  86 + */
  87 + @ResponseBody
  88 + @PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8")
  89 + public ResponseEntity<String> onServerKeepalive(@RequestBody JSONObject json){
  90 +
  91 + if (logger.isDebugEnabled()) {
  92 + logger.debug("[ ZLM HOOK ]on_server_keepalive API调用,参数:" + json.toString());
  93 + }
  94 + String mediaServerId = json.getString("mediaServerId");
  95 +
  96 + List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_keepalive);
  97 + if (subscribes != null && subscribes.size() > 0) {
  98 + for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
  99 + subscribe.response(null, json);
  100 + }
  101 + }
  102 +
  103 + JSONObject ret = new JSONObject();
  104 + ret.put("code", 0);
  105 + ret.put("msg", "success");
  106 + return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
  107 + }
  108 +
  109 + /**
87 * 流量统计事件,播放器或推流器断开时并且耗用流量超过特定阈值时会触发此事件,阈值通过配置文件general.flowThreshold配置;此事件对回复不敏感。 110 * 流量统计事件,播放器或推流器断开时并且耗用流量超过特定阈值时会触发此事件,阈值通过配置文件general.flowThreshold配置;此事件对回复不敏感。
88 * 111 *
89 */ 112 */
@@ -92,7 +115,7 @@ public class ZLMHttpHookListener { @@ -92,7 +115,7 @@ public class ZLMHttpHookListener {
92 public ResponseEntity<String> onFlowReport(@RequestBody JSONObject json){ 115 public ResponseEntity<String> onFlowReport(@RequestBody JSONObject json){
93 116
94 if (logger.isDebugEnabled()) { 117 if (logger.isDebugEnabled()) {
95 - logger.debug("ZLM HOOK on_flow_report API调用,参数:" + json.toString()); 118 + logger.debug("[ ZLM HOOK ]on_flow_report API调用,参数:" + json.toString());
96 } 119 }
97 String mediaServerId = json.getString("mediaServerId"); 120 String mediaServerId = json.getString("mediaServerId");
98 JSONObject ret = new JSONObject(); 121 JSONObject ret = new JSONObject();
@@ -110,7 +133,7 @@ public class ZLMHttpHookListener { @@ -110,7 +133,7 @@ public class ZLMHttpHookListener {
110 public ResponseEntity<String> onHttpAccess(@RequestBody JSONObject json){ 133 public ResponseEntity<String> onHttpAccess(@RequestBody JSONObject json){
111 134
112 if (logger.isDebugEnabled()) { 135 if (logger.isDebugEnabled()) {
113 - logger.debug("ZLM HOOK on_http_access API 调用,参数:" + json.toString()); 136 + logger.debug("[ ZLM HOOK ]on_http_access API 调用,参数:" + json.toString());
114 } 137 }
115 String mediaServerId = json.getString("mediaServerId"); 138 String mediaServerId = json.getString("mediaServerId");
116 JSONObject ret = new JSONObject(); 139 JSONObject ret = new JSONObject();
@@ -130,7 +153,7 @@ public class ZLMHttpHookListener { @@ -130,7 +153,7 @@ public class ZLMHttpHookListener {
130 public ResponseEntity<String> onPlay(@RequestBody JSONObject json){ 153 public ResponseEntity<String> onPlay(@RequestBody JSONObject json){
131 154
132 if (logger.isDebugEnabled()) { 155 if (logger.isDebugEnabled()) {
133 - logger.debug("ZLM HOOK on_play API调用,参数:" + json.toString()); 156 + logger.debug("[ ZLM HOOK ]on_play API调用,参数:" + json.toString());
134 } 157 }
135 String mediaServerId = json.getString("mediaServerId"); 158 String mediaServerId = json.getString("mediaServerId");
136 ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json); 159 ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json);
@@ -155,7 +178,7 @@ public class ZLMHttpHookListener { @@ -155,7 +178,7 @@ public class ZLMHttpHookListener {
155 @PostMapping(value = "/on_publish", produces = "application/json;charset=UTF-8") 178 @PostMapping(value = "/on_publish", produces = "application/json;charset=UTF-8")
156 public ResponseEntity<String> onPublish(@RequestBody JSONObject json) { 179 public ResponseEntity<String> onPublish(@RequestBody JSONObject json) {
157 180
158 - logger.debug("ZLM HOOK on_publish API调用,参数:" + json.toString()); 181 + logger.debug("[ ZLM HOOK ]on_publish API调用,参数:" + json.toString());
159 182
160 String mediaServerId = json.getString("mediaServerId"); 183 String mediaServerId = json.getString("mediaServerId");
161 ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json); 184 ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json);
@@ -191,7 +214,7 @@ public class ZLMHttpHookListener { @@ -191,7 +214,7 @@ public class ZLMHttpHookListener {
191 public ResponseEntity<String> onRecordMp4(@RequestBody JSONObject json){ 214 public ResponseEntity<String> onRecordMp4(@RequestBody JSONObject json){
192 215
193 if (logger.isDebugEnabled()) { 216 if (logger.isDebugEnabled()) {
194 - logger.debug("ZLM HOOK on_record_mp4 API调用,参数:" + json.toString()); 217 + logger.debug("[ ZLM HOOK ]on_record_mp4 API调用,参数:" + json.toString());
195 } 218 }
196 String mediaServerId = json.getString("mediaServerId"); 219 String mediaServerId = json.getString("mediaServerId");
197 JSONObject ret = new JSONObject(); 220 JSONObject ret = new JSONObject();
@@ -209,7 +232,7 @@ public class ZLMHttpHookListener { @@ -209,7 +232,7 @@ public class ZLMHttpHookListener {
209 public ResponseEntity<String> onRtspRealm(@RequestBody JSONObject json){ 232 public ResponseEntity<String> onRtspRealm(@RequestBody JSONObject json){
210 233
211 if (logger.isDebugEnabled()) { 234 if (logger.isDebugEnabled()) {
212 - logger.debug("ZLM HOOK on_rtsp_realm API调用,参数:" + json.toString()); 235 + logger.debug("[ ZLM HOOK ]on_rtsp_realm API调用,参数:" + json.toString());
213 } 236 }
214 String mediaServerId = json.getString("mediaServerId"); 237 String mediaServerId = json.getString("mediaServerId");
215 JSONObject ret = new JSONObject(); 238 JSONObject ret = new JSONObject();
@@ -228,7 +251,7 @@ public class ZLMHttpHookListener { @@ -228,7 +251,7 @@ public class ZLMHttpHookListener {
228 public ResponseEntity<String> onRtspAuth(@RequestBody JSONObject json){ 251 public ResponseEntity<String> onRtspAuth(@RequestBody JSONObject json){
229 252
230 if (logger.isDebugEnabled()) { 253 if (logger.isDebugEnabled()) {
231 - logger.debug("ZLM HOOK on_rtsp_auth API调用,参数:" + json.toString()); 254 + logger.debug("[ ZLM HOOK ]on_rtsp_auth API调用,参数:" + json.toString());
232 } 255 }
233 String mediaServerId = json.getString("mediaServerId"); 256 String mediaServerId = json.getString("mediaServerId");
234 JSONObject ret = new JSONObject(); 257 JSONObject ret = new JSONObject();
@@ -247,7 +270,7 @@ public class ZLMHttpHookListener { @@ -247,7 +270,7 @@ public class ZLMHttpHookListener {
247 public ResponseEntity<String> onShellLogin(@RequestBody JSONObject json){ 270 public ResponseEntity<String> onShellLogin(@RequestBody JSONObject json){
248 271
249 if (logger.isDebugEnabled()) { 272 if (logger.isDebugEnabled()) {
250 - logger.debug("ZLM HOOK on_shell_login API调用,参数:" + json.toString()); 273 + logger.debug("[ ZLM HOOK ]on_shell_login API调用,参数:" + json.toString());
251 } 274 }
252 // TODO 如果是带有rtpstream则开启按需拉流 275 // TODO 如果是带有rtpstream则开启按需拉流
253 // String app = json.getString("app"); 276 // String app = json.getString("app");
@@ -277,7 +300,7 @@ public class ZLMHttpHookListener { @@ -277,7 +300,7 @@ public class ZLMHttpHookListener {
277 public ResponseEntity<String> onStreamChanged(@RequestBody MediaItem item){ 300 public ResponseEntity<String> onStreamChanged(@RequestBody MediaItem item){
278 301
279 if (logger.isDebugEnabled()) { 302 if (logger.isDebugEnabled()) {
280 - logger.debug("ZLM HOOK on_stream_changed API调用,参数:" + JSONObject.toJSONString(item)); 303 + logger.debug("[ ZLM HOOK ]on_stream_changed API调用,参数:" + JSONObject.toJSONString(item));
281 } 304 }
282 String mediaServerId = item.getMediaServerId(); 305 String mediaServerId = item.getMediaServerId();
283 JSONObject json = (JSONObject) JSON.toJSON(item); 306 JSONObject json = (JSONObject) JSON.toJSON(item);
@@ -315,33 +338,39 @@ public class ZLMHttpHookListener { @@ -315,33 +338,39 @@ public class ZLMHttpHookListener {
315 } 338 }
316 }else { 339 }else {
317 if (!"rtp".equals(app)){ 340 if (!"rtp".equals(app)){
318 - 341 + String type = OriginType.values()[item.getOriginType()].getType();
319 MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); 342 MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
320 if (regist) { 343 if (regist) {
321 StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks); 344 StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks);
322 - redisCatchStorage.addStream(mediaServerItem, OriginType.values()[item.getOriginType()].getType(), app, streamId, streamInfo); 345 + redisCatchStorage.addStream(mediaServerItem, type, app, streamId, streamInfo);
323 if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() 346 if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
324 || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() 347 || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
325 || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { 348 || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
326 zlmMediaListManager.addMedia(item); 349 zlmMediaListManager.addMedia(item);
327 } 350 }
328 }else { 351 }else {
  352 + // 兼容流注销时类型错误的问题,等zlm更新后删除
  353 + StreamPushItem streamPushItem = streamPushService.getPush(app, streamId);
  354 + if (streamPushItem != null) {
  355 + type = "PUSH";
  356 + }else {
  357 + StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(app, streamId);
  358 + if (streamProxyByAppAndStream != null) {
  359 + type = "PULL";
  360 + }
  361 + }
329 zlmMediaListManager.removeMedia( app, streamId); 362 zlmMediaListManager.removeMedia( app, streamId);
330 redisCatchStorage.removeStream(mediaServerItem, OriginType.values()[item.getOriginType()].getType(), app, streamId); 363 redisCatchStorage.removeStream(mediaServerItem, OriginType.values()[item.getOriginType()].getType(), app, streamId);
331 -  
332 - }  
333 - if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()  
334 - || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()  
335 - || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {  
336 - // 发送流变化redis消息  
337 - JSONObject jsonObject = new JSONObject();  
338 - jsonObject.put("serverId", userSetup.getServerId());  
339 - jsonObject.put("app", app);  
340 - jsonObject.put("stream", streamId);  
341 - jsonObject.put("register", regist);  
342 - jsonObject.put("mediaServerId", mediaServerId);  
343 - redisCatchStorage.sendStreamChangeMsg(jsonObject);  
344 } 364 }
  365 +
  366 + // 发送流变化redis消息
  367 + JSONObject jsonObject = new JSONObject();
  368 + jsonObject.put("serverId", userSetup.getServerId());
  369 + jsonObject.put("app", app);
  370 + jsonObject.put("stream", streamId);
  371 + jsonObject.put("register", regist);
  372 + jsonObject.put("mediaServerId", mediaServerId);
  373 + redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
345 } 374 }
346 } 375 }
347 } 376 }
@@ -361,7 +390,7 @@ public class ZLMHttpHookListener { @@ -361,7 +390,7 @@ public class ZLMHttpHookListener {
361 public ResponseEntity<String> onStreamNoneReader(@RequestBody JSONObject json){ 390 public ResponseEntity<String> onStreamNoneReader(@RequestBody JSONObject json){
362 391
363 if (logger.isDebugEnabled()) { 392 if (logger.isDebugEnabled()) {
364 - logger.debug("ZLM HOOK on_stream_none_reader API调用,参数:" + json.toString()); 393 + logger.debug("[ ZLM HOOK ]on_stream_none_reader API调用,参数:" + json.toString());
365 } 394 }
366 String mediaServerId = json.getString("mediaServerId"); 395 String mediaServerId = json.getString("mediaServerId");
367 String streamId = json.getString("stream"); 396 String streamId = json.getString("stream");
@@ -421,7 +450,7 @@ public class ZLMHttpHookListener { @@ -421,7 +450,7 @@ public class ZLMHttpHookListener {
421 @PostMapping(value = "/on_stream_not_found", produces = "application/json;charset=UTF-8") 450 @PostMapping(value = "/on_stream_not_found", produces = "application/json;charset=UTF-8")
422 public ResponseEntity<String> onStreamNotFound(@RequestBody JSONObject json){ 451 public ResponseEntity<String> onStreamNotFound(@RequestBody JSONObject json){
423 if (logger.isDebugEnabled()) { 452 if (logger.isDebugEnabled()) {
424 - logger.debug("ZLM HOOK on_stream_not_found API调用,参数:" + json.toString()); 453 + logger.debug("[ ZLM HOOK ]on_stream_not_found API调用,参数:" + json.toString());
425 } 454 }
426 String mediaServerId = json.getString("mediaServerId"); 455 String mediaServerId = json.getString("mediaServerId");
427 MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); 456 MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
@@ -468,7 +497,7 @@ public class ZLMHttpHookListener { @@ -468,7 +497,7 @@ public class ZLMHttpHookListener {
468 public ResponseEntity<String> onServerStarted(HttpServletRequest request, @RequestBody JSONObject jsonObject){ 497 public ResponseEntity<String> onServerStarted(HttpServletRequest request, @RequestBody JSONObject jsonObject){
469 498
470 if (logger.isDebugEnabled()) { 499 if (logger.isDebugEnabled()) {
471 - logger.debug("ZLM HOOK on_server_started API调用,参数:" + jsonObject.toString()); 500 + logger.debug("[ ZLM HOOK ]on_server_started API调用,参数:" + jsonObject.toString());
472 } 501 }
473 String remoteAddr = request.getRemoteAddr(); 502 String remoteAddr = request.getRemoteAddr();
474 jsonObject.put("ip", remoteAddr); 503 jsonObject.put("ip", remoteAddr);
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java
@@ -27,7 +27,8 @@ public class ZLMHttpHookSubscribe { @@ -27,7 +27,8 @@ public class ZLMHttpHookSubscribe {
27 on_stream_changed, 27 on_stream_changed,
28 on_stream_none_reader, 28 on_stream_none_reader,
29 on_stream_not_found, 29 on_stream_not_found,
30 - on_server_started 30 + on_server_started,
  31 + on_server_keepalive
31 } 32 }
32 33
33 public interface Event{ 34 public interface Event{
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
@@ -3,7 +3,6 @@ package com.genersoft.iot.vmp.media.zlm; @@ -3,7 +3,6 @@ package com.genersoft.iot.vmp.media.zlm;
3 import com.alibaba.fastjson.JSON; 3 import com.alibaba.fastjson.JSON;
4 import com.alibaba.fastjson.JSONArray; 4 import com.alibaba.fastjson.JSONArray;
5 import com.alibaba.fastjson.JSONObject; 5 import com.alibaba.fastjson.JSONObject;
6 -import com.genersoft.iot.vmp.common.VideoManagerConstants;  
7 import com.genersoft.iot.vmp.conf.MediaConfig; 6 import com.genersoft.iot.vmp.conf.MediaConfig;
8 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 7 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
9 import com.genersoft.iot.vmp.service.IMediaServerService; 8 import com.genersoft.iot.vmp.service.IMediaServerService;
@@ -11,9 +10,11 @@ import com.genersoft.iot.vmp.service.IStreamProxyService; @@ -11,9 +10,11 @@ import com.genersoft.iot.vmp.service.IStreamProxyService;
11 import org.slf4j.Logger; 10 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory; 11 import org.slf4j.LoggerFactory;
13 import org.springframework.beans.factory.annotation.Autowired; 12 import org.springframework.beans.factory.annotation.Autowired;
  13 +import org.springframework.beans.factory.annotation.Qualifier;
14 import org.springframework.boot.CommandLineRunner; 14 import org.springframework.boot.CommandLineRunner;
15 import org.springframework.core.annotation.Order; 15 import org.springframework.core.annotation.Order;
16 import org.springframework.scheduling.annotation.Async; 16 import org.springframework.scheduling.annotation.Async;
  17 +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
17 import org.springframework.stereotype.Component; 18 import org.springframework.stereotype.Component;
18 19
19 import java.util.*; 20 import java.util.*;
@@ -41,8 +42,13 @@ public class ZLMRunner implements CommandLineRunner { @@ -41,8 +42,13 @@ public class ZLMRunner implements CommandLineRunner {
41 @Autowired 42 @Autowired
42 private MediaConfig mediaConfig; 43 private MediaConfig mediaConfig;
43 44
  45 + @Qualifier("taskExecutor")
  46 + @Autowired
  47 + private ThreadPoolTaskExecutor taskExecutor;
  48 +
44 @Override 49 @Override
45 public void run(String... strings) throws Exception { 50 public void run(String... strings) throws Exception {
  51 + mediaServerService.clearMediaServerForOnline();
46 if (mediaServerService.getDefaultMediaServer() == null) { 52 if (mediaServerService.getDefaultMediaServer() == null) {
47 mediaServerService.addToDatabase(mediaConfig.getMediaSerItem()); 53 mediaServerService.addToDatabase(mediaConfig.getMediaSerItem());
48 } 54 }
@@ -59,6 +65,9 @@ public class ZLMRunner implements CommandLineRunner { @@ -59,6 +65,9 @@ public class ZLMRunner implements CommandLineRunner {
59 } 65 }
60 }); 66 });
61 67
  68 + // TODO 订阅 zlm保活事件, 当zlm离线时做业务的处理
  69 +
  70 +
62 // 获取zlm信息 71 // 获取zlm信息
63 logger.info("等待默认zlm接入..."); 72 logger.info("等待默认zlm接入...");
64 73
@@ -70,7 +79,9 @@ public class ZLMRunner implements CommandLineRunner { @@ -70,7 +79,9 @@ public class ZLMRunner implements CommandLineRunner {
70 for (MediaServerItem mediaServerItem : all) { 79 for (MediaServerItem mediaServerItem : all) {
71 if (startGetMedia == null) startGetMedia = new HashMap<>(); 80 if (startGetMedia == null) startGetMedia = new HashMap<>();
72 startGetMedia.put(mediaServerItem.getId(), true); 81 startGetMedia.put(mediaServerItem.getId(), true);
73 - connectZlmServer(mediaServerItem); 82 + taskExecutor.execute(()->{
  83 + connectZlmServer(mediaServerItem);
  84 + });
74 } 85 }
75 Timer timer = new Timer(); 86 Timer timer = new Timer();
76 // 2分钟后未连接到则不再去主动连接, TODO 并对重启前使用此在zlm的通道发送bye 87 // 2分钟后未连接到则不再去主动连接, TODO 并对重启前使用此在zlm的通道发送bye
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/OriginType.java
1 package com.genersoft.iot.vmp.media.zlm.dto; 1 package com.genersoft.iot.vmp.media.zlm.dto;
2 2
3 public enum OriginType { 3 public enum OriginType {
  4 + // 不可调整顺序
4 UNKNOWN("UNKNOWN"), 5 UNKNOWN("UNKNOWN"),
5 RTMP_PUSH("PUSH"), 6 RTMP_PUSH("PUSH"),
6 RTSP_PUSH("PUSH"), 7 RTSP_PUSH("PUSH"),
7 RTP_PUSH("RTP"), 8 RTP_PUSH("RTP"),
8 - RTC_PUSH("PUSH"),  
9 PULL("PULL"), 9 PULL("PULL"),
10 FFMPEG_PULL("PULL"), 10 FFMPEG_PULL("PULL"),
11 MP4_VOD("MP4_VOD"), 11 MP4_VOD("MP4_VOD"),
12 - DEVICE_CHN("DEVICE_CHN"); 12 + DEVICE_CHN("DEVICE_CHN"),
  13 + RTC_PUSH("PUSH");
13 14
14 private final String type; 15 private final String type;
15 OriginType(String type) { 16 OriginType(String type) {
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
@@ -35,4 +35,6 @@ public interface IStreamPushService { @@ -35,4 +35,6 @@ public interface IStreamPushService {
35 PageInfo<StreamPushItem> getPushList(Integer page, Integer count); 35 PageInfo<StreamPushItem> getPushList(Integer page, Integer count);
36 36
37 StreamPushItem transform(MediaItem item); 37 StreamPushItem transform(MediaItem item);
  38 +
  39 + StreamPushItem getPush(String app, String streamId);
38 } 40 }
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -310,7 +310,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR @@ -310,7 +310,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
310 */ 310 */
311 @Override 311 @Override
312 public void handLeZLMServerConfig(ZLMServerConfig zlmServerConfig) { 312 public void handLeZLMServerConfig(ZLMServerConfig zlmServerConfig) {
313 - logger.info("[ {} ]-[ {}:{} ]已连接", 313 + logger.info("[ ZLM:{} ]-[ {}:{} ]已连接",
314 zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort()); 314 zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
315 315
316 MediaServerItem serverItem = mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId()); 316 MediaServerItem serverItem = mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId());
@@ -469,7 +469,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR @@ -469,7 +469,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
469 */ 469 */
470 @Override 470 @Override
471 public void setZLMConfig(MediaServerItem mediaServerItem) { 471 public void setZLMConfig(MediaServerItem mediaServerItem) {
472 - logger.info("[ {} ]-[ {}:{} ]设置zlm", 472 + logger.info("[ ZLM:{} ]-[ {}:{} ]设置zlm",
473 mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); 473 mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
474 String protocol = sslEnabled ? "https" : "http"; 474 String protocol = sslEnabled ? "https" : "http";
475 String hookPrex = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort); 475 String hookPrex = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort);
@@ -494,16 +494,17 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR @@ -494,16 +494,17 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
494 param.put("hook.on_stream_changed",String.format("%s/on_stream_changed", hookPrex)); 494 param.put("hook.on_stream_changed",String.format("%s/on_stream_changed", hookPrex));
495 param.put("hook.on_stream_none_reader",String.format("%s/on_stream_none_reader", hookPrex)); 495 param.put("hook.on_stream_none_reader",String.format("%s/on_stream_none_reader", hookPrex));
496 param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrex)); 496 param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrex));
  497 + param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrex));
497 param.put("hook.timeoutSec","20"); 498 param.put("hook.timeoutSec","20");
498 param.put("general.streamNoneReaderDelayMS","-1".equals(mediaServerItem.getStreamNoneReaderDelayMS())?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() ); 499 param.put("general.streamNoneReaderDelayMS","-1".equals(mediaServerItem.getStreamNoneReaderDelayMS())?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() );
499 500
500 JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param); 501 JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param);
501 502
502 if (responseJSON != null && responseJSON.getInteger("code") == 0) { 503 if (responseJSON != null && responseJSON.getInteger("code") == 0) {
503 - logger.info("[ {} ]-[ {}:{} ]设置zlm成功", 504 + logger.info("[ ZLM:{} ]-[ {}:{} ]设置zlm成功",
504 mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); 505 mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
505 }else { 506 }else {
506 - logger.info("[ {} ]-[ {}:{} ]设置zlm失败", 507 + logger.info("[ ZLM:{} ]-[ {}:{} ]设置zlm失败",
507 mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); 508 mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
508 } 509 }
509 } 510 }
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -109,4 +109,11 @@ public class StreamPushServiceImpl implements IStreamPushService { @@ -109,4 +109,11 @@ public class StreamPushServiceImpl implements IStreamPushService {
109 } 109 }
110 return del > 0; 110 return del > 0;
111 } 111 }
  112 +
  113 +
  114 + @Override
  115 + public StreamPushItem getPush(String app, String streamId) {
  116 +
  117 + return streamPushMapper.selectOne(app, streamId);
  118 + }
112 } 119 }
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -127,7 +127,7 @@ public interface IRedisCatchStorage { @@ -127,7 +127,7 @@ public interface IRedisCatchStorage {
127 * 发送推流生成与推流消失消息 127 * 发送推流生成与推流消失消息
128 * @param jsonObject 消息内容 128 * @param jsonObject 消息内容
129 */ 129 */
130 - void sendStreamChangeMsg(JSONObject jsonObject); 130 + void sendStreamChangeMsg(String type, JSONObject jsonObject);
131 131
132 /** 132 /**
133 * 添加流信息到redis 133 * 添加流信息到redis
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -9,6 +9,8 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -9,6 +9,8 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
9 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 9 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
10 import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; 10 import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
11 import com.genersoft.iot.vmp.utils.redis.RedisUtil; 11 import com.genersoft.iot.vmp.utils.redis.RedisUtil;
  12 +import org.slf4j.Logger;
  13 +import org.slf4j.LoggerFactory;
12 import org.springframework.beans.factory.annotation.Autowired; 14 import org.springframework.beans.factory.annotation.Autowired;
13 import org.springframework.stereotype.Component; 15 import org.springframework.stereotype.Component;
14 16
@@ -19,6 +21,8 @@ import java.util.*; @@ -19,6 +21,8 @@ import java.util.*;
19 @Component 21 @Component
20 public class RedisCatchStorageImpl implements IRedisCatchStorage { 22 public class RedisCatchStorageImpl implements IRedisCatchStorage {
21 23
  24 + private Logger logger = LoggerFactory.getLogger(RedisCatchStorageImpl.class);
  25 +
22 @Autowired 26 @Autowired
23 private RedisUtil redis; 27 private RedisUtil redis;
24 28
@@ -311,8 +315,9 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @@ -311,8 +315,9 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
311 } 315 }
312 316
313 @Override 317 @Override
314 - public void sendStreamChangeMsg(JSONObject jsonObject) {  
315 - String key = VideoManagerConstants.WVP_MSG_STREAM_PUSH_CHANGE_PREFIX; 318 + public void sendStreamChangeMsg(String type, JSONObject jsonObject) {
  319 + String key = VideoManagerConstants.WVP_MSG_STREAM_PUSH_CHANGE_PREFIX + type;
  320 + logger.debug("[redis 流变化事件] {}: {}", key, jsonObject.toString());
316 redis.convertAndSend(key, jsonObject); 321 redis.convertAndSend(key, jsonObject);
317 } 322 }
318 323