Commit 81f69eb6f47b69cd89da7621889629f4f456dce1

Authored by 648540858
1 parent 0b1cae75

支持从redis消息更新推流设备状态

src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
... ... @@ -104,6 +104,10 @@ public class VideoManagerConstants {
104 104 // 设备状态订阅的通知
105 105 public static final String VM_MSG_SUBSCRIBE_DEVICE_STATUS = "device";
106 106  
  107 +
  108 +
  109 +
  110 +
107 111 //************************** 第三方 ****************************************
108 112 public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_";
109 113 public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_";
... ...
src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java
1 1 package com.genersoft.iot.vmp.conf;
2 2  
3 3 import com.genersoft.iot.vmp.common.VideoManagerConstants;
4   -import com.genersoft.iot.vmp.service.impl.RedisAlarmMsgListener;
5   -import com.genersoft.iot.vmp.service.impl.RedisGpsMsgListener;
6   -import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
7   -import com.genersoft.iot.vmp.service.impl.RedisStreamMsgListener;
  4 +import com.genersoft.iot.vmp.service.impl.*;
8 5 import org.apache.commons.lang3.StringUtils;
9 6 import org.springframework.beans.factory.annotation.Autowired;
10 7 import org.springframework.beans.factory.annotation.Value;
... ... @@ -60,6 +57,9 @@ public class RedisConfig extends CachingConfigurerSupport {
60 57 @Autowired
61 58 private RedisGbPlayMsgListener redisGbPlayMsgListener;
62 59  
  60 + @Autowired
  61 + private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
  62 +
63 63 @Bean
64 64 public JedisPool jedisPool() {
65 65 if (StringUtils.isBlank(password)) {
... ... @@ -108,6 +108,7 @@ public class RedisConfig extends CachingConfigurerSupport {
108 108 container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE));
109 109 container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
110 110 container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
  111 + container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
111 112 return container;
112 113 }
113 114  
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
... ... @@ -58,7 +58,7 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
58 58 ParentPlatform parentPlatform = null;
59 59  
60 60 Map<String, List<ParentPlatform>> parentPlatformMap = new HashMap<>();
61   - if (event.getPlatformId() != null) {
  61 + if (!StringUtils.isEmpty(event.getPlatformId())) {
62 62 parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformId());
63 63 if (parentPlatform != null && !parentPlatform.isStatus()) {
64 64 return;
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
... ... @@ -292,7 +292,7 @@ public class ZLMRTPServerFactory {
292 292 logger.warn("查询流({}/{})是否有其它观看者时得到: {}", app, streamId, mediaInfo.getString("msg"));
293 293 return -1;
294 294 }
295   - if ( code == 0 && ! mediaInfo.getBoolean("online")) {
  295 + if ( code == 0 && mediaInfo.getBoolean("online") != null && !mediaInfo.getBoolean("online")) {
296 296 logger.warn("查询流({}/{})是否有其它观看者时得到: {}", app, streamId, mediaInfo.getString("msg"));
297 297 return -1;
298 298 }
... ...
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
... ... @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
5 5 import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
6 6 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
7 7 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
  8 +import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
8 9 import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
9 10 import com.github.pagehelper.PageInfo;
10 11  
... ... @@ -44,31 +45,55 @@ public interface IStreamPushService {
44 45 * 停止一路推流
45 46 * @param app 应用名
46 47 * @param streamId 流ID
47   - * @return
48 48 */
49 49 boolean stop(String app, String streamId);
50 50  
51 51 /**
52 52 * 新的节点加入
53   - * @param mediaServerId
54   - * @return
55 53 */
56 54 void zlmServerOnline(String mediaServerId);
57 55  
58 56 /**
59 57 * 节点离线
60   - * @param mediaServerId
61   - * @return
62 58 */
63 59 void zlmServerOffline(String mediaServerId);
64 60  
  61 + /**
  62 + * 清空
  63 + */
65 64 void clean();
66 65  
  66 +
67 67 boolean saveToRandomGB();
68 68  
  69 + /**
  70 + * 批量添加
  71 + */
69 72 void batchAdd(List<StreamPushItem> streamPushExcelDtoList);
70 73  
  74 + /**
  75 + * 中止多个推流
  76 + */
71 77 boolean batchStop(List<GbStream> streamPushItems);
72 78  
  79 + /**
  80 + * 导入时批量增加
  81 + */
73 82 void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll);
  83 +
  84 + /**
  85 + * 全部离线
  86 + */
  87 + void allStreamOffline();
  88 +
  89 + /**
  90 + * 推流离线
  91 + */
  92 + void offline(List<StreamPushItemFromRedis> offlineStreams);
  93 +
  94 + /**
  95 + * 推流上线
  96 + */
  97 + void online(List<StreamPushItemFromRedis> onlineStreams);
  98 +
74 99 }
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java 0 → 100644
  1 +package com.genersoft.iot.vmp.service.impl;
  2 +
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.alibaba.fastjson.JSONObject;
  5 +import com.genersoft.iot.vmp.common.VideoManagerConstants;
  6 +import com.genersoft.iot.vmp.conf.DynamicTask;
  7 +import com.genersoft.iot.vmp.conf.UserSetting;
  8 +import com.genersoft.iot.vmp.gb28181.bean.GbStream;
  9 +import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  10 +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
  11 +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
  12 +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
  13 +import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
  14 +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
  15 +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
  16 +import com.genersoft.iot.vmp.service.IStreamPushService;
  17 +import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto;
  18 +import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  19 +import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  20 +import org.slf4j.Logger;
  21 +import org.slf4j.LoggerFactory;
  22 +import org.springframework.beans.factory.annotation.Autowired;
  23 +import org.springframework.boot.ApplicationArguments;
  24 +import org.springframework.boot.ApplicationRunner;
  25 +import org.springframework.data.redis.connection.Message;
  26 +import org.springframework.data.redis.connection.MessageListener;
  27 +import org.springframework.stereotype.Component;
  28 +
  29 +import java.util.List;
  30 +
  31 +
  32 +/**
  33 + * 接收redis发送的推流设备上线下线通知
  34 + * @author lin
  35 + */
  36 +@Component
  37 +public class RedisPushStreamStatusMsgListener implements MessageListener, ApplicationRunner {
  38 +
  39 + private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusMsgListener.class);
  40 +
  41 + @Autowired
  42 + private IRedisCatchStorage redisCatchStorage;
  43 +
  44 + @Autowired
  45 + private IStreamPushService streamPushService;
  46 +
  47 + @Autowired
  48 + private EventPublisher eventPublisher;
  49 +
  50 + @Autowired
  51 + private UserSetting userSetting;
  52 +
  53 + @Autowired
  54 + private DynamicTask dynamicTask;
  55 +
  56 + @Override
  57 + public void onMessage(Message message, byte[] bytes) {
  58 +
  59 + PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(message.getBody(), PushStreamStatusChangeFromRedisDto.class);
  60 + if (statusChangeFromPushStream == null) {
  61 + logger.warn("[REDIS 消息]推流设备状态变化消息解析失败");
  62 + return;
  63 + }
  64 + if (statusChangeFromPushStream.isSetAllOffline()) {
  65 + // 所有设备离线
  66 + streamPushService.allStreamOffline();
  67 + }
  68 + if (statusChangeFromPushStream.getOfflineStreams().size() > 0) {
  69 + // 更新部分设备离线
  70 + streamPushService.offline(statusChangeFromPushStream.getOfflineStreams());
  71 + }
  72 + if (statusChangeFromPushStream.getOnlineStreams().size() > 0) {
  73 + // 更新部分设备上线
  74 + streamPushService.online(statusChangeFromPushStream.getOnlineStreams());
  75 + }
  76 + }
  77 +
  78 + @Override
  79 + public void run(ApplicationArguments args) throws Exception {
  80 + // 启动时设置所有推流通道离线,发起查询请求
  81 + redisCatchStorage.sendStreamPushRequestedMsgForStatus();
  82 + dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, ()->{
  83 + logger.info("[REDIS 消息]未收到redis回复推流设备状态,执行推流设备离线");
  84 + // 五秒收不到请求就设置通道离线,然后通知上级离线
  85 + streamPushService.allStreamOffline();
  86 + }, 5000);
  87 + }
  88 +
  89 +}
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java
... ... @@ -3,16 +3,12 @@ package com.genersoft.iot.vmp.service.impl;
3 3 import com.alibaba.fastjson.JSON;
4 4 import com.alibaba.fastjson.JSONObject;
5 5 import com.genersoft.iot.vmp.conf.UserSetting;
6   -import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage;
7   -import com.genersoft.iot.vmp.gb28181.bean.Device;
8   -import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
9   -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
  6 +
10 7 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
11 8 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
12 9 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
13 10 import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
14 11 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
15   -import com.genersoft.iot.vmp.utils.DateUtil;
16 12 import org.slf4j.Logger;
17 13 import org.slf4j.LoggerFactory;
18 14 import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -22,6 +18,7 @@ import org.springframework.stereotype.Component;
22 18  
23 19  
24 20 /**
  21 + * 接收其他wvp发送流变化通知
25 22 * @author lin
26 23 */
27 24 @Component
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
... ... @@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.*;
13 13 import com.genersoft.iot.vmp.service.IGbStreamService;
14 14 import com.genersoft.iot.vmp.service.IMediaServerService;
15 15 import com.genersoft.iot.vmp.service.IStreamPushService;
  16 +import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
16 17 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
17 18 import com.genersoft.iot.vmp.storager.dao.*;
18 19 import com.genersoft.iot.vmp.utils.DateUtil;
... ... @@ -181,7 +182,6 @@ public class StreamPushServiceImpl implements IStreamPushService {
181 182  
182 183 @Override
183 184 public StreamPushItem getPush(String app, String streamId) {
184   -
185 185 return streamPushMapper.selectOne(app, streamId);
186 186 }
187 187  
... ... @@ -481,4 +481,34 @@ public class StreamPushServiceImpl implements IStreamPushService {
481 481 }
482 482 return true;
483 483 }
  484 +
  485 + @Override
  486 + public void allStreamOffline() {
  487 + List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGb();
  488 + if (onlinePushers.size() == 0) {
  489 + return;
  490 + }
  491 + streamPushMapper.allStreamOffline();
  492 +
  493 + // 发送通知
  494 + eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
  495 + }
  496 +
  497 + @Override
  498 + public void offline(List<StreamPushItemFromRedis> offlineStreams) {
  499 + // 更新部分设备离线
  500 + List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams);
  501 + streamPushMapper.offline(offlineStreams);
  502 + // 发送通知
  503 + eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
  504 + }
  505 +
  506 + @Override
  507 + public void online(List<StreamPushItemFromRedis> onlineStreams) {
  508 + // 更新部分设备上线streamPushService
  509 + List<GbStream> onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams);
  510 + streamPushMapper.online(onlineStreams);
  511 + // 发送通知
  512 + eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON);
  513 + }
