Commit e2f9ee8f7b2c8b210c75fcd328b2d42c37f9d737

Authored by 648540858
1 parent 9ccce016

修复国标视频点播三种点播方式(自动点播,上级点播,接口点播)并发情况下失败的问题

Showing 26 changed files with 919 additions and 520 deletions
src/main/java/com/genersoft/iot/vmp/common/InviteInfo.java 0 → 100644
  1 +package com.genersoft.iot.vmp.common;
  2 +
  3 +import com.genersoft.iot.vmp.service.bean.SSRCInfo;
  4 +
  5 +/**
  6 + * 记录每次发送invite消息的状态
  7 + */
  8 +public class InviteInfo {
  9 +
  10 + private String deviceId;
  11 +
  12 + private String channelId;
  13 +
  14 + private String stream;
  15 +
  16 + private SSRCInfo ssrcInfo;
  17 +
  18 + private String receiveIp;
  19 +
  20 + private Integer receivePort;
  21 +
  22 + private String streamMode;
  23 +
  24 + private InviteSessionType type;
  25 +
  26 + private InviteSessionStatus status;
  27 +
  28 + private StreamInfo streamInfo;
  29 +
  30 +
  31 + public static InviteInfo getinviteInfo(String deviceId, String channelId, String stream, SSRCInfo ssrcInfo,
  32 + String receiveIp, Integer receivePort, String streamMode,
  33 + InviteSessionType type, InviteSessionStatus status) {
  34 + InviteInfo inviteInfo = new InviteInfo();
  35 + inviteInfo.setDeviceId(deviceId);
  36 + inviteInfo.setChannelId(channelId);
  37 + inviteInfo.setStream(stream);
  38 + inviteInfo.setSsrcInfo(ssrcInfo);
  39 + inviteInfo.setReceiveIp(receiveIp);
  40 + inviteInfo.setReceivePort(receivePort);
  41 + inviteInfo.setStreamMode(streamMode);
  42 + inviteInfo.setType(type);
  43 + inviteInfo.setStatus(status);
  44 + return inviteInfo;
  45 + }
  46 +
  47 + public String getDeviceId() {
  48 + return deviceId;
  49 + }
  50 +
  51 + public void setDeviceId(String deviceId) {
  52 + this.deviceId = deviceId;
  53 + }
  54 +
  55 + public String getChannelId() {
  56 + return channelId;
  57 + }
  58 +
  59 + public void setChannelId(String channelId) {
  60 + this.channelId = channelId;
  61 + }
  62 +
  63 + public InviteSessionType getType() {
  64 + return type;
  65 + }
  66 +
  67 + public void setType(InviteSessionType type) {
  68 + this.type = type;
  69 + }
  70 +
  71 + public InviteSessionStatus getStatus() {
  72 + return status;
  73 + }
  74 +
  75 + public void setStatus(InviteSessionStatus status) {
  76 + this.status = status;
  77 + }
  78 +
  79 + public StreamInfo getStreamInfo() {
  80 + return streamInfo;
  81 + }
  82 +
  83 + public void setStreamInfo(StreamInfo streamInfo) {
  84 + this.streamInfo = streamInfo;
  85 + }
  86 +
  87 + public String getStream() {
  88 + return stream;
  89 + }
  90 +
  91 + public void setStream(String stream) {
  92 + this.stream = stream;
  93 + }
  94 +
  95 + public SSRCInfo getSsrcInfo() {
  96 + return ssrcInfo;
  97 + }
  98 +
  99 + public void setSsrcInfo(SSRCInfo ssrcInfo) {
  100 + this.ssrcInfo = ssrcInfo;
  101 + }
  102 +
  103 + public String getReceiveIp() {
  104 + return receiveIp;
  105 + }
  106 +
  107 + public void setReceiveIp(String receiveIp) {
  108 + this.receiveIp = receiveIp;
  109 + }
  110 +
  111 + public Integer getReceivePort() {
  112 + return receivePort;
  113 + }
  114 +
  115 + public void setReceivePort(Integer receivePort) {
  116 + this.receivePort = receivePort;
  117 + }
  118 +
  119 + public String getStreamMode() {
  120 + return streamMode;
  121 + }
  122 +
  123 + public void setStreamMode(String streamMode) {
  124 + this.streamMode = streamMode;
  125 + }
  126 +}
src/main/java/com/genersoft/iot/vmp/common/InviteSessionStatus.java 0 → 100644
  1 +package com.genersoft.iot.vmp.common;
  2 +
  3 +/**
  4 + * 标识invite消息发出后的各个状态,
  5 + * 收到ok钱停止invite发送cancel,
  6 + * 收到200ok后发送BYE停止invite
  7 + */
  8 +public enum InviteSessionStatus {
  9 + ready,
  10 + ok,
  11 +}
src/main/java/com/genersoft/iot/vmp/common/InviteSessionType.java 0 → 100644
  1 +package com.genersoft.iot.vmp.common;
  2 +
  3 +public enum InviteSessionType {
  4 + PLAY,
  5 + PLAYBACK,
  6 + DOWNLOAD
  7 +}
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -16,8 +16,6 @@ public class VideoManagerConstants { @@ -16,8 +16,6 @@ public class VideoManagerConstants {
16 16
17 public static final String MEDIA_SERVERS_ONLINE_PREFIX = "VMP_MEDIA_ONLINE_SERVERS_"; 17 public static final String MEDIA_SERVERS_ONLINE_PREFIX = "VMP_MEDIA_ONLINE_SERVERS_";
18 18
19 - public static final String MEDIA_STREAM_PREFIX = "VMP_MEDIA_STREAM";  
20 -  
21 public static final String DEVICE_PREFIX = "VMP_DEVICE_"; 19 public static final String DEVICE_PREFIX = "VMP_DEVICE_";
22 20
23 // 设备同步完成 21 // 设备同步完成
@@ -28,9 +26,10 @@ public class VideoManagerConstants { @@ -28,9 +26,10 @@ public class VideoManagerConstants {
28 public static final String KEEPLIVEKEY_PREFIX = "VMP_KEEPALIVE_"; 26 public static final String KEEPLIVEKEY_PREFIX = "VMP_KEEPALIVE_";
29 27
30 // TODO 此处多了一个_,暂不修改 28 // TODO 此处多了一个_,暂不修改
31 - public static final String PLAYER_PREFIX = "VMP_PLAYER_";  
32 - public static final String PLAY_BLACK_PREFIX = "VMP_PLAYBACK_";  
33 - public static final String DOWNLOAD_PREFIX = "VMP_DOWNLOAD_"; 29 + public static final String INVITE_PREFIX = "VMP_INVITE";
  30 + public static final String PLAYER_PREFIX = "VMP_INVITE_PLAY_";
  31 + public static final String PLAY_BLACK_PREFIX = "VMP_INVITE_PLAYBACK_";
  32 + public static final String DOWNLOAD_PREFIX = "VMP_INVITE_DOWNLOAD_";
34 33
35 public static final String PLATFORM_KEEPALIVE_PREFIX = "VMP_PLATFORM_KEEPALIVE_"; 34 public static final String PLATFORM_KEEPALIVE_PREFIX = "VMP_PLATFORM_KEEPALIVE_";
36 35
src/main/java/com/genersoft/iot/vmp/conf/ProxyServletConfig.java
@@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.conf; @@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.conf;
2 2
3 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 3 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
4 import com.genersoft.iot.vmp.service.IMediaServerService; 4 import com.genersoft.iot.vmp.service.IMediaServerService;
5 -import org.apache.catalina.connector.ClientAbortException;  
6 import org.apache.http.HttpHost; 5 import org.apache.http.HttpHost;
7 import org.apache.http.HttpRequest; 6 import org.apache.http.HttpRequest;
8 import org.apache.http.HttpResponse; 7 import org.apache.http.HttpResponse;
@@ -194,11 +193,11 @@ public class ProxyServletConfig { @@ -194,11 +193,11 @@ public class ProxyServletConfig {
194 } catch (IOException ioException) { 193 } catch (IOException ioException) {
195 if (ioException instanceof ConnectException) { 194 if (ioException instanceof ConnectException) {
196 logger.error("录像服务 连接失败"); 195 logger.error("录像服务 连接失败");
197 - }else if (ioException instanceof ClientAbortException) {  
198 - /**  
199 - * TODO 使用这个代理库实现代理在遇到代理视频文件时,如果是206结果,会遇到报错蛋市目前功能正常,  
200 - * TODO 暂时去除异常处理。后续使用其他代理框架修改测试  
201 - */ 196 +// }else if (ioException instanceof ClientAbortException) {
  197 +// /**
  198 +// * TODO 使用这个代理库实现代理在遇到代理视频文件时,如果是206结果,会遇到报错蛋市目前功能正常,
  199 +// * TODO 暂时去除异常处理。后续使用其他代理框架修改测试
  200 +// */
202 201
203 }else { 202 }else {
204 logger.error("录像服务 代理失败: ", e); 203 logger.error("录像服务 代理失败: ", e);
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SsrcTransaction.java
1 package com.genersoft.iot.vmp.gb28181.bean; 1 package com.genersoft.iot.vmp.gb28181.bean;
2 2
3 -import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; 3 +import com.genersoft.iot.vmp.common.InviteSessionType;
4 4
5 public class SsrcTransaction { 5 public class SsrcTransaction {
6 6
@@ -13,7 +13,7 @@ public class SsrcTransaction { @@ -13,7 +13,7 @@ public class SsrcTransaction {
13 13
14 private SipTransactionInfo sipTransactionInfo; 14 private SipTransactionInfo sipTransactionInfo;
15 15
16 - private VideoStreamSessionManager.SessionType type; 16 + private InviteSessionType type;
17 17
18 public String getDeviceId() { 18 public String getDeviceId() {
19 return deviceId; 19 return deviceId;
@@ -63,11 +63,11 @@ public class SsrcTransaction { @@ -63,11 +63,11 @@ public class SsrcTransaction {
63 this.ssrc = ssrc; 63 this.ssrc = ssrc;
64 } 64 }
65 65
66 - public VideoStreamSessionManager.SessionType getType() { 66 + public InviteSessionType getType() {
67 return type; 67 return type;
68 } 68 }
69 69
70 - public void setType(VideoStreamSessionManager.SessionType type) { 70 + public void setType(InviteSessionType type) {
71 this.type = type; 71 this.type = type;
72 } 72 }
73 73
src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java
1 package com.genersoft.iot.vmp.gb28181.session; 1 package com.genersoft.iot.vmp.gb28181.session;
2 2
  3 +import com.genersoft.iot.vmp.common.InviteSessionType;
3 import com.genersoft.iot.vmp.common.VideoManagerConstants; 4 import com.genersoft.iot.vmp.common.VideoManagerConstants;
4 import com.genersoft.iot.vmp.conf.UserSetting; 5 import com.genersoft.iot.vmp.conf.UserSetting;
5 import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; 6 import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
@@ -27,12 +28,6 @@ public class VideoStreamSessionManager { @@ -27,12 +28,6 @@ public class VideoStreamSessionManager {
27 @Autowired 28 @Autowired
28 private RedisTemplate<Object, Object> redisTemplate; 29 private RedisTemplate<Object, Object> redisTemplate;
29 30
30 - public enum SessionType {  
31 - play,  
32 - playback,  
33 - download  
34 - }  
35 -  
36 /** 31 /**
37 * 添加一个点播/回放的事务信息 32 * 添加一个点播/回放的事务信息
38 * 后续可以通过流Id/callID 33 * 后续可以通过流Id/callID
@@ -43,7 +38,7 @@ public class VideoStreamSessionManager { @@ -43,7 +38,7 @@ public class VideoStreamSessionManager {
43 * @param mediaServerId 所使用的流媒体ID 38 * @param mediaServerId 所使用的流媒体ID
44 * @param response 回复 39 * @param response 回复
45 */ 40 */
46 - public void put(String deviceId, String channelId, String callId, String stream, String ssrc, String mediaServerId, SIPResponse response, SessionType type){ 41 + public void put(String deviceId, String channelId, String callId, String stream, String ssrc, String mediaServerId, SIPResponse response, InviteSessionType type){
47 SsrcTransaction ssrcTransaction = new SsrcTransaction(); 42 SsrcTransaction ssrcTransaction = new SsrcTransaction();
48 ssrcTransaction.setDeviceId(deviceId); 43 ssrcTransaction.setDeviceId(deviceId);
49 ssrcTransaction.setChannelId(channelId); 44 ssrcTransaction.setChannelId(channelId);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
1 package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; 1 package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
2 2
3 import com.alibaba.fastjson2.JSONObject; 3 import com.alibaba.fastjson2.JSONObject;
  4 +import com.genersoft.iot.vmp.common.InviteSessionType;
4 import com.genersoft.iot.vmp.common.StreamInfo; 5 import com.genersoft.iot.vmp.common.StreamInfo;
5 import com.genersoft.iot.vmp.conf.SipConfig; 6 import com.genersoft.iot.vmp.conf.SipConfig;
6 import com.genersoft.iot.vmp.conf.UserSetting; 7 import com.genersoft.iot.vmp.conf.UserSetting;
@@ -350,7 +351,7 @@ public class SIPCommander implements ISIPCommander { @@ -350,7 +351,7 @@ public class SIPCommander implements ISIPCommander {
350 // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 351 // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
351 ResponseEvent responseEvent = (ResponseEvent) e.event; 352 ResponseEvent responseEvent = (ResponseEvent) e.event;
352 SIPResponse response = (SIPResponse) responseEvent.getResponse(); 353 SIPResponse response = (SIPResponse) responseEvent.getResponse();
353 - streamSession.put(device.getDeviceId(), channelId, "play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.play); 354 + streamSession.put(device.getDeviceId(), channelId, "play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAY);
354 okEvent.response(e); 355 okEvent.response(e);
355 }); 356 });
356 } 357 }
@@ -452,7 +453,7 @@ public class SIPCommander implements ISIPCommander { @@ -452,7 +453,7 @@ public class SIPCommander implements ISIPCommander {
452 sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> { 453 sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> {
453 ResponseEvent responseEvent = (ResponseEvent) event.event; 454 ResponseEvent responseEvent = (ResponseEvent) event.event;
454 SIPResponse response = (SIPResponse) responseEvent.getResponse(); 455 SIPResponse response = (SIPResponse) responseEvent.getResponse();
455 - streamSession.put(device.getDeviceId(), channelId,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.playback); 456 + streamSession.put(device.getDeviceId(), channelId,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAYBACK);
456 okEvent.response(event); 457 okEvent.response(event);
457 }); 458 });
458 if (inviteStreamCallback != null) { 459 if (inviteStreamCallback != null) {
@@ -580,7 +581,7 @@ public class SIPCommander implements ISIPCommander { @@ -580,7 +581,7 @@ public class SIPCommander implements ISIPCommander {
580 if (ssrcIndex >= 0) { 581 if (ssrcIndex >= 0) {
581 ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); 582 ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
582 } 583 }
583 - streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.download); 584 + streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, InviteSessionType.DOWNLOAD);
584 okEvent.response(event); 585 okEvent.response(event);
585 }); 586 });
586 } 587 }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
1 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; 1 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
2 2
3 -import com.genersoft.iot.vmp.common.StreamInfo; 3 +import com.genersoft.iot.vmp.common.InviteInfo;
  4 +import com.genersoft.iot.vmp.common.InviteSessionType;
4 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; 5 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
5 import com.genersoft.iot.vmp.gb28181.bean.Device; 6 import com.genersoft.iot.vmp.gb28181.bean.Device;
6 import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; 7 import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
@@ -15,6 +16,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP @@ -15,6 +16,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP
15 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; 16 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
16 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 17 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
17 import com.genersoft.iot.vmp.service.IDeviceService; 18 import com.genersoft.iot.vmp.service.IDeviceService;
  19 +import com.genersoft.iot.vmp.service.IInviteStreamService;
18 import com.genersoft.iot.vmp.service.IMediaServerService; 20 import com.genersoft.iot.vmp.service.IMediaServerService;
19 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; 21 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
20 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 22 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -26,7 +28,9 @@ import org.springframework.beans.factory.InitializingBean; @@ -26,7 +28,9 @@ import org.springframework.beans.factory.InitializingBean;
26 import org.springframework.beans.factory.annotation.Autowired; 28 import org.springframework.beans.factory.annotation.Autowired;
27 import org.springframework.stereotype.Component; 29 import org.springframework.stereotype.Component;
28 30
29 -import javax.sip.*; 31 +import javax.sip.InvalidArgumentException;
  32 +import javax.sip.RequestEvent;
  33 +import javax.sip.SipException;
30 import javax.sip.address.SipURI; 34 import javax.sip.address.SipURI;
31 import javax.sip.header.CallIdHeader; 35 import javax.sip.header.CallIdHeader;
32 import javax.sip.header.FromHeader; 36 import javax.sip.header.FromHeader;
@@ -53,6 +57,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @@ -53,6 +57,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
53 private IRedisCatchStorage redisCatchStorage; 57 private IRedisCatchStorage redisCatchStorage;
54 58
55 @Autowired 59 @Autowired
  60 + private IInviteStreamService inviteStreamService;
  61 +
  62 + @Autowired
56 private IDeviceService deviceService; 63 private IDeviceService deviceService;
57 64
58 @Autowired 65 @Autowired
@@ -136,11 +143,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @@ -136,11 +143,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
136 Device device = storager.queryVideoDeviceByChannelId(platformGbId); 143 Device device = storager.queryVideoDeviceByChannelId(platformGbId);
137 if (device != null) { 144 if (device != null) {
138 storager.stopPlay(device.getDeviceId(), channelId); 145 storager.stopPlay(device.getDeviceId(), channelId);
139 - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);  
140 - if (streamInfo != null) {  
141 - redisCatchStorage.stopPlay(streamInfo);  
142 - mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream());  
143 - }  
144 SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); 146 SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
145 if (ssrcTransactionForPlay != null){ 147 if (ssrcTransactionForPlay != null){
146 if (ssrcTransactionForPlay.getCallId().equals(callIdHeader.getCallId())){ 148 if (ssrcTransactionForPlay.getCallId().equals(callIdHeader.getCallId())){
@@ -151,6 +153,14 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @@ -151,6 +153,14 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
151 } 153 }
152 streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream()); 154 streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream());
153 } 155 }
  156 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  157 +
  158 + if (inviteInfo != null) {
  159 + inviteStreamService.removeInviteInfo(inviteInfo);
  160 + if (inviteInfo.getStreamInfo() != null) {
  161 + mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream());
  162 + }
  163 + }
154 } 164 }
155 SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null); 165 SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null);
156 if (ssrcTransactionForPlayBack != null) { 166 if (ssrcTransactionForPlayBack != null) {
@@ -160,6 +170,14 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @@ -160,6 +170,14 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
160 mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlayBack.getSsrc()); 170 mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlayBack.getSsrc());
161 } 171 }
162 streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream()); 172 streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream());
  173 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, device.getDeviceId(), channelId);
  174 +
  175 + if (inviteInfo != null) {
  176 + inviteStreamService.removeInviteInfo(inviteInfo);
  177 + if (inviteInfo.getStreamInfo() != null) {
  178 + mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream());
  179 + }
  180 + }
