Commit 646c3e41b7fac40acbdb7bbfec5608daab2a28d3

Authored by 648540858
Committed by GitHub
2 parents fff3d64f 778969f1

Merge branch 'wvp-28181-2.0' into wvp-28181-2.0

src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -10,6 +10,9 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; @@ -10,6 +10,9 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
10 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; 10 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
11 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; 11 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
12 import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider; 12 import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
  13 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
  14 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
  15 +import com.genersoft.iot.vmp.media.zlm.dto.HookType;
13 import com.genersoft.iot.vmp.utils.DateUtil; 16 import com.genersoft.iot.vmp.utils.DateUtil;
14 import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; 17 import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
15 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; 18 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
@@ -348,25 +351,19 @@ public class SIPCommander implements ISIPCommander { @@ -348,25 +351,19 @@ public class SIPCommander implements ISIPCommander {
348 @Override 351 @Override
349 public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, 352 public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
350 ZLMHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) { 353 ZLMHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) {
351 - String streamId = ssrcInfo.getStream(); 354 + String stream = ssrcInfo.getStream();
352 try { 355 try {
353 if (device == null) { 356 if (device == null) {
354 return; 357 return;
355 } 358 }
356 String streamMode = device.getStreamMode().toUpperCase(); 359 String streamMode = device.getStreamMode().toUpperCase();
357 360
358 - logger.info("{} 分配的ZLM为: {} [{}:{}]", streamId, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());  
359 - // 添加订阅  
360 - JSONObject subscribeKey = new JSONObject();  
361 - subscribeKey.put("app", "rtp");  
362 - subscribeKey.put("stream", streamId);  
363 - subscribeKey.put("regist", true);  
364 - subscribeKey.put("schema", "rtmp");  
365 - subscribeKey.put("mediaServerId", mediaServerItem.getId());  
366 - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,  
367 - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ 361 + logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
  362 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId());
  363 + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
368 if (event != null) { 364 if (event != null) {
369 event.response(mediaServerItemInUse, json); 365 event.response(mediaServerItemInUse, json);
  366 + subscribe.removeSubscribe(hookSubscribe);
370 } 367 }
371 }); 368 });
372 // 369 //
@@ -440,7 +437,7 @@ public class SIPCommander implements ISIPCommander { @@ -440,7 +437,7 @@ public class SIPCommander implements ISIPCommander {
440 errorEvent.response(e); 437 errorEvent.response(e);
441 }), e ->{ 438 }), e ->{
442 // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 439 // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
443 - streamSession.put(device.getDeviceId(), channelId ,"play", streamId, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play); 440 + streamSession.put(device.getDeviceId(), channelId ,"play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play);
444 streamSession.put(device.getDeviceId(), channelId ,"play", e.dialog); 441 streamSession.put(device.getDeviceId(), channelId ,"play", e.dialog);
445 okEvent.response(e); 442 okEvent.response(e);
446 }); 443 });
@@ -530,21 +527,14 @@ public class SIPCommander implements ISIPCommander { @@ -530,21 +527,14 @@ public class SIPCommander implements ISIPCommander {
530 527
531 CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() 528 CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
532 : udpSipProvider.getNewCallId(); 529 : udpSipProvider.getNewCallId();
533 - 530 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtmp", mediaServerItem.getId());
534 // 添加订阅 531 // 添加订阅
535 - JSONObject subscribeKey = new JSONObject();  
536 - subscribeKey.put("app", "rtp");  
537 - subscribeKey.put("stream", ssrcInfo.getStream());  
538 - subscribeKey.put("regist", true);  
539 - subscribeKey.put("schema", "rtmp");  
540 - subscribeKey.put("mediaServerId", mediaServerItem.getId());  
541 - logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey);  
542 - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,  
543 - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ 532 + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
544 if (hookEvent != null) { 533 if (hookEvent != null) {
545 InviteStreamInfo inviteStreamInfo = new InviteStreamInfo(mediaServerItemInUse, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()); 534 InviteStreamInfo inviteStreamInfo = new InviteStreamInfo(mediaServerItemInUse, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream());
546 hookEvent.call(inviteStreamInfo); 535 hookEvent.call(inviteStreamInfo);
547 } 536 }
  537 + subscribe.removeSubscribe(hookSubscribe);
548 }); 538 });
549 Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc()); 539 Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc());
550 540
@@ -643,21 +633,15 @@ public class SIPCommander implements ISIPCommander { @@ -643,21 +633,15 @@ public class SIPCommander implements ISIPCommander {
643 CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() 633 CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
644 : udpSipProvider.getNewCallId(); 634 : udpSipProvider.getNewCallId();
645 635
  636 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId());
646 // 添加订阅 637 // 添加订阅
647 - JSONObject subscribeKey = new JSONObject();  
648 - subscribeKey.put("app", "rtp");  
649 - subscribeKey.put("stream", ssrcInfo.getStream());  
650 - subscribeKey.put("regist", true);  
651 - subscribeKey.put("mediaServerId", mediaServerItem.getId());  
652 - logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());  
653 - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,  
654 - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ 638 + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
655 hookEvent.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream())); 639 hookEvent.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
656 - subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);  
657 - subscribeKey.put("regist", false);  
658 - subscribeKey.put("schema", "rtmp"); 640 + subscribe.removeSubscribe(hookSubscribe);
  641 + hookSubscribe.getContent().put("regist", false);
  642 + hookSubscribe.getContent().put("schema", "rtmp");
