Commit ac1a4a027a7bd88efb32e9da666bdba4b5fa166f

Authored by 648540858
1 parent 36fda97e

支持国标级联的目录订阅功能

Showing 31 changed files with 948 additions and 91 deletions
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java
@@ -18,13 +18,13 @@ public class SubscribeInfo { @@ -18,13 +18,13 @@ public class SubscribeInfo {
18 this.fromTag = fromHeader.getTag(); 18 this.fromTag = fromHeader.getTag();
19 ExpiresHeader expiresHeader = (ExpiresHeader)request.getHeader(ExpiresHeader.NAME); 19 ExpiresHeader expiresHeader = (ExpiresHeader)request.getHeader(ExpiresHeader.NAME);
20 this.expires = expiresHeader.getExpires(); 20 this.expires = expiresHeader.getExpires();
21 - this.event = (EventHeader)request.getHeader(EventHeader.NAME); 21 + this.event = ((EventHeader)request.getHeader(EventHeader.NAME)).getName();
22 } 22 }
23 23
24 private String id; 24 private String id;
25 private int expires; 25 private int expires;
26 private String callId; 26 private String callId;
27 - private EventHeader event; 27 + private String event;
28 private String fromTag; 28 private String fromTag;
29 private String toTag; 29 private String toTag;
30 30
@@ -40,10 +40,6 @@ public class SubscribeInfo { @@ -40,10 +40,6 @@ public class SubscribeInfo {
40 return callId; 40 return callId;
41 } 41 }
42 42
43 - public EventHeader getEvent() {  
44 - return event;  
45 - }  
46 -  
47 public String getFromTag() { 43 public String getFromTag() {
48 return fromTag; 44 return fromTag;
49 } 45 }
@@ -68,11 +64,15 @@ public class SubscribeInfo { @@ -68,11 +64,15 @@ public class SubscribeInfo {
68 this.callId = callId; 64 this.callId = callId;
69 } 65 }
70 66
71 - public void setEvent(EventHeader event) {  
72 - this.event = event;  
73 - }  
74 -  
75 public void setFromTag(String fromTag) { 67 public void setFromTag(String fromTag) {
76 this.fromTag = fromTag; 68 this.fromTag = fromTag;
77 } 69 }
  70 +
  71 + public String getEvent() {
  72 + return event;
  73 + }
  74 +
  75 + public void setEvent(String event) {
  76 + this.event = event;
  77 + }
78 } 78 }
src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
1 package com.genersoft.iot.vmp.gb28181.event; 1 package com.genersoft.iot.vmp.gb28181.event;
2 2
3 import com.genersoft.iot.vmp.gb28181.bean.Device; 3 import com.genersoft.iot.vmp.gb28181.bean.Device;
  4 +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
  5 +import com.genersoft.iot.vmp.gb28181.bean.GbStream;
4 import com.genersoft.iot.vmp.gb28181.event.offline.OfflineEvent; 6 import com.genersoft.iot.vmp.gb28181.event.offline.OfflineEvent;
5 import com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire.PlatformKeepaliveExpireEvent; 7 import com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire.PlatformKeepaliveExpireEvent;
6 import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformNotRegisterEvent; 8 import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformNotRegisterEvent;
  9 +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
7 import com.genersoft.iot.vmp.media.zlm.event.ZLMOfflineEvent; 10 import com.genersoft.iot.vmp.media.zlm.event.ZLMOfflineEvent;
8 import com.genersoft.iot.vmp.media.zlm.event.ZLMOnlineEvent; 11 import com.genersoft.iot.vmp.media.zlm.event.ZLMOnlineEvent;
9 import org.springframework.beans.factory.annotation.Autowired; 12 import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.context.ApplicationEventPublisher; 13 import org.springframework.context.ApplicationEventPublisher;
  14 +import org.springframework.scheduling.annotation.Async;
11 import org.springframework.stereotype.Component; 15 import org.springframework.stereotype.Component;
12 16
13 import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; 17 import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
14 import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEvent; 18 import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEvent;
15 import com.genersoft.iot.vmp.gb28181.event.online.OnlineEvent; 19 import com.genersoft.iot.vmp.gb28181.event.online.OnlineEvent;
16 20
  21 +import java.util.ArrayList;
  22 +import java.util.HashSet;
  23 +import java.util.List;
  24 +import java.util.Set;
  25 +