163 } 181 }
164 } 182 }
165 183
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
1 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; 1 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
2 2
3 -import com.alibaba.fastjson2.JSONObject; 3 +import com.genersoft.iot.vmp.common.StreamInfo;
4 import com.genersoft.iot.vmp.conf.DynamicTask; 4 import com.genersoft.iot.vmp.conf.DynamicTask;
5 import com.genersoft.iot.vmp.conf.UserSetting; 5 import com.genersoft.iot.vmp.conf.UserSetting;
6 import com.genersoft.iot.vmp.gb28181.bean.*; 6 import com.genersoft.iot.vmp.gb28181.bean.*;
7 -import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;  
8 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; 7 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
9 -import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;  
10 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; 8 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
11 import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; 9 import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
12 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; 10 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
@@ -21,6 +19,8 @@ import com.genersoft.iot.vmp.service.IMediaServerService; @@ -21,6 +19,8 @@ import com.genersoft.iot.vmp.service.IMediaServerService;
21 import com.genersoft.iot.vmp.service.IPlayService; 19 import com.genersoft.iot.vmp.service.IPlayService;
22 import com.genersoft.iot.vmp.service.IStreamProxyService; 20 import com.genersoft.iot.vmp.service.IStreamProxyService;
23 import com.genersoft.iot.vmp.service.IStreamPushService; 21 import com.genersoft.iot.vmp.service.IStreamPushService;
  22 +import com.genersoft.iot.vmp.service.bean.InviteErrorCallback;
  23 +import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
24 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; 24 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
25 import com.genersoft.iot.vmp.service.bean.SSRCInfo; 25 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
26 import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; 26 import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
@@ -102,9 +102,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -102,9 +102,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
102 private SIPProcessorObserver sipProcessorObserver; 102 private SIPProcessorObserver sipProcessorObserver;
103 103
104 @Autowired 104 @Autowired
105 - private VideoStreamSessionManager sessionManager;  
106 -  
107 - @Autowired  
108 private UserSetting userSetting; 105 private UserSetting userSetting;
109 106
110 @Autowired 107 @Autowired
@@ -380,10 +377,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -380,10 +377,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
380 377
381 Long finalStartTime = startTime; 378 Long finalStartTime = startTime;
382 Long finalStopTime = stopTime; 379 Long finalStopTime = stopTime;
383 - ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> {  
384 - String app = responseJSON.getString("app");  
385 - String stream = responseJSON.getString("stream");  
386 - logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", app, stream); 380 + InviteErrorCallback<Object> hookEvent = (code, msg, data) -> {
  381 + StreamInfo streamInfo = (StreamInfo)data;
  382 + MediaServerItem mediaServerItemInUSe = mediaServerService.getOne(streamInfo.getMediaServerId());
  383 + logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", streamInfo.getApp(), streamInfo.getStream());
387 // * 0 等待设备推流上来 384 // * 0 等待设备推流上来
388 // * 1 下级已经推流,等待上级平台回复ack 385 // * 1 下级已经推流,等待上级平台回复ack
389 // * 2 推流中 386 // * 2 推流中
@@ -429,10 +426,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -429,10 +426,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
429 logger.error("[命令发送失败] 国标级联 回复SdpAck", e); 426 logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
430 } 427 }
431 }; 428 };
432 - SipSubscribe.Event errorEvent = ((event) -> { 429 + InviteErrorCallback<Object> errorEvent = ((statusCode, msg, data) -> {
433 // 未知错误。直接转发设备点播的错误 430 // 未知错误。直接转发设备点播的错误
434 try { 431 try {
435 - Response response = getMessageFactory().createResponse(event.statusCode, evt.getRequest()); 432 + Response response = getMessageFactory().createResponse(statusCode, evt.getRequest());
436 sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response); 433 sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response);
437 } catch (ParseException | SipException e) { 434 } catch (ParseException | SipException e) {
438 logger.error("未处理的异常 ", e); 435 logger.error("未处理的异常 ", e);
@@ -450,7 +447,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -450,7 +447,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
450 if (result.getCode() != 0) { 447 if (result.getCode() != 0) {
451 logger.warn("录像回放失败"); 448 logger.warn("录像回放失败");
452 if (result.getEvent() != null) { 449 if (result.getEvent() != null) {
453 - errorEvent.response(result.getEvent()); 450 +// errorEvent.response(result.getEvent());
454 } 451 }
455 redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); 452 redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
456 try { 453 try {
@@ -460,53 +457,31 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -460,53 +457,31 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
460 } 457 }
461 } else { 458 } else {
462 if (result.getMediaServerItem() != null) { 459 if (result.getMediaServerItem() != null) {
463 - hookEvent.response(result.getMediaServerItem(), result.getResponse()); 460 +// hookEvent.response(result.getMediaServerItem(), result.getResponse());
464 } 461 }
465 } 462 }
466 }); 463 });
467 } else { 464 } else {
468 sendRtpItem.setPlayType(InviteStreamType.PLAY); 465 sendRtpItem.setPlayType(InviteStreamType.PLAY);
469 - SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);  
470 - if (playTransaction != null) {  
471 - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream());  
472 - if (!streamReady) {  
473 - boolean hasRtpServer = mediaServerService.checkRtpServer(mediaServerItem, "rtp", playTransaction.getStream());  
474 - if (hasRtpServer) {  
475 - logger.info("[上级点播]已经开启rtpServer但是尚未收到流,开启监听流的到来");  
476 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", playTransaction.getStream(), true, "rtsp", mediaServerItem.getId());  
477 - zlmHttpHookSubscribe.addSubscribe(hookSubscribe, hookEvent);  
478 - }else {  
479 - playTransaction = null;  
480 - }  
481 - } 466 + String streamId = null;
  467 + if (mediaServerItem.isRtpEnable()) {
  468 + streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  469 + }else {
  470 + streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase();
482 } 471 }
483 - if (playTransaction == null) {  
484 - // 被点播的通道目前未被点播,则开始点播  
485 - String streamId = null;  
486 - if (mediaServerItem.isRtpEnable()) {  
487 - streamId = String.format("%s_%s", device.getDeviceId(), channelId);  
488 - }  
489 - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());  
490 - logger.info(JSONObject.toJSONString(ssrcInfo));  
491 - sendRtpItem.setStreamId(ssrcInfo.getStream());  
492 -// sendRtpItem.setSsrc(ssrcInfo.getSsrc());  
493 -  
494 - // 写入redis, 超时时回复  
495 - redisCatchStorage.updateSendRTPSever(sendRtpItem);  
496 - playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> { 472 + sendRtpItem.setStreamId(streamId);
  473 + redisCatchStorage.updateSendRTPSever(sendRtpItem);
  474 + playService.play(mediaServerItem, device.getDeviceId(), channelId, ((code, msg, data) -> {
  475 + if (code == InviteErrorCode.SUCCESS.getCode()){
  476 + hookEvent.run(code, msg, data);
  477 + }else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){
497 logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId); 478 logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId);
498 redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); 479 redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
499 - });  
500 - } else { 480 + }else {
  481 + errorEvent.run(code, msg, data);
  482 + }
  483 + }));