484 514 }
... ...
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
... ... @@ -233,4 +233,9 @@ public interface IRedisCatchStorage {
233 233 * @return
234 234 */
235 235 StreamAuthorityInfo getStreamAuthorityInfo(String app, String stream);
  236 +
  237 + /**
  238 + * 发送redis消息 查询所有推流设备的状态
  239 + */
  240 + void sendStreamPushRequestedMsgForStatus();
236 241 }
... ...
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
... ... @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.storager.dao;
2 2  
3 3 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
4 4 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
  5 +import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
5 6 import org.apache.ibatis.annotations.*;
6 7 // import org.omg.PortableInterceptor.INACTIVE;
7 8 import org.springframework.stereotype.Repository;
... ... @@ -117,4 +118,45 @@ public interface StreamPushMapper {
117 118 "SET status=#{status} " +
118 119 "WHERE mediaServerId=#{mediaServerId}")
119 120 void updateStatusByMediaServerId(String mediaServerId, boolean status);
  121 +
  122 +
  123 + @Select("<script> "+
  124 + "SELECT gs.* FROM stream_push sp left join gb_stream gs on sp.app = gs.app AND sp.stream = gs.stream " +
  125 + "where sp.status = 1 and (gs.app, gs.stream) in" +
  126 + "<foreach collection='offlineStreams' item='item' separator=','>" +
  127 + "(#{item.app}, {item.stream}) " +
  128 + "</foreach>" +
  129 + "</script>")
  130 + List<GbStream> getOnlinePusherForGbInList(List<StreamPushItemFromRedis> offlineStreams);
  131 +
  132 + @Update("<script> "+
  133 + "UPDATE stream_push SET status=0 where (app, stream) in" +
  134 + "<foreach collection='offlineStreams' item='item' separator=','>" +
  135 + "(#{item.app}, {item.stream}) " +
  136 + "</foreach>" +
  137 + "</script>")
  138 + void offline(List<StreamPushItemFromRedis> offlineStreams);
  139 +
  140 + @Select("<script> "+
  141 + "SELECT * FROM stream_push sp left join gb_stream gs on sp.app = gs.app AND sp.stream = gs.stream " +
  142 + "where sp.status = 0 and (gs.app, gs.stream) in" +
  143 + "<foreach collection='offlineStreams' item='item' separator=','>" +
  144 + "(#{item.app}, {item.stream}) " +
  145 + "</foreach>" +
  146 + "</script>")
  147 + List<GbStream> getOfflinePusherForGbInList(List<StreamPushItemFromRedis> onlineStreams);
  148 +
  149 + @Update("<script> "+
  150 + "UPDATE stream_push SET status=1 where (app, stream) in" +
  151 + "<foreach collection='offlineStreams' item='item' separator=','>" +
  152 + "(#{item.app}, {item.stream}) " +
  153 + "</foreach>" +
  154 + "</script>")
  155 + void online(List<StreamPushItemFromRedis> onlineStreams);
  156 +
  157 + @Select("SELECT gs.* FROM stream_push sp left join gb_stream gs on sp.app = gs.app AND sp.stream = gs.stream")
  158 + List<GbStream> getOnlinePusherForGb();
  159 +
  160 + @Update("UPDATE stream_push SET status=0")
  161 + void allStreamOffline();
120 162 }
... ...