Commit 5f7c53823a38ef8a840dff57a01cad70d645dc88

Authored by 648540858
1 parent 6b8ecd1f

清理过期的推流鉴权信息

src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -221,6 +221,12 @@ public class StreamPushServiceImpl implements IStreamPushService { @@ -221,6 +221,12 @@ public class StreamPushServiceImpl implements IStreamPushService {
221 streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam); 221 streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam);
222 } 222 }
223 } 223 }
  224 + // 获取所有推流鉴权信息,清理过期的
  225 + List<StreamAuthorityInfo> allStreamAuthorityInfo = redisCatchStorage.getAllStreamAuthorityInfo();
  226 + Map<String, StreamAuthorityInfo> streamAuthorityInfoInfoMap = new HashMap<>();
  227 + for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) {
  228 + streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo);
  229 + }
224 zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ 230 zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
225 if (mediaList == null) { 231 if (mediaList == null) {
226 return; 232 return;
@@ -239,6 +245,7 @@ public class StreamPushServiceImpl implements IStreamPushService { @@ -239,6 +245,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
239 for (StreamPushItem streamPushItem : streamPushItems) { 245 for (StreamPushItem streamPushItem : streamPushItems) {
240 pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); 246 pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
241 streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); 247 streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  248 + streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
242 } 249 }
243 } 250 }
244 List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values()); 251 List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
@@ -274,6 +281,14 @@ public class StreamPushServiceImpl implements IStreamPushService { @@ -274,6 +281,14 @@ public class StreamPushServiceImpl implements IStreamPushService {
274 redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream()); 281 redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream());
275 } 282 }
276 } 283 }
  284 +
  285 + Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values();
  286 + if (streamAuthorityInfos.size() > 0) {
  287 + for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) {
  288 + // 移除redis内流的信息
  289 + redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream());
  290 + }
  291 + }
277 })); 292 }));
278 } 293 }
279 294
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -233,6 +233,8 @@ public interface IRedisCatchStorage { @@ -233,6 +233,8 @@ public interface IRedisCatchStorage {
233 */ 233 */
234 StreamAuthorityInfo getStreamAuthorityInfo(String app, String stream); 234 StreamAuthorityInfo getStreamAuthorityInfo(String app, String stream);
235 235
  236 + List<StreamAuthorityInfo> getAllStreamAuthorityInfo();
  237 +
236 /** 238 /**
237 * 发送redis消息 查询所有推流设备的状态 239 * 发送redis消息 查询所有推流设备的状态
238 */ 240 */
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -714,6 +714,18 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @@ -714,6 +714,18 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
714 714
715 } 715 }
716 716
  717 + @Override
  718 + public List<StreamAuthorityInfo> getAllStreamAuthorityInfo() {
  719 + String scanKey = VideoManagerConstants.MEDIA_STREAM_AUTHORITY + userSetting.getServerId() + "_*_*" ;
  720 + List<StreamAuthorityInfo> result = new ArrayList<>();
  721 + List<Object> keys = RedisUtil.scan(scanKey);
  722 + for (Object o : keys) {
  723 + String key = (String) o;
  724 + result.add((StreamAuthorityInfo) RedisUtil.get(key));
  725 + }
  726 + return result;
  727 + }
  728 +
717 729
718 @Override 730 @Override
719 public OnStreamChangedHookParam getStreamInfo(String app, String streamId, String mediaServerId) { 731 public OnStreamChangedHookParam getStreamInfo(String app, String streamId, String mediaServerId) {