501 484
502 - sendRtpItem.setStreamId(playTransaction.getStream());  
503 - // 写入redis, 超时时回复  
504 - redisCatchStorage.updateSendRTPSever(sendRtpItem);  
505 - JSONObject jsonObject = new JSONObject();  
506 - jsonObject.put("app", sendRtpItem.getApp());  
507 - jsonObject.put("stream", sendRtpItem.getStreamId());  
508 - hookEvent.response(mediaServerItem, jsonObject);  
509 - }  
510 } 485 }
511 } else if (gbStream != null) { 486 } else if (gbStream != null) {
512 487
@@ -559,7 +534,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -559,7 +534,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
559 int port, Boolean tcpActive, boolean mediaTransmissionTCP, 534 int port, Boolean tcpActive, boolean mediaTransmissionTCP,
560 String channelId, String addressStr, String ssrc, String requesterId) { 535 String channelId, String addressStr, String ssrc, String requesterId) {
561 Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); 536 Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
562 - if (streamReady) { 537 + if (streamReady != null && streamReady) {
563 // 自平台内容 538 // 自平台内容
564 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, 539 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
565 gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); 540 gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
@@ -598,7 +573,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -598,7 +573,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
598 // 推流 573 // 推流
599 if (streamPushItem.isSelf()) { 574 if (streamPushItem.isSelf()) {
600 Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); 575 Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
601 - if (streamReady) { 576 + if (streamReady != null && streamReady) {
602 // 自平台内容 577 // 自平台内容
603 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, 578 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
604 gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); 579 gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -2,6 +2,8 @@ package com.genersoft.iot.vmp.media.zlm; @@ -2,6 +2,8 @@ package com.genersoft.iot.vmp.media.zlm;
2 2
3 import com.alibaba.fastjson2.JSON; 3 import com.alibaba.fastjson2.JSON;
4 import com.alibaba.fastjson2.JSONObject; 4 import com.alibaba.fastjson2.JSONObject;
  5 +import com.genersoft.iot.vmp.common.InviteInfo;
  6 +import com.genersoft.iot.vmp.common.InviteSessionType;
5 import com.genersoft.iot.vmp.common.StreamInfo; 7 import com.genersoft.iot.vmp.common.StreamInfo;
6 import com.genersoft.iot.vmp.conf.UserSetting; 8 import com.genersoft.iot.vmp.conf.UserSetting;
7 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; 9 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
@@ -22,10 +24,8 @@ import com.genersoft.iot.vmp.media.zlm.dto.hook.*; @@ -22,10 +24,8 @@ import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
22 import com.genersoft.iot.vmp.service.*; 24 import com.genersoft.iot.vmp.service.*;
23 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 25 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
24 import com.genersoft.iot.vmp.storager.IVideoManagerStorage; 26 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
25 -import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;  
26 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; 27 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
27 import com.genersoft.iot.vmp.vmanager.bean.StreamContent; 28 import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
28 -import com.genersoft.iot.vmp.vmanager.bean.WVPResult;  
29 import org.slf4j.Logger; 29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory; 30 import org.slf4j.LoggerFactory;
31 import org.springframework.beans.factory.annotation.Autowired; 31 import org.springframework.beans.factory.annotation.Autowired;
@@ -71,6 +71,9 @@ public class ZLMHttpHookListener { @@ -71,6 +71,9 @@ public class ZLMHttpHookListener {
71 private IRedisCatchStorage redisCatchStorage; 71 private IRedisCatchStorage redisCatchStorage;
72 72
73 @Autowired 73 @Autowired
  74 + private IInviteStreamService inviteStreamService;
  75 +
  76 + @Autowired
74 private IDeviceService deviceService; 77 private IDeviceService deviceService;
75 78
76 @Autowired 79 @Autowired
@@ -252,7 +255,7 @@ public class ZLMHttpHookListener { @@ -252,7 +255,7 @@ public class ZLMHttpHookListener {
252 result.setEnable_audio(deviceChannel.isHasAudio()); 255 result.setEnable_audio(deviceChannel.isHasAudio());
253 } 256 }
254 // 如果是录像下载就设置视频间隔十秒 257 // 如果是录像下载就设置视频间隔十秒
255 - if (ssrcTransactionForAll.get(0).getType() == VideoStreamSessionManager.SessionType.download) { 258 + if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) {
256 result.setMp4_max_second(10); 259 result.setMp4_max_second(10);
257 result.setEnable_audio(true); 260 result.setEnable_audio(true);
258 result.setEnable_mp4(true); 261 result.setEnable_mp4(true);
@@ -342,17 +345,10 @@ public class ZLMHttpHookListener { @@ -342,17 +345,10 @@ public class ZLMHttpHookListener {
342 } 345 }
343 346
344 if ("rtp".equals(param.getApp()) && !param.isRegist()) { 347 if ("rtp".equals(param.getApp()) && !param.isRegist()) {
345 - StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(param.getStream());  
346 - if (streamInfo != null) {  
347 - redisCatchStorage.stopPlay(streamInfo);  
348 - storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());  
349 - } else {  
350 - streamInfo = redisCatchStorage.queryPlayback(null, null,  
351 - param.getStream(), null);  
352 - if (streamInfo != null) {  
353 - redisCatchStorage.stopPlayback(streamInfo.getDeviceID(), streamInfo.getChannelId(),  
354 - streamInfo.getStream(), null);  
355 - } 348 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
  349 + if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
  350 + inviteStreamService.removeInviteInfo(inviteInfo);
  351 + storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
356 } 352 }
357 } else { 353 } else {
358 if (!"rtp".equals(param.getApp())) { 354 if (!"rtp".equals(param.getApp())) {
@@ -450,13 +446,15 @@ public class ZLMHttpHookListener { @@ -450,13 +446,15 @@ public class ZLMHttpHookListener {
450 if ("rtp".equals(param.getApp())) { 446 if ("rtp".equals(param.getApp())) {
451 ret.put("close", userSetting.getStreamOnDemand()); 447 ret.put("close", userSetting.getStreamOnDemand());
452 // 国标流, 点播/录像回放/录像下载 448 // 国标流, 点播/录像回放/录像下载
453 - StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(param.getStream()); 449 +// StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(param.getStream());
  450 +
  451 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
454 // 点播 452 // 点播
455 - if (streamInfoForPlayCatch != null) { 453 + if (inviteInfo != null) {
456 // 收到无人观看说明流也没有在往上级推送 454 // 收到无人观看说明流也没有在往上级推送
457 - if (redisCatchStorage.isChannelSendingRTP(streamInfoForPlayCatch.getChannelId())) { 455 + if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
458 List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( 456 List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
459 - streamInfoForPlayCatch.getChannelId()); 457 + inviteInfo.getChannelId());
460 if (sendRtpItems.size() > 0) { 458 if (sendRtpItems.size() > 0) {
461 for (SendRtpItem sendRtpItem : sendRtpItems) { 459 for (SendRtpItem sendRtpItem : sendRtpItems) {
462 ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); 460 ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
@@ -470,19 +468,22 @@ public class ZLMHttpHookListener { @@ -470,19 +468,22 @@ public class ZLMHttpHookListener {
470 } 468 }
471 } 469 }
472 } 470 }
473 - Device device = deviceService.getDevice(streamInfoForPlayCatch.getDeviceID()); 471 + Device device = deviceService.getDevice(inviteInfo.getDeviceId());
474 if (device != null) { 472 if (device != null) {
475 try { 473 try {
476 - cmder.streamByeCmd(device, streamInfoForPlayCatch.getChannelId(),  
477 - streamInfoForPlayCatch.getStream(), null); 474 + if (inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()) != null) {
  475 + cmder.streamByeCmd(device, inviteInfo.getChannelId(),
  476 + inviteInfo.getStream(), null);
  477 + }
478 } catch (InvalidArgumentException | ParseException | SipException | 478 } catch (InvalidArgumentException | ParseException | SipException |
479 SsrcTransactionNotFoundException e) { 479 SsrcTransactionNotFoundException e) {
480 logger.error("[无人观看]点播, 发送BYE失败 {}", e.getMessage()); 480 logger.error("[无人观看]点播, 发送BYE失败 {}", e.getMessage());
481 } 481 }
482 } 482 }
483 483
484 - redisCatchStorage.stopPlay(streamInfoForPlayCatch);  
485 - storager.stopPlay(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId()); 484 + inviteStreamService.removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
  485 + inviteInfo.getChannelId(), inviteInfo.getStream());
  486 + storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
486 return ret; 487 return ret;
487 } 488 }
488 // 录像回放 489 // 录像回放
@@ -582,6 +583,7 @@ public class ZLMHttpHookListener { @@ -582,6 +583,7 @@ public class ZLMHttpHookListener {
582 return defaultResult; 583 return defaultResult;
583 } 584 }
584 logger.info("[ZLM HOOK] 流未找到, 发起自动点播:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); 585 logger.info("[ZLM HOOK] 流未找到, 发起自动点播:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
  586 +
585 RequestMessage msg = new RequestMessage(); 587 RequestMessage msg = new RequestMessage();
586 String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; 588 String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
587 boolean exist = resultHolder.exist(key, null); 589 boolean exist = resultHolder.exist(key, null);
@@ -589,31 +591,22 @@ public class ZLMHttpHookListener { @@ -589,31 +591,22 @@ public class ZLMHttpHookListener {
589 String uuid = UUID.randomUUID().toString(); 591 String uuid = UUID.randomUUID().toString();
590 msg.setId(uuid); 592 msg.setId(uuid);
591 DeferredResult<HookResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); 593 DeferredResult<HookResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
592 - DeferredResultEx<HookResult> deferredResultEx = new DeferredResultEx<>(result);  
593 594
594 result.onTimeout(() -> { 595 result.onTimeout(() -> {
595 - logger.info("点播接口等待超时"); 596 + logger.info("[ZLM HOOK] 自动点播, 等待超时");
596 // 释放rtpserver 597 // 释放rtpserver
597 msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "点播超时")); 598 msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "点播超时"));
598 resultHolder.invokeResult(msg); 599 resultHolder.invokeResult(msg);
599 }); 600 });
600 - // TODO 在点播未成功的情况下在此调用接口点播会导致返回的流地址ip错误  
601 - deferredResultEx.setFilter(result1 -> {  
602 - WVPResult<StreamInfo> wvpResult1 = (WVPResult<StreamInfo>) result1;  
603 - HookResult resultForEnd = new HookResult();  
604 - resultForEnd.setCode(wvpResult1.getCode());  
605 - resultForEnd.setMsg(wvpResult1.getMsg());  
606 - return resultForEnd;  
607 - });  
608 601
609 // 录像查询以channelId作为deviceId查询 602 // 录像查询以channelId作为deviceId查询
610 - resultHolder.put(key, uuid, deferredResultEx); 603 + resultHolder.put(key, uuid, result);
611 604
612 if (!exist) { 605 if (!exist) {
613 - playService.play(mediaInfo, deviceId, channelId, null, eventResult -> {  
614 - msg.setData(new HookResult(eventResult.statusCode, eventResult.msg)); 606 + playService.play(mediaInfo, deviceId, channelId, (code, message, data) -> {
  607 + msg.setData(new HookResult(code, message));
615 resultHolder.invokeResult(msg); 608 resultHolder.invokeResult(msg);
616 - }, null); 609 + });
617 } 610 }
618 return result; 611 return result;
619 } else { 612 } else {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
@@ -97,7 +97,8 @@ public class ZLMMediaListManager { @@ -97,7 +97,8 @@ public class ZLMMediaListManager {
97 public void sendStreamEvent(String app, String stream, String mediaServerId) { 97 public void sendStreamEvent(String app, String stream, String mediaServerId) {
98 MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); 98 MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
99 // 查看推流状态 99 // 查看推流状态
100 - if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) { 100 + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream);
  101 + if (streamReady != null && streamReady) {
101 ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(app, stream); 102 ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(app, stream);
102 if (channelOnlineEventLister != null) { 103 if (channelOnlineEventLister != null) {
103 try { 104 try {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -330,6 +330,9 @@ public class ZLMRTPServerFactory { @@ -330,6 +330,9 @@ public class ZLMRTPServerFactory {
330 */ 330 */
331 public Boolean isRtpReady(MediaServerItem mediaServerItem, String streamId) { 331 public Boolean isRtpReady(MediaServerItem mediaServerItem, String streamId) {
332 JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem,"rtp", "rtsp", streamId); 332 JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem,"rtp", "rtsp", streamId);
  333 + if (mediaInfo.getInteger("code") == -2) {
  334 + return null;
  335 + }
333 return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")); 336 return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"));
334 } 337 }
335 338
@@ -338,8 +341,10 @@ public class ZLMRTPServerFactory { @@ -338,8 +341,10 @@ public class ZLMRTPServerFactory {
338 */ 341 */
339 public Boolean isStreamReady(MediaServerItem mediaServerItem, String app, String streamId) { 342 public Boolean isStreamReady(MediaServerItem mediaServerItem, String app, String streamId) {
340 JSONObject mediaInfo = zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId); 343 JSONObject mediaInfo = zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId);
341 - return mediaInfo != null && (mediaInfo.getInteger("code") == 0  
342 - 344 + if (mediaInfo == null || (mediaInfo.getInteger("code") == -2)) {
  345 + return null;
  346 + }
  347 + return (mediaInfo.getInteger("code") == 0
343 && mediaInfo.getJSONArray("data") != null 348 && mediaInfo.getJSONArray("data") != null
344 && mediaInfo.getJSONArray("data").size() > 0); 349 && mediaInfo.getJSONArray("data").size() > 0);
345 } 350 }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZlmHttpHookSubscribe.java
@@ -134,9 +134,10 @@ public class ZlmHttpHookSubscribe { @@ -134,9 +134,10 @@ public class ZlmHttpHookSubscribe {
134 /** 134 /**
135 * 对订阅数据进行过期清理 135 * 对订阅数据进行过期清理
136 */ 136 */
137 - @Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次 137 +// @Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次
  138 + @Scheduled(fixedRate = 2 * 1000)
138 public void execute(){ 139 public void execute(){
139 - 140 + System.out.println(allSubscribes.size());
140 Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5)); 141 Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5));
141 int total = 0; 142 int total = 0;
142 for (HookType hookType : allSubscribes.keySet()) { 143 for (HookType hookType : allSubscribes.keySet()) {
src/main/java/com/genersoft/iot/vmp/service/IInviteStreamService.java 0 → 100644
  1 +package com.genersoft.iot.vmp.service;
  2 +
  3 +import com.genersoft.iot.vmp.common.InviteInfo;
  4 +import com.genersoft.iot.vmp.common.InviteSessionType;
  5 +import com.genersoft.iot.vmp.service.bean.InviteErrorCallback;
  6 +
  7 +/**
  8 + * 记录国标点播的状态,包括实时预览,下载,录像回放
  9 + */
  10 +public interface IInviteStreamService {
  11 +
  12 + /**
  13 + * 更新点播的状态信息
  14 + */
  15 + void updateInviteInfo(InviteInfo inviteInfo);
  16 +
  17 + /**
  18 + * 获取点播的状态信息
  19 + */
  20 + InviteInfo getInviteInfo(InviteSessionType type,
  21 + String deviceId,
  22 + String channelId,
  23 + String stream);
  24 +
  25 + /**
  26 + * 移除点播的状态信息
  27 + */
  28 + void removeInviteInfo(InviteSessionType type,
  29 + String deviceId,
  30 + String channelId,
  31 + String stream);
  32 + /**
  33 + * 移除点播的状态信息
  34 + */
  35 + void removeInviteInfo(InviteInfo inviteInfo);
  36 + /**
  37 + * 移除点播的状态信息
  38 + */
  39 + void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId);
  40 +
  41 + /**
  42 + * 获取点播的状态信息
  43 + */
  44 + InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type,
  45 + String deviceId,
  46 + String channelId);
  47 +
  48 + /**
  49 + * 获取点播的状态信息
  50 + */
  51 + InviteInfo getInviteInfoByStream(InviteSessionType type, String stream);
  52 +
  53 +
  54 + /**
  55 + * 添加一个invite回调
  56 + */
  57 + void once(InviteSessionType type, String deviceId, String channelId, String stream, InviteErrorCallback<Object> callback);
  58 +
  59 + /**
  60 + * 调用一个invite回调
  61 + */
  62 + void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data);
  63 +}
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
1 package com.genersoft.iot.vmp.service; 1 package com.genersoft.iot.vmp.service;
2 2
3 -import com.alibaba.fastjson2.JSONObject;  
4 import com.genersoft.iot.vmp.common.StreamInfo; 3 import com.genersoft.iot.vmp.common.StreamInfo;
5 import com.genersoft.iot.vmp.conf.exception.ServiceException; 4 import com.genersoft.iot.vmp.conf.exception.ServiceException;
6 import com.genersoft.iot.vmp.gb28181.bean.Device; 5 import com.genersoft.iot.vmp.gb28181.bean.Device;
7 import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback; 6 import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
8 -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo;  
9 -import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;  
10 -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;  
11 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 7 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
12 -import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; 8 +import com.genersoft.iot.vmp.service.bean.InviteErrorCallback;
13 import com.genersoft.iot.vmp.service.bean.PlayBackCallback; 9 import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
14 import com.genersoft.iot.vmp.service.bean.SSRCInfo; 10 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
15 11
@@ -22,12 +18,9 @@ import java.text.ParseException; @@ -22,12 +18,9 @@ import java.text.ParseException;
22 */ 18 */
23 public interface IPlayService { 19 public interface IPlayService {
24 20
25 - void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId);  
26 -  
27 void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, 21 void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
28 - ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,  
29 - InviteTimeOutCallback timeoutCallback);  
30 - void play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback); 22 + InviteErrorCallback<Object> callback);
  23 + SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, InviteErrorCallback<Object> callback);
31 24
32 MediaServerItem getNewMediaServerItem(Device device); 25 MediaServerItem getNewMediaServerItem(Device device);
33 26
@@ -36,8 +29,6 @@ public interface IPlayService { @@ -36,8 +29,6 @@ public interface IPlayService {
36 */ 29 */
37 MediaServerItem getNewMediaServerItemHasAssist(Device device); 30 MediaServerItem getNewMediaServerItemHasAssist(Device device);
38 31
39 - void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String toString);  
40 -  
41 void playBack(String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback playBackCallback); 32 void playBack(String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback playBackCallback);
42 void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack); 33 void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack);
43 34
src/main/java/com/genersoft/iot/vmp/service/bean/InviteErrorCallback.java 0 → 100644
  1 +package com.genersoft.iot.vmp.service.bean;
  2 +
  3 +public interface InviteErrorCallback<T> {
  4 +
  5 + void run(int code, String msg, T data);
  6 +}
src/main/java/com/genersoft/iot/vmp/service/bean/InviteErrorCode.java 0 → 100644
  1 +package com.genersoft.iot.vmp.service.bean;
  2 +
  3 +/**
  4 + * 全局错误码
  5 + */
  6 +public enum InviteErrorCode {
  7 + SUCCESS(0, "成功"),
  8 + ERROR_FOR_SIGNALLING_TIMEOUT(-1, "点播超时"),
  9 + ERROR_FOR_STREAM_TIMEOUT(-2, "收流超时"),
  10 + ERROR_FOR_RESOURCE_EXHAUSTION(-3, "资源耗尽"),
  11 + ERROR_FOR_CATCH_DATA(-4, "缓存数据异常"),
  12 + ERROR_FOR_SIGNALLING_ERROR(-5, "收到信令错误"),
  13 + ERROR_FOR_STREAM_PARSING_EXCEPTIONS(-6, "流地址解析错误"),
  14 + ERROR_FOR_SDP_PARSING_EXCEPTIONS(-7, "SDP信息解析失败"),
  15 + ERROR_FOR_SSRC_UNAVAILABLE(-8, "SSRC不可用"),
  16 + ERROR_FOR_RESET_SSRC(-9, "重新设置收流信息失败"),
  17 + ERROR_FOR_SIP_SENDING_FAILED(-10, "命令发送失败");
  18 +
  19 + private final int code;
  20 + private final String msg;
  21 +
  22 + InviteErrorCode(int code, String msg) {
  23 + this.code = code;
  24 + this.msg = msg;
  25 + }
  26 +
  27 + public int getCode() {
  28 + return code;
  29 + }
  30 +
  31 + public String getMsg() {
  32 + return msg;
  33 + }
  34 +}
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
1 package com.genersoft.iot.vmp.service.impl; 1 package com.genersoft.iot.vmp.service.impl;
2 2
3 -import com.genersoft.iot.vmp.common.StreamInfo; 3 +import com.genersoft.iot.vmp.common.InviteInfo;
  4 +import com.genersoft.iot.vmp.common.InviteSessionType;
4 import com.genersoft.iot.vmp.gb28181.bean.Device; 5 import com.genersoft.iot.vmp.gb28181.bean.Device;
5 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; 6 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
6 import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; 7 import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
7 import com.genersoft.iot.vmp.service.IDeviceChannelService; 8 import com.genersoft.iot.vmp.service.IDeviceChannelService;
  9 +import com.genersoft.iot.vmp.service.IInviteStreamService;
8 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 10 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
9 import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; 11 import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
10 import com.genersoft.iot.vmp.storager.dao.DeviceMapper; 12 import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
@@ -33,6 +35,9 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { @@ -33,6 +35,9 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
33 private IRedisCatchStorage redisCatchStorage; 35 private IRedisCatchStorage redisCatchStorage;
34 36
35 @Autowired 37 @Autowired
  38 + private IInviteStreamService inviteStreamService;
  39 +
  40 + @Autowired
36 private DeviceChannelMapper channelMapper; 41 private DeviceChannelMapper channelMapper;
37 42
38 @Autowired 43 @Autowired
@@ -78,9 +83,10 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { @@ -78,9 +83,10 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
78 public void updateChannel(String deviceId, DeviceChannel channel) { 83 public void updateChannel(String deviceId, DeviceChannel channel) {
79 String channelId = channel.getChannelId(); 84 String channelId = channel.getChannelId();
80 channel.setDeviceId(deviceId); 85 channel.setDeviceId(deviceId);
81 - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);  
82 - if (streamInfo != null) {  
83 - channel.setStreamId(streamInfo.getStream()); 86 +// StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
  87 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  88 + if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
  89 + channel.setStreamId(inviteInfo.getStreamInfo().getStream());
84 } 90 }
85 String now = DateUtil.getNow(); 91 String now = DateUtil.getNow();
86 channel.setUpdateTime(now); 92 channel.setUpdateTime(now);
@@ -106,9 +112,9 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { @@ -106,9 +112,9 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
106 if (channelList.size() == 0) { 112 if (channelList.size() == 0) {
107 for (DeviceChannel channel : channels) { 113 for (DeviceChannel channel : channels) {
108 channel.setDeviceId(deviceId); 114 channel.setDeviceId(deviceId);
109 - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channel.getChannelId());  
110 - if (streamInfo != null) {  
111 - channel.setStreamId(streamInfo.getStream()); 115 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channel.getChannelId());
  116 + if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
  117 + channel.setStreamId(inviteInfo.getStreamInfo().getStream());
112 } 118 }
113 String now = DateUtil.getNow(); 119 String now = DateUtil.getNow();
114 channel.setUpdateTime(now); 120 channel.setUpdateTime(now);
@@ -122,9 +128,9 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { @@ -122,9 +128,9 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
122 } 128 }
123 for (DeviceChannel channel : channels) { 129 for (DeviceChannel channel : channels) {
124 channel.setDeviceId(deviceId); 130 channel.setDeviceId(deviceId);
125 - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channel.getChannelId());  
126 - if (streamInfo != null) {  
127 - channel.setStreamId(streamInfo.getStream()); 131 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channel.getChannelId());
  132 + if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
  133 + channel.setStreamId(inviteInfo.getStreamInfo().getStream());
128 } 134 }
129 String now = DateUtil.getNow(); 135 String now = DateUtil.getNow();
130 channel.setUpdateTime(now); 136 channel.setUpdateTime(now);
src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java 0 → 100644
  1 +package com.genersoft.iot.vmp.service.impl;
  2 +
  3 +import com.alibaba.fastjson2.JSON;
  4 +import com.genersoft.iot.vmp.common.InviteInfo;
  5 +import com.genersoft.iot.vmp.common.InviteSessionStatus;
  6 +import com.genersoft.iot.vmp.common.InviteSessionType;
  7 +import com.genersoft.iot.vmp.common.VideoManagerConstants;
  8 +import com.genersoft.iot.vmp.service.IInviteStreamService;
  9 +import com.genersoft.iot.vmp.service.bean.InviteErrorCallback;
  10 +import com.genersoft.iot.vmp.utils.redis.RedisUtil;
  11 +import org.slf4j.Logger;
  12 +import org.slf4j.LoggerFactory;
  13 +import org.springframework.beans.factory.annotation.Autowired;
  14 +import org.springframework.data.redis.core.RedisTemplate;
  15 +import org.springframework.stereotype.Service;
  16 +
  17 +import java.util.List;
  18 +import java.util.Map;
  19 +import java.util.concurrent.ConcurrentHashMap;
  20 +import java.util.concurrent.CopyOnWriteArrayList;
  21 +
  22 +@Service
  23 +public class InviteStreamServiceImpl implements IInviteStreamService {
  24 +
  25 + private final Logger logger = LoggerFactory.getLogger(InviteStreamServiceImpl.class);
  26 +
  27 + private final Map<String, List<InviteErrorCallback<Object>>> inviteErrorCallbackMap = new ConcurrentHashMap<>();
  28 +
  29 + @Autowired
  30 + private RedisTemplate<Object, Object> redisTemplate;
  31 +
  32 + @Override
  33 + public void updateInviteInfo(InviteInfo inviteInfo) {
  34 + if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) {
  35 + logger.warn("[更新Invite信息],参数不全: {}", JSON.toJSON(inviteInfo));
  36 + return;
  37 + }
  38 + InviteInfo inviteInfoForUpdate = null;
  39 +
  40 + if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
  41 + if (inviteInfo.getDeviceId() == null
  42 + || inviteInfo.getChannelId() == null
  43 + || inviteInfo.getType() == null
  44 + || inviteInfo.getStream() == null
  45 + ) {
  46 + return;
  47 + }
  48 + inviteInfoForUpdate = inviteInfo;
  49 + } else {
  50 + InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
  51 + inviteInfo.getChannelId(), inviteInfo.getStream());
  52 + if (inviteInfoInRedis == null) {
  53 + logger.warn("[更新Invite信息],未从缓存中读取到Invite信息: deviceId: {}, channel: {}, stream: {}",
  54 + inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  55 + return;
  56 + }
  57 + if (inviteInfo.getStreamInfo() != null) {
  58 + inviteInfoInRedis.setStreamInfo(inviteInfo.getStreamInfo());
  59 + }
  60 + if (inviteInfo.getSsrcInfo() != null) {
  61 + inviteInfoInRedis.setSsrcInfo(inviteInfo.getSsrcInfo());
  62 + }
  63 + if (inviteInfo.getStreamMode() != null) {
  64 + inviteInfoInRedis.setStreamMode(inviteInfo.getStreamMode());
  65 + }
  66 + if (inviteInfo.getReceiveIp() != null) {
  67 + inviteInfoInRedis.setReceiveIp(inviteInfo.getReceiveIp());
  68 + }
  69 + if (inviteInfo.getReceivePort() != null) {
  70 + inviteInfoInRedis.setReceivePort(inviteInfo.getReceivePort());
  71 + }
  72 + if (inviteInfo.getStatus() != null) {
  73 + inviteInfoInRedis.setStatus(inviteInfo.getStatus());
  74 + }
  75 +
  76 + inviteInfoForUpdate = inviteInfoInRedis;
  77 +
  78 + }
  79 + String key = VideoManagerConstants.INVITE_PREFIX +
  80 + "_" + inviteInfoForUpdate.getType() +
  81 + "_" + inviteInfoForUpdate.getDeviceId() +
  82 + "_" + inviteInfoForUpdate.getChannelId() +
  83 + "_" + inviteInfoForUpdate.getStream();
  84 + redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
  85 + }
  86 +
  87 + @Override
  88 + public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
  89 + String key = VideoManagerConstants.INVITE_PREFIX +
  90 + "_" + (type != null ? type : "*") +
  91 + "_" + (deviceId != null ? deviceId : "*") +
  92 + "_" + (channelId != null ? channelId : "*") +
  93 + "_" + (stream != null ? stream : "*");
  94 + List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
  95 + if (scanResult.size() != 1) {
  96 + return null;
  97 + }
  98 +
  99 + return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
  100 + }
  101 +
  102 + @Override
  103 + public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, String deviceId, String channelId) {
  104 + return getInviteInfo(type, deviceId, channelId, null);
  105 + }
  106 +
  107 + @Override
  108 + public InviteInfo getInviteInfoByStream(InviteSessionType type, String stream) {
  109 + return getInviteInfo(type, null, null, stream);
  110 + }
  111 +
  112 + @Override
  113 + public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
  114 + String scanKey = VideoManagerConstants.INVITE_PREFIX +
  115 + "_" + (type != null ? type : "*") +
  116 + "_" + (deviceId != null ? deviceId : "*") +
  117 + "_" + (channelId != null ? channelId : "*") +
  118 + "_" + (stream != null ? stream : "*");
  119 + List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
  120 + if (scanResult.size() > 0) {
  121 + for (Object keyObj : scanResult) {
  122 + String key = (String) keyObj;
  123 + InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key);
  124 + if (inviteInfo == null) {
  125 + continue;
  126 + }
  127 + redisTemplate.delete(key);
  128 + inviteErrorCallbackMap.remove(buildKey(type, deviceId, channelId, inviteInfo.getStream()));
  129 + }
  130 + }
  131 + }
  132 +
  133 + @Override
  134 + public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId) {
  135 + removeInviteInfo(inviteSessionType, deviceId, channelId, null);
  136 + }
  137 +
  138 + @Override
  139 + public void removeInviteInfo(InviteInfo inviteInfo) {
  140 + removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  141 + }
  142 +
  143 + @Override
  144 + public void once(InviteSessionType type, String deviceId, String channelId, String stream, InviteErrorCallback<Object> callback) {
  145 + String key = buildKey(type, deviceId, channelId, stream);
  146 + List<InviteErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
  147 + if (callbacks == null) {
  148 + callbacks = new CopyOnWriteArrayList<>();
  149 + inviteErrorCallbackMap.put(key, callbacks);
  150 + }
  151 + callbacks.add(callback);
  152 +
  153 + }
  154 +
  155 + @Override
  156 + public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) {
  157 + String key = buildKey(type, deviceId, channelId, stream);
  158 + List<InviteErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
  159 + if (callbacks == null) {
  160 + return;
  161 + }
  162 + for (InviteErrorCallback<Object> callback : callbacks) {
  163 + callback.run(code, msg, data);
  164 + }
  165 + inviteErrorCallbackMap.remove(key);
  166 + }
  167 +
  168 + private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) {
  169 + String key = type + "_" + deviceId + "_" + channelId;
  170 + // 如果ssrc未null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
  171 + if (stream != null) {
  172 + key += ("_" + stream);
  173 + }
  174 + return key;
  175 + }
  176 +
  177 +
  178 +}
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -3,6 +3,9 @@ package com.genersoft.iot.vmp.service.impl; @@ -3,6 +3,9 @@ package com.genersoft.iot.vmp.service.impl;
3 import com.alibaba.fastjson2.JSON; 3 import com.alibaba.fastjson2.JSON;
4 import com.alibaba.fastjson2.JSONArray; 4 import com.alibaba.fastjson2.JSONArray;
5 import com.alibaba.fastjson2.JSONObject; 5 import com.alibaba.fastjson2.JSONObject;
  6 +import com.genersoft.iot.vmp.common.InviteInfo;
  7 +import com.genersoft.iot.vmp.common.InviteSessionStatus;
  8 +import com.genersoft.iot.vmp.common.InviteSessionType;
