Commit 534be3f5809f430cb46cb0fcbba99d3d425f2324

Authored by 648540858
1 parent e272fa26

支持redis消息强制关闭流

src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
... ... @@ -107,6 +107,11 @@ public class VideoManagerConstants {
107 107 public static final String VM_MSG_STREAM_PUSH_RESPONSE = "VM_MSG_STREAM_PUSH_RESPONSE";
108 108  
109 109 /**
  110 + * redis 通知平台关闭推流
  111 + */
  112 + public static final String VM_MSG_STREAM_PUSH_CLOSE = "VM_MSG_STREAM_PUSH_CLOSE";
  113 +
  114 + /**
110 115 * redis 消息请求所有的在线通道
111 116 */
112 117 public static final String VM_MSG_GET_ALL_ONLINE_REQUESTED = "VM_MSG_GET_ALL_ONLINE_REQUESTED";
... ...
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
... ... @@ -43,6 +43,9 @@ public class RedisMsgListenConfig {
43 43 @Autowired
44 44 private RedisPushStreamResponseListener redisPushStreamResponseListener;
45 45  
  46 + @Autowired
  47 + private RedisCloseStreamMsgListener redisCloseStreamMsgListener;
  48 +
46 49  
47 50 /**
48 51 * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
... ... @@ -63,6 +66,7 @@ public class RedisMsgListenConfig {
63 66 container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
64 67 container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
65 68 container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
  69 + container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
66 70 return container;
67 71 }
68 72 }
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
... ... @@ -183,6 +183,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
183 183  
184 184 @Override
185 185 public boolean stop(String app, String streamId) {
  186 + logger.info("[推流 ] 停止流: {}/{}", app, streamId);
186 187 StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
187 188 if (streamPushItem != null) {
188 189 gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
... ...
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java 0 → 100644
  1 +package com.genersoft.iot.vmp.service.redisMsg;
  2 +
  3 +import com.alibaba.fastjson2.JSON;
  4 +import com.alibaba.fastjson2.JSONObject;
  5 +import com.genersoft.iot.vmp.service.IStreamPushService;
  6 +import org.jetbrains.annotations.NotNull;
  7 +import org.slf4j.Logger;
  8 +import org.slf4j.LoggerFactory;
  9 +import org.springframework.beans.factory.annotation.Autowired;
  10 +import org.springframework.beans.factory.annotation.Qualifier;
  11 +import org.springframework.data.redis.connection.Message;
  12 +import org.springframework.data.redis.connection.MessageListener;
  13 +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  14 +import org.springframework.stereotype.Component;
  15 +
  16 +import java.util.concurrent.ConcurrentLinkedQueue;
  17 +
  18 +/**
  19 + * 接收来自redis的关闭流更新通知
  20 + * @author lin
  21 + */
  22 +@Component
  23 +public class RedisCloseStreamMsgListener implements MessageListener {
  24 +
  25 + private final static Logger logger = LoggerFactory.getLogger(RedisCloseStreamMsgListener.class);
  26 +
  27 +
  28 + @Autowired
  29 + private IStreamPushService pushService;
  30 +
  31 + private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
  32 +
  33 + @Qualifier("taskExecutor")
  34 + @Autowired
  35 + private ThreadPoolTaskExecutor taskExecutor;
  36 +
  37 + @Override
  38 + public void onMessage(@NotNull Message message, byte[] bytes) {
  39 + boolean isEmpty = taskQueue.isEmpty();
  40 + taskQueue.offer(message);
  41 + if (isEmpty) {
  42 + taskExecutor.execute(() -> {
  43 + while (!taskQueue.isEmpty()) {
  44 + Message msg = taskQueue.poll();
  45 + try {
  46 + JSONObject jsonObject = JSON.parseObject(msg.getBody());
  47 + String app = jsonObject.getString("app");
  48 + String stream = jsonObject.getString("stream");
  49 + pushService.stop(app, stream);
  50 +
  51 + }catch (Exception e) {
  52 + logger.warn("[REDIS的关闭推流通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
  53 + logger.error("[REDIS的关闭推流通知] 异常内容: ", e);
  54 + }
  55 + }
  56 + });
  57 + }
  58 + }
  59 +}
... ...