17 /** 26 /**
18 * @description:Event事件通知推送器,支持推送在线事件、离线事件 27 * @description:Event事件通知推送器,支持推送在线事件、离线事件
19 * @author: swwheihei 28 * @author: swwheihei
@@ -80,4 +89,49 @@ public class EventPublisher { @@ -80,4 +89,49 @@ public class EventPublisher {
80 outEvent.setMediaServerId(mediaServerId); 89 outEvent.setMediaServerId(mediaServerId);
81 applicationEventPublisher.publishEvent(outEvent); 90 applicationEventPublisher.publishEvent(outEvent);
82 } 91 }
  92 +
  93 + @Async
  94 + public void catalogEventPublish(String platformId, DeviceChannel deviceChannel, String type) {
  95 + List<DeviceChannel> deviceChannelList = new ArrayList<>();
  96 + deviceChannelList.add(deviceChannel);
  97 + catalogEventPublish(platformId, deviceChannelList, type);
  98 + }
  99 +
  100 + @Async
  101 + public void catalogEventPublish(String platformId, List<DeviceChannel> deviceChannels, String type) {
  102 + CatalogEvent outEvent = new CatalogEvent(this);
  103 + List<DeviceChannel> channels = new ArrayList<>();
  104 + if (deviceChannels.size() > 1) {
  105 + // 数据去重
  106 + Set<String> gbIdSet = new HashSet<>();
  107 + for (DeviceChannel deviceChannel : deviceChannels) {
  108 + if (!gbIdSet.contains(deviceChannel.getChannelId())) {
  109 + gbIdSet.add(deviceChannel.getChannelId());
  110 + channels.add(deviceChannel);
  111 + }
  112 + }
  113 + }else {
  114 + channels = deviceChannels;
  115 + }
  116 + outEvent.setDeviceChannels(channels);
  117 + outEvent.setType(type);
  118 + outEvent.setPlatformId(platformId);
  119 + applicationEventPublisher.publishEvent(outEvent);
  120 + }
  121 +
  122 + @Async
  123 + public void catalogEventPublishForStream(String platformId, List<GbStream> gbStreams, String type) {
  124 + CatalogEvent outEvent = new CatalogEvent(this);
  125 + outEvent.setGbStreams(gbStreams);
  126 + outEvent.setType(type);
  127 + outEvent.setPlatformId(platformId);
  128 + applicationEventPublisher.publishEvent(outEvent);
  129 + }
  130 +
  131 + @Async
  132 + public void catalogEventPublishForStream(String platformId, GbStream gbStream, String type) {
  133 + List<GbStream> gbStreamList = new ArrayList<>();
  134 + gbStreamList.add(gbStream);
  135 + catalogEventPublishForStream(platformId, gbStreamList, type);
  136 + }
83 } 137 }
src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java
@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.event.offline; @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.event.offline;
2 2
3 import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener; 3 import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
4 import com.genersoft.iot.vmp.conf.UserSetup; 4 import com.genersoft.iot.vmp.conf.UserSetup;
  5 +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
5 import org.slf4j.Logger; 6 import org.slf4j.Logger;
6 import org.slf4j.LoggerFactory; 7 import org.slf4j.LoggerFactory;
7 import org.springframework.beans.factory.InitializingBean; 8 import org.springframework.beans.factory.InitializingBean;
@@ -35,6 +36,9 @@ public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEvent @@ -35,6 +36,9 @@ public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEvent
35 @Autowired 36 @Autowired
36 private UserSetup userSetup; 37 private UserSetup userSetup;
37 38
  39 + @Autowired
  40 + private SipSubscribe sipSubscribe;
  41 +
38 public KeepaliveTimeoutListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) { 42 public KeepaliveTimeoutListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) {
39 super(listenerContainer, userSetup); 43 super(listenerContainer, userSetup);
40 } 44 }
@@ -54,6 +58,7 @@ public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEvent @@ -54,6 +58,7 @@ public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEvent
54 String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.PLATFORM_KEEPALIVE_PREFIX + userSetup.getServerId() + "_"; 58 String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.PLATFORM_KEEPALIVE_PREFIX + userSetup.getServerId() + "_";
55 String PLATFORM_REGISTER_PREFIX = VideoManagerConstants.PLATFORM_REGISTER_PREFIX + userSetup.getServerId() + "_"; 59 String PLATFORM_REGISTER_PREFIX = VideoManagerConstants.PLATFORM_REGISTER_PREFIX + userSetup.getServerId() + "_";
56 String KEEPLIVEKEY_PREFIX = VideoManagerConstants.KEEPLIVEKEY_PREFIX + userSetup.getServerId() + "_"; 60 String KEEPLIVEKEY_PREFIX = VideoManagerConstants.KEEPLIVEKEY_PREFIX + userSetup.getServerId() + "_";
  61 + String REGISTER_INFO_PREFIX = VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetup.getServerId() + "_";
57 if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) { 62 if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) {
58 String platformGBId = expiredKey.substring(PLATFORM_KEEPLIVEKEY_PREFIX.length(),expiredKey.length()); 63 String platformGBId = expiredKey.substring(PLATFORM_KEEPLIVEKEY_PREFIX.length(),expiredKey.length());
59 64
@@ -65,6 +70,13 @@ public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEvent @@ -65,6 +70,13 @@ public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEvent
65 }else if (expiredKey.startsWith(KEEPLIVEKEY_PREFIX)){ 70 }else if (expiredKey.startsWith(KEEPLIVEKEY_PREFIX)){
66 String deviceId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length()); 71 String deviceId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length());
67 publisher.outlineEventPublish(deviceId, KEEPLIVEKEY_PREFIX); 72 publisher.outlineEventPublish(deviceId, KEEPLIVEKEY_PREFIX);
  73 + }else if (expiredKey.startsWith(REGISTER_INFO_PREFIX)) {
  74 + String callid = expiredKey.substring(REGISTER_INFO_PREFIX.length());
  75 + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
  76 + eventResult.callId = callid;
  77 + eventResult.msg = "注册超时";
  78 + eventResult.type = "register timeout";
  79 + sipSubscribe.getErrorSubscribe(callid).response(eventResult);
68 } 80 }
69 81
70 } 82 }
src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java
1 package com.genersoft.iot.vmp.gb28181.event.offline; 1 package com.genersoft.iot.vmp.gb28181.event.offline;
2 2
3 import com.genersoft.iot.vmp.conf.UserSetup; 3 import com.genersoft.iot.vmp.conf.UserSetup;
  4 +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
  5 +import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  6 +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
4 import org.slf4j.Logger; 7 import org.slf4j.Logger;
5 import org.slf4j.LoggerFactory; 8 import org.slf4j.LoggerFactory;
6 import org.springframework.beans.factory.annotation.Autowired; 9 import org.springframework.beans.factory.annotation.Autowired;
@@ -13,6 +16,8 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; @@ -13,6 +16,8 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
13 import com.genersoft.iot.vmp.storager.IVideoManagerStorager; 16 import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
14 import com.genersoft.iot.vmp.utils.redis.RedisUtil; 17 import com.genersoft.iot.vmp.utils.redis.RedisUtil;
15 18
  19 +import java.util.List;
  20 +
16 /** 21 /**
17 * @description: 离线事件监听器,监听到离线后,修改设备离在线状态。 设备离线有两个来源: 22 * @description: 离线事件监听器,监听到离线后,修改设备离在线状态。 设备离线有两个来源:
18 * 1、设备主动注销,发送注销指令,{@link com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.RegisterRequestProcessor} 23 * 1、设备主动注销,发送注销指令,{@link com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.RegisterRequestProcessor}
@@ -34,6 +39,9 @@ public class OfflineEventListener implements ApplicationListener&lt;OfflineEvent&gt; { @@ -34,6 +39,9 @@ public class OfflineEventListener implements ApplicationListener&lt;OfflineEvent&gt; {
34 @Autowired 39 @Autowired
35 private UserSetup userSetup; 40 private UserSetup userSetup;
36 41
  42 + @Autowired
  43 + private EventPublisher eventPublisher;
  44 +
37 @Override 45 @Override
38 public void onApplicationEvent(OfflineEvent event) { 46 public void onApplicationEvent(OfflineEvent event) {
39 47
@@ -58,6 +66,8 @@ public class OfflineEventListener implements ApplicationListener&lt;OfflineEvent&gt; { @@ -58,6 +66,8 @@ public class OfflineEventListener implements ApplicationListener&lt;OfflineEvent&gt; {
58 } 66 }
59 } 67 }
60 68
  69 + List<DeviceChannel> deviceChannelList = storager.queryOnlineChannelsByDeviceId(event.getDeviceId());
  70 + eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.OFF);
61 // 处理离线监听 71 // 处理离线监听
62 storager.outline(event.getDeviceId()); 72 storager.outline(event.getDeviceId());
63 73
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java
@@ -3,6 +3,10 @@ package com.genersoft.iot.vmp.gb28181.event.online; @@ -3,6 +3,10 @@ package com.genersoft.iot.vmp.gb28181.event.online;
3 import com.genersoft.iot.vmp.conf.SipConfig; 3 import com.genersoft.iot.vmp.conf.SipConfig;
4 import com.genersoft.iot.vmp.conf.UserSetup; 4 import com.genersoft.iot.vmp.conf.UserSetup;
5 import com.genersoft.iot.vmp.gb28181.bean.Device; 5 import com.genersoft.iot.vmp.gb28181.bean.Device;
  6 +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
  7 +import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  8 +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
  9 +import com.genersoft.iot.vmp.service.IDeviceService;
6 import com.genersoft.iot.vmp.storager.dao.dto.User; 10 import com.genersoft.iot.vmp.storager.dao.dto.User;
7 import org.slf4j.Logger; 11 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory; 12 import org.slf4j.LoggerFactory;
@@ -15,6 +19,7 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorager; @@ -15,6 +19,7 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
15 import com.genersoft.iot.vmp.utils.redis.RedisUtil; 19 import com.genersoft.iot.vmp.utils.redis.RedisUtil;
16 20
17 import java.text.SimpleDateFormat; 21 import java.text.SimpleDateFormat;
  22 +import java.util.List;
18 23
19 /** 24 /**
20 * @description: 在线事件监听器,监听到离线后,修改设备离在线状态。 设备在线有两个来源: 25 * @description: 在线事件监听器,监听到离线后,修改设备离在线状态。 设备在线有两个来源:
@@ -40,6 +45,9 @@ public class OnlineEventListener implements ApplicationListener&lt;OnlineEvent&gt; { @@ -40,6 +45,9 @@ public class OnlineEventListener implements ApplicationListener&lt;OnlineEvent&gt; {
40 @Autowired 45 @Autowired
41 private UserSetup userSetup; 46 private UserSetup userSetup;
42 47
  48 + @Autowired
  49 + private EventPublisher eventPublisher;
  50 +
43 private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 51 private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
44 52
45 @Override 53 @Override
@@ -76,6 +84,11 @@ public class OnlineEventListener implements ApplicationListener&lt;OnlineEvent&gt; { @@ -76,6 +84,11 @@ public class OnlineEventListener implements ApplicationListener&lt;OnlineEvent&gt; {
76 } 84 }
77 85
78 device.setOnline(1); 86 device.setOnline(1);
  87 + Device deviceInstore = storager.queryVideoDevice(device.getDeviceId());
  88 + if (deviceInstore.getOnline() == 0) {
  89 + List<DeviceChannel> deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId());
  90 + eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON);
  91 + }
79 // 处理上线监听 92 // 处理上线监听
80 storager.updateDevice(device); 93 storager.updateDevice(device);
81 94
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEvent.java 0 → 100644
  1 +package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog;
  2 +
  3 +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
  4 +import com.genersoft.iot.vmp.gb28181.bean.GbStream;
  5 +import org.springframework.context.ApplicationEvent;
  6 +
  7 +import java.util.List;
  8 +
  9 +public class CatalogEvent extends ApplicationEvent {
  10 + public CatalogEvent(Object source) {
  11 + super(source);
  12 + }
  13 +
  14 + public static final String ON = "ON"; // 上线
  15 + public static final String OFF = "OFF"; // 离线
  16 + public static final String VLOST = "VLOST"; // 视频丢失
  17 + public static final String DEFECT = "DEFECT"; // 故障
  18 + public static final String ADD = "ADD"; // 增加
  19 + public static final String DEL = "DEL"; // 删除
  20 + public static final String UPDATE = "UPDATE"; // 更新
  21 +
  22 + private List<DeviceChannel> deviceChannels;
  23 + private List<GbStream> gbStreams;
  24 + private String type;
  25 + private String platformId;
  26 +
  27 + public List<DeviceChannel> getDeviceChannels() {
  28 + return deviceChannels;
  29 + }
  30 +
  31 + public void setDeviceChannels(List<DeviceChannel> deviceChannels) {
  32 + this.deviceChannels = deviceChannels;
  33 + }
  34 +
  35 + public String getType() {
  36 + return type;
  37 + }
  38 +
  39 + public void setType(String type) {
  40 + this.type = type;
  41 + }
  42 +
  43 + public String getPlatformId() {
  44 + return platformId;
  45 + }
  46 +
  47 + public void setPlatformId(String platformId) {
  48 + this.platformId = platformId;
  49 + }
  50 +
  51 + public List<GbStream> getGbStreams() {
  52 + return gbStreams;
  53 + }
  54 +
  55 + public void setGbStreams(List<GbStream> gbStreams) {
  56 + this.gbStreams = gbStreams;
  57 + }
  58 +}
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java 0 → 100644
  1 +package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog;
  2 +
  3 +import com.genersoft.iot.vmp.common.VideoManagerConstants;
  4 +import com.genersoft.iot.vmp.conf.SipConfig;
  5 +import com.genersoft.iot.vmp.conf.UserSetup;
  6 +import com.genersoft.iot.vmp.gb28181.bean.*;
  7 +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
  8 +import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformNotRegisterEvent;
  9 +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
  10 +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
  11 +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  12 +import com.genersoft.iot.vmp.service.IGbStreamService;
  13 +import com.genersoft.iot.vmp.service.IMediaServerService;
  14 +import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  15 +import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
  16 +import org.slf4j.Logger;
  17 +import org.slf4j.LoggerFactory;
  18 +import org.springframework.beans.factory.annotation.Autowired;
  19 +import org.springframework.context.ApplicationListener;
  20 +import org.springframework.stereotype.Component;
  21 +
  22 +import java.util.*;
  23 +
  24 +/**
  25 + * catalog事件
  26 + */
  27 +@Component
  28 +public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
  29 +
  30 + private final static Logger logger = LoggerFactory.getLogger(CatalogEventLister.class);
  31 +
  32 + @Autowired
  33 + private IVideoManagerStorager storager;
  34 + @Autowired
  35 + private IRedisCatchStorage redisCatchStorage;
  36 + @Autowired
  37 + private IMediaServerService mediaServerService;
  38 +
  39 + @Autowired
  40 + private SIPCommanderFroPlatform sipCommanderFroPlatform;
  41 +
  42 + @Autowired
  43 + private ZLMRTPServerFactory zlmrtpServerFactory;
  44 +
  45 + @Autowired
  46 + private SipConfig config;
  47 +
  48 + @Autowired
  49 + private UserSetup userSetup;
  50 +
  51 + @Autowired
  52 + private IGbStreamService gbStreamService;
  53 +
  54 + @Override
  55 + public void onApplicationEvent(CatalogEvent event) {
  56 + SubscribeInfo subscribe = null;
  57 + ParentPlatform parentPlatform = null;
  58 +
  59 + Map<String, List<ParentPlatform>> parentPlatformMap = new HashMap<>();
  60 + if (event.getPlatformId() != null) {
  61 + parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformId());
  62 + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + event.getPlatformId();
  63 + subscribe = redisCatchStorage.getSubscribe(key);
  64 + }else {
  65 + // 获取所用订阅
  66 + List<String> platforms = redisCatchStorage.getAllSubscribePlatform();
  67 + if (event.getDeviceChannels() != null) {
  68 + if (platforms.size() > 0) {
  69 + for (DeviceChannel deviceChannel : event.getDeviceChannels()) {
  70 + List<ParentPlatform> parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(deviceChannel.getChannelId(), platforms);
  71 + parentPlatformMap.put(deviceChannel.getChannelId(), parentPlatformsForGB);
  72 + }
  73 + }
  74 + }else if (event.getGbStreams() != null) {
  75 + if (platforms.size() > 0) {
  76 + for (GbStream gbStream : event.getGbStreams()) {
  77 + List<ParentPlatform> parentPlatformsForGB = storager.queryPlatFormListForStreamWithGBId(gbStream.getApp(),gbStream.getStream(), platforms);
  78 + parentPlatformMap.put(gbStream.getGbId(), parentPlatformsForGB);
  79 + }
  80 + }
  81 + }
  82 +
  83 + }
  84 + switch (event.getType()) {
  85 + case CatalogEvent.ON:
  86 + case CatalogEvent.OFF:
  87 + case CatalogEvent.DEL:
  88 +
  89 + if (parentPlatform != null || subscribe != null) {
  90 + List<DeviceChannel> deviceChannelList = new ArrayList<>();
  91 + if (event.getDeviceChannels() != null) {
  92 + deviceChannelList.addAll(event.getDeviceChannels());
  93 + }
  94 + if (event.getGbStreams().size() > 0){
  95 + for (GbStream gbStream : event.getGbStreams()) {
  96 + DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform.getDeviceGBId());
  97 + deviceChannelList.add(deviceChannelByStream);
  98 + }
  99 + }
  100 + if (deviceChannelList.size() > 0) {
  101 + logger.info("[Catalog事件: {}]平台:{},影响通道{}个", event.getType(), event.getPlatformId(), deviceChannelList.size());
  102 + sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), parentPlatform, deviceChannelList, subscribe);
  103 + }
  104 + }else if (parentPlatformMap.keySet().size() > 0) {
  105 + for (String gbId : parentPlatformMap.keySet()) {
  106 + List<ParentPlatform> parentPlatforms = parentPlatformMap.get(gbId);
  107 + if (parentPlatforms != null && parentPlatforms.size() > 0) {
  108 + for (ParentPlatform platform : parentPlatforms) {
  109 + logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);
  110 + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId();
  111 + SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key);
  112 + List<DeviceChannel> deviceChannelList = new ArrayList<>();
  113 + DeviceChannel deviceChannel = new DeviceChannel();
  114 + deviceChannel.setChannelId(gbId);
  115 + deviceChannelList.add(deviceChannel);
  116 + sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), platform, deviceChannelList, subscribeInfo);
  117 + }
  118 + }
  119 + }
  120 + }
  121 + break;
  122 + case CatalogEvent.VLOST:
  123 + break;
  124 + case CatalogEvent.DEFECT:
  125 + break;
  126 + case CatalogEvent.ADD:
  127 + case CatalogEvent.UPDATE:
  128 + if (parentPlatform != null || subscribe != null) {
  129 + List<DeviceChannel> deviceChannelList = new ArrayList<>();
  130 + if (event.getDeviceChannels() != null) {
  131 + deviceChannelList.addAll(event.getDeviceChannels());
  132 + }
  133 + if (event.getGbStreams().size() > 0){
  134 + for (GbStream gbStream : event.getGbStreams()) {
  135 + DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform.getDeviceGBId());
  136 + deviceChannelList.add(deviceChannelByStream);
  137 + }
  138 + }
  139 + if (deviceChannelList.size() > 0) {
  140 + logger.info("[Catalog事件: {}]平台:{},影响通道{}个", event.getType(), event.getPlatformId(), deviceChannelList.size());
  141 + sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), parentPlatform, deviceChannelList, subscribe);
  142 + }
  143 + }else if (parentPlatformMap.keySet().size() > 0) {
  144 + for (String gbId : parentPlatformMap.keySet()) {
  145 + List<ParentPlatform> parentPlatforms = parentPlatformMap.get(gbId);
  146 + if (parentPlatforms != null && parentPlatforms.size() > 0) {
  147 + for (ParentPlatform platform : parentPlatforms) {
  148 + logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);
  149 + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId();
  150 + SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key);
  151 + List<DeviceChannel> deviceChannelList = new ArrayList<>();
  152 + DeviceChannel deviceChannel = storager.queryChannelInParentPlatform(platform.getServerGBId(), gbId);
  153 + deviceChannelList.add(deviceChannel);
  154 + GbStream gbStream = storager.queryStreamInParentPlatform(platform.getServerGBId(), gbId);
  155 + DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), platform.getDeviceGBId());
  156 + deviceChannelList.add(deviceChannelByStream);
  157 + sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), platform, deviceChannelList, subscribeInfo);
  158 + }
  159 + }
  160 + }
  161 + }
  162 + break;
  163 + default:
  164 + break;
  165 + }
  166 + }
  167 +}