6 import com.genersoft.iot.vmp.common.StreamInfo; 9 import com.genersoft.iot.vmp.common.StreamInfo;
7 import com.genersoft.iot.vmp.conf.DynamicTask; 10 import com.genersoft.iot.vmp.conf.DynamicTask;
8 import com.genersoft.iot.vmp.conf.UserSetting; 11 import com.genersoft.iot.vmp.conf.UserSetting;
@@ -19,18 +22,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; @@ -19,18 +22,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
19 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; 22 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
20 import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; 23 import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
21 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; 24 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  25 +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
22 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; 26 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
23 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; 27 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
24 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; 28 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
25 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 29 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
26 -import com.genersoft.iot.vmp.service.IDeviceService;  
27 -import com.genersoft.iot.vmp.service.IMediaServerService;  
28 -import com.genersoft.iot.vmp.service.IMediaService;  
29 -import com.genersoft.iot.vmp.service.IPlayService;  
30 -import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;  
31 -import com.genersoft.iot.vmp.service.bean.PlayBackCallback;  
32 -import com.genersoft.iot.vmp.service.bean.PlayBackResult;  
33 -import com.genersoft.iot.vmp.service.bean.SSRCInfo; 30 +import com.genersoft.iot.vmp.service.*;
  31 +import com.genersoft.iot.vmp.service.bean.*;
