Commit 14c4a3c7e81e9a3898a3f301b9303c381a806932

Authored by wangyimeng
1 parent 8acd64ea

优化历史录像下载,目前已测试大华国标级联下载

src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -541,11 +541,14 @@ public class SIPCommander implements ISIPCommander { @@ -541,11 +541,14 @@ public class SIPCommander implements ISIPCommander {
541 content.append("a=downloadspeed:" + downloadSpeed + "\r\n"); 541 content.append("a=downloadspeed:" + downloadSpeed + "\r\n");
542 542
543 content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc 543 content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc
544 - 544 + logger.debug("此时请求下载信令的ssrc===>{}",ssrcInfo.getSsrc());
545 HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId()); 545 HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId());
546 // 添加订阅 546 // 添加订阅
  547 + CallIdHeader newCallIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport());
  548 + String callId=newCallIdHeader.getCallId();
547 subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { 549 subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
548 - hookEvent.call(new InviteStreamInfo(mediaServerItem, json,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), "rtp", ssrcInfo.getStream())); 550 + logger.debug("sipc 添加订阅===callId {}",callId);
  551 + hookEvent.call(new InviteStreamInfo(mediaServerItem, json,callId, "rtp", ssrcInfo.getStream()));
549 subscribe.removeSubscribe(hookSubscribe); 552 subscribe.removeSubscribe(hookSubscribe);
550 hookSubscribe.getContent().put("regist", false); 553 hookSubscribe.getContent().put("regist", false);
551 hookSubscribe.getContent().put("schema", "rtsp"); 554 hookSubscribe.getContent().put("schema", "rtsp");
@@ -554,7 +557,7 @@ public class SIPCommander implements ISIPCommander { @@ -554,7 +557,7 @@ public class SIPCommander implements ISIPCommander {
554 (MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd) -> { 557 (MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd) -> {
555 logger.info("[录像]下载结束, 发送BYE"); 558 logger.info("[录像]下载结束, 发送BYE");
556 try { 559 try {
557 - streamByeCmd(device, channelId, ssrcInfo.getStream(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId()); 560 + streamByeCmd(device, channelId, ssrcInfo.getStream(),callId);
558 } catch (InvalidArgumentException | ParseException | SipException | 561 } catch (InvalidArgumentException | ParseException | SipException |
559 SsrcTransactionNotFoundException e) { 562 SsrcTransactionNotFoundException e) {
560 logger.error("[录像]下载结束, 发送BYE失败 {}", e.getMessage()); 563 logger.error("[录像]下载结束, 发送BYE失败 {}", e.getMessage());
@@ -562,15 +565,23 @@ public class SIPCommander implements ISIPCommander { @@ -562,15 +565,23 @@ public class SIPCommander implements ISIPCommander {
562 }); 565 });
563 }); 566 });
564 567
565 - Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc()); 568 + Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc());
566 if (inviteStreamCallback != null) { 569 if (inviteStreamCallback != null) {
567 - inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), "rtp", ssrcInfo.getStream())); 570 + inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null,callId, "rtp", ssrcInfo.getStream()));
568 } 571 }
569 572
570 sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> { 573 sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> {
571 ResponseEvent responseEvent = (ResponseEvent) event.event; 574 ResponseEvent responseEvent = (ResponseEvent) event.event;
572 SIPResponse response = (SIPResponse) responseEvent.getResponse(); 575 SIPResponse response = (SIPResponse) responseEvent.getResponse();
573 - streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.download); 576 + String contentString =new String(response.getRawContent());
  577 + int ssrcIndex = contentString.indexOf("y=");
  578 + String ssrc=ssrcInfo.getSsrc();
  579 + if (ssrcIndex >= 0) {
  580 + ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
  581 + }
  582 + logger.debug("接收到的下载响应ssrc====>{}",ssrcInfo.getSsrc());
  583 + logger.debug("接收到的下载响应ssrc====>{}",ssrc);
  584 + streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.download);
574 okEvent.response(event); 585 okEvent.response(event);
575 }); 586 });
576 } 587 }
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -635,23 +635,23 @@ public class PlayServiceImpl implements IPlayService { @@ -635,23 +635,23 @@ public class PlayServiceImpl implements IPlayService {
635 hookCallBack.call(downloadResult); 635 hookCallBack.call(downloadResult);
636 streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); 636 streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
637 }; 637 };
638 - 638 + InviteStreamCallback hookEvent = (InviteStreamInfo inviteStreamInfo) -> {
  639 + logger.info("收到订阅消息: " + inviteStreamInfo.getCallId());
  640 + dynamicTask.stop(downLoadTimeOutTaskKey);
  641 + StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
  642 + streamInfo.setStartTime(startTime);
  643 + streamInfo.setEndTime(endTime);
  644 + redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
  645 + downloadResult.setCode(ErrorCode.SUCCESS.getCode());
  646 + downloadResult.setMsg(ErrorCode.SUCCESS.getMsg());
  647 + downloadResult.setData(streamInfo);
  648 + downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
  649 + downloadResult.setResponse(inviteStreamInfo.getResponse());
  650 + hookCallBack.call(downloadResult);
  651 + };