src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java
@@ -48,7 +48,7 @@ public class GPSSubscribeTask implements Runnable{ @@ -48,7 +48,7 @@ public class GPSSubscribeTask implements Runnable{
48 if (gbStream.isStatus()) { 48 if (gbStream.isStatus()) {
49 if (gpsMsgInfo != null) { 49 if (gpsMsgInfo != null) {
50 // 发送GPS消息 50 // 发送GPS消息
51 - sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe); 51 + sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
52 }else { 52 }else {
53 // 没有在redis找到新的消息就使用数据库的消息 53 // 没有在redis找到新的消息就使用数据库的消息
54 gpsMsgInfo = new GPSMsgInfo(); 54 gpsMsgInfo = new GPSMsgInfo();
@@ -56,7 +56,7 @@ public class GPSSubscribeTask implements Runnable{ @@ -56,7 +56,7 @@ public class GPSSubscribeTask implements Runnable{
56 gpsMsgInfo.setLat(gbStream.getLongitude()); 56 gpsMsgInfo.setLat(gbStream.getLongitude());
57 gpsMsgInfo.setLng(gbStream.getLongitude()); 57 gpsMsgInfo.setLng(gbStream.getLongitude());
58 // 发送GPS消息 58 // 发送GPS消息
59 - sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe); 59 + sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
60 } 60 }
61 } 61 }
62 62
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
@@ -14,6 +14,7 @@ import org.springframework.stereotype.Component; @@ -14,6 +14,7 @@ import org.springframework.stereotype.Component;
14 import javax.sip.*; 14 import javax.sip.*;
15 import javax.sip.header.CSeqHeader; 15 import javax.sip.header.CSeqHeader;
16 import javax.sip.header.CallIdHeader; 16 import javax.sip.header.CallIdHeader;
  17 +import javax.sip.header.Header;
17 import javax.sip.message.Response; 18 import javax.sip.message.Response;
18 import java.util.Map; 19 import java.util.Map;
19 import java.util.concurrent.ConcurrentHashMap; 20 import java.util.concurrent.ConcurrentHashMap;
@@ -140,6 +141,7 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { @@ -140,6 +141,7 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
140 */ 141 */
141 @Override 142 @Override
142 public void processTimeout(TimeoutEvent timeoutEvent) { 143 public void processTimeout(TimeoutEvent timeoutEvent) {
  144 + System.out.println("processTimeout");
143 if(timeoutProcessor != null) { 145 if(timeoutProcessor != null) {
144 timeoutProcessor.process(timeoutEvent); 146 timeoutProcessor.process(timeoutEvent);
145 } 147 }
@@ -147,14 +149,31 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { @@ -147,14 +149,31 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
147 149
148 @Override 150 @Override
149 public void processIOException(IOExceptionEvent exceptionEvent) { 151 public void processIOException(IOExceptionEvent exceptionEvent) {
  152 + System.out.println("processIOException");
150 } 153 }
151 154
152 @Override 155 @Override
153 public void processTransactionTerminated(TransactionTerminatedEvent transactionTerminatedEvent) { 156 public void processTransactionTerminated(TransactionTerminatedEvent transactionTerminatedEvent) {
  157 +// Transaction transaction = null;
  158 +// System.out.println("processTransactionTerminated");
  159 +// if (transactionTerminatedEvent.isServerTransaction()) {
  160 +// transaction = transactionTerminatedEvent.getServerTransaction();
  161 +// }else {
  162 +// transaction = transactionTerminatedEvent.getClientTransaction();
  163 +// }
  164 +//
  165 +// System.out.println(transaction.getBranchId());
  166 +// System.out.println(transaction.getState());
  167 +// System.out.println(transaction.getRequest().getMethod());
  168 +// CallIdHeader header = (CallIdHeader)transaction.getRequest().getHeader(CallIdHeader.NAME);
  169 +// SipSubscribe.EventResult<TransactionTerminatedEvent> terminatedEventEventResult = new SipSubscribe.EventResult<>(transactionTerminatedEvent);
  170 +
  171 +// sipSubscribe.getErrorSubscribe(header.getCallId()).response(terminatedEventEventResult);
154 } 172 }
155 173
156 @Override 174 @Override
157 public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) { 175 public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) {
  176 + System.out.println("processDialogTerminated");
158 CallIdHeader callId = dialogTerminatedEvent.getDialog().getCallId(); 177 CallIdHeader callId = dialogTerminatedEvent.getDialog().getCallId();
159 } 178 }
160 179
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java
@@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
7 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; 7 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
8 8
9 import javax.sip.header.WWWAuthenticateHeader; 9 import javax.sip.header.WWWAuthenticateHeader;
  10 +import java.util.List;
10 11
11 public interface ISIPCommanderForPlatform { 12 public interface ISIPCommanderForPlatform {
12 13
@@ -70,5 +71,20 @@ public interface ISIPCommanderForPlatform { @@ -70,5 +71,20 @@ public interface ISIPCommanderForPlatform {
70 * @param subscribeInfo 订阅相关的信息 71 * @param subscribeInfo 订阅相关的信息
71 * @return 72 * @return
72 */ 73 */
73 - boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo); 74 + boolean sendNotifyMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo);
  75 +
  76 + /**
  77 + * 回复catalog事件-增加/更新
  78 + * @param parentPlatform
  79 + * @param deviceChannels
  80 + */
  81 + boolean sendNotifyForCatalogAddOrUpdate(String type, ParentPlatform parentPlatform, List<DeviceChannel> deviceChannels, SubscribeInfo subscribeInfo);
  82 +
  83 + /**
  84 + * 回复catalog事件-删除
  85 + * @param parentPlatform
  86 + * @param deviceChannels
  87 + */
  88 + boolean sendNotifyForCatalogOther(String type, ParentPlatform parentPlatform, List<DeviceChannel> deviceChannels, SubscribeInfo subscribeInfo);
  89 +
74 } 90 }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -23,6 +23,7 @@ import javax.sip.header.CallIdHeader; @@ -23,6 +23,7 @@ import javax.sip.header.CallIdHeader;
23 import javax.sip.header.WWWAuthenticateHeader; 23 import javax.sip.header.WWWAuthenticateHeader;
24 import javax.sip.message.Request; 24 import javax.sip.message.Request;
25 import java.text.ParseException; 25 import java.text.ParseException;
  26 +import java.util.List;
26 import java.util.UUID; 27 import java.util.UUID;
27 28
28 @Component 29 @Component
@@ -96,7 +97,8 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @@ -96,7 +97,8 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
96 97
97 request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), "FromRegister" + tm, null, callIdHeader); 98 request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), "FromRegister" + tm, null, callIdHeader);
98 // 将 callid 写入缓存, 等注册成功可以更新状态 99 // 将 callid 写入缓存, 等注册成功可以更新状态
99 - redisCatchStorage.updatePlatformRegisterInfo(callIdHeader.getCallId(), parentPlatform.getServerGBId()); 100 + String callIdFromHeader = callIdHeader.getCallId();
  101 + redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, parentPlatform.getServerGBId());
100 102
101 sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (event)->{ 103 sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (event)->{
102 if (event != null) { 104 if (event != null) {
@@ -104,6 +106,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @@ -104,6 +106,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
104 parentPlatform.getServerGBId(), 106 parentPlatform.getServerGBId(),
105 event.msg); 107 event.msg);
106 } 108 }
  109 + redisCatchStorage.delPlatformRegisterInfo(callIdFromHeader);
107 if (errorEvent != null ) { 110 if (errorEvent != null ) {
108 errorEvent.response(event); 111 errorEvent.response(event);
109 } 112 }
@@ -219,8 +222,8 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @@ -219,8 +222,8 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
219 catalogXml.append("<Owner>" + channel.getOwner() + "</Owner>\r\n"); 222 catalogXml.append("<Owner>" + channel.getOwner() + "</Owner>\r\n");
220 catalogXml.append("<CivilCode>" + channel.getCivilCode() + "</CivilCode>\r\n"); 223 catalogXml.append("<CivilCode>" + channel.getCivilCode() + "</CivilCode>\r\n");
221 catalogXml.append("<Address>" + channel.getAddress() + "</Address>\r\n"); 224 catalogXml.append("<Address>" + channel.getAddress() + "</Address>\r\n");
222 - catalogXml.append("<Parental>" + channel.getParental() + "</Parental>\r\n");// TODO 当前不能添加分组, 所以暂时没有父节点  
223 - catalogXml.append("<ParentID>" + channel.getParentId() + "</ParentID>\r\n"); // TODO 当前不能添加分组, 所以暂时没有父节点 225 + catalogXml.append("<Parental>" + channel.getParental() + "</Parental>\r\n");
  226 + catalogXml.append("<ParentID>" + channel.getParentId() + "</ParentID>\r\n");
224 catalogXml.append("<Secrecy>" + channel.getSecrecy() + "</Secrecy>\r\n"); 227 catalogXml.append("<Secrecy>" + channel.getSecrecy() + "</Secrecy>\r\n");
225 catalogXml.append("<RegisterWay>" + channel.getRegisterWay() + "</RegisterWay>\r\n"); 228 catalogXml.append("<RegisterWay>" + channel.getRegisterWay() + "</RegisterWay>\r\n");
226 catalogXml.append("<Status>" + (channel.getStatus() == 0?"OFF":"ON") + "</Status>\r\n"); 229 catalogXml.append("<Status>" + (channel.getStatus() == 0?"OFF":"ON") + "</Status>\r\n");
@@ -329,7 +332,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @@ -329,7 +332,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
329 } 332 }
330 333
331 @Override 334 @Override
332 - public boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo) { 335 + public boolean sendNotifyMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo) {
333 if (parentPlatform == null) { 336 if (parentPlatform == null) {
334 return false; 337 return false;
335 } 338 }
@@ -364,4 +367,110 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @@ -364,4 +367,110 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
364 } 367 }
365 return true; 368 return true;
366 } 369 }
  370 +
  371 + @Override
  372 + public boolean sendNotifyForCatalogAddOrUpdate(String type, ParentPlatform parentPlatform, List<DeviceChannel> deviceChannels, SubscribeInfo subscribeInfo) {
  373 + if (parentPlatform == null) {
  374 + return false;
  375 + }
  376 + if (deviceChannels == null || deviceChannels.size() == 0) {
  377 + return false;
  378 + }
  379 + for (DeviceChannel channel : deviceChannels) {
  380 + try {
  381 + StringBuffer catalogXml = new StringBuffer(600);
  382 + catalogXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
  383 + catalogXml.append("<Notify>\r\n");
  384 + catalogXml.append("<CmdType>Catalog</CmdType>\r\n");
  385 + catalogXml.append("<SN>" + (int) ((Math.random() * 9 + 1) * 100000) + "</SN>\r\n");
  386 + catalogXml.append("<SumNum>" + deviceChannels.size() + "</SumNum>\r\n");
  387 + catalogXml.append("<DeviceList Num=\"1\">\r\n");
  388 + catalogXml.append("<Item>\r\n");
  389 + catalogXml.append("<DeviceID>" + channel.getChannelId() + "</DeviceID>\r\n");
  390 + catalogXml.append("<Event>" + type + "</Event>\r\n");
  391 + catalogXml.append("<Name>" + channel.getName() + "</Name>\r\n");
  392 + catalogXml.append("<Manufacturer>" + channel.getManufacture() + "</Manufacturer>\r\n");
  393 + catalogXml.append("<Model>" + channel.getModel() + "</Model>\r\n");
  394 + catalogXml.append("<Owner>" + channel.getOwner() + "</Owner>\r\n");
  395 + catalogXml.append("<CivilCode>" + channel.getCivilCode() + "</CivilCode>\r\n");
  396 + catalogXml.append("<Address>" + channel.getAddress() + "</Address>\r\n");
  397 + catalogXml.append("<Parental>" + channel.getParental() + "</Parental>\r\n");
  398 + catalogXml.append("<ParentID>" + channel.getParentId() + "</ParentID>\r\n");
  399 + catalogXml.append("<Secrecy>" + channel.getSecrecy() + "</Secrecy>\r\n");
  400 + catalogXml.append("<RegisterWay>" + channel.getRegisterWay() + "</RegisterWay>\r\n");
  401 + catalogXml.append("<Status>" + (channel.getStatus() == 0 ? "OFF" : "ON") + "</Status>\r\n");
  402 + catalogXml.append("<Longitude>" + channel.getLongitude() + "</Longitude>\r\n");
  403 + catalogXml.append("<Latitude>" + channel.getLatitude() + "</Latitude>\r\n");
  404 + catalogXml.append("<IPAddress>" + channel.getIpAddress() + "</IPAddress>\r\n");
  405 + catalogXml.append("<Port>" + channel.getPort() + "</Port>\r\n");
  406 + catalogXml.append("<Info>\r\n");
  407 + catalogXml.append("<PTZType>" + channel.getPTZType() + "</PTZType>\r\n");
  408 + catalogXml.append("</Info>\r\n");
  409 + catalogXml.append("</Item>\r\n");
  410 + catalogXml.append("</DeviceList>\r\n");
  411 + catalogXml.append("</Notify>\r\n");
  412 +
  413 + CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
  414 + : udpSipProvider.getNewCallId();
  415 + callIdHeader.setCallId(subscribeInfo.getCallId());
  416 +
  417 + String tm = Long.toString(System.currentTimeMillis());
  418 +
  419 + Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, catalogXml.toString(), subscribeInfo.getToTag(), subscribeInfo.getFromTag(), callIdHeader);
  420 + transmitRequest(parentPlatform, request);
  421 + Thread.sleep(100);
  422 + } catch (SipException | ParseException | InvalidArgumentException e) {
  423 + e.printStackTrace();
  424 + return false;
  425 + } catch (InterruptedException e) {
  426 + e.printStackTrace();
  427 + }
  428 + }
  429 + return true;
  430 + }
  431 +
  432 + @Override
  433 + public boolean sendNotifyForCatalogOther(String type, ParentPlatform parentPlatform, List<DeviceChannel> deviceChannels, SubscribeInfo subscribeInfo) {
  434 + if (parentPlatform == null) {
  435 + return false;
  436 + }
  437 + if (deviceChannels == null || deviceChannels.size() == 0) {
  438 + return false;
  439 + }
  440 +
  441 + for (DeviceChannel channel : deviceChannels) {
  442 + try {
  443 + StringBuffer catalogXml = new StringBuffer(600);
  444 + catalogXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
  445 + catalogXml.append("<Notify>\r\n");
  446 + catalogXml.append("<CmdType>Catalog</CmdType>\r\n");
  447 + catalogXml.append("<SN>" + (int) ((Math.random() * 9 + 1) * 100000) + "</SN>\r\n");
  448 + catalogXml.append("<SumNum>" + deviceChannels.size() + "</SumNum>\r\n");
  449 + catalogXml.append("<DeviceList Num=\"1\">\r\n");
  450 + catalogXml.append("<Item>\r\n");
  451 + catalogXml.append("<DeviceID>" + channel.getChannelId() + "</DeviceID>\r\n");
  452 + catalogXml.append("<Event>" + type + "</Event>\r\n");
  453 + catalogXml.append("</Item>\r\n");
  454 + catalogXml.append("</DeviceList>\r\n");
  455 + catalogXml.append("</Notify>\r\n");
  456 +
  457 + CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
  458 + : udpSipProvider.getNewCallId();
  459 + callIdHeader.setCallId(subscribeInfo.getCallId());
  460 +
  461 + String tm = Long.toString(System.currentTimeMillis());
  462 +
  463 + Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, catalogXml.toString(), subscribeInfo.getToTag(), subscribeInfo.getFromTag(), callIdHeader);
  464 + transmitRequest(parentPlatform, request);
  465 + Thread.sleep(100);
  466 + } catch (SipException | ParseException | InvalidArgumentException e) {
  467 + e.printStackTrace();
  468 + return false;
  469 + } catch (InterruptedException e) {
  470 + e.printStackTrace();
  471 + }
  472 + }
  473 + return true;
  474 + }
  475 +