34 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 32 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
35 import com.genersoft.iot.vmp.storager.IVideoManagerStorage; 33 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
36 import com.genersoft.iot.vmp.utils.DateUtil; 34 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -73,12 +71,18 @@ public class PlayServiceImpl implements IPlayService { @@ -73,12 +71,18 @@ public class PlayServiceImpl implements IPlayService {
73 private IRedisCatchStorage redisCatchStorage; 71 private IRedisCatchStorage redisCatchStorage;
74 72
75 @Autowired 73 @Autowired
  74 + private IInviteStreamService inviteStreamService;
  75 +
  76 + @Autowired
76 private DeferredResultHolder resultHolder; 77 private DeferredResultHolder resultHolder;
77 78
78 @Autowired 79 @Autowired
79 private ZLMRESTfulUtils zlmresTfulUtils; 80 private ZLMRESTfulUtils zlmresTfulUtils;
80 81
81 @Autowired 82 @Autowired
  83 + private ZLMRTPServerFactory zlmrtpServerFactory;
  84 +
  85 + @Autowired
82 private AssistRESTfulUtils assistRESTfulUtils; 86 private AssistRESTfulUtils assistRESTfulUtils;
83 87
84 @Autowired 88 @Autowired
@@ -111,137 +115,122 @@ public class PlayServiceImpl implements IPlayService { @@ -111,137 +115,122 @@ public class PlayServiceImpl implements IPlayService {
111 115
112 116
113 @Override 117 @Override
114 - public void play(MediaServerItem mediaServerItem, String deviceId, String channelId,  
115 - ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,  
116 - Runnable timeoutCallback) { 118 + public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, InviteErrorCallback<Object> callback) {
117 if (mediaServerItem == null) { 119 if (mediaServerItem == null) {
118 throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); 120 throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
119 } 121 }
120 - String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;  
121 -  
122 - RequestMessage msg = new RequestMessage();  
123 - msg.setKey(key);  
124 122
125 Device device = redisCatchStorage.getDevice(deviceId); 123 Device device = redisCatchStorage.getDevice(deviceId);
126 - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);  
127 -  
128 - if (streamInfo != null) {  
129 - String streamId = streamInfo.getStream();  
130 - if (streamId == null) {  
131 - WVPResult wvpResult = new WVPResult();  
132 - wvpResult.setCode(ErrorCode.ERROR100.getCode());  
133 - wvpResult.setMsg("点播失败, redis缓存streamId等于null");  
134 - msg.setData(wvpResult);  
135 - resultHolder.invokeAllResult(msg);  
136 - return;  
137 - }  
138 - String mediaServerId = streamInfo.getMediaServerId();  
139 - MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);  
140 -  
141 - JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);  
142 - if (rtpInfo.getInteger("code") == 0) {  
143 - if (rtpInfo.getBoolean("exist")) {  
144 - int localPort = rtpInfo.getInteger("local_port");  
145 - if (localPort == 0) {  
146 - logger.warn("[点播],点播时发现rtpServer存在,但是尚未开始推流");  
147 - // 此时说明rtpServer已经创建但是流还没有推上来  
148 - WVPResult wvpResult = new WVPResult();  
149 - wvpResult.setCode(ErrorCode.ERROR100.getCode());  
150 - wvpResult.setMsg("点播已经在进行中,请稍候重试");  
151 - msg.setData(wvpResult);  
152 -  
153 - resultHolder.invokeAllResult(msg);  
154 - return;  
155 - } else {  
156 - WVPResult wvpResult = new WVPResult();  
157 - wvpResult.setCode(ErrorCode.SUCCESS.getCode());  
158 - wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());  
159 - wvpResult.setData(streamInfo);  
160 - msg.setData(wvpResult);  
161 - resultHolder.invokeAllResult(msg);  
162 - if (hookEvent != null) {  
163 - hookEvent.response(mediaServerItem, JSON.parseObject(JSON.toJSONString(streamInfo)));  
164 - }  
165 - }  
166 -  
167 - } else {  
168 - redisCatchStorage.stopPlay(streamInfo); 124 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  125 +
  126 + if (inviteInfo != null ) {
  127 + System.out.println("inviteInfo 已存在");
  128 + if (inviteInfo.getStreamInfo() == null) {
  129 + System.out.println("inviteInfo 已存在, StreamInfo 不存在,添加回调等待");
  130 + // 点播发起了但是尚未成功, 仅注册回调等待结果即可
  131 + inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
  132 + return inviteInfo.getSsrcInfo();
  133 + }else {
  134 + StreamInfo streamInfo = inviteInfo.getStreamInfo();
  135 + String streamId = streamInfo.getStream();
  136 + if (streamId == null) {
  137 + callback.run(InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(), "点播失败, redis缓存streamId等于null", null);
  138 + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  139 + InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(),
  140 + "点播失败, redis缓存streamId等于null",
  141 + null);
  142 + return inviteInfo.getSsrcInfo();
  143 + }
  144 + String mediaServerId = streamInfo.getMediaServerId();
  145 + MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
  146 +
  147 + Boolean ready = zlmrtpServerFactory.isStreamReady(mediaInfo, "rtp", streamId);
  148 + if (ready != null && ready) {
  149 + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  150 + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  151 + InviteErrorCode.SUCCESS.getCode(),
  152 + InviteErrorCode.SUCCESS.getMsg(),
  153 + streamInfo);
  154 + return inviteInfo.getSsrcInfo();
  155 + }else {
  156 + // 点播发起了但是尚未成功, 仅注册回调等待结果即可
  157 + inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
169 storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); 158 storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
170 - streamInfo = null; 159 + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
171 } 160 }
172 - } else {  
173 - //zlm连接失败  
174 - redisCatchStorage.stopPlay(streamInfo);  
175 - storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());  
176 - streamInfo = null;  
177 -  
178 } 161 }
179 } 162 }
180 - if (streamInfo == null) {  
181 - String streamId = null;  
182 - if (mediaServerItem.isRtpEnable()) {  
183 - streamId = String.format("%s_%s", device.getDeviceId(), channelId);  
184 - }  
185 - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());  
186 - if (ssrcInfo == null) {  
187 - WVPResult wvpResult = new WVPResult();  
188 - wvpResult.setCode(ErrorCode.ERROR100.getCode());  
189 - wvpResult.setMsg("开启收流失败");  
190 - msg.setData(wvpResult);  
191 -  
192 - resultHolder.invokeAllResult(msg);  
193 - return;  
194 - }  
195 - play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response) -> {  
196 - if (hookEvent != null) {  
197 - hookEvent.response(mediaServerItem, response);  
198 - }  
199 - }, event -> {  
200 - // sip error错误  
201 - WVPResult wvpResult = new WVPResult();  
202 - wvpResult.setCode(ErrorCode.ERROR100.getCode());  
203 - wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));  
204 - msg.setData(wvpResult);  
205 - resultHolder.invokeAllResult(msg);  
206 - if (errorEvent != null) {  
207 - errorEvent.response(event);  
208 - }  
209 - }, (code, msgStr) -> {  
210 - // invite点播超时  
211 - WVPResult wvpResult = new WVPResult();  
212 - wvpResult.setCode(ErrorCode.ERROR100.getCode());  
213 - if (code == 0) {  
214 - wvpResult.setMsg("点播超时,请稍候重试");  
215 - } else if (code == 1) {  
216 - wvpResult.setMsg("收流超时,请稍候重试");  
217 - }  
218 - msg.setData(wvpResult);  
219 - // 回复之前所有的点播请求  
220 - resultHolder.invokeAllResult(msg);  
221 - }); 163 +
  164 + String streamId = null;
  165 + if (mediaServerItem.isRtpEnable()) {
  166 + streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  167 + }
  168 + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());
  169 + if (ssrcInfo == null) {
  170 + callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
  171 + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  172 + InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(),
  173 + InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(),
  174 + null);
  175 + return null;
222 } 176 }
  177 + // TODO 记录点播的状态
  178 + play(mediaServerItem, ssrcInfo, device, channelId, callback);
  179 + return ssrcInfo;