659 // 添加流注销的订阅,注销了后向设备发送bye 643 // 添加流注销的订阅,注销了后向设备发送bye
660 - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, 644 + subscribe.addSubscribe(hookSubscribe,
661 (MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd)->{ 645 (MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd)->{
662 ClientTransaction transaction = streamSession.getTransaction(device.getDeviceId(), channelId, ssrcInfo.getStream(), callIdHeader.getCallId()); 646 ClientTransaction transaction = streamSession.getTransaction(device.getDeviceId(), channelId, ssrcInfo.getStream(), callIdHeader.getCallId());
663 if (transaction != null) { 647 if (transaction != null) {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -102,7 +102,7 @@ public class ZLMHttpHookListener { @@ -102,7 +102,7 @@ public class ZLMHttpHookListener {
102 logger.debug("[ ZLM HOOK ] on_server_keepalive API调用,参数:" + json.toString()); 102 logger.debug("[ ZLM HOOK ] on_server_keepalive API调用,参数:" + json.toString());
103 } 103 }
104 String mediaServerId = json.getString("mediaServerId"); 104 String mediaServerId = json.getString("mediaServerId");
105 - List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_keepalive); 105 + List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
106 if (subscribes != null && subscribes.size() > 0) { 106 if (subscribes != null && subscribes.size() > 0) {
107 for (ZLMHttpHookSubscribe.Event subscribe : subscribes) { 107 for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
108 subscribe.response(null, json); 108 subscribe.response(null, json);
@@ -168,7 +168,7 @@ public class ZLMHttpHookListener { @@ -168,7 +168,7 @@ public class ZLMHttpHookListener {
168 logger.debug("[ ZLM HOOK ]on_play API调用,参数:" + JSON.toJSONString(param)); 168 logger.debug("[ ZLM HOOK ]on_play API调用,参数:" + JSON.toJSONString(param));
169 } 169 }
170 String mediaServerId = param.getMediaServerId(); 170 String mediaServerId = param.getMediaServerId();
171 - ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json); 171 + ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json);
172 if (subscribe != null ) { 172 if (subscribe != null ) {
173 MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); 173 MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
174 if (mediaInfo != null) { 174 if (mediaInfo != null) {
@@ -253,7 +253,7 @@ public class ZLMHttpHookListener { @@ -253,7 +253,7 @@ public class ZLMHttpHookListener {
253 } 253 }
254 254
255 255
256 - ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json); 256 + ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
257 if (subscribe != null) { 257 if (subscribe != null) {
258 if (mediaInfo != null) { 258 if (mediaInfo != null) {
259 subscribe.response(mediaInfo, json); 259 subscribe.response(mediaInfo, json);
@@ -377,7 +377,7 @@ public class ZLMHttpHookListener { @@ -377,7 +377,7 @@ public class ZLMHttpHookListener {
377 logger.debug("[ ZLM HOOK ]on_shell_login API调用,参数:" + json.toString()); 377 logger.debug("[ ZLM HOOK ]on_shell_login API调用,参数:" + json.toString());
378 } 378 }
379 String mediaServerId = json.getString("mediaServerId"); 379 String mediaServerId = json.getString("mediaServerId");
380 - ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_shell_login, json); 380 + ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_shell_login, json);
381 if (subscribe != null ) { 381 if (subscribe != null ) {
382 MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); 382 MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
383 if (mediaInfo != null) { 383 if (mediaInfo != null) {
@@ -403,7 +403,7 @@ public class ZLMHttpHookListener { @@ -403,7 +403,7 @@ public class ZLMHttpHookListener {
403 logger.info("[ ZLM HOOK ]on_stream_changed API调用,参数:" + JSONObject.toJSONString(item)); 403 logger.info("[ ZLM HOOK ]on_stream_changed API调用,参数:" + JSONObject.toJSONString(item));
404 String mediaServerId = item.getMediaServerId(); 404 String mediaServerId = item.getMediaServerId();
405 JSONObject json = (JSONObject) JSON.toJSON(item); 405 JSONObject json = (JSONObject) JSON.toJSON(item);
406 - ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json); 406 + ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
407 if (subscribe != null ) { 407 if (subscribe != null ) {
408 MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); 408 MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
409 if (mediaInfo != null) { 409 if (mediaInfo != null) {
@@ -614,7 +614,7 @@ public class ZLMHttpHookListener { @@ -614,7 +614,7 @@ public class ZLMHttpHookListener {
614 } 614 }
615 String remoteAddr = request.getRemoteAddr(); 615 String remoteAddr = request.getRemoteAddr();
616 jsonObject.put("ip", remoteAddr); 616 jsonObject.put("ip", remoteAddr);
617 - List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_started); 617 + List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
618 if (subscribes != null && subscribes.size() > 0) { 618 if (subscribes != null && subscribes.size() > 0) {
619 for (ZLMHttpHookSubscribe.Event subscribe : subscribes) { 619 for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
620 subscribe.response(null, jsonObject); 620 subscribe.response(null, jsonObject);
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java
1 package com.genersoft.iot.vmp.media.zlm; 1 package com.genersoft.iot.vmp.media.zlm;
2 2
3 import com.alibaba.fastjson.JSONObject; 3 import com.alibaba.fastjson.JSONObject;
  4 +import com.genersoft.iot.vmp.media.zlm.dto.HookType;
  5 +import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
4 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 6 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
5 import org.springframework.stereotype.Component; 7 import org.springframework.stereotype.Component;
6 import org.springframework.util.CollectionUtils; 8 import org.springframework.util.CollectionUtils;
7 9
  10 +import java.time.Instant;
8 import java.util.*; 11 import java.util.*;
9 import java.util.concurrent.ConcurrentHashMap; 12 import java.util.concurrent.ConcurrentHashMap;
  13 +import java.util.concurrent.TimeUnit;
10 14
11 /** 15 /**
12 * @description:针对 ZLMediaServer的hook事件订阅 16 * @description:针对 ZLMediaServer的hook事件订阅
@@ -16,51 +20,39 @@ import java.util.concurrent.ConcurrentHashMap; @@ -16,51 +20,39 @@ import java.util.concurrent.ConcurrentHashMap;
16 @Component 20 @Component
17 public class ZLMHttpHookSubscribe { 21 public class ZLMHttpHookSubscribe {
18 22
19 - public enum HookType{  
20 - on_flow_report,  
21 - on_http_access,  
22 - on_play,  
23 - on_publish,  
24 - on_record_mp4,  
25 - on_rtsp_auth,  
26 - on_rtsp_realm,  
27 - on_shell_login,  
28 - on_stream_changed,  
29 - on_stream_none_reader,  
30 - on_stream_not_found,  
31 - on_server_started,  
32 - on_server_keepalive  
33 - }  
34 -  
35 @FunctionalInterface 23 @FunctionalInterface
36 public interface Event{ 24 public interface Event{
37 void response(MediaServerItem mediaServerItem, JSONObject response); 25 void response(MediaServerItem mediaServerItem, JSONObject response);
38 } 26 }
39 27
40 - private Map<HookType, Map<JSONObject, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>(); 28 + private Map<HookType, Map<IHookSubscribe, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
41 29
42 - public void addSubscribe(HookType type, JSONObject hookResponse, ZLMHttpHookSubscribe.Event event) {  
43 - allSubscribes.computeIfAbsent(type, k -> new ConcurrentHashMap<>()).put(hookResponse, event); 30 + public void addSubscribe(IHookSubscribe hookSubscribe, ZLMHttpHookSubscribe.Event event) {
  31 + if (hookSubscribe.getExpires() == null) {
  32 + // 默认5分钟过期
  33 + Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5));
  34 + hookSubscribe.setExpires(expiresInstant);
  35 + }
  36 + allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event);
44 } 37 }
45 38
46 - public ZLMHttpHookSubscribe.Event getSubscribe(HookType type, JSONObject hookResponse) { 39 + public ZLMHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) {
47 ZLMHttpHookSubscribe.Event event= null; 40 ZLMHttpHookSubscribe.Event event= null;
48 - Map<JSONObject, Event> eventMap = allSubscribes.get(type); 41 + Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
49 if (eventMap == null) { 42 if (eventMap == null) {
50 return null; 43 return null;
51 } 44 }
52 - for (JSONObject key : eventMap.keySet()) { 45 + for (IHookSubscribe key : eventMap.keySet()) {
53 Boolean result = null; 46 Boolean result = null;
54 - for (String s : key.keySet()) { 47 + for (String s : key.getContent().keySet()) {
55 if (result == null) { 48 if (result == null) {
56 - result = key.getString(s).equals(hookResponse.getString(s)); 49 + result = key.getContent().getString(s).equals(hookResponse.getString(s));
57 }else { 50 }else {
58 - if (key.getString(s) == null) { 51 + if (key.getContent().getString(s) == null) {
59 continue; 52 continue;
60 } 53 }
61 - result = result && key.getString(s).equals(hookResponse.getString(s)); 54 + result = result && key.getContent().getString(s).equals(hookResponse.getString(s));
62 } 55 }
63 -  
64 } 56 }
65 if (null != result && result) { 57 if (null != result && result) {
66 event = eventMap.get(key); 58 event = eventMap.get(key);
@@ -69,26 +61,30 @@ public class ZLMHttpHookSubscribe { @@ -69,26 +61,30 @@ public class ZLMHttpHookSubscribe {
69 return event; 61 return event;
70 } 62 }
71 63
72 - public void removeSubscribe(HookType type, JSONObject hookResponse) {  
73 - Map<JSONObject, Event> eventMap = allSubscribes.get(type); 64 + public void removeSubscribe(IHookSubscribe hookSubscribe) {
  65 + Map<IHookSubscribe, Event> eventMap = allSubscribes.get(hookSubscribe.getHookType());
74 if (eventMap == null) { 66 if (eventMap == null) {
75 return; 67 return;
76 } 68 }
77 69
78 - Set<Map.Entry<JSONObject, Event>> entries = eventMap.entrySet(); 70 + Set<Map.Entry<IHookSubscribe, Event>> entries = eventMap.entrySet();
79 if (entries.size() > 0) { 71 if (entries.size() > 0) {
80 - List<Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();  
81 - for (Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event> entry : entries) {  
82 - JSONObject key = entry.getKey(); 72 + List<Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
  73 + for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entries) {
  74 + JSONObject content = entry.getKey().getContent();
  75 + if (content == null || content.size() == 0) {
  76 + entriesToRemove.add(entry);
  77 + continue;
  78 + }
83 Boolean result = null; 79 Boolean result = null;
84 - for (String s : key.keySet()) { 80 + for (String s : content.keySet()) {
85 if (result == null) { 81 if (result == null) {
86 - result = key.getString(s).equals(hookResponse.getString(s)); 82 + result = content.getString(s).equals(hookSubscribe.getContent().getString(s));
87 }else { 83 }else {
88 - if (key.getString(s) == null) { 84 + if (content.getString(s) == null) {
89 continue; 85 continue;
90 } 86 }
91 - result = result && key.getString(s).equals(hookResponse.getString(s)); 87 + result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s));
92 } 88 }
93 } 89 }
94 if (null != result && result){ 90 if (null != result && result){
@@ -97,7 +93,7 @@ public class ZLMHttpHookSubscribe { @@ -97,7 +93,7 @@ public class ZLMHttpHookSubscribe {
97 } 93 }
98 94
99 if (!CollectionUtils.isEmpty(entriesToRemove)) { 95 if (!CollectionUtils.isEmpty(entriesToRemove)) {
100 - for (Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) { 96 + for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) {
101 entries.remove(entry); 97 entries.remove(entry);
102 } 98 }
103 } 99 }
@@ -111,17 +107,25 @@ public class ZLMHttpHookSubscribe { @@ -111,17 +107,25 @@ public class ZLMHttpHookSubscribe {
111 * @return 107 * @return
112 */ 108 */
113 public List<ZLMHttpHookSubscribe.Event> getSubscribes(HookType type) { 109 public List<ZLMHttpHookSubscribe.Event> getSubscribes(HookType type) {
114 - // ZLMHttpHookSubscribe.Event event= null;  
115 - Map<JSONObject, Event> eventMap = allSubscribes.get(type); 110 + Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
116 if (eventMap == null) { 111 if (eventMap == null) {
117 return null; 112 return null;
118 } 113 }
119 List<ZLMHttpHookSubscribe.Event> result = new ArrayList<>(); 114 List<ZLMHttpHookSubscribe.Event> result = new ArrayList<>();
120 - for (JSONObject key : eventMap.keySet()) { 115 + for (IHookSubscribe key : eventMap.keySet()) {
121 result.add(eventMap.get(key)); 116 result.add(eventMap.get(key));
122 } 117 }
123 return result; 118 return result;
124 } 119 }
125 120
  121 + public List<IHookSubscribe> getAll(){
  122 + ArrayList<IHookSubscribe> result = new ArrayList<>();
  123 + Collection<Map<IHookSubscribe, Event>> values = allSubscribes.values();
  124 + for (Map<IHookSubscribe, Event> value : values) {
  125 + result.addAll(value.keySet());
  126 + }
  127 + return result;
  128 + }
  129 +
126 130
127 } 131 }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
@@ -6,22 +6,22 @@ import com.alibaba.fastjson.JSONObject; @@ -6,22 +6,22 @@ import com.alibaba.fastjson.JSONObject;
6 import com.genersoft.iot.vmp.conf.DynamicTask; 6 import com.genersoft.iot.vmp.conf.DynamicTask;
7 import com.genersoft.iot.vmp.conf.MediaConfig; 7 import com.genersoft.iot.vmp.conf.MediaConfig;
8 import com.genersoft.iot.vmp.gb28181.event.EventPublisher; 8 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  9 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
  10 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted;
  11 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
9 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 12 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
10 import com.genersoft.iot.vmp.service.IMediaServerService; 13 import com.genersoft.iot.vmp.service.IMediaServerService;
11 -import com.genersoft.iot.vmp.service.IStreamProxyService;  
12 -import com.genersoft.iot.vmp.storager.IRedisCatchStorage;  
13 import org.slf4j.Logger; 14 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory; 15 import org.slf4j.LoggerFactory;
15 import org.springframework.beans.factory.annotation.Autowired; 16 import org.springframework.beans.factory.annotation.Autowired;
16 -import org.springframework.beans.factory.annotation.Qualifier;  
17 import org.springframework.boot.CommandLineRunner; 17 import org.springframework.boot.CommandLineRunner;
18 import org.springframework.core.annotation.Order; 18 import org.springframework.core.annotation.Order;
19 import org.springframework.scheduling.annotation.Async; 19 import org.springframework.scheduling.annotation.Async;
20 -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;  
21 import org.springframework.stereotype.Component; 20 import org.springframework.stereotype.Component;
22 -import org.springframework.util.StringUtils;  
23 21
  22 +import java.time.Instant;
24 import java.util.*; 23 import java.util.*;
  24 +import java.util.concurrent.TimeUnit;
25 25
26 @Component 26 @Component
27 @Order(value=1) 27 @Order(value=1)
@@ -38,18 +38,12 @@ public class ZLMRunner implements CommandLineRunner { @@ -38,18 +38,12 @@ public class ZLMRunner implements CommandLineRunner {
38 private ZLMHttpHookSubscribe hookSubscribe; 38 private ZLMHttpHookSubscribe hookSubscribe;
39 39
40 @Autowired 40 @Autowired
41 - private IStreamProxyService streamProxyService;  
42 -  
43 - @Autowired  
44 private EventPublisher publisher; 41 private EventPublisher publisher;
45 42
46 @Autowired 43 @Autowired
47 private IMediaServerService mediaServerService; 44 private IMediaServerService mediaServerService;
48 45
49 @Autowired 46 @Autowired
50 - private IRedisCatchStorage redisCatchStorage;  
51 -  
52 - @Autowired  
53 private MediaConfig mediaConfig; 47 private MediaConfig mediaConfig;
54 48
55 @Autowired 49 @Autowired
@@ -67,17 +61,25 @@ public class ZLMRunner implements CommandLineRunner { @@ -67,17 +61,25 @@ public class ZLMRunner implements CommandLineRunner {
67 mediaServerService.updateToDatabase(mediaSerItem); 61 mediaServerService.updateToDatabase(mediaSerItem);
68 } 62 }
69 mediaServerService.syncCatchFromDatabase(); 63 mediaServerService.syncCatchFromDatabase();
  64 + HookSubscribeForServerStarted hookSubscribeForServerStarted = HookSubscribeFactory.on_server_started();
  65 +// Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.SECONDS.toSeconds(60));
  66 +// hookSubscribeForStreamChange.setExpires(expiresInstant);
70 // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 67 // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
71 - hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started,new JSONObject(), 68 + hookSubscribe.addSubscribe(hookSubscribeForServerStarted,
72 (MediaServerItem mediaServerItem, JSONObject response)->{ 69 (MediaServerItem mediaServerItem, JSONObject response)->{
73 ZLMServerConfig zlmServerConfig = JSONObject.toJavaObject(response, ZLMServerConfig.class); 70 ZLMServerConfig zlmServerConfig = JSONObject.toJavaObject(response, ZLMServerConfig.class);
74 if (zlmServerConfig !=null ) { 71 if (zlmServerConfig !=null ) {
75 if (startGetMedia != null) { 72 if (startGetMedia != null) {
76 startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId()); 73 startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId());
  74 + if (startGetMedia.size() == 0) {
  75 + hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
  76 + }
77 } 77 }
78 } 78 }
79 }); 79 });
80 80
  81 +
  82 +
81 // 获取zlm信息 83 // 获取zlm信息
82 logger.info("[zlm] 等待默认zlm中..."); 84 logger.info("[zlm] 等待默认zlm中...");
83 85
@@ -103,7 +105,6 @@ public class ZLMRunner implements CommandLineRunner { @@ -103,7 +105,6 @@ public class ZLMRunner implements CommandLineRunner {
103 } 105 }
104 startGetMedia = null; 106 startGetMedia = null;
105 } 107 }
106 - hookSubscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started, new JSONObject());  
107 // TODO 清理数据库中与redis不匹配的zlm 108 // TODO 清理数据库中与redis不匹配的zlm
108 }, 60 * 1000 ); 109 }, 60 * 1000 );
109 } 110 }
@@ -116,6 +117,9 @@ public class ZLMRunner implements CommandLineRunner { @@ -116,6 +117,9 @@ public class ZLMRunner implements CommandLineRunner {
116 zlmServerConfigFirst.setIp(mediaServerItem.getIp()); 117 zlmServerConfigFirst.setIp(mediaServerItem.getIp());
117 zlmServerConfigFirst.setHttpPort(mediaServerItem.getHttpPort()); 118 zlmServerConfigFirst.setHttpPort(mediaServerItem.getHttpPort());
118 startGetMedia.remove(mediaServerItem.getId()); 119 startGetMedia.remove(mediaServerItem.getId());
  120 + if (startGetMedia.size() == 0) {
  121 + hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
  122 + }
119 mediaServerService.zlmServerOnline(zlmServerConfigFirst); 123 mediaServerService.zlmServerOnline(zlmServerConfigFirst);
120 }else { 124 }else {
121 logger.info("[ {} ]-[ {}:{} ]主动连接失败, 清理相关资源, 开始尝试重试连接", 125 logger.info("[ {} ]-[ {}:{} ]主动连接失败, 清理相关资源, 开始尝试重试连接",
@@ -130,6 +134,9 @@ public class ZLMRunner implements CommandLineRunner { @@ -130,6 +134,9 @@ public class ZLMRunner implements CommandLineRunner {
130 zlmServerConfig.setIp(mediaServerItem.getIp()); 134 zlmServerConfig.setIp(mediaServerItem.getIp());
131 zlmServerConfig.setHttpPort(mediaServerItem.getHttpPort()); 135 zlmServerConfig.setHttpPort(mediaServerItem.getHttpPort());
132 startGetMedia.remove(mediaServerItem.getId()); 136 startGetMedia.remove(mediaServerItem.getId());
  137 + if (startGetMedia.size() == 0) {
  138 + hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
  139 + }
133 mediaServerService.zlmServerOnline(zlmServerConfig); 140 mediaServerService.zlmServerOnline(zlmServerConfig);
134 } 141 }
135 }, 2000); 142 }, 2000);
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java 0 → 100644
  1 +package com.genersoft.iot.vmp.media.zlm.dto;
  2 +
  3 +
  4 +import com.alibaba.fastjson.JSONObject;
  5 +
  6 +/**
  7 + * hook 订阅工厂
  8 + * @author lin
  9 + */
  10 +public class HookSubscribeFactory {
  11 +
  12 + public static HookSubscribeForStreamChange on_stream_changed(String app, String stream, boolean regist, String scheam, String mediaServerId) {
  13 + HookSubscribeForStreamChange hookSubscribe = new HookSubscribeForStreamChange();
  14 + JSONObject subscribeKey = new com.alibaba.fastjson.JSONObject();
  15 + subscribeKey.put("app", app);
  16 + subscribeKey.put("stream", stream);
  17 + subscribeKey.put("regist", regist);
  18 + if (scheam != null) {
  19 + subscribeKey.put("schema", scheam);
  20 + }
  21 + subscribeKey.put("mediaServerId", mediaServerId);
  22 + hookSubscribe.setContent(subscribeKey);
  23 +
  24 + return hookSubscribe;
  25 + }
  26 +
  27 + public static HookSubscribeForServerStarted on_server_started() {
  28 + HookSubscribeForServerStarted hookSubscribe = new HookSubscribeForServerStarted();
  29 + hookSubscribe.setContent(new JSONObject());
  30 +
  31 + return hookSubscribe;
  32 + }
  33 +}
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java 0 → 100644
  1 +package com.genersoft.iot.vmp.media.zlm.dto;
  2 +
  3 +import com.alibaba.fastjson.JSONObject;
  4 +import com.alibaba.fastjson.annotation.JSONField;
  5 +
  6 +import java.time.Instant;
  7 +
  8 +/**
  9 + * hook订阅-流变化
  10 + * @author lin
  11 + */
  12 +public class HookSubscribeForServerStarted implements IHookSubscribe{
  13 +
  14 + private HookType hookType = HookType.on_server_started;
  15 +
  16 + private JSONObject content;
  17 +
  18 + @JSONField(format="yyyy-MM-dd HH:mm:ss")
  19 + private Instant expires;
  20 +
  21 + @Override
  22 + public HookType getHookType() {
  23 + return hookType;
  24 + }
  25 +
  26 + @Override
  27 + public JSONObject getContent() {
  28 + return content;
  29 + }
  30 +
  31 + public void setContent(JSONObject content) {
  32 + this.content = content;
  33 + }
  34 +
  35 + @Override
  36 + public Instant getExpires() {
  37 + return expires;
  38 + }
  39 +
  40 + @Override
  41 + public void setExpires(Instant expires) {
  42 + this.expires = expires;
  43 + }
  44 +}
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java 0 → 100644
  1 +package com.genersoft.iot.vmp.media.zlm.dto;
  2 +
  3 +import com.alibaba.fastjson.JSONObject;
  4 +import com.alibaba.fastjson.annotation.JSONField;
  5 +
  6 +import java.time.Instant;
  7 +
  8 +/**
  9 + * hook订阅-流变化
  10 + * @author lin
  11 + */
  12 +public class HookSubscribeForStreamChange implements IHookSubscribe{
  13 +
  14 + private HookType hookType = HookType.on_stream_changed;
  15 +
  16 + private JSONObject content;
  17 +
  18 + private Instant expires;
  19 +
  20 + @Override
  21 + public HookType getHookType() {
  22 + return hookType;
  23 + }
  24 +
  25 + @Override
  26 + public JSONObject getContent() {
  27 + return content;
  28 + }
  29 +
  30 + public void setContent(JSONObject content) {
  31 + this.content = content;
  32 + }
  33 +
  34 + @Override
  35 + public Instant getExpires() {
  36 + return expires;
  37 + }
  38 +
  39 + @Override
  40 + public void setExpires(Instant expires) {
  41 + this.expires = expires;
  42 + }
  43 +}
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java 0 → 100644
  1 +package com.genersoft.iot.vmp.media.zlm.dto;
  2 +
  3 +/**
  4 + * hook类型
  5 + * @author lin
  6 + */
  7 +
  8 +public enum HookType {
  9 +
  10 + on_flow_report,
  11 + on_http_access,
  12 + on_play,
  13 + on_publish,
  14 + on_record_mp4,
  15 + on_rtsp_auth,
  16 + on_rtsp_realm,
  17 + on_shell_login,
  18 + on_stream_changed,
  19 + on_stream_none_reader,
  20 + on_stream_not_found,
  21 + on_server_started,
  22 + on_server_keepalive
  23 +}
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java 0 → 100644
  1 +package com.genersoft.iot.vmp.media.zlm.dto;
  2 +
  3 +import com.alibaba.fastjson.JSONObject;
  4 +
  5 +import java.time.Instant;
  6 +
  7 +/**
  8 + * zlm hook事件的参数
  9 + * @author lin
  10 + */
  11 +public interface IHookSubscribe {
  12 +
  13 + /**
  14 + * 获取hook类型
  15 + * @return hook类型
  16 + */
  17 + HookType getHookType();
  18 +
  19 + /**
  20 + * 获取hook的具体内容
  21 + * @return hook的具体内容
  22 + */
  23 + JSONObject getContent();
  24 +
  25 + /**
  26 + * 设置过期时间
  27 + * @param instant 过期时间
  28 + */
  29 + void setExpires(Instant instant);
  30 +
  31 + /**
  32 + * 获取过期时间
  33 + * @return 过期时间
  34 + */
  35 + Instant getExpires();
  36 +}
src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java deleted 100644 → 0
1 -package com.genersoft.iot.vmp.service;  
2 -  
3 -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;  
4 -import com.genersoft.iot.vmp.storager.IRedisCatchStorage;  
5 -import com.genersoft.iot.vmp.storager.IVideoManagerStorage;  
6 -import org.springframework.beans.factory.annotation.Autowired;  
7 -import org.springframework.scheduling.annotation.Scheduled;  
8 -import org.springframework.stereotype.Component;  
9 -  
10 -import java.util.List;  
11 -  
12 -  
13 -/**  
14 - * 定时查找redis中的GPS推送消息,并保存到对应的流中  
15 - */  
16 -@Component  
17 -public class StreamGPSSubscribeTask {  
18 -  
19 - @Autowired  
20 - private IRedisCatchStorage redisCatchStorage;  
21 -  
22 - @Autowired  
23 - private IVideoManagerStorage storager;  
24 -  
25 -  
26 - @Scheduled(fixedRate = 30 * 1000) //每30秒执行一次  
27 - public void execute(){  
28 - List<GPSMsgInfo> gpsMsgInfo = redisCatchStorage.getAllGpsMsgInfo();  
29 - if (gpsMsgInfo.size() > 0) {  
30 - storager.updateStreamGPS(gpsMsgInfo);  
31 - for (GPSMsgInfo msgInfo : gpsMsgInfo) {  
32 - msgInfo.setStored(true);  
33 - redisCatchStorage.updateGpsMsgInfo(msgInfo);  
34 - }  
35 - }  
36 -  
37 - }  
38 -}  
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -35,6 +35,10 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; @@ -35,6 +35,10 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
35 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; 35 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
36 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; 36 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
37 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; 37 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
  38 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
  39 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
  40 +import com.genersoft.iot.vmp.media.zlm.dto.HookType;
  41 +import com.genersoft.iot.vmp.utils.DateUtil;
38 import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; 42 import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
39 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; 43 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
40 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; 44 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
@@ -305,16 +309,10 @@ public class PlayServiceImpl implements IPlayService { @@ -305,16 +309,10 @@ public class PlayServiceImpl implements IPlayService {
305 // 单端口模式streamId也有变化,需要重新设置监听 309 // 单端口模式streamId也有变化,需要重新设置监听
306 if (!mediaServerItem.isRtpEnable()) { 310 if (!mediaServerItem.isRtpEnable()) {
307 // 添加订阅 311 // 添加订阅
308 - JSONObject subscribeKey = new JSONObject();  
309 - subscribeKey.put("app", "rtp");  
310 - subscribeKey.put("stream", stream);  
311 - subscribeKey.put("regist", true);  
312 - subscribeKey.put("schema", "rtmp");  
313 - subscribeKey.put("mediaServerId", mediaServerItem.getId());  
314 - subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed,subscribeKey);  
315 - subscribeKey.put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());  
316 - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,  
317 - (MediaServerItem mediaServerItemInUse, JSONObject response)->{ 312 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId());
  313 + subscribe.removeSubscribe(hookSubscribe);
  314 + hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
  315 + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{
318 logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); 316 logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
319 dynamicTask.stop(timeOutTaskKey); 317 dynamicTask.stop(timeOutTaskKey);
320 // hook响应 318 // hook响应
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java
@@ -8,6 +8,9 @@ import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; @@ -8,6 +8,9 @@ import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
8 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; 8 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
9 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; 9 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
10 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; 10 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
  11 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
  12 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
  13 +import com.genersoft.iot.vmp.media.zlm.dto.HookType;
11 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 14 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
12 import com.genersoft.iot.vmp.service.IMediaServerService; 15 import com.genersoft.iot.vmp.service.IMediaServerService;
13 import com.genersoft.iot.vmp.service.bean.*; 16 import com.genersoft.iot.vmp.service.bean.*;
@@ -270,14 +273,9 @@ public class RedisGbPlayMsgListener implements MessageListener { @@ -270,14 +273,9 @@ public class RedisGbPlayMsgListener implements MessageListener {
270 }, userSetting.getPlatformPlayTimeout()); 273 }, userSetting.getPlatformPlayTimeout());
271 274
272 // 添加订阅 275 // 添加订阅
273 - JSONObject subscribeKey = new JSONObject();  
274 - subscribeKey.put("app", content.getApp());  
275 - subscribeKey.put("stream", content.getStream());  
276 - subscribeKey.put("regist", true);  
277 - subscribeKey.put("schema", "rtmp");  
278 - subscribeKey.put("mediaServerId", mediaServerItem.getId());  
279 - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,  
280 - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ 276 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtmp", mediaServerItem.getId());
  277 +
  278 + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
281 dynamicTask.stop(taskKey); 279 dynamicTask.stop(taskKey);
282 responseSendItem(mediaServerItem, content, toId, serial); 280 responseSendItem(mediaServerItem, content, toId, serial);
283 }); 281 });
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java
@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON; @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
4 import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; 4 import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
5 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; 5 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
6 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 6 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  7 +import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
7 import org.jetbrains.annotations.NotNull; 8 import org.jetbrains.annotations.NotNull;
8 import org.slf4j.Logger; 9 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory; 10 import org.slf4j.LoggerFactory;
@@ -11,9 +12,11 @@ import org.springframework.beans.factory.annotation.Autowired; @@ -11,9 +12,11 @@ import org.springframework.beans.factory.annotation.Autowired;
11 import org.springframework.beans.factory.annotation.Qualifier; 12 import org.springframework.beans.factory.annotation.Qualifier;
12 import org.springframework.data.redis.connection.Message; 13 import org.springframework.data.redis.connection.Message;
13 import org.springframework.data.redis.connection.MessageListener; 14 import org.springframework.data.redis.connection.MessageListener;
  15 +import org.springframework.scheduling.annotation.Scheduled;
14 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 16 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
15 import org.springframework.stereotype.Component; 17 import org.springframework.stereotype.Component;
16 18
  19 +import java.util.List;
17 import java.util.concurrent.ConcurrentLinkedQueue; 20 import java.util.concurrent.ConcurrentLinkedQueue;
18 21
19 /** 22 /**
@@ -30,6 +33,9 @@ public class RedisGpsMsgListener implements MessageListener { @@ -30,6 +33,9 @@ public class RedisGpsMsgListener implements MessageListener {
30 @Autowired 33 @Autowired
31 private IRedisCatchStorage redisCatchStorage; 34 private IRedisCatchStorage redisCatchStorage;
32 35
  36 + @Autowired
  37 + private IVideoManagerStorage storager;
  38 +
33 private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); 39 private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
34 40
35 @Qualifier("taskExecutor") 41 @Qualifier("taskExecutor")
@@ -46,10 +52,26 @@ public class RedisGpsMsgListener implements MessageListener { @@ -46,10 +52,26 @@ public class RedisGpsMsgListener implements MessageListener {
46 while (!taskQueue.isEmpty()) { 52 while (!taskQueue.isEmpty()) {
47 Message msg = taskQueue.poll(); 53 Message msg = taskQueue.poll();
48 GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class); 54 GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class);
  55 + // 只是放入redis缓存起来
49 redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); 56 redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
50 } 57 }
51 taskQueueHandlerRun = false; 58 taskQueueHandlerRun = false;
52 }); 59 });
53 } 60 }
54 } 61 }
  62 +
  63 + /**
  64 + * 定时将经纬度更新到数据库
  65 + */
  66 + @Scheduled(fixedRate = 2 * 1000) //每2秒执行一次
  67 + public void execute(){
  68 + List<GPSMsgInfo> gpsMsgInfo = redisCatchStorage.getAllGpsMsgInfo();
  69 + if (gpsMsgInfo.size() > 0) {
  70 + storager.updateStreamGPS(gpsMsgInfo);
  71 + for (GPSMsgInfo msgInfo : gpsMsgInfo) {
  72 + msgInfo.setStored(true);
  73 + redisCatchStorage.updateGpsMsgInfo(msgInfo);
  74 + }
  75 + }
  76 + }
55 } 77 }
src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java
@@ -8,6 +8,8 @@ import com.genersoft.iot.vmp.conf.DynamicTask; @@ -8,6 +8,8 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
8 import com.genersoft.iot.vmp.conf.SipConfig; 8 import com.genersoft.iot.vmp.conf.SipConfig;
9 import com.genersoft.iot.vmp.conf.UserSetting; 9 import com.genersoft.iot.vmp.conf.UserSetting;
10 import com.genersoft.iot.vmp.conf.VersionInfo; 10 import com.genersoft.iot.vmp.conf.VersionInfo;
  11 +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
  12 +import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