367 } 476 }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -106,9 +106,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -106,9 +106,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
106 if (platform != null) { 106 if (platform != null) {
107 // 查询平台下是否有该通道 107 // 查询平台下是否有该通道
108 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); 108 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
109 - List<GbStream> gbStreams = storager.queryStreamInParentPlatform(requesterId, channelId); 109 + GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId);
110 PlatformCatalog catalog = storager.getCatalog(channelId); 110 PlatformCatalog catalog = storager.getCatalog(channelId);
111 - GbStream gbStream = gbStreams.size() > 0? gbStreams.get(0):null;  
112 MediaServerItem mediaServerItem = null; 111 MediaServerItem mediaServerItem = null;
113 // 不是通道可能是直播流 112 // 不是通道可能是直播流
114 if (channel != null && gbStream == null ) { 113 if (channel != null && gbStream == null ) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.conf.UserSetup; @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.conf.UserSetup;
6 import com.genersoft.iot.vmp.gb28181.bean.*; 6 import com.genersoft.iot.vmp.gb28181.bean.*;
7 import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; 7 import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
8 import com.genersoft.iot.vmp.gb28181.event.EventPublisher; 8 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  9 +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
9 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; 10 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
10 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; 11 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
11 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; 12 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
@@ -51,6 +52,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @@ -51,6 +52,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
51 private IVideoManagerStorager storager; 52 private IVideoManagerStorager storager;
52 53
53 @Autowired 54 @Autowired
  55 + private EventPublisher eventPublisher;
  56 +
  57 + @Autowired
54 private SipConfig sipConfig; 58 private SipConfig sipConfig;
55 59
56 @Autowired 60 @Autowired
@@ -259,39 +263,39 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @@ -259,39 +263,39 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
259 channel.setDeviceId(device.getDeviceId()); 263 channel.setDeviceId(device.getDeviceId());
260 logger.debug("收到来自设备【{}】的通道: {}【{}】", device.getDeviceId(), channel.getName(), channel.getChannelId()); 264 logger.debug("收到来自设备【{}】的通道: {}【{}】", device.getDeviceId(), channel.getName(), channel.getChannelId());
261 switch (eventElement.getText().toUpperCase()) { 265 switch (eventElement.getText().toUpperCase()) {
262 - case "ON" : // 上线 266 + case CatalogEvent.ON: // 上线
263 logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId()); 267 logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId());
264 storager.deviceChannelOnline(deviceId, channel.getChannelId()); 268 storager.deviceChannelOnline(deviceId, channel.getChannelId());
265 // 回复200 OK 269 // 回复200 OK
266 responseAck(evt, Response.OK); 270 responseAck(evt, Response.OK);
267 break; 271 break;
268 - case "OFF" : // 离线 272 + case CatalogEvent.OFF : // 离线
269 logger.info("收到来自设备【{}】的通道【{}】离线通知", device.getDeviceId(), channel.getChannelId()); 273 logger.info("收到来自设备【{}】的通道【{}】离线通知", device.getDeviceId(), channel.getChannelId());
270 storager.deviceChannelOffline(deviceId, channel.getChannelId()); 274 storager.deviceChannelOffline(deviceId, channel.getChannelId());
271 // 回复200 OK 275 // 回复200 OK
272 responseAck(evt, Response.OK); 276 responseAck(evt, Response.OK);
273 break; 277 break;
274 - case "VLOST" : // 视频丢失 278 + case CatalogEvent.VLOST: // 视频丢失
275 logger.info("收到来自设备【{}】的通道【{}】视频丢失通知", device.getDeviceId(), channel.getChannelId()); 279 logger.info("收到来自设备【{}】的通道【{}】视频丢失通知", device.getDeviceId(), channel.getChannelId());
276 storager.deviceChannelOffline(deviceId, channel.getChannelId()); 280 storager.deviceChannelOffline(deviceId, channel.getChannelId());
277 // 回复200 OK 281 // 回复200 OK
278 responseAck(evt, Response.OK); 282 responseAck(evt, Response.OK);
279 break; 283 break;
280 - case "DEFECT" : // 故障 284 + case CatalogEvent.DEFECT: // 故障
281 // 回复200 OK 285 // 回复200 OK
282 responseAck(evt, Response.OK); 286 responseAck(evt, Response.OK);
283 break; 287 break;
284 - case "ADD" : // 增加 288 + case CatalogEvent.ADD: // 增加
285 logger.info("收到来自设备【{}】的增加通道【{}】通知", device.getDeviceId(), channel.getChannelId()); 289 logger.info("收到来自设备【{}】的增加通道【{}】通知", device.getDeviceId(), channel.getChannelId());
286 storager.updateChannel(deviceId, channel); 290 storager.updateChannel(deviceId, channel);
287 responseAck(evt, Response.OK); 291 responseAck(evt, Response.OK);
288 break; 292 break;
289 - case "DEL" : // 删除 293 + case CatalogEvent.DEL: // 删除
290 logger.info("收到来自设备【{}】的删除通道【{}】通知", device.getDeviceId(), channel.getChannelId()); 294 logger.info("收到来自设备【{}】的删除通道【{}】通知", device.getDeviceId(), channel.getChannelId());
291 storager.delChannel(deviceId, channel.getChannelId()); 295 storager.delChannel(deviceId, channel.getChannelId());
292 responseAck(evt, Response.OK); 296 responseAck(evt, Response.OK);
293 break; 297 break;
294 - case "UPDATE" : // 更新 298 + case CatalogEvent.UPDATE: // 更新
295 logger.info("收到来自设备【{}】的更新通道【{}】通知", device.getDeviceId(), channel.getChannelId()); 299 logger.info("收到来自设备【{}】的更新通道【{}】通知", device.getDeviceId(), channel.getChannelId());
296 storager.updateChannel(deviceId, channel); 300 storager.updateChannel(deviceId, channel);
297 responseAck(evt, Response.OK); 301 responseAck(evt, Response.OK);
@@ -300,6 +304,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @@ -300,6 +304,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
300 responseAck(evt, Response.BAD_REQUEST, "event not found"); 304 responseAck(evt, Response.BAD_REQUEST, "event not found");
301 305
302 } 306 }
  307 + // 转发变化信息
  308 + eventPublisher.catalogEventPublish(null, channel, eventElement.getText().toUpperCase());
303 309
304 } 310 }
305 311
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
@@ -85,9 +85,9 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme @@ -85,9 +85,9 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
85 // } else if (CmdType.ALARM.equals(cmd)) { 85 // } else if (CmdType.ALARM.equals(cmd)) {
86 // logger.info("接收到Alarm订阅"); 86 // logger.info("接收到Alarm订阅");
87 // processNotifyAlarm(evt, rootElement); 87 // processNotifyAlarm(evt, rootElement);
88 -// } else if (CmdType.CATALOG.equals(cmd)) {  
89 -// logger.info("接收到Catalog订阅");  
90 -// processNotifyCatalogList(evt, rootElement); 88 + } else if (CmdType.CATALOG.equals(cmd)) {
  89 + logger.info("接收到Catalog订阅");
  90 + processNotifyCatalogList(evt, rootElement);
91 } else { 91 } else {
92 logger.info("接收到消息:" + cmd); 92 logger.info("接收到消息:" + cmd);
93 // responseAck(evt, Response.OK); 93 // responseAck(evt, Response.OK);
@@ -177,7 +177,40 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme @@ -177,7 +177,40 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
177 } 177 }
178 178
179 private void processNotifyCatalogList(RequestEvent evt, Element rootElement) { 179 private void processNotifyCatalogList(RequestEvent evt, Element rootElement) {
  180 + String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
  181 + String deviceID = XmlUtil.getText(rootElement, "DeviceID");
  182 + SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
  183 + String sn = XmlUtil.getText(rootElement, "SN");
  184 + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platformId;
  185 +
  186 + StringBuilder resultXml = new StringBuilder(200);
  187 + resultXml.append("<?xml version=\"1.0\" ?>\r\n")
  188 + .append("<Response>\r\n")
  189 + .append("<CmdType>Catalog</CmdType>\r\n")
  190 + .append("<SN>" + sn + "</SN>\r\n")
  191 + .append("<DeviceID>" + deviceID + "</DeviceID>\r\n")
  192 + .append("<Result>OK</Result>\r\n")
  193 + .append("</Response>\r\n");
180 194
  195 + if (subscribeInfo.getExpires() > 0) {
  196 + redisCatchStorage.updateSubscribe(key, subscribeInfo);
  197 + }else if (subscribeInfo.getExpires() == 0) {
  198 + redisCatchStorage.delSubscribe(key);
  199 + }
  200 +
  201 + try {
  202 + Response response = responseXmlAck(evt, resultXml.toString());
  203 + ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME);
  204 + subscribeInfo.setToTag(toHeader.getTag());
  205 + redisCatchStorage.updateSubscribe(key, subscribeInfo);
  206 +
  207 + } catch (SipException e) {
  208 + e.printStackTrace();
  209 + } catch (InvalidArgumentException e) {
  210 + e.printStackTrace();
  211 + } catch (ParseException e) {
  212 + e.printStackTrace();
  213 + }
181 } 214 }
182 215
183 } 216 }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
1 package com.genersoft.iot.vmp.media.zlm; 1 package com.genersoft.iot.vmp.media.zlm;
2 2
  3 +import java.util.ArrayList;
3 import java.util.List; 4 import java.util.List;
4 import java.util.UUID; 5 import java.util.UUID;
5 6
@@ -8,6 +9,9 @@ import com.genersoft.iot.vmp.common.StreamInfo; @@ -8,6 +9,9 @@ import com.genersoft.iot.vmp.common.StreamInfo;
8 import com.genersoft.iot.vmp.conf.MediaConfig; 9 import com.genersoft.iot.vmp.conf.MediaConfig;
9 import com.genersoft.iot.vmp.conf.UserSetup; 10 import com.genersoft.iot.vmp.conf.UserSetup;
10 import com.genersoft.iot.vmp.gb28181.bean.Device; 11 import com.genersoft.iot.vmp.gb28181.bean.Device;
  12 +import com.genersoft.iot.vmp.gb28181.bean.GbStream;
  13 +import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  14 +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