223 } 180 }
224 181
225 182
226 @Override 183 @Override
227 public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, 184 public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
228 - ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,  
229 - InviteTimeOutCallback timeoutCallback) { 185 + InviteErrorCallback<Object> callback) {
230 186
231 logger.info("[点播开始] deviceId: {}, channelId: {},收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); 187 logger.info("[点播开始] deviceId: {}, channelId: {},收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  188 +
  189 + //端口获取失败的ssrcInfo 没有必要发送点播指令
  190 + if (ssrcInfo.getPort() <= 0) {
  191 + logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
  192 + // 释放ssrc
  193 + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  194 + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  195 +
  196 + callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
  197 + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  198 + InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
  199 + return;
  200 + }
  201 +
  202 + // 初始化redis中的invite消息状态
  203 + InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
  204 + mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
  205 + InviteSessionStatus.ready);
  206 + inviteStreamService.updateInviteInfo(inviteInfo);
232 // 超时处理 207 // 超时处理
233 String timeOutTaskKey = UUID.randomUUID().toString(); 208 String timeOutTaskKey = UUID.randomUUID().toString();
234 dynamicTask.startDelay(timeOutTaskKey, () -> { 209 dynamicTask.startDelay(timeOutTaskKey, () -> {
235 // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况 210 // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
236 - if (redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId) == null) { 211 + InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
  212 + if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) {
237 logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc()); 213 logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
238 // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 214 // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  215 +// InviteInfo inviteInfoForTimeout = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.play, device.getDeviceId(), channelId);
  216 +// if (inviteInfoForTimeout == null) {
  217 +// return;
  218 +// }
  219 +// if (InviteSessionStatus.ok == inviteInfoForTimeout.getStatus() ) {
  220 +// // TODO 发送bye
  221 +// }else {
  222 +// // TODO 发送cancel
  223 +// }
  224 + callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
  225 + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  226 + InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
  227 +
  228 + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
239 try { 229 try {
240 cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null); 230 cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
241 } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { 231 } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
242 logger.error("[点播超时], 发送BYE失败 {}", e.getMessage()); 232 logger.error("[点播超时], 发送BYE失败 {}", e.getMessage());
243 } finally { 233 } finally {
244 - timeoutCallback.run(1, "收流超时");  
245 mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); 234 mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
246 mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); 235 mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
247 streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); 236 streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
@@ -252,28 +241,26 @@ public class PlayServiceImpl implements IPlayService { @@ -252,28 +241,26 @@ public class PlayServiceImpl implements IPlayService {
252 } 241 }
253 } 242 }
254 }, userSetting.getPlayTimeout()); 243 }, userSetting.getPlayTimeout());
255 - //端口获取失败的ssrcInfo 没有必要发送点播指令  
256 - if (ssrcInfo.getPort() <= 0) {  
257 - logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);  
258 - dynamicTask.stop(timeOutTaskKey);  
259 - // 释放ssrc  
260 - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());  
261 - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());  
262 244
263 - RequestMessage msg = new RequestMessage();  
264 - msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + device.getDeviceId() + channelId);  
265 - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "点播端口分配异常"));  
266 - resultHolder.invokeAllResult(msg);  
267 - return;  
268 - }  
269 try { 245 try {
270 cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { 246 cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
271 logger.info("收到订阅消息: " + response.toJSONString()); 247 logger.info("收到订阅消息: " + response.toJSONString());
272 dynamicTask.stop(timeOutTaskKey); 248 dynamicTask.stop(timeOutTaskKey);
273 -  
274 // hook响应 249 // hook响应
275 - onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId);  
276 - hookEvent.response(mediaServerItemInuse, response); 250 + StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId);
  251 + if (streamInfo == null){
  252 + callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  253 + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  254 + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  255 + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  256 + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  257 + return;
  258 + }
  259 + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
  260 + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  261 + InviteErrorCode.SUCCESS.getCode(),
  262 + InviteErrorCode.SUCCESS.getMsg(),
  263 + streamInfo);
277 logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId); 264 logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
278 String streamUrl; 265 String streamUrl;
279 if (mediaServerItemInuse.getRtspPort() != 0) { 266 if (mediaServerItemInuse.getRtspPort() != 0) {
@@ -288,6 +275,8 @@ public class PlayServiceImpl implements IPlayService { @@ -288,6 +275,8 @@ public class PlayServiceImpl implements IPlayService {
288 zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName); 275 zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
289 276
290 }, (event) -> { 277 }, (event) -> {
  278 + inviteInfo.setStatus(InviteSessionStatus.ok);
  279 +
291 ResponseEvent responseEvent = (ResponseEvent) event.event; 280 ResponseEvent responseEvent = (ResponseEvent) event.event;
292 String contentString = new String(responseEvent.getResponse().getRawContent()); 281 String contentString = new String(responseEvent.getResponse().getRawContent());
293 // 获取ssrc 282 // 获取ssrc
@@ -319,6 +308,18 @@ public class PlayServiceImpl implements IPlayService { @@ -319,6 +308,18 @@ public class PlayServiceImpl implements IPlayService {
319 logger.info("[点播-TCP主动连接对方] 结果: {}", jsonObject); 308 logger.info("[点播-TCP主动连接对方] 结果: {}", jsonObject);
320 } catch (SdpException e) { 309 } catch (SdpException e) {
321 logger.error("[点播-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e); 310 logger.error("[点播-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e);
  311 + dynamicTask.stop(timeOutTaskKey);
  312 + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
  313 + // 释放ssrc
  314 + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  315 +
  316 + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  317 +
  318 + callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  319 + InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
  320 + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  321 + InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
  322 + InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
322 } 323 }
323 } 324 }
324 return; 325 return;
@@ -332,9 +333,13 @@ public class PlayServiceImpl implements IPlayService { @@ -332,9 +333,13 @@ public class PlayServiceImpl implements IPlayService {
332 // 释放ssrc 333 // 释放ssrc
333 ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); 334 ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
334 streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); 335 streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
335 - event.msg = "下级自定义了ssrc,但是此ssrc不可用";  
336 - event.statusCode = 400;  
337 - errorEvent.response(event); 336 +
  337 + callback.run(InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(),
  338 + InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null);
  339 + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  340 + InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getCode(),
  341 + InviteErrorCode.ERROR_FOR_SSRC_UNAVAILABLE.getMsg(), null);
  342 +
338 return; 343 return;
339 } 344 }
340 // 单端口模式streamId也有变化,重新设置监听即可 345 // 单端口模式streamId也有变化,重新设置监听即可
@@ -342,18 +347,32 @@ public class PlayServiceImpl implements IPlayService { @@ -342,18 +347,32 @@ public class PlayServiceImpl implements IPlayService {
342 // 添加订阅 347 // 添加订阅
343 HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); 348 HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
344 subscribe.removeSubscribe(hookSubscribe); 349 subscribe.removeSubscribe(hookSubscribe);
345 - hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); 350 + String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
  351 + hookSubscribe.getContent().put("stream", stream);
  352 + inviteInfo.setStream(stream);
346 subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { 353 subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
347 logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); 354 logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
348 dynamicTask.stop(timeOutTaskKey); 355 dynamicTask.stop(timeOutTaskKey);
349 // hook响应 356 // hook响应
350 - onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId);  
351 - hookEvent.response(mediaServerItemInUse, response); 357 + StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId);
  358 + if (streamInfo == null){
  359 + callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  360 + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  361 + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  362 + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
  363 + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
  364 + return;
  365 + }
  366 + callback.run(InviteErrorCode.SUCCESS.getCode(),
  367 + InviteErrorCode.SUCCESS.getMsg(), null);
  368 + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  369 + InviteErrorCode.SUCCESS.getCode(),
  370 + InviteErrorCode.SUCCESS.getMsg(),
  371 + streamInfo);
352 }); 372 });
353 return; 373 return;
354 } 374 }
355 375
356 -  
357 // 更新ssrc 376 // 更新ssrc
358 Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse); 377 Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
359 if (!result) { 378 if (!result) {
@@ -370,14 +389,23 @@ public class PlayServiceImpl implements IPlayService { @@ -370,14 +389,23 @@ public class PlayServiceImpl implements IPlayService {
370 mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); 389 mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
371 390
372 streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); 391 streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
373 - event.msg = "下级自定义了ssrc,重新设置收流信息失败";  
374 - event.statusCode = 500;  
375 - errorEvent.response(event); 392 +
  393 + callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  394 + "下级自定义了ssrc,重新设置收流信息失败", null);
  395 + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  396 + InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  397 + "下级自定义了ssrc,重新设置收流信息失败", null);
  398 +
  399 + }else {
  400 + ssrcInfo.setSsrc(ssrcInResponse);
  401 + inviteInfo.setSsrcInfo(ssrcInfo);
  402 + inviteInfo.setStream(ssrcInfo.getStream());
376 } 403 }
377 }else { 404 }else {
378 logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正"); 405 logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
379 } 406 }
380 } 407 }
  408 + inviteStreamService.updateInviteInfo(inviteInfo);
381 }, (event) -> { 409 }, (event) -> {
382 dynamicTask.stop(timeOutTaskKey); 410 dynamicTask.stop(timeOutTaskKey);
383 mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); 411 mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
@@ -385,7 +413,14 @@ public class PlayServiceImpl implements IPlayService { @@ -385,7 +413,14 @@ public class PlayServiceImpl implements IPlayService {
385 mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); 413 mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
386 414
387 streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); 415 streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
388 - errorEvent.response(event); 416 +
  417 + callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
  418 + String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  419 + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  420 + InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
  421 + String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
  422 +
  423 + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
389 }); 424 });
390 } catch (InvalidArgumentException | SipException | ParseException e) { 425 } catch (InvalidArgumentException | SipException | ParseException e) {
391 426
@@ -396,40 +431,34 @@ public class PlayServiceImpl implements IPlayService { @@ -396,40 +431,34 @@ public class PlayServiceImpl implements IPlayService {
396 mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); 431 mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
397 432
398 streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); 433 streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
399 - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();  
400 - eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;  
401 - eventResult.statusCode = -1;  
402 - eventResult.msg = "命令发送失败";  
403 - errorEvent.response(eventResult); 434 +
  435 + callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
  436 + InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
  437 + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
  438 + InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
  439 + InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
  440 +
  441 + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
404 } 442 }
405 } 443 }
406 444
407 - @Override  
408 - public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) { 445 + private StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) {
409 StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); 446 StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
410 - RequestMessage msg = new RequestMessage();  
411 - msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);  
412 if (streamInfo != null) { 447 if (streamInfo != null) {
413 DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); 448 DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
414 if (deviceChannel != null) { 449 if (deviceChannel != null) {
415 deviceChannel.setStreamId(streamInfo.getStream()); 450 deviceChannel.setStreamId(streamInfo.getStream());
416 storager.startPlay(deviceId, channelId, streamInfo.getStream()); 451 storager.startPlay(deviceId, channelId, streamInfo.getStream());
417 } 452 }
418 - redisCatchStorage.startPlay(streamInfo);  
419 -  
420 - WVPResult wvpResult = new WVPResult();  
421 - wvpResult.setCode(ErrorCode.SUCCESS.getCode());  
422 - wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());  
423 - wvpResult.setData(streamInfo);  
424 -  
425 - msg.setData(wvpResult);  
426 - resultHolder.invokeAllResult(msg);  
427 -  
428 - } else {  
429 - logger.warn("设备预览API调用失败!");  
430 - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败!"));  
431 - resultHolder.invokeAllResult(msg); 453 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  454 + if (inviteInfo != null) {
  455 + inviteInfo.setStatus(InviteSessionStatus.ok);
  456 + inviteInfo.setStreamInfo(streamInfo);
  457 + inviteStreamService.updateInviteInfo(inviteInfo);
  458 + }
432 } 459 }
  460 + return streamInfo;
  461 +
433 } 462 }
434 463
435 private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, PlayBackCallback playBackCallback) { 464 private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, PlayBackCallback playBackCallback) {
@@ -442,8 +471,12 @@ public class PlayServiceImpl implements IPlayService { @@ -442,8 +471,12 @@ public class PlayServiceImpl implements IPlayService {
442 deviceChannel.setStreamId(streamInfo.getStream()); 471 deviceChannel.setStreamId(streamInfo.getStream());
443 storager.startPlay(deviceId, channelId, streamInfo.getStream()); 472 storager.startPlay(deviceId, channelId, streamInfo.getStream());
444 } 473 }
445 - redisCatchStorage.startPlay(streamInfo);  
446 - 474 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, deviceId, channelId);
  475 + if (inviteInfo != null) {
  476 + inviteInfo.setStatus(InviteSessionStatus.ok);
  477 + inviteInfo.setStreamInfo(streamInfo);
  478 + inviteStreamService.updateInviteInfo(inviteInfo);
  479 + }
447 480
448 playBackResult.setCode(ErrorCode.SUCCESS.getCode()); 481 playBackResult.setCode(ErrorCode.SUCCESS.getCode());
449 playBackResult.setMsg(ErrorCode.SUCCESS.getMsg()); 482 playBackResult.setMsg(ErrorCode.SUCCESS.getMsg());
@@ -560,6 +593,7 @@ public class PlayServiceImpl implements IPlayService { @@ -560,6 +593,7 @@ public class PlayServiceImpl implements IPlayService {
560 return; 593 return;
561 } 594 }
562 redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId()); 595 redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId());
  596 +