639 try { 652 try {
640 cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, infoCallBack, 653 cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, infoCallBack,
641 - inviteStreamInfo -> {  
642 - logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());  
643 - dynamicTask.stop(downLoadTimeOutTaskKey);  
644 - StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);  
645 - streamInfo.setStartTime(startTime);  
646 - streamInfo.setEndTime(endTime);  
647 - redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());  
648 - downloadResult.setCode(ErrorCode.SUCCESS.getCode());  
649 - downloadResult.setMsg(ErrorCode.SUCCESS.getMsg());  
650 - downloadResult.setData(streamInfo);  
651 - downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());  
652 - downloadResult.setResponse(inviteStreamInfo.getResponse());  
653 - hookCallBack.call(downloadResult);  
654 - }, errorEvent, eventResult -> 654 + hookEvent, errorEvent, eventResult ->
655 { 655 {
656 if (eventResult.type == SipSubscribe.EventResultType.response) { 656 if (eventResult.type == SipSubscribe.EventResultType.response) {
657 ResponseEvent responseEvent = (ResponseEvent) eventResult.event; 657 ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
@@ -690,9 +690,9 @@ public class PlayServiceImpl implements IPlayService { @@ -690,9 +690,9 @@ public class PlayServiceImpl implements IPlayService {
690 subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { 690 subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
691 logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); 691 logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
692 dynamicTask.stop(downLoadTimeOutTaskKey); 692 dynamicTask.stop(downLoadTimeOutTaskKey);
693 - // hook响应,TODO 此处待处理  
694 -// onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, playBackCallback);  
695 -// hookCallBack.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream())); 693 + // hook响应
  694 + onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, hookCallBack);
  695 + hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));
696 }); 696 });
697 } 697 }
698 // 关闭rtp server 698 // 关闭rtp server
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -177,12 +177,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @@ -177,12 +177,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
177 @Override 177 @Override
178 public boolean startDownload(StreamInfo stream, String callId) { 178 public boolean startDownload(StreamInfo stream, String callId) {
179 boolean result; 179 boolean result;
  180 + String key=String.format("%S_%s_%s_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX,
  181 + userSetting.getServerId(), stream.getMediaServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId);
180 if (stream.getProgress() == 1) { 182 if (stream.getProgress() == 1) {
181 - result = RedisUtil.set(String.format("%S_%s_%s_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX,  
182 - userSetting.getServerId(), stream.getMediaServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId), stream); 183 + logger.debug("添加下载缓存==已完成下载=》{}",key);
  184 + result = RedisUtil.set(key, stream);
183 }else { 185 }else {
184 - result = RedisUtil.set(String.format("%S_%s_%s_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX,  
185 - userSetting.getServerId(), stream.getMediaServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId), stream, 60*60); 186 + logger.debug("添加下载缓存==未完成下载=》{}",key);
  187 + result = RedisUtil.set(key, stream, 60*60);
186 } 188 }
187 return result; 189 return result;
188 } 190 }
@@ -617,7 +619,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @@ -617,7 +619,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
617 stream, 619 stream,
618 callId 620 callId
619 ); 621 );
620 - List<Object> streamInfoScan = RedisUtil.scan(key); 622 + List<Object> streamInfoScan = RedisUtil.scan2(key);
621 if (streamInfoScan.size() > 0) { 623 if (streamInfoScan.size() > 0) {
622 return (StreamInfo) RedisUtil.get((String) streamInfoScan.get(0)); 624 return (StreamInfo) RedisUtil.get((String) streamInfoScan.get(0));
623 }else { 625 }else {
src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java
@@ -881,7 +881,13 @@ public class RedisUtil { @@ -881,7 +881,13 @@ public class RedisUtil {
881 881
882 return new ArrayList<>(resultKeys); 882 return new ArrayList<>(resultKeys);
883 } 883 }
884 - 884 + public static List<Object> scan2(String query) {
  885 + if (redisTemplate == null) {
  886 + redisTemplate = SpringBeanFactory.getBean("redisTemplate");
  887 + }
  888 + Set<String> keys = redisTemplate.keys(query);
  889 + return new ArrayList<>(keys);
  890 + }
885 // ============================== 消息发送与订阅 ============================== 891 // ============================== 消息发送与订阅 ==============================
886 public static void convertAndSend(String channel, JSONObject msg) { 892 public static void convertAndSend(String channel, JSONObject msg) {
887 if (redisTemplate == null) { 893 if (redisTemplate == null) {