11 import com.genersoft.iot.vmp.media.zlm.dto.*; 15 import com.genersoft.iot.vmp.media.zlm.dto.*;
12 import com.genersoft.iot.vmp.service.*; 16 import com.genersoft.iot.vmp.service.*;
13 import com.genersoft.iot.vmp.service.bean.SSRCInfo; 17 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -65,7 +69,7 @@ public class ZLMHttpHookListener { @@ -65,7 +69,7 @@ public class ZLMHttpHookListener {
65 private IMediaService mediaService; 69 private IMediaService mediaService;
66 70
67 @Autowired 71 @Autowired
68 - private ZLMRESTfulUtils zlmresTfulUtils; 72 + private EventPublisher eventPublisher;
69 73
70 @Autowired 74 @Autowired
71 private ZLMMediaListManager zlmMediaListManager; 75 private ZLMMediaListManager zlmMediaListManager;
@@ -341,29 +345,52 @@ public class ZLMHttpHookListener { @@ -341,29 +345,52 @@ public class ZLMHttpHookListener {
341 if (!"rtp".equals(app)){ 345 if (!"rtp".equals(app)){
342 String type = OriginType.values()[item.getOriginType()].getType(); 346 String type = OriginType.values()[item.getOriginType()].getType();
343 MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); 347 MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  348 +
344 if (mediaServerItem != null){ 349 if (mediaServerItem != null){
345 if (regist) { 350 if (regist) {
  351 + StreamPushItem streamPushItem = null;
346 redisCatchStorage.addStream(mediaServerItem, type, app, streamId, item); 352 redisCatchStorage.addStream(mediaServerItem, type, app, streamId, item);
347 if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() 353 if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
348 || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() 354 || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
349 || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { 355 || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
350 - zlmMediaListManager.addPush(item); 356 + streamPushItem = zlmMediaListManager.addPush(item);
  357 + }
  358 + List<GbStream> gbStreams = new ArrayList<>();
  359 + if (streamPushItem == null || streamPushItem.getGbId() == null) {
  360 + GbStream gbStream = storager.getGbStream(app, streamId);
  361 + gbStreams.add(gbStream);
  362 + }else {
  363 + if (streamPushItem.getGbId() != null) {
  364 + gbStreams.add(streamPushItem);
  365 + }
351 } 366 }
  367 + if (gbStreams.size() > 0) {
  368 + eventPublisher.catalogEventPublishForStream(null, gbStreams, CatalogEvent.ON);
  369 + }
  370 +
352 }else { 371 }else {
353 // 兼容流注销时类型从redis记录获取 372 // 兼容流注销时类型从redis记录获取
354 MediaItem mediaItem = redisCatchStorage.getStreamInfo(app, streamId, mediaServerId); 373 MediaItem mediaItem = redisCatchStorage.getStreamInfo(app, streamId, mediaServerId);
355 - type = OriginType.values()[mediaItem.getOriginType()].getType(); 374 + if (mediaItem != null) {
  375 + type = OriginType.values()[mediaItem.getOriginType()].getType();
  376 + redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, streamId);
  377 + }
  378 + GbStream gbStream = storager.getGbStream(app, streamId);
  379 + if (gbStream != null) {
  380 + eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
  381 + }
356 zlmMediaListManager.removeMedia(app, streamId); 382 zlmMediaListManager.removeMedia(app, streamId);
357 - redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, streamId);  
358 } 383 }
359 - // 发送流变化redis消息  
360 - JSONObject jsonObject = new JSONObject();  
361 - jsonObject.put("serverId", userSetup.getServerId());  
362 - jsonObject.put("app", app);  
363 - jsonObject.put("stream", streamId);  
364 - jsonObject.put("register", regist);  
365 - jsonObject.put("mediaServerId", mediaServerId);  
366 - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); 384 + if (type != null) {
  385 + // 发送流变化redis消息
  386 + JSONObject jsonObject = new JSONObject();
  387 + jsonObject.put("serverId", userSetup.getServerId());
  388 + jsonObject.put("app", app);
  389 + jsonObject.put("stream", streamId);
  390 + jsonObject.put("register", regist);
  391 + jsonObject.put("mediaServerId", mediaServerId);
  392 + redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  393 + }
367 } 394 }
368 } 395 }
369 } 396 }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
@@ -105,7 +105,7 @@ public class ZLMMediaListManager { @@ -105,7 +105,7 @@ public class ZLMMediaListManager {
105 updateMedia(mediaServerItem, app, streamId); 105 updateMedia(mediaServerItem, app, streamId);
106 } 106 }
107 107
108 - public void addPush(MediaItem mediaItem) { 108 + public StreamPushItem addPush(MediaItem mediaItem) {
109 // 查找此直播流是否存在redis预设gbId 109 // 查找此直播流是否存在redis预设gbId
110 StreamPushItem transform = streamPushService.transform(mediaItem); 110 StreamPushItem transform = streamPushService.transform(mediaItem);
111 // 从streamId取出查询关键值 111 // 从streamId取出查询关键值
@@ -130,7 +130,6 @@ public class ZLMMediaListManager { @@ -130,7 +130,6 @@ public class ZLMMediaListManager {
130 for (GbStream gbStream : gbStreams) { 130 for (GbStream gbStream : gbStreams) {
131 // 出现使用相同国标Id的视频流时,使用新流替换旧流, 131 // 出现使用相同国标Id的视频流时,使用新流替换旧流,
132 gbStreamMapper.del(gbStream.getApp(), gbStream.getStream()); 132 gbStreamMapper.del(gbStream.getApp(), gbStream.getStream());
133 - platformGbStreamMapper.delByAppAndStream(gbStream.getApp(), gbStream.getStream());  
134 if (!gbStream.isStatus()) { 133 if (!gbStream.isStatus()) {
135 streamPushMapper.del(gbStream.getApp(), gbStream.getStream()); 134 streamPushMapper.del(gbStream.getApp(), gbStream.getStream());
136 } 135 }
@@ -142,6 +141,7 @@ public class ZLMMediaListManager { @@ -142,6 +141,7 @@ public class ZLMMediaListManager {
142 gbStreamMapper.add(transform); 141 gbStreamMapper.add(transform);
143 } 142 }
144 } 143 }
  144 + return transform;
145 } 145 }
146 146
147 147
src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java
1 package com.genersoft.iot.vmp.service; 1 package com.genersoft.iot.vmp.service;
2 2
  3 +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
3 import com.genersoft.iot.vmp.gb28181.bean.GbStream; 4 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
4 import com.github.pagehelper.PageInfo; 5 import com.github.pagehelper.PageInfo;
5 6
@@ -35,6 +36,11 @@ public interface IGbStreamService { @@ -35,6 +36,11 @@ public interface IGbStreamService {
35 /** 36 /**
36 * 移除国标关联 37 * 移除国标关联
37 * @param gbStreams 38 * @param gbStreams
  39 + * @param platformId
38 */ 40 */
39 - boolean delPlatformInfo(List<GbStream> gbStreams); 41 + boolean delPlatformInfo(String platformId, List<GbStream> gbStreams);
  42 +
  43 + DeviceChannel getDeviceChannelListByStream(GbStream gbStream, String catalogId, String deviceGBId);
  44 +
  45 + void sendCatalogMsg(GbStream gbStream, String type);
40 } 46 }
src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java
1 package com.genersoft.iot.vmp.service.impl; 1 package com.genersoft.iot.vmp.service.impl;
2 2
  3 +import com.genersoft.iot.vmp.conf.SipConfig;
  4 +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
3 import com.genersoft.iot.vmp.gb28181.bean.GbStream; 5 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
  6 +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
  7 +import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  8 +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
  9 +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
4 import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; 10 import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
  11 +import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
5 import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; 12 import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
6 import com.genersoft.iot.vmp.service.IGbStreamService; 13 import com.genersoft.iot.vmp.service.IGbStreamService;
7 import com.github.pagehelper.PageHelper; 14 import com.github.pagehelper.PageHelper;
@@ -14,6 +21,7 @@ import org.springframework.stereotype.Service; @@ -14,6 +21,7 @@ import org.springframework.stereotype.Service;
14 import org.springframework.transaction.TransactionDefinition; 21 import org.springframework.transaction.TransactionDefinition;
15 import org.springframework.transaction.TransactionStatus; 22 import org.springframework.transaction.TransactionStatus;
16 23
  24 +import java.util.ArrayList;
17 import java.util.List; 25 import java.util.List;
18 26
19 @Service 27 @Service
@@ -33,6 +41,15 @@ public class GbStreamServiceImpl implements IGbStreamService { @@ -33,6 +41,15 @@ public class GbStreamServiceImpl implements IGbStreamService {
33 @Autowired 41 @Autowired
34 private PlatformGbStreamMapper platformGbStreamMapper; 42 private PlatformGbStreamMapper platformGbStreamMapper;
35 43
  44 + @Autowired
  45 + private ParentPlatformMapper platformMapper;
  46 +
  47 + @Autowired
  48 + private SipConfig sipConfig;
  49 +
  50 + @Autowired
  51 + private EventPublisher eventPublisher;
  52 +
36 @Override 53 @Override
37 public PageInfo<GbStream> getAll(Integer page, Integer count) { 54 public PageInfo<GbStream> getAll(Integer page, Integer count) {
38 PageHelper.startPage(page, count); 55 PageHelper.startPage(page, count);
@@ -51,32 +68,62 @@ public class GbStreamServiceImpl implements IGbStreamService { @@ -51,32 +68,62 @@ public class GbStreamServiceImpl implements IGbStreamService {
51 // 放在事务内执行 68 // 放在事务内执行
52 boolean result = false; 69 boolean result = false;
53 TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); 70 TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
  71 + ParentPlatform parentPlatform = platformMapper.getParentPlatByServerGBId(platformId);
54 try { 72 try {
  73 + List<DeviceChannel> deviceChannelList = new ArrayList<>();
55 for (GbStream gbStream : gbStreams) { 74 for (GbStream gbStream : gbStreams) {
56 gbStream.setCatalogId(catalogId); 75 gbStream.setCatalogId(catalogId);
57 gbStream.setPlatformId(platformId); 76 gbStream.setPlatformId(platformId);
58 // TODO 修改为批量提交 77 // TODO 修改为批量提交
59 platformGbStreamMapper.add(gbStream); 78 platformGbStreamMapper.add(gbStream);
  79 + DeviceChannel deviceChannelListByStream = getDeviceChannelListByStream(gbStream, catalogId, parentPlatform.getDeviceGBId());
  80 + deviceChannelList.add(deviceChannelListByStream);
60 } 81 }
61 dataSourceTransactionManager.commit(transactionStatus); //手动提交 82 dataSourceTransactionManager.commit(transactionStatus); //手动提交
  83 + eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.ADD);
62 result = true; 84 result = true;
63 }catch (Exception e) { 85 }catch (Exception e) {
64 logger.error("批量保存流与平台的关系时错误", e); 86 logger.error("批量保存流与平台的关系时错误", e);
65 dataSourceTransactionManager.rollback(transactionStatus); 87 dataSourceTransactionManager.rollback(transactionStatus);
66 } 88 }
67 return result; 89 return result;
  90 + }
68 91
  92 + @Override
  93 + public DeviceChannel getDeviceChannelListByStream(GbStream gbStream, String catalogId, String deviceGBId) {
  94 + DeviceChannel deviceChannel = new DeviceChannel();
  95 + deviceChannel.setChannelId(gbStream.getGbId());
  96 + deviceChannel.setName(gbStream.getName());
  97 + deviceChannel.setLongitude(gbStream.getLongitude());
  98 + deviceChannel.setLatitude(gbStream.getLatitude());
  99 + deviceChannel.setDeviceId(deviceGBId);
  100 + deviceChannel.setManufacture("wvp-pro");
  101 + deviceChannel.setStatus(gbStream.isStatus()?1:0);
  102 + deviceChannel.setParentId(catalogId ==null?gbStream.getCatalogId():catalogId);
  103 + deviceChannel.setRegisterWay(1);
  104 + deviceChannel.setCivilCode(sipConfig.getDomain());
  105 + deviceChannel.setModel("live");
  106 + deviceChannel.setOwner("wvp-pro");
  107 + deviceChannel.setParental(0);
  108 + deviceChannel.setSecrecy("0");
  109 + return deviceChannel;
69 } 110 }
70 111
71 @Override 112 @Override
72 - public boolean delPlatformInfo(List<GbStream> gbStreams) { 113 + public boolean delPlatformInfo(String platformId, List<GbStream> gbStreams) {
73 // 放在事务内执行 114 // 放在事务内执行
74 boolean result = false; 115 boolean result = false;
75 TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); 116 TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
76 try { 117 try {
  118 + List<DeviceChannel> deviceChannelList = new ArrayList<>();
77 for (GbStream gbStream : gbStreams) { 119 for (GbStream gbStream : gbStreams) {
78 platformGbStreamMapper.delByAppAndStream(gbStream.getApp(), gbStream.getStream()); 120 platformGbStreamMapper.delByAppAndStream(gbStream.getApp(), gbStream.getStream());
  121 + DeviceChannel deviceChannel = new DeviceChannel();
  122 + deviceChannel.setChannelId(gbStream.getGbId());
  123 + deviceChannelList.add(deviceChannel);
  124 + eventPublisher.catalogEventPublish(platformId, deviceChannel, CatalogEvent.DEL);
79 } 125 }
  126 +
80 dataSourceTransactionManager.commit(transactionStatus); //手动提交 127 dataSourceTransactionManager.commit(transactionStatus); //手动提交
81 result = true; 128 result = true;
82 }catch (Exception e) { 129 }catch (Exception e) {
@@ -85,4 +132,27 @@ public class GbStreamServiceImpl implements IGbStreamService { @@ -85,4 +132,27 @@ public class GbStreamServiceImpl implements IGbStreamService {
85 } 132 }
86 return result; 133 return result;
87 } 134 }
  135 +
  136 + @Override
  137 + public void sendCatalogMsg(GbStream gbStream, String type) {
  138 + List<GbStream> gbStreams = new ArrayList<>();
  139 + if (gbStream.getGbId() != null) {
  140 + gbStreams.add(gbStream);
  141 + }else {
  142 + StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(gbStream.getApp(), gbStream.getStream());
  143 + if (streamProxyItem != null && streamProxyItem.getGbId() != null){
  144 + gbStreams.add(streamProxyItem);
  145 + }
  146 + }
  147 + if (gbStreams.size() > 0) {
  148 + for (GbStream gs : gbStreams) {
  149 + List<ParentPlatform> parentPlatforms = platformGbStreamMapper.selectByAppAndStream(gs.getApp(), gs.getStream());
  150 + if (parentPlatforms.size() > 0) {
  151 + for (ParentPlatform parentPlatform : parentPlatforms) {
  152 + eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), gs, type);
  153 + }
  154 + }
  155 + }
  156 + }
  157 + }
88 } 158 }
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -2,9 +2,13 @@ package com.genersoft.iot.vmp.service.impl; @@ -2,9 +2,13 @@ package com.genersoft.iot.vmp.service.impl;
2 2
3 import com.alibaba.fastjson.JSONObject; 3 import com.alibaba.fastjson.JSONObject;
4 import com.genersoft.iot.vmp.common.StreamInfo; 4 import com.genersoft.iot.vmp.common.StreamInfo;
  5 +import com.genersoft.iot.vmp.conf.SipConfig;
5 import com.genersoft.iot.vmp.conf.UserSetup; 6 import com.genersoft.iot.vmp.conf.UserSetup;
  7 +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
6 import com.genersoft.iot.vmp.gb28181.bean.GbStream; 8 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
7 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; 9 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
  10 +import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  11 +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
8 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; 12 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
9 import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; 13 import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
10 import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; 14 import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
@@ -58,12 +62,18 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @@ -58,12 +62,18 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
58 private UserSetup userSetup; 62 private UserSetup userSetup;
59 63
60 @Autowired 64 @Autowired
  65 + private SipConfig sipConfig;
  66 +
  67 + @Autowired
61 private GbStreamMapper gbStreamMapper; 68 private GbStreamMapper gbStreamMapper;
62 69
63 @Autowired 70 @Autowired
64 private PlatformGbStreamMapper platformGbStreamMapper; 71 private PlatformGbStreamMapper platformGbStreamMapper;
65 72
66 @Autowired 73 @Autowired
  74 + private EventPublisher eventPublisher;
  75 +
  76 + @Autowired
67 private ParentPlatformMapper parentPlatformMapper; 77 private ParentPlatformMapper parentPlatformMapper;
68 78
69 @Autowired 79 @Autowired
@@ -146,6 +156,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @@ -146,6 +156,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
146 StreamProxyItem streamProxyItems = platformGbStreamMapper.selectOne(param.getApp(), stream, parentPlatform.getServerGBId()); 156 StreamProxyItem streamProxyItems = platformGbStreamMapper.selectOne(param.getApp(), stream, parentPlatform.getServerGBId());
147 if (streamProxyItems == null) { 157 if (streamProxyItems == null) {
148 platformGbStreamMapper.add(param); 158 platformGbStreamMapper.add(param);
  159 + eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), param, CatalogEvent.ADD);
149 } 160 }
150 } 161 }
151 } 162 }
@@ -194,6 +205,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @@ -194,6 +205,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
194 public void del(String app, String stream) { 205 public void del(String app, String stream) {
195 StreamProxyItem streamProxyItem = videoManagerStorager.queryStreamProxy(app, stream); 206 StreamProxyItem streamProxyItem = videoManagerStorager.queryStreamProxy(app, stream);
196 if (streamProxyItem != null) { 207 if (streamProxyItem != null) {
  208 + gbStreamService.sendCatalogMsg(streamProxyItem, CatalogEvent.DEL);
197 videoManagerStorager.deleteStreamProxy(app, stream); 209 videoManagerStorager.deleteStreamProxy(app, stream);
198 JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem); 210 JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
199 if (jsonObject != null && jsonObject.getInteger("code") == 0) { 211 if (jsonObject != null && jsonObject.getInteger("code") == 0) {
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -5,12 +5,16 @@ import com.alibaba.fastjson.JSONObject; @@ -5,12 +5,16 @@ import com.alibaba.fastjson.JSONObject;
5 import com.alibaba.fastjson.TypeReference; 5 import com.alibaba.fastjson.TypeReference;
6 import com.genersoft.iot.vmp.common.StreamInfo; 6 import com.genersoft.iot.vmp.common.StreamInfo;
7 import com.genersoft.iot.vmp.conf.UserSetup; 7 import com.genersoft.iot.vmp.conf.UserSetup;
  8 +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
8 import com.genersoft.iot.vmp.gb28181.bean.GbStream; 9 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
9 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; 10 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
  11 +import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  12 +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
10 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; 13 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
11 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; 14 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
12 import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; 15 import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
13 import com.genersoft.iot.vmp.media.zlm.dto.*; 16 import com.genersoft.iot.vmp.media.zlm.dto.*;
  17 +import com.genersoft.iot.vmp.service.IGbStreamService;
14 import com.genersoft.iot.vmp.service.IMediaServerService; 18 import com.genersoft.iot.vmp.service.IMediaServerService;
15 import com.genersoft.iot.vmp.service.IStreamPushService; 19 import com.genersoft.iot.vmp.service.IStreamPushService;
16 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 20 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -42,6 +46,12 @@ public class StreamPushServiceImpl implements IStreamPushService { @@ -42,6 +46,12 @@ public class StreamPushServiceImpl implements IStreamPushService {
42 private PlatformGbStreamMapper platformGbStreamMapper; 46 private PlatformGbStreamMapper platformGbStreamMapper;
43 47
44 @Autowired 48 @Autowired
  49 + private IGbStreamService gbStreamService;
  50 +
  51 + @Autowired
  52 + private EventPublisher eventPublisher;
  53 +
  54 + @Autowired
45 private ZLMRESTfulUtils zlmresTfulUtils; 55 private ZLMRESTfulUtils zlmresTfulUtils;
46 56
47 @Autowired 57 @Autowired
@@ -115,6 +125,7 @@ public class StreamPushServiceImpl implements IStreamPushService { @@ -115,6 +125,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
115 stream.setStreamType("push"); 125 stream.setStreamType("push");
116 stream.setStatus(true); 126 stream.setStatus(true);
117 int add = gbStreamMapper.add(stream); 127 int add = gbStreamMapper.add(stream);
  128 +
118 // 查找开启了全部直播流共享的上级平台 129 // 查找开启了全部直播流共享的上级平台
119 List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream(); 130 List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
120 if (parentPlatforms.size() > 0) { 131 if (parentPlatforms.size() > 0) {
@@ -122,18 +133,30 @@ public class StreamPushServiceImpl implements IStreamPushService { @@ -122,18 +133,30 @@ public class StreamPushServiceImpl implements IStreamPushService {
122 stream.setCatalogId(parentPlatform.getCatalogId()); 133 stream.setCatalogId(parentPlatform.getCatalogId());
123 stream.setPlatformId(parentPlatform.getServerGBId()); 134 stream.setPlatformId(parentPlatform.getServerGBId());
124 String streamId = stream.getStream(); 135 String streamId = stream.getStream();
125 - StreamProxyItem streamProxyItems = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId());  
126 - if (streamProxyItems == null) { 136 + StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId());
  137 + if (streamProxyItem == null) {
127 platformGbStreamMapper.add(stream); 138 platformGbStreamMapper.add(stream);
  139 + eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
  140 + }else {
  141 + if (!streamProxyItem.getGbId().equals(stream.getGbId())) {
  142 + // 此流使用另一个国标Id已经与该平台关联,移除此记录
  143 + platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId());
  144 + platformGbStreamMapper.add(stream);
  145 + eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
  146 + }
128 } 147 }
129 } 148 }
130 } 149 }
  150 +
131 return add > 0; 151 return add > 0;
132 } 152 }
133 153
134 @Override 154 @Override
135 public boolean removeFromGB(GbStream stream) { 155 public boolean removeFromGB(GbStream stream) {
  156 + // 判断是否需要发送事件
  157 + gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL);
136 int del = gbStreamMapper.del(stream.getApp(), stream.getStream()); 158 int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
  159 + platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
137 MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId()); 160 MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
138 JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream()); 161 JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream());
139 if (mediaList == null) { 162 if (mediaList == null) {
@@ -152,6 +175,8 @@ public class StreamPushServiceImpl implements IStreamPushService { @@ -152,6 +175,8 @@ public class StreamPushServiceImpl implements IStreamPushService {
152 @Override 175 @Override
153 public boolean stop(String app, String streamId) { 176 public boolean stop(String app, String streamId) {
154 StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); 177 StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
  178 + gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
  179 +
155 int delStream = streamPushMapper.del(app, streamId); 180 int delStream = streamPushMapper.del(app, streamId);
156 gbStreamMapper.del(app, streamId); 181 gbStreamMapper.del(app, streamId);
157 platformGbStreamMapper.delByAppAndStream(app, streamId); 182 platformGbStreamMapper.delByAppAndStream(app, streamId);
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -210,4 +210,8 @@ public interface IRedisCatchStorage { @@ -210,4 +210,8 @@ public interface IRedisCatchStorage {
210 void delSubscribe(String key); 210 void delSubscribe(String key);
211 211
212 MediaItem getStreamInfo(String app, String streamId, String mediaServerId); 212 MediaItem getStreamInfo(String app, String streamId, String mediaServerId);
  213 +
  214 + List<SubscribeInfo> getAllSubscribe();
  215 +
  216 + List<String> getAllSubscribePlatform();
213 } 217 }
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java
@@ -100,6 +100,7 @@ public interface IVideoManagerStorager { @@ -100,6 +100,7 @@ public interface IVideoManagerStorager {
100 * @return 100 * @return
101 */ 101 */
102 public List<DeviceChannel> queryChannelsByDeviceId(String deviceId); 102 public List<DeviceChannel> queryChannelsByDeviceId(String deviceId);
  103 + public List<DeviceChannel> queryOnlineChannelsByDeviceId(String deviceId);
103 104
104 /** 105 /**
105 * 获取某个设备的通道 106 * 获取某个设备的通道
@@ -341,7 +342,7 @@ public interface IVideoManagerStorager { @@ -341,7 +342,7 @@ public interface IVideoManagerStorager {
341 * @param channelId 342 * @param channelId
342 * @return 343 * @return
343 */ 344 */
344 - List<GbStream> queryStreamInParentPlatform(String platformId, String channelId); 345 + GbStream queryStreamInParentPlatform(String platformId, String channelId);
345 346
346 /** 347 /**
347 * 获取平台关联的直播流 348 * 获取平台关联的直播流
@@ -459,4 +460,10 @@ public interface IVideoManagerStorager { @@ -459,4 +460,10 @@ public interface IVideoManagerStorager {
459 int delRelation(PlatformCatalog platformCatalog); 460 int delRelation(PlatformCatalog platformCatalog);
460 461
461 int updateStreamGPS(List<GPSMsgInfo> gpsMsgInfo); 462 int updateStreamGPS(List<GPSMsgInfo> gpsMsgInfo);
  463 +
  464 + List<ParentPlatform> queryPlatFormListForGBWithGBId(String channelId, List<String> platforms);
  465 +
  466 + List<ParentPlatform> queryPlatFormListForStreamWithGBId(String app, String stream, List<String> platforms);
  467 +
  468 + GbStream getGbStream(String app, String streamId);
462 } 469 }
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -54,17 +54,22 @@ public interface DeviceChannelMapper { @@ -54,17 +54,22 @@ public interface DeviceChannelMapper {
54 int update(DeviceChannel channel); 54 int update(DeviceChannel channel);
55 55
56 @Select(value = {" <script>" + 56 @Select(value = {" <script>" +
57 - "SELECT * FROM ( "+  
58 - " SELECT * , (SELECT count(0) FROM device_channel WHERE parentId=dc.channelId) as subCount FROM device_channel dc " +  
59 - " WHERE dc.deviceId=#{deviceId} " +  
60 - " <if test='query != null'> AND (dc.channelId LIKE '%${query}%' OR dc.name LIKE '%${query}%' OR dc.name LIKE '%${query}%')</if> " +  
61 - " <if test='parentChannelId != null'> AND dc.parentId=#{parentChannelId} </if> " +  
62 - " <if test='online == true' > AND dc.status=1</if>" +  
63 - " <if test='online == false' > AND dc.status=0</if>) dcr" +  
64 - " WHERE 1=1 " + 57 + "SELECT " +
  58 + "dc1.*, " +
  59 + "COUNT(dc2.channelId) as subCount " +
  60 + "from " +
  61 + "device_channel dc1 " +
  62 + "left join device_channel dc2 on " +
  63 + "dc1.channelId = dc2.parentId " +
  64 + "WHERE " +
  65 + "dc1.deviceId = #{deviceId} " +
  66 + " <if test='query != null'> AND (dc1.channelId LIKE '%${query}%' OR dc1.name LIKE '%${query}%' OR dc1.name LIKE '%${query}%')</if> " +
  67 + " <if test='parentChannelId != null'> AND dc1.parentId=#{parentChannelId} </if> " +
  68 + " <if test='online == true' > AND dc1.status=1</if>" +
  69 + " <if test='online == false' > AND dc1.status=0</if>" +
65 " <if test='hasSubChannel == true' > AND subCount >0</if>" + 70 " <if test='hasSubChannel == true' > AND subCount >0</if>" +
66 " <if test='hasSubChannel == false' > AND subCount=0</if>" + 71 " <if test='hasSubChannel == false' > AND subCount=0</if>" +
67 - " ORDER BY channelId ASC" + 72 + "GROUP BY dc1.channelId " +
68 " </script>"}) 73 " </script>"})
69 List<DeviceChannel> queryChannels(String deviceId, String parentChannelId, String query, Boolean hasSubChannel, Boolean online); 74 List<DeviceChannel> queryChannels(String deviceId, String parentChannelId, String query, Boolean hasSubChannel, Boolean online);
70 75
@@ -170,19 +175,30 @@ public interface DeviceChannelMapper { @@ -170,19 +175,30 @@ public interface DeviceChannelMapper {
170 "</script>"}) 175 "</script>"})
171 int batchUpdate(List<DeviceChannel> updateChannels); 176 int batchUpdate(List<DeviceChannel> updateChannels);
172 177
  178 +
173 @Select(value = {" <script>" + 179 @Select(value = {" <script>" +
174 - "SELECT * FROM ( "+  
175 - " SELECT * , (SELECT count(0) FROM device_channel WHERE parentId=dc.channelId) as subCount FROM device_channel dc " +  
176 - " WHERE dc.deviceId=#{deviceId} " +  
177 - " <if test='query != null'> AND (dc.channelId LIKE '%${query}%' OR dc.name LIKE '%${query}%' OR dc.name LIKE '%${query}%')</if> " +  
178 - " <if test='parentChannelId != null'> AND dc.parentId=#{parentChannelId} </if> " +  
179 - " <if test='online == true' > AND dc.status=1</if>" +  
180 - " <if test='online == false' > AND dc.status=0</if>) dcr" +  
181 - " WHERE 1=1 " + 180 + "SELECT " +
  181 + "dc1.*, " +
  182 + "COUNT(dc2.channelId) as subCount " +
  183 + "from " +
  184 + "device_channel dc1 " +
  185 + "left join device_channel dc2 on " +
  186 + "dc1.channelId = dc2.parentId " +
  187 + "WHERE " +
  188 + "dc1.deviceId = #{deviceId} " +
  189 + " <if test='query != null'> AND (dc1.channelId LIKE '%${query}%' OR dc1.name LIKE '%${query}%' OR dc1.name LIKE '%${query}%')</if> " +
  190 + " <if test='parentChannelId != null'> AND dc1.parentId=#{parentChannelId} </if> " +
  191 + " <if test='online == true' > AND dc1.status=1</if>" +
  192 + " <if test='online == false' > AND dc1.status=0</if>" +
182 " <if test='hasSubChannel == true' > AND subCount >0</if>" + 193 " <if test='hasSubChannel == true' > AND subCount >0</if>" +
183 " <if test='hasSubChannel == false' > AND subCount=0</if>" + 194 " <if test='hasSubChannel == false' > AND subCount=0</if>" +
184 - " ORDER BY channelId ASC" +  
185 - " LIMIT #{limit} OFFSET #{start}" + 195 + "GROUP BY dc1.channelId " +
  196 + "ORDER BY dc1.channelId ASC " +
  197 + "Limit #{limit} OFFSET #{start}" +
186 " </script>"}) 198 " </script>"})
187 - List<DeviceChannel> queryChannelsByDeviceIdWithStartAndLimit(String deviceId, String parentChannelId, String query, Boolean hasSubChannel, Boolean online, int start, int limit); 199 + List<DeviceChannel> queryChannelsByDeviceIdWithStartAndLimit(String deviceId, String parentChannelId, String query,
  200 + Boolean hasSubChannel, Boolean online, int start, int limit);
  201 +
  202 + @Select("SELECT * FROM device_channel WHERE deviceId=#{deviceId} AND status=1")
  203 + List<DeviceChannel> queryOnlineChannelsByDeviceId(String deviceId);
188 } 204 }
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -13,7 +13,7 @@ import java.util.List; @@ -13,7 +13,7 @@ import java.util.List;
13 @Repository 13 @Repository
14 public interface GbStreamMapper { 14 public interface GbStreamMapper {
15 15
16 - @Insert("INSERT INTO gb_stream (app, stream, gbId, name, " + 16 + @Insert("REPLACE INTO gb_stream (app, stream, gbId, name, " +
17 "longitude, latitude, streamType, mediaServerId, status) VALUES" + 17 "longitude, latitude, streamType, mediaServerId, status) VALUES" +
18 "('${app}', '${stream}', '${gbId}', '${name}', " + 18 "('${app}', '${stream}', '${gbId}', '${name}', " +
19 "'${longitude}', '${latitude}', '${streamType}', " + 19 "'${longitude}', '${latitude}', '${streamType}', " +
@@ -48,7 +48,7 @@ public interface GbStreamMapper { @@ -48,7 +48,7 @@ public interface GbStreamMapper {
48 @Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs " + 48 @Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs " +
49 "LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream " + 49 "LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream " +
50 "WHERE gs.gbId = '${gbId}' AND pgs.platformId = '${platformId}'") 50 "WHERE gs.gbId = '${gbId}' AND pgs.platformId = '${platformId}'")
51 - List<GbStream> queryStreamInPlatform(String platformId, String gbId); 51 + GbStream queryStreamInPlatform(String platformId, String gbId);
52 52
53 @Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs " + 53 @Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs " +
54 "LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream " + 54 "LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream " +
src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java
@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.storager.dao; @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.storager.dao;
2 2
3 import com.genersoft.iot.vmp.gb28181.bean.Device; 3 import com.genersoft.iot.vmp.gb28181.bean.Device;
4 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; 4 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
  5 +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
5 import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; 6 import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
6 import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; 7 import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
7 import org.apache.ibatis.annotations.Delete; 8 import org.apache.ibatis.annotations.Delete;
@@ -73,4 +74,18 @@ public interface PlatformChannelMapper { @@ -73,4 +74,18 @@ public interface PlatformChannelMapper {
73 "DELETE FROM platform_gb_channel WHERE catalogId=#{parentId} AND platformId=#{platformId} AND channelId=#{id}" + 74 "DELETE FROM platform_gb_channel WHERE catalogId=#{parentId} AND platformId=#{platformId} AND channelId=#{id}" +
74 "</script>") 75 "</script>")
75 int delByCatalogIdAndChannelIdAndPlatformId(PlatformCatalog platformCatalog); 76 int delByCatalogIdAndChannelIdAndPlatformId(PlatformCatalog platformCatalog);
  77 +
  78 + @Select("<script> " +
  79 + "SELECT " +
  80 + "pp.* " +
  81 + "FROM " +
  82 + "parent_platform pp " +
  83 + "left join platform_gb_channel pgc on " +
  84 + "pp.serverGBId = pgc.platformId " +
  85 + "WHERE " +
  86 + "pgc.channelId = #{channelId} " +
  87 + "AND pp.serverGBId IN" +
  88 + "<foreach collection='platforms' item='item' open='(' separator=',' close=')' > #{item}</foreach>" +
  89 + "</script> ")
  90 + List<ParentPlatform> queryPlatFormListForGBWithGBId(String channelId, List<String> platforms);
76 } 91 }
src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java
1 package com.genersoft.iot.vmp.storager.dao; 1 package com.genersoft.iot.vmp.storager.dao;
2 2
3 import com.genersoft.iot.vmp.gb28181.bean.GbStream; 3 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
  4 +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
4 import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; 5 import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
5 import com.genersoft.iot.vmp.gb28181.bean.PlatformGbStream; 6 import com.genersoft.iot.vmp.gb28181.bean.PlatformGbStream;
6 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; 7 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
@@ -14,7 +15,7 @@ import java.util.List; @@ -14,7 +15,7 @@ import java.util.List;
14 @Repository 15 @Repository
15 public interface PlatformGbStreamMapper { 16 public interface PlatformGbStreamMapper {
16 17
17 - @Insert("INSERT INTO platform_gb_stream (app, stream, platformId, catalogId) VALUES" + 18 + @Insert("REPLACE INTO platform_gb_stream (app, stream, platformId, catalogId) VALUES" +
18 "('${app}', '${stream}', '${platformId}', '${catalogId}')") 19 "('${app}', '${stream}', '${platformId}', '${catalogId}')")
19 int add(PlatformGbStream platformGbStream); 20 int add(PlatformGbStream platformGbStream);
20 21
@@ -24,10 +25,20 @@ public interface PlatformGbStreamMapper { @@ -24,10 +25,20 @@ public interface PlatformGbStreamMapper {
24 @Delete("DELETE FROM platform_gb_stream WHERE platformId=#{platformId}") 25 @Delete("DELETE FROM platform_gb_stream WHERE platformId=#{platformId}")
25 int delByPlatformId(String platformId); 26 int delByPlatformId(String platformId);
26 27
27 - @Select("SELECT * FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream}")  
28 - List<StreamProxyItem> selectByAppAndStream(String app, String stream); 28 + @Select("SELECT " +
  29 + "pp.* " +
  30 + "FROM " +
  31 + "platform_gb_stream pgs " +
  32 + "LEFT JOIN parent_platform pp ON pp.serverGBId = pgs.platformId " +
  33 + "WHERE " +
  34 + "pgs.app =#{app} " +
  35 + "AND pgs.stream =#{stream} " +
  36 + "GROUP BY pp.serverGBId")
  37 + List<ParentPlatform> selectByAppAndStream(String app, String stream);
29 38
30 - @Select("SELECT * FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream} AND platformId=#{serverGBId}") 39 + @Select("SELECT pgs.*, gs.gbId FROM platform_gb_stream pgs " +
  40 + "LEFT JOIN gb_stream gs ON pgs.app = gs.app AND pgs.stream = gs.stream " +
  41 + "WHERE pgs.app=#{app} AND pgs.stream=#{stream} AND pgs.platformId=#{serverGBId}")
31 StreamProxyItem selectOne(String app, String stream, String serverGBId); 42 StreamProxyItem selectOne(String app, String stream, String serverGBId);
32 43
33 @Select("select gs.* \n" + 44 @Select("select gs.* \n" +
@@ -47,4 +58,21 @@ public interface PlatformGbStreamMapper { @@ -47,4 +58,21 @@ public interface PlatformGbStreamMapper {
47 @Delete("DELETE FROM platform_gb_stream WHERE catalogId=#{id}") 58 @Delete("DELETE FROM platform_gb_stream WHERE catalogId=#{id}")
48 int delByCatalogId(String id); 59 int delByCatalogId(String id);
49 60
  61 + @Select("<script> " +
  62 + "SELECT " +
  63 + "pp.* " +
  64 + "FROM " +
  65 + "parent_platform pp " +
  66 + "left join platform_gb_stream pgs on " +
  67 + "pp.serverGBId = pgs.platformId " +
  68 + "WHERE " +
  69 + "pgs.app = #{app} " +
  70 + "AND pgs.stream = #{stream}" +
  71 + "AND pp.serverGBId IN" +
  72 + "<foreach collection='platforms' item='item' open='(' separator=',' close=')' > #{item}</foreach>" +
  73 + "</script> ")
  74 + List<ParentPlatform> queryPlatFormListForGBWithGBId(String app, String stream, List<String> platforms);
  75 +
  76 + @Select("SELECT * FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream} AND platformId=#{platformId}")
  77 + int delByAppAndStreamAndPlatform(String app, String streamId, String platformId);
50 } 78 }
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -250,7 +250,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @@ -250,7 +250,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
250 @Override 250 @Override
251 public void updatePlatformRegisterInfo(String callId, String platformGbId) { 251 public void updatePlatformRegisterInfo(String callId, String platformGbId) {
252 String key = VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetup.getServerId() + "_" + callId; 252 String key = VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetup.getServerId() + "_" + callId;
253 - redis.set(key, platformGbId); 253 + redis.set(key, platformGbId, 30);
254 } 254 }
255 255
256 256
@@ -508,4 +508,30 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @@ -508,4 +508,30 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
508 508
509 return result; 509 return result;
510 } 510 }
  511 +
  512 + @Override
  513 + public List<SubscribeInfo> getAllSubscribe() {
  514 + String scanKey = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_*";
  515 + List<SubscribeInfo> result = new ArrayList<>();
  516 + List<Object> keys = redis.scan(scanKey);
  517 + for (int i = 0; i < keys.size(); i++) {
  518 + String key = (String) keys.get(i);
  519 + SubscribeInfo subscribeInfo = (SubscribeInfo) redis.get(key);
  520 + result.add(subscribeInfo);
  521 + }
  522 + return result;
  523 + }
  524 +
  525 + @Override
  526 + public List<String> getAllSubscribePlatform() {
  527 + String scanKey = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_*";
  528 + List<String> result = new ArrayList<>();
  529 + List<Object> keys = redis.scan(scanKey);
  530 + for (int i = 0; i < keys.size(); i++) {
  531 + String key = (String) keys.get(i);
  532 + String platformId = key.substring(scanKey.length() - 1);
  533 + result.add(platformId);
  534 + }
  535 + return result;
  536 + }
511 } 537 }
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
1 package com.genersoft.iot.vmp.storager.impl; 1 package com.genersoft.iot.vmp.storager.impl;
2 2
  3 +import com.genersoft.iot.vmp.conf.SipConfig;
3 import com.genersoft.iot.vmp.gb28181.bean.*; 4 import com.genersoft.iot.vmp.gb28181.bean.*;
  5 +import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  6 +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
4 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; 7 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
5 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 8 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
6 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; 9 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
@@ -27,9 +30,9 @@ import java.text.SimpleDateFormat; @@ -27,9 +30,9 @@ import java.text.SimpleDateFormat;
27 import java.util.*; 30 import java.util.*;
28 31
29 /** 32 /**
30 - * @description:视频设备数据存储-jdbc实现  
31 - * @author: swwheihei  
32 - * @date: 2020年5月6日 下午2:31:42 33 + * 视频设备数据存储-jdbc实现
  34 + * swwheihei
  35 + * 2020年5月6日 下午2:31:42
33 */ 36 */
34 @SuppressWarnings("rawtypes") 37 @SuppressWarnings("rawtypes")
35 @Component 38 @Component
@@ -38,6 +41,12 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @@ -38,6 +41,12 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
38 private Logger logger = LoggerFactory.getLogger(VideoManagerStoragerImpl.class); 41 private Logger logger = LoggerFactory.getLogger(VideoManagerStoragerImpl.class);
39 42
40 @Autowired 43 @Autowired
  44 + EventPublisher eventPublisher;
  45 +
  46 + @Autowired
  47 + SipConfig sipConfig;
  48 +
  49 + @Autowired
41 DataSourceTransactionManager dataSourceTransactionManager; 50 DataSourceTransactionManager dataSourceTransactionManager;
42 51
43 @Autowired 52 @Autowired
@@ -134,6 +143,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @@ -134,6 +143,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
134 return deviceMapper.add(device) > 0; 143 return deviceMapper.add(device) > 0;
135 }else { 144 }else {
136 redisCatchStorage.updateDevice(device); 145 redisCatchStorage.updateDevice(device);
  146 +
137 return deviceMapper.update(device) > 0; 147 return deviceMapper.update(device) > 0;
138 } 148 }
139 149
@@ -408,6 +418,8 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @@ -408,6 +418,8 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
408 device.setOnline(1); 418 device.setOnline(1);
409 logger.info("更新设备在线: " + deviceId); 419 logger.info("更新设备在线: " + deviceId);
410 redisCatchStorage.updateDevice(device); 420 redisCatchStorage.updateDevice(device);
  421 + List<DeviceChannel> deviceChannelList = deviceChannelMapper.queryOnlineChannelsByDeviceId(deviceId);
  422 + eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON);
411 return deviceMapper.update(device) > 0; 423 return deviceMapper.update(device) > 0;
412 } 424 }
413 425
@@ -514,7 +526,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @@ -514,7 +526,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
514 if (parentPlatform.isShareAllLiveStream()) { 526 if (parentPlatform.isShareAllLiveStream()) {
515 gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId()); 527 gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId());
516 }else { 528 }else {
517 - gbStreamService.delPlatformInfo(gbStreams); 529 + gbStreamService.delPlatformInfo(parentPlatform.getServerGBId(), gbStreams);
518 } 530 }
519 } 531 }
520 } 532 }
@@ -590,6 +602,9 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @@ -590,6 +602,9 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
590 int result = 0; 602 int result = 0;
591 if (channelReducesToAdd.size() > 0) { 603 if (channelReducesToAdd.size() > 0) {
592 result = platformChannelMapper.addChannels(platformId, channelReducesToAdd); 604 result = platformChannelMapper.addChannels(platformId, channelReducesToAdd);
  605 + // TODO 后续给平台增加控制开关以控制是否响应目录订阅
  606 + List<DeviceChannel> deviceChannelList = getDeviceChannelListByChannelReduceList(channelReducesToAdd, catalogId);
  607 + eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.ADD);
593 } 608 }
594 609
595 return result; 610 return result;
@@ -600,7 +615,13 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @@ -600,7 +615,13 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
600 public int delChannelForGB(String platformId, List<ChannelReduce> channelReduces) { 615 public int delChannelForGB(String platformId, List<ChannelReduce> channelReduces) {
601 616
602 int result = platformChannelMapper.delChannelForGB(platformId, channelReduces); 617 int result = platformChannelMapper.delChannelForGB(platformId, channelReduces);
603 - 618 + List<DeviceChannel> deviceChannelList = new ArrayList<>();
  619 + for (ChannelReduce channelReduce : channelReduces) {
  620 + DeviceChannel deviceChannel = new DeviceChannel();
  621 + deviceChannel.setChannelId(channelReduce.getChannelId());
  622 + deviceChannelList.add(deviceChannel);
  623 + }
  624 + eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.DEL);
604 return result; 625 return result;
605 } 626 }
606 627
@@ -739,7 +760,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @@ -739,7 +760,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
739 * @return 760 * @return
740 */ 761 */
741 @Override 762 @Override
742 - public List<GbStream> queryStreamInParentPlatform(String platformId, String gbId) { 763 + public GbStream queryStreamInParentPlatform(String platformId, String gbId) {
743 return gbStreamMapper.queryStreamInPlatform(platformId, gbId); 764 return gbStreamMapper.queryStreamInPlatform(platformId, gbId);
744 } 765 }
745 766
@@ -771,7 +792,11 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @@ -771,7 +792,11 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
771 streamPushMapper.addAll(streamPushItems); 792 streamPushMapper.addAll(streamPushItems);
772 // TODO 待优化 793 // TODO 待优化
773 for (int i = 0; i < streamPushItems.size(); i++) { 794 for (int i = 0; i < streamPushItems.size(); i++) {
774 - gbStreamMapper.setStatus(streamPushItems.get(i).getApp(), streamPushItems.get(i).getStream(), true); 795 + int onlineResult = gbStreamMapper.setStatus(streamPushItems.get(i).getApp(), streamPushItems.get(i).getStream(), true);
  796 + if (onlineResult > 0) {
  797 + // 发送上线通知
  798 + eventPublisher.catalogEventPublishForStream(null, streamPushItems.get(i), CatalogEvent.ON);
  799 + }
775 } 800 }
776 } 801 }
777 802
@@ -780,6 +805,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @@ -780,6 +805,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
780 streamPushMapper.del(streamPushItem.getApp(), streamPushItem.getStream()); 805 streamPushMapper.del(streamPushItem.getApp(), streamPushItem.getStream());
781 streamPushMapper.add(streamPushItem); 806 streamPushMapper.add(streamPushItem);
782 gbStreamMapper.setStatus(streamPushItem.getApp(), streamPushItem.getStream(), true); 807 gbStreamMapper.setStatus(streamPushItem.getApp(), streamPushItem.getStream(), true);
  808 +