563 playBackResult.setCode(ErrorCode.SUCCESS.getCode()); 597 playBackResult.setCode(ErrorCode.SUCCESS.getCode());
564 playBackResult.setMsg(ErrorCode.SUCCESS.getMsg()); 598 playBackResult.setMsg(ErrorCode.SUCCESS.getMsg());
565 playBackResult.setData(streamInfo); 599 playBackResult.setData(streamInfo);
@@ -858,8 +892,7 @@ public class PlayServiceImpl implements IPlayService { @@ -858,8 +892,7 @@ public class PlayServiceImpl implements IPlayService {
858 return streamInfo; 892 return streamInfo;
859 } 893 }
860 894
861 - @Override  
862 - public void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String uuid) { 895 + private void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String uuid) {
863 RequestMessage msg = new RequestMessage(); 896 RequestMessage msg = new RequestMessage();
864 msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId); 897 msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId);
865 msg.setId(uuid); 898 msg.setId(uuid);
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -264,8 +264,8 @@ public class RedisGbPlayMsgListener implements MessageListener { @@ -264,8 +264,8 @@ public class RedisGbPlayMsgListener implements MessageListener {
264 return; 264 return;
265 } 265 }
266 // 确定流是否在线 266 // 确定流是否在线
267 - boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream());  
268 - if (streamReady) { 267 + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream());
  268 + if (streamReady != null && streamReady) {
269 logger.info("[回复推流信息] {}/{}", content.getApp(), content.getStream()); 269 logger.info("[回复推流信息] {}/{}", content.getApp(), content.getStream());
270 responseSendItem(mediaServerItem, content, toId, serial); 270 responseSendItem(mediaServerItem, content, toId, serial);
271 }else { 271 }else {
@@ -301,9 +301,6 @@ public class RedisGbPlayMsgListener implements MessageListener { @@ -301,9 +301,6 @@ public class RedisGbPlayMsgListener implements MessageListener {
301 String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED; 301 String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED;
302 logger.info("[redis发送通知] 推流被请求 {}: {}/{}", key, messageForPushChannel.getApp(), messageForPushChannel.getStream()); 302 logger.info("[redis发送通知] 推流被请求 {}: {}/{}", key, messageForPushChannel.getApp(), messageForPushChannel.getStream());
303 redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel)); 303 redisTemplate.convertAndSend(key, JSON.toJSON(messageForPushChannel));
304 -  
305 -// redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);  
306 -  
307 } 304 }
308 } 305 }
309 306
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
@@ -23,34 +23,6 @@ public interface IRedisCatchStorage { @@ -23,34 +23,6 @@ public interface IRedisCatchStorage {
23 */ 23 */
24 Long getCSEQ(); 24 Long getCSEQ();
25 25
26 - /**  
27 - * 开始播放时将流存入  
28 - *  
29 - * @param stream 流信息  
30 - * @return  
31 - */  
32 - boolean startPlay(StreamInfo stream);  
33 -  
34 -  
35 - /**  
36 - * 停止播放时删除  
37 - *  
38 - * @return  
39 - */  
40 - boolean stopPlay(StreamInfo streamInfo);  
41 -  
42 - /**  
43 - * 查询播放列表  
44 - * @return  
45 - */  
46 - StreamInfo queryPlay(StreamInfo streamInfo);  
47 -  
48 - StreamInfo queryPlayByStreamId(String steamId);  
49 -  
50 - StreamInfo queryPlayByDevice(String deviceId, String channelId);  
51 -  
52 - Map<String, StreamInfo> queryPlayByDeviceId(String deviceId);  
53 -  
54 boolean startPlayback(StreamInfo stream, String callId); 26 boolean startPlayback(StreamInfo stream, String callId);
55 27
56 boolean stopPlayback(String deviceId, String channelId, String stream, String callId); 28 boolean stopPlayback(String deviceId, String channelId, String stream, String callId);
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
@@ -92,87 +92,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @@ -92,87 +92,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
92 } 92 }
93 } 93 }
94 94
95 - /**  
96 - * 开始播放时将流存入redis  
97 - */  
98 - @Override  
99 - public boolean startPlay(StreamInfo stream) {  
100 -  
101 - redisTemplate.opsForValue().set(String.format("%S_%s_%s_%s_%s_%s", VideoManagerConstants.PLAYER_PREFIX, userSetting.getServerId(),  
102 - stream.getMediaServerId(), stream.getStream(), stream.getDeviceID(), stream.getChannelId()),  
103 - stream);  
104 - return true;  
105 - }  
106 -  
107 - /**  
108 - * 停止播放时从redis删除  
109 - */  
110 - @Override  
111 - public boolean stopPlay(StreamInfo streamInfo) {  
112 - if (streamInfo == null) {  
113 - return false;  
114 - }  
115 - Boolean result = redisTemplate.delete(String.format("%S_%s_%s_%s_%s_%s", VideoManagerConstants.PLAYER_PREFIX,  
116 - userSetting.getServerId(),  
117 - streamInfo.getMediaServerId(),  
118 - streamInfo.getStream(),  
119 - streamInfo.getDeviceID(),  
120 - streamInfo.getChannelId()));  
121 - return result != null && result;  
122 - }  
123 -  
124 - /**  
125 - * 查询播放列表  
126 - */  
127 - @Override  
128 - public StreamInfo queryPlay(StreamInfo streamInfo) {  
129 - return (StreamInfo)redisTemplate.opsForValue().get(String.format("%S_%s_%s_%s_%s_%s",  
130 - VideoManagerConstants.PLAYER_PREFIX,  
131 - userSetting.getServerId(),  
132 - streamInfo.getMediaServerId(),  
133 - streamInfo.getStream(),  
134 - streamInfo.getDeviceID(),  
135 - streamInfo.getChannelId()));  
136 - }  
137 - @Override  
138 - public StreamInfo queryPlayByStreamId(String streamId) {  
139 - List<Object> playLeys = RedisUtil.scan(redisTemplate, String.format("%S_%s_*_%s_*", VideoManagerConstants.PLAYER_PREFIX, userSetting.getServerId(), streamId));  
140 - if (playLeys.size() == 0) {  
141 - return null;  
142 - }  
143 - return (StreamInfo)redisTemplate.opsForValue().get(playLeys.get(0).toString());  
144 - }  
145 -  
146 - @Override  
147 - public StreamInfo queryPlayByDevice(String deviceId, String channelId) {  
148 - List<Object> playLeys = RedisUtil.scan(redisTemplate, String.format("%S_%s_*_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX,  
149 - userSetting.getServerId(),  
150 - deviceId,  
151 - channelId));  
152 - if (playLeys.size() == 0) {  
153 - return null;  
154 - }  
155 - return (StreamInfo)redisTemplate.opsForValue().get(playLeys.get(0).toString());  
156 - }  
157 -  
158 - @Override  
159 - public Map<String, StreamInfo> queryPlayByDeviceId(String deviceId) {  
160 - Map<String, StreamInfo> streamInfos = new HashMap<>();  
161 - List<Object> players = RedisUtil.scan(redisTemplate, String.format("%S_%s_*_*_%s_*", VideoManagerConstants.PLAYER_PREFIX, userSetting.getServerId(),deviceId));  
162 - if (players.size() == 0) {  
163 - return streamInfos;  
164 - }  
165 - for (Object player : players) {  
166 - String key = (String) player;  
167 - StreamInfo streamInfo = JsonUtil.redisJsonToObject(redisTemplate, key, StreamInfo.class);  
168 - if (Objects.isNull(streamInfo)) {  
169 - continue;  
170 - }  
171 - streamInfos.put(streamInfo.getDeviceID() + "_" + streamInfo.getChannelId(), streamInfo);  
172 - }  
173 - return streamInfos;  
174 - }  
175 -  
176 95
177 @Override 96 @Override
178 public boolean startPlayback(StreamInfo stream, String callId) { 97 public boolean startPlayback(StreamInfo stream, String callId) {
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
1 package com.genersoft.iot.vmp.vmanager.gb28181.play; 1 package com.genersoft.iot.vmp.vmanager.gb28181.play;
2 2
  3 +import com.alibaba.fastjson2.JSON;
3 import com.alibaba.fastjson2.JSONArray; 4 import com.alibaba.fastjson2.JSONArray;
4 import com.alibaba.fastjson2.JSONObject; 5 import com.alibaba.fastjson2.JSONObject;
  6 +import com.genersoft.iot.vmp.common.InviteInfo;
  7 +import com.genersoft.iot.vmp.common.InviteSessionStatus;
  8 +import com.genersoft.iot.vmp.common.InviteSessionType;
5 import com.genersoft.iot.vmp.common.StreamInfo; 9 import com.genersoft.iot.vmp.common.StreamInfo;
6 import com.genersoft.iot.vmp.conf.UserSetting; 10 import com.genersoft.iot.vmp.conf.UserSetting;
7 import com.genersoft.iot.vmp.conf.exception.ControllerException; 11 import com.genersoft.iot.vmp.conf.exception.ControllerException;
@@ -14,12 +18,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; @@ -14,12 +18,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
14 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; 18 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
15 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; 19 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
16 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 20 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  21 +import com.genersoft.iot.vmp.service.IInviteStreamService;
17 import com.genersoft.iot.vmp.service.IMediaServerService; 22 import com.genersoft.iot.vmp.service.IMediaServerService;
18 import com.genersoft.iot.vmp.service.IMediaService; 23 import com.genersoft.iot.vmp.service.IMediaService;
19 import com.genersoft.iot.vmp.service.IPlayService; 24 import com.genersoft.iot.vmp.service.IPlayService;
  25 +import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
20 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 26 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
21 import com.genersoft.iot.vmp.storager.IVideoManagerStorage; 27 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
22 -import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;  
23 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; 28 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
24 import com.genersoft.iot.vmp.vmanager.bean.StreamContent; 29 import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
25 import com.genersoft.iot.vmp.vmanager.bean.WVPResult; 30 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
@@ -60,6 +65,9 @@ public class PlayController { @@ -60,6 +65,9 @@ public class PlayController {
60 private IRedisCatchStorage redisCatchStorage; 65 private IRedisCatchStorage redisCatchStorage;
61 66
62 @Autowired 67 @Autowired
  68 + private IInviteStreamService inviteStreamService;
  69 +
  70 + @Autowired
63 private ZLMRESTfulUtils zlmresTfulUtils; 71 private ZLMRESTfulUtils zlmresTfulUtils;
64 72
65 @Autowired 73 @Autowired
@@ -88,14 +96,12 @@ public class PlayController { @@ -88,14 +96,12 @@ public class PlayController {
88 Device device = storager.queryVideoDevice(deviceId); 96 Device device = storager.queryVideoDevice(deviceId);
89 MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device); 97 MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
90 98
91 - RequestMessage msg = new RequestMessage(); 99 + RequestMessage requestMessage = new RequestMessage();
92 String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; 100 String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
93 - boolean exist = resultHolder.exist(key, null);  
94 - msg.setKey(key); 101 + requestMessage.setKey(key);
95 String uuid = UUID.randomUUID().toString(); 102 String uuid = UUID.randomUUID().toString();
96 - msg.setId(uuid); 103 + requestMessage.setId(uuid);
97 DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); 104 DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
98 - DeferredResultEx<WVPResult<StreamContent>> deferredResultEx = new DeferredResultEx<>(result);  
99 105
100 result.onTimeout(()->{ 106 result.onTimeout(()->{
101 logger.info("点播接口等待超时"); 107 logger.info("点播接口等待超时");
@@ -103,32 +109,36 @@ public class PlayController { @@ -103,32 +109,36 @@ public class PlayController {
103 WVPResult<StreamInfo> wvpResult = new WVPResult<>(); 109 WVPResult<StreamInfo> wvpResult = new WVPResult<>();
104 wvpResult.setCode(ErrorCode.ERROR100.getCode()); 110 wvpResult.setCode(ErrorCode.ERROR100.getCode());
105 wvpResult.setMsg("点播超时"); 111 wvpResult.setMsg("点播超时");
106 - msg.setData(wvpResult);  
107 - resultHolder.invokeResult(msg);  
108 - });  
109 - // TODO 在点播未成功的情况下在此调用接口点播会导致返回的流地址ip错误  
110 - deferredResultEx.setFilter(result1 -> {  
111 - WVPResult<StreamInfo> wvpResult1 = (WVPResult<StreamInfo>)result1;  
112 - WVPResult<StreamContent> resultStream = new WVPResult<>();  
113 - resultStream.setCode(wvpResult1.getCode());  
114 - resultStream.setMsg(wvpResult1.getMsg());  
115 - if (wvpResult1.getCode() == ErrorCode.SUCCESS.getCode()) {  
116 - StreamInfo data = wvpResult1.getData().clone();  
117 - if (userSetting.getUseSourceIpAsStreamIp()) {  
118 - data.channgeStreamIp(request.getLocalName());  
119 - }  
120 - resultStream.setData(new StreamContent(wvpResult1.getData()));  
121 - }  
122 - return resultStream; 112 + requestMessage.setData(wvpResult);
  113 + resultHolder.invokeResult(requestMessage);
123 }); 114 });
124 115
125 -  
126 // 录像查询以channelId作为deviceId查询 116 // 录像查询以channelId作为deviceId查询
127 - resultHolder.put(key, uuid, deferredResultEx); 117 + resultHolder.put(key, uuid, result);
128 118
129 - if (!exist) {  
130 - playService.play(newMediaServerItem, deviceId, channelId, null, null, null);  
131 - } 119 + playService.play(newMediaServerItem, deviceId, channelId, ((code, msg, data) -> {
  120 + System.out.println("controller收到回调");
  121 + System.out.println(JSON.toJSONString(data));
  122 + WVPResult<StreamContent> wvpResult = new WVPResult<>();
  123 + if (code == InviteErrorCode.SUCCESS.getCode()) {
  124 + wvpResult.setCode(ErrorCode.SUCCESS.getCode());
  125 + wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
  126 +
  127 + if (data != null) {
  128 + StreamInfo streamInfo = (StreamInfo)data;
  129 + if (userSetting.getUseSourceIpAsStreamIp()) {
  130 + streamInfo.channgeStreamIp(request.getLocalName());
  131 + }
  132 + wvpResult.setData(new StreamContent(streamInfo));
  133 + }
  134 + }else {
  135 + wvpResult.setCode(code);
  136 + wvpResult.setMsg(msg);
  137 + }
  138 + System.out.println(JSON.toJSONString(wvpResult));
  139 + requestMessage.setData(wvpResult);
  140 + resultHolder.invokeResult(requestMessage);
  141 + }));
132 return result; 142 return result;
133 } 143 }
134 144
@@ -149,21 +159,22 @@ public class PlayController { @@ -149,21 +159,22 @@ public class PlayController {
149 throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备[" + deviceId + "]不存在"); 159 throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备[" + deviceId + "]不存在");
150 } 160 }
151 161
152 - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);  
153 - if (streamInfo == null) { 162 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
  163 + if (inviteInfo == null) {
154 throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到"); 164 throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到");
155 } 165 }
156 -  
157 - try {  
158 - logger.warn("[停止点播] {}/{}", device.getDeviceId(), channelId);  
159 - cmder.streamByeCmd(device, channelId, streamInfo.getStream(), null, null);  
160 - } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {  
161 - logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());  
162 - throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); 166 + if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
  167 + try {
  168 + logger.warn("[停止点播] {}/{}", device.getDeviceId(), channelId);
  169 + cmder.streamByeCmd(device, channelId, inviteInfo.getStream(), null, null);
  170 + } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
  171 + logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
  172 + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
  173 + }
