Commit 098dd8a04515c9dbb143ab39678c2a71a03ec427

Authored by lin
1 parent abb60593

优化推流结束时流类型的获取

src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -343,24 +343,16 @@ public class ZLMHttpHookListener { @@ -343,24 +343,16 @@ public class ZLMHttpHookListener {
343 MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); 343 MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
344 if (mediaServerItem != null){ 344 if (mediaServerItem != null){
345 if (regist) { 345 if (regist) {
346 - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks);  
347 - redisCatchStorage.addStream(mediaServerItem, type, app, streamId, streamInfo); 346 + redisCatchStorage.addStream(mediaServerItem, type, app, streamId, item);
348 if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() 347 if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
349 || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() 348 || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
350 || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { 349 || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
351 zlmMediaListManager.addPush(item); 350 zlmMediaListManager.addPush(item);
352 } 351 }
353 }else { 352 }else {
354 - // 兼容流注销时类型错误的问题,等zlm更新后删除  
355 - StreamPushItem streamPushItem = streamPushService.getPush(app, streamId);  
356 - if (streamPushItem != null) {  
357 - type = "PUSH";  
358 - }else {  
359 - StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(app, streamId);  
360 - if (streamProxyByAppAndStream != null) {  
361 - type = "PULL";  
362 - }  
363 - } 353 + // 兼容流注销时类型从redis记录获取
  354 + MediaItem mediaItem = redisCatchStorage.getStreamInfo(app, streamId, mediaServerId);
  355 + type = OriginType.values()[mediaItem.getOriginType()].getType();
364 zlmMediaListManager.removeMedia(app, streamId); 356 zlmMediaListManager.removeMedia(app, streamId);
365 redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, streamId); 357 redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, streamId);
366 } 358 }
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -279,18 +279,18 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @@ -279,18 +279,18 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
279 String type = "PULL"; 279 String type = "PULL";
280 280
281 // 发送redis消息 281 // 发送redis消息
282 - List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);  
283 - if (streamInfoList.size() > 0) {  
284 - for (StreamInfo streamInfo : streamInfoList) { 282 + List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, type);
  283 + if (mediaItems.size() > 0) {
  284 + for (MediaItem mediaItem : mediaItems) {
285 JSONObject jsonObject = new JSONObject(); 285 JSONObject jsonObject = new JSONObject();
286 jsonObject.put("serverId", userSetup.getServerId()); 286 jsonObject.put("serverId", userSetup.getServerId());
287 - jsonObject.put("app", streamInfo.getApp());  
288 - jsonObject.put("stream", streamInfo.getStreamId()); 287 + jsonObject.put("app", mediaItem.getApp());
  288 + jsonObject.put("stream", mediaItem.getStream());
289 jsonObject.put("register", false); 289 jsonObject.put("register", false);
290 jsonObject.put("mediaServerId", mediaServerId); 290 jsonObject.put("mediaServerId", mediaServerId);
291 redisCatchStorage.sendStreamChangeMsg(type, jsonObject); 291 redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
292 // 移除redis内流的信息 292 // 移除redis内流的信息
293 - redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId()); 293 + redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream());
294 } 294 }
295 } 295 }
296 } 296 }
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -173,16 +173,16 @@ public class StreamPushServiceImpl implements IStreamPushService { @@ -173,16 +173,16 @@ public class StreamPushServiceImpl implements IStreamPushService {
173 List<StreamPushItem> pushList = getPushList(mediaServerId); 173 List<StreamPushItem> pushList = getPushList(mediaServerId);
174 Map<String, StreamPushItem> pushItemMap = new HashMap<>(); 174 Map<String, StreamPushItem> pushItemMap = new HashMap<>();
175 // redis记录 175 // redis记录
176 - List<StreamInfo> streamInfoPushList = redisCatchStorage.getStreams(mediaServerId, "PUSH");  
177 - Map<String, StreamInfo> streamInfoPushItemMap = new HashMap<>(); 176 + List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, "PUSH");
  177 + Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>();
178 if (pushList.size() > 0) { 178 if (pushList.size() > 0) {
179 for (StreamPushItem streamPushItem : pushList) { 179 for (StreamPushItem streamPushItem : pushList) {
180 pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem); 180 pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
181 } 181 }
182 } 182 }
183 - if (streamInfoPushList.size() > 0) {  
184 - for (StreamInfo streamInfo : streamInfoPushList) {  
185 - streamInfoPushItemMap.put(streamInfo.getApp() + streamInfo.getStreamId(), streamInfo); 183 + if (mediaItems.size() > 0) {
  184 + for (MediaItem mediaItem : mediaItems) {
  185 + streamInfoPushItemMap.put(mediaItem.getApp() + mediaItem.getStream(), mediaItem);
186 } 186 }
187 } 187 }
188 zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ 188 zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
@@ -221,19 +221,19 @@ public class StreamPushServiceImpl implements IStreamPushService { @@ -221,19 +221,19 @@ public class StreamPushServiceImpl implements IStreamPushService {
221 } 221 }
222 222
223 } 223 }
224 - Collection<StreamInfo> offlineStreamInfoItems = streamInfoPushItemMap.values();  
225 - if (offlineStreamInfoItems.size() > 0) { 224 + Collection<MediaItem> offlineMediaItemList = streamInfoPushItemMap.values();
  225 + if (offlineMediaItemList.size() > 0) {
226 String type = "PUSH"; 226 String type = "PUSH";
227 - for (StreamInfo offlineStreamInfoItem : offlineStreamInfoItems) { 227 + for (MediaItem offlineMediaItem : offlineMediaItemList) {
228 JSONObject jsonObject = new JSONObject(); 228 JSONObject jsonObject = new JSONObject();
229 jsonObject.put("serverId", userSetup.getServerId()); 229 jsonObject.put("serverId", userSetup.getServerId());
230 - jsonObject.put("app", offlineStreamInfoItem.getApp());  
231 - jsonObject.put("stream", offlineStreamInfoItem.getStreamId()); 230 + jsonObject.put("app", offlineMediaItem.getApp());
  231 + jsonObject.put("stream", offlineMediaItem.getStream());
232 jsonObject.put("register", false); 232 jsonObject.put("register", false);
233 jsonObject.put("mediaServerId", mediaServerId); 233 jsonObject.put("mediaServerId", mediaServerId);
234 redisCatchStorage.sendStreamChangeMsg(type, jsonObject); 234 redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
235 // 移除redis内流的信息 235 // 移除redis内流的信息
236 - redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineStreamInfoItem.getApp(), offlineStreamInfoItem.getStreamId()); 236 + redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineMediaItem.getApp(), offlineMediaItem.getStream());
237 } 237 }
238 } 238 }
239 })); 239 }));
@@ -250,15 +250,15 @@ public class StreamPushServiceImpl implements IStreamPushService { @@ -250,15 +250,15 @@ public class StreamPushServiceImpl implements IStreamPushService {
250 // 发送流停止消息 250 // 发送流停止消息
251 String type = "PUSH"; 251 String type = "PUSH";
252 // 发送redis消息 252 // 发送redis消息
253 - List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); 253 + List<MediaItem> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
254 if (streamInfoList.size() > 0) { 254 if (streamInfoList.size() > 0) {
255 - for (StreamInfo streamInfo : streamInfoList) { 255 + for (MediaItem mediaItem : streamInfoList) {
256 // 移除redis内流的信息 256 // 移除redis内流的信息
257 - redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId()); 257 + redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream());
258 JSONObject jsonObject = new JSONObject(); 258 JSONObject jsonObject = new JSONObject();
259 jsonObject.put("serverId", userSetup.getServerId()); 259 jsonObject.put("serverId", userSetup.getServerId());
260 - jsonObject.put("app", streamInfo.getApp());  
261 - jsonObject.put("stream", streamInfo.getStreamId()); 260 + jsonObject.put("app", mediaItem.getApp());
  261 + jsonObject.put("stream", mediaItem.getStream());
262 jsonObject.put("register", false); 262 jsonObject.put("register", false);
263 jsonObject.put("mediaServerId", mediaServerId); 263 jsonObject.put("mediaServerId", mediaServerId);
264 redisCatchStorage.sendStreamChangeMsg(type, jsonObject); 264 redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -3,7 +3,9 @@ package com.genersoft.iot.vmp.storager; @@ -3,7 +3,9 @@ package com.genersoft.iot.vmp.storager;
3 import com.alibaba.fastjson.JSONObject; 3 import com.alibaba.fastjson.JSONObject;
4 import com.genersoft.iot.vmp.common.StreamInfo; 4 import com.genersoft.iot.vmp.common.StreamInfo;
5 import com.genersoft.iot.vmp.gb28181.bean.*; 5 import com.genersoft.iot.vmp.gb28181.bean.*;
  6 +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
6 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 7 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  8 +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
7 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; 9 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
8 import com.genersoft.iot.vmp.service.bean.ThirdPartyGB; 10 import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
9 11
@@ -143,7 +145,7 @@ public interface IRedisCatchStorage { @@ -143,7 +145,7 @@ public interface IRedisCatchStorage {
143 * @param app 145 * @param app
144 * @param streamId 146 * @param streamId
145 */ 147 */
146 - void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, StreamInfo streamInfo); 148 + void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, MediaItem item);
147 149
148 /** 150 /**
149 * 移除流信息从redis 151 * 移除流信息从redis
@@ -175,7 +177,7 @@ public interface IRedisCatchStorage { @@ -175,7 +177,7 @@ public interface IRedisCatchStorage {
175 */ 177 */
176 ThirdPartyGB queryMemberNoGBId(String queryKey); 178 ThirdPartyGB queryMemberNoGBId(String queryKey);
177 179
178 - List<StreamInfo> getStreams(String mediaServerId, String pull); 180 + List<MediaItem> getStreams(String mediaServerId, String pull);
179 181
180 /** 182 /**
181 * 将device信息写入redis 183 * 将device信息写入redis
@@ -206,4 +208,6 @@ public interface IRedisCatchStorage { @@ -206,4 +208,6 @@ public interface IRedisCatchStorage {
206 SubscribeInfo getSubscribe(String key); 208 SubscribeInfo getSubscribe(String key);
207 209
208 void delSubscribe(String key); 210 void delSubscribe(String key);
  211 +
  212 + MediaItem getStreamInfo(String app, String streamId, String mediaServerId);
209 } 213 }
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -5,7 +5,9 @@ import com.genersoft.iot.vmp.common.StreamInfo; @@ -5,7 +5,9 @@ import com.genersoft.iot.vmp.common.StreamInfo;
5 import com.genersoft.iot.vmp.common.VideoManagerConstants; 5 import com.genersoft.iot.vmp.common.VideoManagerConstants;
6 import com.genersoft.iot.vmp.conf.UserSetup; 6 import com.genersoft.iot.vmp.conf.UserSetup;
7 import com.genersoft.iot.vmp.gb28181.bean.*; 7 import com.genersoft.iot.vmp.gb28181.bean.*;
  8 +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
8 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 9 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  10 +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
9 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; 11 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
10 import com.genersoft.iot.vmp.service.bean.ThirdPartyGB; 12 import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
11 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 13 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -386,9 +388,9 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @@ -386,9 +388,9 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
386 } 388 }
387 389
388 @Override 390 @Override
389 - public void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, StreamInfo streamInfo) { 391 + public void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, MediaItem mediaItem) {
390 String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId(); 392 String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId();
391 - redis.set(key, streamInfo); 393 + redis.set(key, mediaItem);
392 } 394 }
393 395
394 @Override 396 @Override
@@ -421,13 +423,13 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @@ -421,13 +423,13 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
421 } 423 }
422 424
423 @Override 425 @Override
424 - public List<StreamInfo> getStreams(String mediaServerId, String type) {  
425 - List<StreamInfo> result = new ArrayList<>(); 426 + public List<MediaItem> getStreams(String mediaServerId, String type) {
  427 + List<MediaItem> result = new ArrayList<>();
426 String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_*_*_" + mediaServerId; 428 String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_*_*_" + mediaServerId;
427 List<Object> streams = redis.scan(key); 429 List<Object> streams = redis.scan(key);
428 for (Object stream : streams) { 430 for (Object stream : streams) {
429 - StreamInfo streamInfo = (StreamInfo)redis.get((String) stream);  
430 - result.add(streamInfo); 431 + MediaItem mediaItem = (MediaItem)redis.get((String) stream);
  432 + result.add(mediaItem);
431 } 433 }
432 return result; 434 return result;
433 } 435 }
@@ -492,4 +494,18 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @@ -492,4 +494,18 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
492 494
493 return result; 495 return result;
494 } 496 }
  497 +
  498 + @Override
  499 + public MediaItem getStreamInfo(String app, String streamId, String mediaServerId) {
  500 + String scanKey = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_*_" + app + "_" + streamId + "_" + mediaServerId;
  501 +
  502 + MediaItem result = null;
  503 + List<Object> keys = redis.scan(scanKey);
  504 + if (keys.size() > 0) {
  505 + String key = (String) keys.get(0);
  506 + result = (MediaItem)redis.get(key);
  507 + }
  508 +
  509 + return result;
  510 + }
495 } 511 }