783 if(!StringUtils.isEmpty(streamPushItem.getGbId() )){ 809 if(!StringUtils.isEmpty(streamPushItem.getGbId() )){
784 // 查找开启了全部直播流共享的上级平台 810 // 查找开启了全部直播流共享的上级平台
785 List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream(); 811 List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
@@ -858,7 +884,12 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @@ -858,7 +884,12 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
858 884
859 @Override 885 @Override
860 public int addCatalog(PlatformCatalog platformCatalog) { 886 public int addCatalog(PlatformCatalog platformCatalog) {
861 - return catalogMapper.add(platformCatalog); 887 + int result = catalogMapper.add(platformCatalog);
  888 + if (result > 0) {
  889 + DeviceChannel deviceChannel = getDeviceChannelByCatalog(platformCatalog);
  890 + eventPublisher.catalogEventPublish(platformCatalog.getPlatformId(), deviceChannel, CatalogEvent.ADD);
  891 + }
  892 + return result;
862 } 893 }
863 894
864 @Override 895 @Override
@@ -873,23 +904,56 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @@ -873,23 +904,56 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
873 List<PlatformCatalog> platformCatalogList = catalogMapper.selectByParentId(platformCatalog.getPlatformId(), platformCatalog.getId()); 904 List<PlatformCatalog> platformCatalogList = catalogMapper.selectByParentId(platformCatalog.getPlatformId(), platformCatalog.getId());
874 for (PlatformCatalog catalog : platformCatalogList) { 905 for (PlatformCatalog catalog : platformCatalogList) {
875 if (catalog.getChildrenCount() == 0) { 906 if (catalog.getChildrenCount() == 0) {
876 - catalogMapper.del(catalog.getId());  
877 - platformGbStreamMapper.delByCatalogId(catalog.getId());  
878 - platformChannelMapper.delByCatalogId(catalog.getId()); 907 + delCatalogExecute(catalog.getId(), catalog.getPlatformId());
879 }else { 908 }else {
880 delCatalog(catalog.getId()); 909 delCatalog(catalog.getId());
881 } 910 }
882 } 911 }
883 } 912 }
  913 + return delCatalogExecute(id, platformCatalog.getPlatformId());
  914 + }
  915 + private int delCatalogExecute(String id, String platformId) {
884 int delresult = catalogMapper.del(id); 916 int delresult = catalogMapper.del(id);
  917 + DeviceChannel deviceChannelForCatalog = new DeviceChannel();
  918 + if (delresult > 0){
  919 + deviceChannelForCatalog.setChannelId(id);
  920 + eventPublisher.catalogEventPublish(platformId, deviceChannelForCatalog, CatalogEvent.DEL);
  921 + }
  922 +
  923 + List<GbStream> gbStreams = platformGbStreamMapper.queryChannelInParentPlatformAndCatalog(platformId, id);
  924 + if (gbStreams.size() > 0){
  925 + List<DeviceChannel> deviceChannelList = new ArrayList<>();
  926 + for (GbStream gbStream : gbStreams) {
  927 + DeviceChannel deviceChannel = new DeviceChannel();
  928 + deviceChannel.setChannelId(gbStream.getGbId());
  929 + deviceChannelList.add(deviceChannel);
  930 + }
  931 + eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.DEL);
  932 + }