163 } 174 }
164 - redisCatchStorage.stopPlay(streamInfo); 175 + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
165 176
166 - storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); 177 + storager.stopPlay(deviceId, channelId);
167 JSONObject json = new JSONObject(); 178 JSONObject json = new JSONObject();
168 json.put("deviceId", deviceId); 179 json.put("deviceId", deviceId);
169 json.put("channelId", channelId); 180 json.put("channelId", channelId);
@@ -178,15 +189,14 @@ public class PlayController { @@ -178,15 +189,14 @@ public class PlayController {
178 @Parameter(name = "streamId", description = "视频流ID", required = true) 189 @Parameter(name = "streamId", description = "视频流ID", required = true)
179 @PostMapping("/convert/{streamId}") 190 @PostMapping("/convert/{streamId}")
180 public JSONObject playConvert(@PathVariable String streamId) { 191 public JSONObject playConvert(@PathVariable String streamId) {
181 - StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);  
182 - if (streamInfo == null) {  
183 - streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null);  
184 - }  
185 - if (streamInfo == null) { 192 +// StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
  193 +
  194 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, streamId);
  195 + if (inviteInfo == null || inviteInfo.getStreamInfo() == null) {
186 logger.warn("视频转码API调用失败!, 视频流已经停止!"); 196 logger.warn("视频转码API调用失败!, 视频流已经停止!");
187 throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到视频流信息, 视频流可能已经停止"); 197 throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到视频流信息, 视频流可能已经停止");
188 } 198 }
189 - MediaServerItem mediaInfo = mediaServerService.getOne(streamInfo.getMediaServerId()); 199 + MediaServerItem mediaInfo = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId());
190 JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId); 200 JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
191 if (!rtpInfo.getBoolean("exist")) { 201 if (!rtpInfo.getBoolean("exist")) {
192 logger.warn("视频转码API调用失败!, 视频流已停止推流!"); 202 logger.warn("视频转码API调用失败!, 视频流已停止推流!");
src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java
1 package com.genersoft.iot.vmp.web.gb28181; 1 package com.genersoft.iot.vmp.web.gb28181;
2 2
3 import com.alibaba.fastjson2.JSONObject; 3 import com.alibaba.fastjson2.JSONObject;
4 -import com.genersoft.iot.vmp.common.StreamInfo; 4 +import com.genersoft.iot.vmp.common.InviteInfo;
  5 +import com.genersoft.iot.vmp.common.InviteSessionType;
5 import com.genersoft.iot.vmp.conf.UserSetting; 6 import com.genersoft.iot.vmp.conf.UserSetting;
6 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; 7 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
7 import com.genersoft.iot.vmp.gb28181.bean.Device; 8 import com.genersoft.iot.vmp.gb28181.bean.Device;
@@ -9,13 +10,18 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; @@ -9,13 +10,18 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
9 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; 10 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
10 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 11 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
11 import com.genersoft.iot.vmp.service.IDeviceService; 12 import com.genersoft.iot.vmp.service.IDeviceService;
  13 +import com.genersoft.iot.vmp.service.IInviteStreamService;
12 import com.genersoft.iot.vmp.service.IPlayService; 14 import com.genersoft.iot.vmp.service.IPlayService;
  15 +import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
13 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 16 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
14 import com.genersoft.iot.vmp.storager.IVideoManagerStorage; 17 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
15 import org.slf4j.Logger; 18 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory; 19 import org.slf4j.LoggerFactory;
17 import org.springframework.beans.factory.annotation.Autowired; 20 import org.springframework.beans.factory.annotation.Autowired;
18 -import org.springframework.web.bind.annotation.*; 21 +import org.springframework.web.bind.annotation.RequestMapping;
  22 +import org.springframework.web.bind.annotation.RequestParam;
  23 +import org.springframework.web.bind.annotation.ResponseBody;
  24 +import org.springframework.web.bind.annotation.RestController;
19 import org.springframework.web.context.request.async.DeferredResult; 25 import org.springframework.web.context.request.async.DeferredResult;
20 26
21 import javax.sip.InvalidArgumentException; 27 import javax.sip.InvalidArgumentException;
@@ -46,6 +52,9 @@ public class ApiStreamController { @@ -46,6 +52,9 @@ public class ApiStreamController {
46 private IRedisCatchStorage redisCatchStorage; 52 private IRedisCatchStorage redisCatchStorage;
47 53
48 @Autowired 54 @Autowired
  55 + private IInviteStreamService inviteStreamService;
  56 +
  57 + @Autowired
49 private IDeviceService deviceService; 58 private IDeviceService deviceService;
50 59
51 @Autowired 60 @Autowired
@@ -111,46 +120,96 @@ public class ApiStreamController { @@ -111,46 +120,96 @@ public class ApiStreamController {
111 return resultDeferredResult; 120 return resultDeferredResult;
112 } 121 }
113 MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device); 122 MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
114 - playService.play(newMediaServerItem, serial, code, (mediaServerItem, response)->{  
115 - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(serial, code);  
116 - JSONObject result = new JSONObject();  
117 - result.put("StreamID", streamInfo.getStream());  
118 - result.put("DeviceID", device.getDeviceId());  
119 - result.put("ChannelID", code);  
120 - result.put("ChannelName", deviceChannel.getName());  
121 - result.put("ChannelCustomName", "");  
122 - result.put("FLV", streamInfo.getFlv().getUrl());  
123 - result.put("WS_FLV", streamInfo.getWs_flv().getUrl());  
124 - result.put("RTMP", streamInfo.getRtmp().getUrl());  
125 - result.put("HLS", streamInfo.getHls().getUrl());  
126 - result.put("RTSP", streamInfo.getRtsp().getUrl());  
127 - result.put("WEBRTC", streamInfo.getRtc().getUrl());  
128 - result.put("CDN", "");  
129 - result.put("SnapURL", "");  
130 - result.put("Transport", device.getTransport());  
131 - result.put("StartAt", "");  
132 - result.put("Duration", "");  
133 - result.put("SourceVideoCodecName", "");  
134 - result.put("SourceVideoWidth", "");  
135 - result.put("SourceVideoHeight", "");  
136 - result.put("SourceVideoFrameRate", "");  
137 - result.put("SourceAudioCodecName", "");  
138 - result.put("SourceAudioSampleRate", "");  
139 - result.put("AudioEnable", "");  
140 - result.put("Ondemand", "");  
141 - result.put("InBytes", "");  
142 - result.put("InBitRate", "");  
143 - result.put("OutBytes", "");  
144 - result.put("NumOutputs", "");  
145 - result.put("CascadeSize", "");  
146 - result.put("RelaySize", "");  
147 - result.put("ChannelPTZType", "0");  
148 - resultDeferredResult.setResult(result);  
149 - }, (eventResult) -> {  
150 - JSONObject result = new JSONObject();  
151 - result.put("error", "channel[ " + code + " ] " + eventResult.msg);  
152 - resultDeferredResult.setResult(result);  
153 - }, null); 123 +// playService.play(newMediaServerItem, serial, code, (mediaServerItem, response)->{
  124 +// InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, serial, code);
  125 +// if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
  126 +// JSONObject result = new JSONObject();
  127 +// result.put("StreamID", inviteInfo.getStreamInfo().getStream());
  128 +// result.put("DeviceID", device.getDeviceId());
  129 +// result.put("ChannelID", code);
  130 +// result.put("ChannelName", deviceChannel.getName());
  131 +// result.put("ChannelCustomName", "");
  132 +// result.put("FLV", inviteInfo.getStreamInfo().getFlv().getUrl());
  133 +// result.put("WS_FLV", inviteInfo.getStreamInfo().getWs_flv().getUrl());
  134 +// result.put("RTMP", inviteInfo.getStreamInfo().getRtmp().getUrl());
  135 +// result.put("HLS", inviteInfo.getStreamInfo().getHls().getUrl());
  136 +// result.put("RTSP", inviteInfo.getStreamInfo().getRtsp().getUrl());
  137 +// result.put("WEBRTC", inviteInfo.getStreamInfo().getRtc().getUrl());
  138 +// result.put("CDN", "");
  139 +// result.put("SnapURL", "");
  140 +// result.put("Transport", device.getTransport());
  141 +// result.put("StartAt", "");
  142 +// result.put("Duration", "");
  143 +// result.put("SourceVideoCodecName", "");
  144 +// result.put("SourceVideoWidth", "");
  145 +// result.put("SourceVideoHeight", "");
  146 +// result.put("SourceVideoFrameRate", "");
  147 +// result.put("SourceAudioCodecName", "");
  148 +// result.put("SourceAudioSampleRate", "");
  149 +// result.put("AudioEnable", "");
  150 +// result.put("Ondemand", "");
  151 +// result.put("InBytes", "");
  152 +// result.put("InBitRate", "");
  153 +// result.put("OutBytes", "");
  154 +// result.put("NumOutputs", "");
  155 +// result.put("CascadeSize", "");
  156 +// result.put("RelaySize", "");
  157 +// result.put("ChannelPTZType", "0");
  158 +// resultDeferredResult.setResult(result);
  159 +// }
  160 +//
  161 +// }, (eventResult) -> {
  162 +// JSONObject result = new JSONObject();
  163 +// result.put("error", "channel[ " + code + " ] " + eventResult.msg);
  164 +// resultDeferredResult.setResult(result);
  165 +// }, null);
  166 +
  167 +
  168 + playService.play(newMediaServerItem, serial, code, (errorCode, msg, data) -> {
  169 + if (errorCode == InviteErrorCode.SUCCESS.getCode()) {
  170 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, serial, code);
  171 + if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
  172 + JSONObject result = new JSONObject();
  173 + result.put("StreamID", inviteInfo.getStreamInfo().getStream());
  174 + result.put("DeviceID", device.getDeviceId());
  175 + result.put("ChannelID", code);
  176 + result.put("ChannelName", deviceChannel.getName());
  177 + result.put("ChannelCustomName", "");
  178 + result.put("FLV", inviteInfo.getStreamInfo().getFlv().getUrl());
  179 + result.put("WS_FLV", inviteInfo.getStreamInfo().getWs_flv().getUrl());
  180 + result.put("RTMP", inviteInfo.getStreamInfo().getRtmp().getUrl());
  181 + result.put("HLS", inviteInfo.getStreamInfo().getHls().getUrl());
  182 + result.put("RTSP", inviteInfo.getStreamInfo().getRtsp().getUrl());
  183 + result.put("WEBRTC", inviteInfo.getStreamInfo().getRtc().getUrl());
  184 + result.put("CDN", "");
  185 + result.put("SnapURL", "");
  186 + result.put("Transport", device.getTransport());
  187 + result.put("StartAt", "");
  188 + result.put("Duration", "");
  189 + result.put("SourceVideoCodecName", "");
  190 + result.put("SourceVideoWidth", "");
  191 + result.put("SourceVideoHeight", "");
  192 + result.put("SourceVideoFrameRate", "");
  193 + result.put("SourceAudioCodecName", "");
  194 + result.put("SourceAudioSampleRate", "");
  195 + result.put("AudioEnable", "");
  196 + result.put("Ondemand", "");
  197 + result.put("InBytes", "");
  198 + result.put("InBitRate", "");
  199 + result.put("OutBytes", "");
  200 + result.put("NumOutputs", "");
  201 + result.put("CascadeSize", "");
  202 + result.put("RelaySize", "");
  203 + result.put("ChannelPTZType", "0");
  204 + resultDeferredResult.setResult(result);
  205 + }
  206 + }else {
  207 + JSONObject result = new JSONObject();
  208 + result.put("error", "channel[ " + code + " ] " + msg);
  209 + resultDeferredResult.setResult(result);
  210 + }
  211 + });
  212 +
154 return resultDeferredResult; 213 return resultDeferredResult;
155 } 214 }
156 215
@@ -171,8 +230,8 @@ public class ApiStreamController { @@ -171,8 +230,8 @@ public class ApiStreamController {
171 230
172 ){ 231 ){
173 232
174 - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(serial, code);  
175 - if (streamInfo == null) { 233 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, serial, code);
  234 + if (inviteInfo == null) {
176 JSONObject result = new JSONObject(); 235 JSONObject result = new JSONObject();
177 result.put("error","未找到流信息"); 236 result.put("error","未找到流信息");
178 return result; 237 return result;
@@ -184,14 +243,14 @@ public class ApiStreamController { @@ -184,14 +243,14 @@ public class ApiStreamController {
184 return result; 243 return result;
185 } 244 }
186 try { 245 try {
187 - cmder.streamByeCmd(device, code, streamInfo.getStream(), null); 246 + cmder.streamByeCmd(device, code, inviteInfo.getStream(), null);
188 } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { 247 } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
189 JSONObject result = new JSONObject(); 248 JSONObject result = new JSONObject();
190 result.put("error","发送BYE失败:" + e.getMessage()); 249 result.put("error","发送BYE失败:" + e.getMessage());
191 return result; 250 return result;
192 } 251 }
193 - redisCatchStorage.stopPlay(streamInfo);  
194 - storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); 252 + inviteStreamService.removeInviteInfo(inviteInfo);
  253 + storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
195 return null; 254 return null;
196 } 255 }
197 256