11 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 13 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
12 import com.genersoft.iot.vmp.service.IMediaServerService; 14 import com.genersoft.iot.vmp.service.IMediaServerService;
13 import com.genersoft.iot.vmp.utils.SpringBeanFactory; 15 import com.genersoft.iot.vmp.utils.SpringBeanFactory;
@@ -38,7 +40,7 @@ import java.util.Set; @@ -38,7 +40,7 @@ import java.util.Set;
38 public class ServerController { 40 public class ServerController {
39 41
40 @Autowired 42 @Autowired
41 - private ConfigurableApplicationContext context; 43 + private ZLMHttpHookSubscribe zlmHttpHookSubscribe;
42 44
43 @Autowired 45 @Autowired
44 private IMediaServerService mediaServerService; 46 private IMediaServerService mediaServerService;
@@ -254,6 +256,18 @@ public class ServerController { @@ -254,6 +256,18 @@ public class ServerController {
254 return result; 256 return result;
255 } 257 }
256 258
  259 + @ApiOperation("获取当前所有hook")
  260 + @GetMapping(value = "/hooks")
  261 + @ResponseBody
  262 + public WVPResult<List<IHookSubscribe>> getHooks(){
  263 + WVPResult<List<IHookSubscribe>> result = new WVPResult<>();
  264 + result.setCode(0);
  265 + result.setMsg("success");
  266 + List<IHookSubscribe> all = zlmHttpHookSubscribe.getAll();
  267 + result.setData(all);
  268 + return result;
  269 + }
  270 +
257 // @ApiOperation("当前进行中的动态任务") 271 // @ApiOperation("当前进行中的动态任务")
258 // @GetMapping(value = "/dynamicTask") 272 // @GetMapping(value = "/dynamicTask")
259 // @ResponseBody 273 // @ResponseBody