885 int delStreamresult = platformGbStreamMapper.delByCatalogId(id); 933 int delStreamresult = platformGbStreamMapper.delByCatalogId(id);
886 - int delChanneresult = platformChannelMapper.delByCatalogId(id);  
887 - return delresult + delChanneresult + delStreamresult; 934 + List<PlatformCatalog> platformCatalogs = platformChannelMapper.queryChannelInParentPlatformAndCatalog(platformId, id);
  935 + if (platformCatalogs.size() > 0){
  936 + List<DeviceChannel> deviceChannelList = new ArrayList<>();
  937 + for (PlatformCatalog platformCatalog : platformCatalogs) {
  938 + DeviceChannel deviceChannel = new DeviceChannel();
  939 + deviceChannel.setChannelId(platformCatalog.getId());
  940 + deviceChannelList.add(deviceChannel);
  941 + }
  942 + eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.DEL);
  943 + }
  944 + int delChannelresult = platformChannelMapper.delByCatalogId(id);
  945 + return delresult + delChannelresult + delStreamresult;
888 } 946 }
889 947
  948 +
890 @Override 949 @Override
891 public int updateCatalog(PlatformCatalog platformCatalog) { 950 public int updateCatalog(PlatformCatalog platformCatalog) {
892 - return catalogMapper.update(platformCatalog); 951 + int result = catalogMapper.update(platformCatalog);
  952 + if (result > 0) {
  953 + DeviceChannel deviceChannel = getDeviceChannelByCatalog(platformCatalog);
  954 + eventPublisher.catalogEventPublish(platformCatalog.getPlatformId(), deviceChannel, CatalogEvent.UPDATE);
  955 + }
  956 + return result;
893 } 957 }
894 958
895 @Override 959 @Override
@@ -905,11 +969,17 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @@ -905,11 +969,17 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
905 @Override 969 @Override
906 public int delRelation(PlatformCatalog platformCatalog) { 970 public int delRelation(PlatformCatalog platformCatalog) {
907 if (platformCatalog.getType() == 1) { 971 if (platformCatalog.getType() == 1) {
  972 + DeviceChannel deviceChannel = new DeviceChannel();
  973 + deviceChannel.setChannelId(platformCatalog.getId());
  974 + eventPublisher.catalogEventPublish(platformCatalog.getPlatformId(), deviceChannel, CatalogEvent.DEL);
908 return platformChannelMapper.delByCatalogIdAndChannelIdAndPlatformId(platformCatalog); 975 return platformChannelMapper.delByCatalogIdAndChannelIdAndPlatformId(platformCatalog);
909 }else if (platformCatalog.getType() == 2) { 976 }else if (platformCatalog.getType() == 2) {
910 List<GbStream> gbStreams = platformGbStreamMapper.queryChannelInParentPlatformAndCatalog(platformCatalog.getPlatformId(), platformCatalog.getParentId()); 977 List<GbStream> gbStreams = platformGbStreamMapper.queryChannelInParentPlatformAndCatalog(platformCatalog.getPlatformId(), platformCatalog.getParentId());
911 for (GbStream gbStream : gbStreams) { 978 for (GbStream gbStream : gbStreams) {
912 if (gbStream.getGbId().equals(platformCatalog.getId())) { 979 if (gbStream.getGbId().equals(platformCatalog.getId())) {
  980 + DeviceChannel deviceChannel = new DeviceChannel();
  981 + deviceChannel.setChannelId(gbStream.getGbId());
  982 + eventPublisher.catalogEventPublish(platformCatalog.getPlatformId(), deviceChannel, CatalogEvent.DEL);
913 return platformGbStreamMapper.delByAppAndStream(gbStream.getApp(), gbStream.getStream()); 983 return platformGbStreamMapper.delByAppAndStream(gbStream.getApp(), gbStream.getStream());
914 } 984 }
915 } 985 }
@@ -921,4 +991,57 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @@ -921,4 +991,57 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
921 public int updateStreamGPS(List<GPSMsgInfo> gpsMsgInfos) { 991 public int updateStreamGPS(List<GPSMsgInfo> gpsMsgInfos) {
922 return gbStreamMapper.updateStreamGPS(gpsMsgInfos); 992 return gbStreamMapper.updateStreamGPS(gpsMsgInfos);
923 } 993 }
  994 +
  995 + private List<DeviceChannel> getDeviceChannelListByChannelReduceList(List<ChannelReduce> channelReduces, String catalogId) {
  996 + List<DeviceChannel> deviceChannelList = new ArrayList<>();
  997 + if (channelReduces.size() > 0){
  998 + for (ChannelReduce channelReduce : channelReduces) {
  999 + DeviceChannel deviceChannel = queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId());
  1000 + deviceChannel.setParental(1);
  1001 + deviceChannel.setParentId(catalogId);
  1002 + deviceChannelList.add(deviceChannel);
  1003 + }
  1004 + }
  1005 + return deviceChannelList;
  1006 + }
  1007 +
  1008 + private DeviceChannel getDeviceChannelByCatalog(PlatformCatalog catalog) {
  1009 + ParentPlatform parentPlatByServerGBId = platformMapper.getParentPlatByServerGBId(catalog.getPlatformId());
  1010 + DeviceChannel deviceChannel = new DeviceChannel();
  1011 + deviceChannel.setChannelId(catalog.getId());
  1012 + deviceChannel.setName(catalog.getName());
  1013 + deviceChannel.setLongitude(0.0);
  1014 + deviceChannel.setLatitude(0.0);
  1015 + deviceChannel.setDeviceId(parentPlatByServerGBId.getDeviceGBId());
  1016 + deviceChannel.setManufacture("wvp-pro");
  1017 + deviceChannel.setStatus(1);
  1018 + deviceChannel.setParental(1);
  1019 + deviceChannel.setParentId(catalog.getParentId());
  1020 + deviceChannel.setRegisterWay(1);
  1021 + deviceChannel.setCivilCode(sipConfig.getDomain());
  1022 + deviceChannel.setModel("live");
  1023 + deviceChannel.setOwner("wvp-pro");
  1024 + deviceChannel.setSecrecy("0");
  1025 + return deviceChannel;
  1026 + }
  1027 +
  1028 + @Override
  1029 + public List<DeviceChannel> queryOnlineChannelsByDeviceId(String deviceId) {
  1030 + return deviceChannelMapper.queryOnlineChannelsByDeviceId(deviceId);
  1031 + }
  1032 +
  1033 + @Override
  1034 + public List<ParentPlatform> queryPlatFormListForGBWithGBId(String channelId, List<String> platforms) {
  1035 + return platformChannelMapper.queryPlatFormListForGBWithGBId(channelId, platforms);
  1036 + }
  1037 +
  1038 + @Override
  1039 + public List<ParentPlatform> queryPlatFormListForStreamWithGBId(String app, String stream, List<String> platforms) {
  1040 + return platformGbStreamMapper.queryPlatFormListForGBWithGBId(app, stream, platforms);
  1041 + }
  1042 +
  1043 + @Override
  1044 + public GbStream getGbStream(String app, String streamId) {
  1045 + return gbStreamMapper.selectOne(app, streamId);
  1046 + }
924 } 1047 }
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java
@@ -62,7 +62,7 @@ public class GbStreamController { @@ -62,7 +62,7 @@ public class GbStreamController {
62 @DeleteMapping(value = "/del") 62 @DeleteMapping(value = "/del")
63 @ResponseBody 63 @ResponseBody
64 public Object del(@RequestBody GbStreamParam gbStreamParam){ 64 public Object del(@RequestBody GbStreamParam gbStreamParam){
65 - if (gbStreamService.delPlatformInfo(gbStreamParam.getGbStreams())) { 65 + if (gbStreamService.delPlatformInfo(gbStreamParam.getPlatformId(), gbStreamParam.getGbStreams())) {
66 return "success"; 66 return "success";
67 }else { 67 }else {
68 return "fail"; 68 return "fail";
web_src/package-lock.json
@@ -5094,7 +5094,8 @@ @@ -5094,7 +5094,8 @@
5094 }, 5094 },
5095 "js-yaml": { 5095 "js-yaml": {
5096 "version": "3.7.0", 5096 "version": "3.7.0",
5097 - "resolved": "", 5097 + "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.7.0.tgz",
  5098 + "integrity": "sha1-XJZ93YN6m/3KXy3oQlOr6KHAO4A=",
5098 "dev": true, 5099 "dev": true,
5099 "requires": { 5100 "requires": {
5100 "argparse": "^1.0.7", 5101 "argparse": "^1.0.7",
web_src/src/components/dialog/chooseChannelForStream.vue
@@ -147,6 +147,7 @@ export default { @@ -147,6 +147,7 @@ export default {
147 method:"delete", 147 method:"delete",
148 url:"/api/gbStream/del", 148 url:"/api/gbStream/del",
149 data:{ 149 data:{
  150 + platformId: that.platformId,
150 gbStreams: delData, 151 gbStreams: delData,
151 } 152 }
152 }).then((res)=>{ 153 }).then((res)=>{