Commit e94b99d11c46246532edc93cd25cbf8c0b88f03f

Authored by 648540858
1 parent 10f77667

实现国标录像级联播放,优化点播流程,加快点播速度

Showing 19 changed files with 209 additions and 101 deletions
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
... ... @@ -71,6 +71,16 @@ public class SendRtpItem {
71 71 */
72 72 private String mediaServerId;
73 73  
  74 + /**
  75 + * invite的callId
  76 + */
  77 + private String CallId;
  78 +
  79 + /**
  80 + * 是否是play, false是playback
  81 + */
  82 + private boolean isPlay;
  83 +
74 84 public String getIp() {
75 85 return ip;
76 86 }
... ... @@ -174,4 +184,20 @@ public class SendRtpItem {
174 184 public void setMediaServerId(String mediaServerId) {
175 185 this.mediaServerId = mediaServerId;
176 186 }
  187 +
  188 + public String getCallId() {
  189 + return CallId;
  190 + }
  191 +
  192 + public void setCallId(String callId) {
  193 + CallId = callId;
  194 + }
  195 +
  196 + public boolean isPlay() {
  197 + return isPlay;
  198 + }
  199 +
  200 + public void setPlay(boolean play) {
  201 + isPlay = play;
  202 + }
177 203 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/session/SsrcConfig.java
... ... @@ -81,7 +81,6 @@ public class SsrcConfig {
81 81 isUsed.remove(sn);
82 82 notUsed.add(sn);
83 83 }catch (NullPointerException e){
84   - System.out.printf("11111");
85 84 }
86 85 }
87 86  
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java
... ... @@ -36,7 +36,6 @@ public class GPSSubscribeTask implements Runnable{
36 36  
37 37 SubscribeInfo subscribe = redisCatchStorage.getSubscribe(key);
38 38 if (subscribe != null) {
39   - System.out.println("发送GPS消息");
40 39 ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
41 40 if (parentPlatform == null || parentPlatform.isStatus()) {
42 41 // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
... ... @@ -141,7 +141,6 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
141 141 */
142 142 @Override
143 143 public void processTimeout(TimeoutEvent timeoutEvent) {
144   - System.out.println("processTimeout");
145 144 if(timeoutProcessor != null) {
146 145 timeoutProcessor.process(timeoutEvent);
147 146 }
... ... @@ -173,7 +172,6 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
173 172  
174 173 @Override
175 174 public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) {
176   - System.out.println("processDialogTerminated");
177 175 CallIdHeader callId = dialogTerminatedEvent.getDialog().getCallId();
178 176 }
179 177  
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
... ... @@ -346,8 +346,11 @@ public class SIPCommander implements ISIPCommander {
346 346 subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
347 347 (MediaServerItem mediaServerItemInUse, JSONObject json)->{
348 348 if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return;
349   - event.response(mediaServerItemInUse, json);
350   - subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
  349 + if (event != null) {
  350 + event.response(mediaServerItemInUse, json);
  351 + }
  352 +
  353 +// subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
351 354 });
352 355 //
353 356 StringBuffer content = new StringBuffer(200);
... ... @@ -452,9 +455,11 @@ public class SIPCommander implements ISIPCommander {
452 455 logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
453 456 subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
454 457 (MediaServerItem mediaServerItemInUse, JSONObject json)->{
  458 + System.out.println(344444);
455 459 if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return;
456   - event.response(mediaServerItemInUse, json);
457   - subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
  460 + if (event != null) {
  461 + event.response(mediaServerItemInUse, json);
  462 + }
458 463 });
459 464  
460 465 StringBuffer content = new StringBuffer(200);
... ... @@ -466,8 +471,6 @@ public class SIPCommander implements ISIPCommander {
466 471 content.append("t="+DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime)+" "
467 472 +DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime) +"\r\n");
468 473  
469   -
470   -
471 474 String streamMode = device.getStreamMode().toUpperCase();
472 475  
473 476 if (userSetup.isSeniorSdp()) {
... ... @@ -1202,7 +1205,6 @@ public class SIPCommander implements ISIPCommander {
1202 1205 if (type == null) {
1203 1206 type = "all";
1204 1207 }
1205   -
1206 1208 try {
1207 1209 StringBuffer recordInfoXml = new StringBuffer(200);
1208 1210 recordInfoXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
... ... @@ -508,9 +508,6 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
508 508 // callid
509 509 CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
510 510 : udpSipProvider.getNewCallId();
511   - System.out.println(
512   - recordXml.toString()
513   - );
514 511 Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, recordXml.toString(), fromTag, callIdHeader);
515 512 transmitRequest(parentPlatform, request);
516 513  
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
1 1 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
2 2  
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.alibaba.fastjson.JSONObject;
3 5 import com.genersoft.iot.vmp.common.StreamInfo;
4 6 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
5 7 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
6 8 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
7 9 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
  10 +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
8 11 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
9 12 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
10 13 import com.genersoft.iot.vmp.service.IMediaServerService;
... ... @@ -24,6 +27,8 @@ import javax.sip.header.HeaderAddress;
24 27 import javax.sip.header.ToHeader;
25 28 import java.util.HashMap;
26 29 import java.util.Map;
  30 +import java.util.Timer;
  31 +import java.util.TimerTask;
27 32  
28 33 /**
29 34 * SIP命令类型: ACK请求
... ... @@ -52,6 +57,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
52 57 @Autowired
53 58 private IMediaServerService mediaServerService;
54 59  
  60 + @Autowired
  61 + private ZLMHttpHookSubscribe subscribe;
  62 +
55 63  
56 64 /**
57 65 * 处理 ACK请求
... ... @@ -60,6 +68,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
60 68 */
61 69 @Override
62 70 public void process(RequestEvent evt) {
  71 + logger.debug("ACK请求: {}", ((System.currentTimeMillis())));
63 72 Dialog dialog = evt.getDialog();
64 73 if (dialog == null) return;
65 74 if (dialog.getState()== DialogState.CONFIRMED) {
... ... @@ -69,16 +78,17 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
69 78 String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
70 79 String deviceId = sendRtpItem.getDeviceId();
71 80 StreamInfo streamInfo = null;
72   - if (deviceId == null) {
  81 + if (sendRtpItem.isPlay()) {
  82 + streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
  83 + }else {
  84 + streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId);
  85 + }
  86 + System.out.println(JSON.toJSON(streamInfo));
  87 + if (streamInfo == null) {
73 88 streamInfo = new StreamInfo();
74 89 streamInfo.setApp(sendRtpItem.getApp());
75 90 streamInfo.setStreamId(sendRtpItem.getStreamId());
76   - }else {
77   - streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
78   - sendRtpItem.setStreamId(streamInfo.getStreamId());
79   - streamInfo.setApp("rtp");
80 91 }
81   -
82 92 redisCatchStorage.updateSendRTPSever(sendRtpItem);
83 93 logger.info(platformGbId);
84 94 logger.info(channelId);
... ... @@ -90,34 +100,42 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
90 100 param.put("dst_url",sendRtpItem.getIp());
91 101 param.put("dst_port", sendRtpItem.getPort());
92 102 param.put("is_udp", is_Udp);
93   - //param.put ("src_port", sendRtpItem.getLocalPort());
94 103 // 设备推流查询,成功后才能转推
95   - boolean rtpPushed = false;
96   - long startTime = System.currentTimeMillis();
97   - while (!rtpPushed) {
98   - try {
99   - if (System.currentTimeMillis() - startTime < 30 * 1000) {
100   - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
101   - if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) {
102   - rtpPushed = true;
103   - logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]",
104   - streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort());
105   - zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
106   - } else {
107   - logger.info("等待设备推流[{}/{}].......",
108   - streamInfo.getApp() ,streamInfo.getStreamId());
109   - Thread.sleep(1000);
110   - continue;
111   - }
112   - } else {
113   - rtpPushed = true;
114   - logger.info("设备推流[{}/{}]超时,终止向上级推流",
115   - streamInfo.getApp() ,streamInfo.getStreamId());
116   - }
117   - } catch (InterruptedException e) {
118   - e.printStackTrace();
119   - }
120   - }
  104 + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
  105 + zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
  106 +// if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) {
  107 +// logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]",
  108 +// streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort());
  109 +// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
  110 +// } else {
  111 +// // 对hook进行订阅
  112 +// logger.info("等待设备推流[{}/{}].......",
  113 +// streamInfo.getApp(), streamInfo.getStreamId());
  114 +// Timer timer = new Timer();
  115 +// timer.schedule(new TimerTask() {
  116 +// @Override
  117 +// public void run() {
  118 +// logger.info("设备推流[{}/{}]超时,终止向上级推流",
  119 +// finalStreamInfo.getApp() , finalStreamInfo.getStreamId());
  120 +//
  121 +// }
  122 +// }, 30*1000L);
  123 +// // 添加订阅
  124 +// JSONObject subscribeKey = new JSONObject();
  125 +// subscribeKey.put("app", "rtp");
  126 +// subscribeKey.put("stream", streamInfo.getStreamId());
  127 +// subscribeKey.put("mediaServerId", streamInfo.getMediaServerId());
  128 +// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, subscribeKey,
  129 +// (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
  130 +// logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]",
  131 +// finalStreamInfo.getApp(), finalStreamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort());
  132 +// timer.cancel();
  133 +// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
  134 +// subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
  135 +// });
  136 +// }
  137 +
  138 +
121 139 }
122 140 }
123 141 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
... ... @@ -87,18 +87,29 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
87 87 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
88 88 zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
89 89 redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
90   - if (zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId) == 0) {
  90 + int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
  91 + if (totalReaderCount == 0) {
91 92 logger.info(streamId + "无其它观看者,通知设备停止推流");
92 93 cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId);
  94 + }else if (totalReaderCount == -1){
  95 + logger.warn(streamId + " 查找其它观看者失败");
93 96 }
94 97 }
95 98 // 可能是设备主动停止
96 99 Device device = storager.queryVideoDeviceByChannelId(platformGbId);
97 100 if (device != null) {
98   - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
99   - if (streamInfo != null) {
100   - redisCatchStorage.stopPlay(streamInfo);
  101 + if (sendRtpItem.isPlay()) {
  102 + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
  103 + if (streamInfo != null) {
  104 + redisCatchStorage.stopPlay(streamInfo);
  105 + }
  106 + }else {
  107 + StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(device.getDeviceId(), channelId);
  108 + if (streamInfo != null) {
  109 + redisCatchStorage.stopPlayback(streamInfo);
  110 + }
101 111 }
  112 +
102 113 storager.stopPlay(device.getDeviceId(), channelId);
103 114 mediaServerService.closeRTPServer(device, channelId);
104 115 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
1 1 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
2 2  
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.alibaba.fastjson.JSONObject;
  5 +import com.genersoft.iot.vmp.common.StreamInfo;
3 6 import com.genersoft.iot.vmp.gb28181.bean.*;
  7 +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
4 8 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
  9 +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
  10 +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
5 11 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
6 12 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
7 13 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
8 14 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
  15 +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
9 16 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
10 17 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
11 18 import com.genersoft.iot.vmp.service.IMediaServerService;
12 19 import com.genersoft.iot.vmp.service.IPlayService;
  20 +import com.genersoft.iot.vmp.service.bean.SSRCInfo;
13 21 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
14 22 import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
15 23 import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
  24 +import gov.nist.javax.sdp.TimeDescriptionImpl;
  25 +import gov.nist.javax.sdp.fields.TimeField;
16 26 import gov.nist.javax.sip.address.AddressImpl;
17 27 import gov.nist.javax.sip.address.SipUri;
18 28 import org.slf4j.Logger;
... ... @@ -27,10 +37,13 @@ import javax.sip.RequestEvent;
27 37 import javax.sip.ServerTransaction;
28 38 import javax.sip.SipException;
29 39 import javax.sip.address.SipURI;
  40 +import javax.sip.header.CallIdHeader;
30 41 import javax.sip.header.FromHeader;
31 42 import javax.sip.message.Request;
32 43 import javax.sip.message.Response;
33 44 import java.text.ParseException;
  45 +import java.text.SimpleDateFormat;
  46 +import java.util.Date;
34 47 import java.util.List;
35 48 import java.util.Vector;
36 49  
... ... @@ -61,6 +74,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
61 74 private IPlayService playService;
62 75  
63 76 @Autowired
  77 + private ISIPCommander commander;
  78 +
  79 + @Autowired
64 80 private ZLMRTPServerFactory zlmrtpServerFactory;
65 81  
66 82 @Autowired
... ... @@ -69,6 +85,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
69 85 @Autowired
70 86 private SIPProcessorObserver sipProcessorObserver;
71 87  
  88 +
72 89 @Override
73 90 public void afterPropertiesSet() throws Exception {
74 91 // 添加消息处理的订阅
... ... @@ -84,6 +101,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
84 101 @Override
85 102 public void process(RequestEvent evt) {
86 103 // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令
  104 + Long startTimeForInvite = System.currentTimeMillis();
87 105 try {
88 106 Request request = evt.getRequest();
89 107 SipURI sipURI = (SipURI) request.getRequestURI();
... ... @@ -91,6 +109,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
91 109 String requesterId = null;
92 110  
93 111 FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
  112 + CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
94 113 AddressImpl address = (AddressImpl) fromHeader.getAddress();
95 114 SipUri uri = (SipUri) address.getURI();
96 115 requesterId = uri.getUser();
... ... @@ -101,7 +120,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
101 120 return;
102 121 }
103 122  
104   - // 查询请求方是否上级平台
  123 + // 查询请求是否来自上级平台\设备
105 124 ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId);
106 125 if (platform != null) {
107 126 // 查询平台下是否有该通道
... ... @@ -158,7 +177,21 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
158 177 ssrc = ssrcDefault;
159 178 sdp = SdpFactory.getInstance().createSessionDescription(contentString);
160 179 }
161   -
  180 + String sessionName = sdp.getSessionName().getValue();
  181 +
  182 + Long startTime = null;
  183 + Long stopTime = null;
  184 + Date start = null;
  185 + Date end = null;
  186 + if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) {
  187 + TimeDescriptionImpl timeDescription = (TimeDescriptionImpl)(sdp.getTimeDescriptions(false).get(0));
  188 + TimeField startTimeFiled = (TimeField)timeDescription.getTime();
  189 + startTime = startTimeFiled.getStartTime();
  190 + stopTime = startTimeFiled.getStopTime();
  191 +
  192 + start = new Date(startTime*1000);
  193 + end = new Date(stopTime*1000);
  194 + }
162 195 // 获取支持的格式
163 196 Vector mediaDescriptions = sdp.getMediaDescriptions(true);
164 197 // 查看是否支持PS 负载96
... ... @@ -228,23 +261,31 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
228 261 responseAck(evt, Response.BUSY_HERE);
229 262 return;
230 263 }
231   -
  264 + sendRtpItem.setCallId(callIdHeader.getCallId());
  265 + sendRtpItem.setPlay("Play".equals(sessionName));
232 266 // 写入redis, 超时时回复
233 267 redisCatchStorage.updateSendRTPSever(sendRtpItem);
234   - // 通知下级推流,
235   - PlayResult playResult = playService.play(mediaServerItem,device.getDeviceId(), channelId, (mediaServerItemInUSe, responseJSON)->{
236   - // 收到推流, 回复200OK, 等待ack
  268 +
  269 + Device finalDevice = device;
  270 + MediaServerItem finalMediaServerItem = mediaServerItem;
  271 + Long finalStartTime = startTime;
  272 + Long finalStopTime = stopTime;
  273 + ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{
  274 + logger.info("[上级点播]收到下级开始点播订阅, {}/{}", sendRtpItem.getApp(), sendRtpItem.getStreamId());
237 275 // if (sendRtpItem == null) return;
238 276 sendRtpItem.setStatus(1);
239 277 redisCatchStorage.updateSendRTPSever(sendRtpItem);
240   - // TODO 添加对tcp的支持
241 278  
242 279 StringBuffer content = new StringBuffer(200);
243 280 content.append("v=0\r\n");
244 281 content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n");
245   - content.append("s=Play\r\n");
  282 + content.append("s=" + sessionName+"\r\n");
246 283 content.append("c=IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n");
247   - content.append("t=0 0\r\n");
  284 + if ("Playback".equals(sessionName)) {
  285 + content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n");
  286 + }else {
  287 + content.append("t=0 0\r\n");
  288 + }
248 289 content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n");
249 290 content.append("a=sendonly\r\n");
250 291 content.append("a=rtpmap:96 PS/90000\r\n");
... ... @@ -260,7 +301,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
260 301 } catch (ParseException e) {
261 302 e.printStackTrace();
262 303 }
263   - } ,((event) -> {
  304 + if ("Playback".equals(sessionName) && responseJSON != null) {
  305 + playService.onPublishHandlerForPlayBack(finalMediaServerItem, responseJSON, finalDevice.getDeviceId(), channelId, null);
  306 + }
  307 + };
  308 + SipSubscribe.Event errorEvent = ((event) -> {
264 309 // 未知错误。直接转发设备点播的错误
265 310 Response response = null;
266 311 try {
... ... @@ -271,11 +316,27 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
271 316 } catch (ParseException | SipException | InvalidArgumentException e) {
272 317 e.printStackTrace();
273 318 }
274   - }));
275   - if (logger.isDebugEnabled()) {
276   - logger.debug(playResult.getResult().toString());
  319 + });
  320 + if ("Playback".equals(sessionName)) {
  321 + sendRtpItem.setPlay(false);
  322 + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, sendRtpItem.getSsrc(), true);
  323 + sendRtpItem.setStreamId(ssrc);
  324 + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  325 + commander.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, format.format(start), format.format(end), hookEvent, errorEvent);
  326 + }else {
  327 + sendRtpItem.setPlay(true);
  328 + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
  329 + if (streamInfo == null) {
  330 + if (mediaServerItem.isRtpEnable()) {
  331 + sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId));
  332 + }
  333 + sendRtpItem.setPlay(false);
  334 + playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent,errorEvent);
  335 + }else {
  336 + sendRtpItem.setStreamId(streamInfo.getStreamId());
  337 + hookEvent.response(mediaServerItem, null);
  338 + }
277 339 }
278   -
279 340 }else if (gbStream != null) {
280 341 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
281 342 gbStream.getApp(), gbStream.getStream(), channelId,
... ... @@ -295,7 +356,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
295 356  
296 357 sendRtpItem.setStatus(1);
297 358 redisCatchStorage.updateSendRTPSever(sendRtpItem);
298   - // TODO 添加对tcp的支持
299 359 StringBuffer content = new StringBuffer(200);
300 360 content.append("v=0\r\n");
301 361 content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java
... ... @@ -82,9 +82,6 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
82 82 requestURI.setPort(event.getRemotePort());
83 83 reqAck.setRequestURI(requestURI);
84 84 logger.info("向 " + event.getRemoteIpAddress() + ":" + event.getRemotePort() + "回复ack");
85   - SipURI sipURI = (SipURI)dialog.getRemoteParty().getURI();
86   - String deviceId = requestURI.getUser();
87   - String channelId = sipURI.getUser();
88 85  
89 86 dialog.sendAck(reqAck);
90 87  
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
... ... @@ -181,7 +181,7 @@ public class ZLMHttpHookListener {
181 181 @PostMapping(value = "/on_publish", produces = "application/json;charset=UTF-8")
182 182 public ResponseEntity<String> onPublish(@RequestBody JSONObject json) {
183 183  
184   - logger.debug("[ ZLM HOOK ]on_publish API调用,参数:" + json.toString());
  184 + logger.info("[ ZLM HOOK ]on_publish API调用,参数:" + json.toString());
185 185 JSONObject ret = new JSONObject();
186 186 ret.put("code", 0);
187 187 ret.put("msg", "success");
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java
... ... @@ -77,21 +77,23 @@ public class ZLMHttpHookSubscribe {
77 77 if (eventMap == null) {
78 78 return;
79 79 }
80   - Iterator<Map.Entry<JSONObject, Event>> iterator = eventMap.entrySet().iterator();
81   - while (iterator.hasNext()){
82   - Map.Entry<JSONObject, Event> next = iterator.next();
83   - JSONObject key = next.getKey();
84   - Boolean result = null;
85   - for (String s : key.keySet()) {
86   - if (result == null) {
87   - result = key.getString(s).equals(hookResponse.getString(s));
88   - }else {
89   - if (key.getString(s) == null) continue;
90   - result = result && key.getString(s).equals(hookResponse.getString(s));
  80 +
  81 + Set<Map.Entry<JSONObject, Event>> entries = eventMap.entrySet();
  82 + if (entries.size() > 0) {
  83 + for (Map.Entry<JSONObject, Event> entry : entries) {
  84 + JSONObject key = entry.getKey();
  85 + Boolean result = null;
  86 + for (String s : key.keySet()) {
  87 + if (result == null) {
  88 + result = key.getString(s).equals(hookResponse.getString(s));
  89 + }else {
  90 + if (key.getString(s) == null) continue;
  91 + result = result && key.getString(s).equals(hookResponse.getString(s));
  92 + }
  93 + }
  94 + if (null != result && result){
  95 + entries.remove(entry);
91 96 }
92   - }
93   - if (null != result && result){
94   - iterator.remove();
95 97 }
96 98 }
97 99 }
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
... ... @@ -72,7 +72,6 @@ public class ZLMRESTfulUtils {
72 72 ResponseBody responseBody = response.body();
73 73 if (responseBody != null) {
74 74 String responseStr = responseBody.string();
75   - System.out.println(responseStr);
76 75 responseJSON = JSON.parseObject(responseStr);
77 76 }
78 77 }else {
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
... ... @@ -242,9 +242,18 @@ public class ZLMRTPServerFactory {
242 242 */
243 243 public int totalReaderCount(MediaServerItem mediaServerItem, String app, String streamId) {
244 244 JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtmp", streamId);
  245 + Integer code = mediaInfo.getInteger("code");
245 246 if (mediaInfo == null) {
246 247 return 0;
247 248 }
  249 + if ( code < 0) {
  250 + logger.warn("查询流({}/{})是否有其它观看者时得到: {}", app, streamId, mediaInfo.getString("msg"));
  251 + return -1;
  252 + }
  253 + if ( code == 0 && ! mediaInfo.getBoolean("online")) {
  254 + logger.warn("查询流({}/{})是否有其它观看者时得到: {}", app, streamId, mediaInfo.getString("msg"));
  255 + return -1;
  256 + }
248 257 return mediaInfo.getInteger("totalReaderCount");
249 258 }
250 259  
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
... ... @@ -122,7 +122,6 @@ public class PlayServiceImpl implements IPlayService {
122 122 // 点播结束时调用截图接口
123 123 try {
124 124 String classPath = ResourceUtils.getURL("classpath:").getPath();
125   - // System.out.println(classPath);
126 125 // 兼容打包为jar的class路径
127 126 if(classPath.contains("jar")) {
128 127 classPath = classPath.substring(0, classPath.lastIndexOf("."));
... ... @@ -238,11 +237,11 @@ public class PlayServiceImpl implements IPlayService {
238 237 }
239 238  
240 239 @Override
241   - public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
  240 + public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
242 241 RequestMessage msg = new RequestMessage();
243 242 msg.setId(uuid);
244 243 msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
245   - StreamInfo streamInfo = onPublishHandler(mediaServerItem, resonse, deviceId, channelId, uuid);
  244 + StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId, uuid);
246 245 if (streamInfo != null) {
247 246 DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
248 247 if (deviceChannel != null) {
... ...
src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java
... ... @@ -88,7 +88,7 @@ public interface ParentPlatformMapper {
88 88 "</script>"})
89 89 int setDefaultCatalog(String platformId, String catalogId);
90 90  
91   - @Select("select 'channel' as name, count(pgc.platformId) count from platform_gb_channel pgc where pgc.platformId=#{platformId} and pgc.channelId =#{gbId} " +
  91 + @Select("select 'channel' as name, count(pgc.platformId) count from platform_gb_channel pgc left join device_channel dc on dc.id = pgc.deviceChannelId where pgc.platformId=#{platformId} and dc.channelId =#{gbId} " +
92 92 "union " +
93 93 "select 'stream' as name, count(pgs.platformId) count from platform_gb_stream pgs left join gb_stream gs on pgs.gbStreamId = gs.gbStreamId where pgs.platformId=#{platformId} and gs.gbId = #{gbId}")
94 94 List<ChannelSourceInfo> getChannelSource(String platformId, String gbId);
... ...
src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java
... ... @@ -52,8 +52,8 @@ public interface PlatformChannelMapper {
52 52 int cleanChannelForGB(String platformId);
53 53  
54 54  
55   - @Select("SELECT * FROM device_channel WHERE deviceId = (SELECT deviceId FROM platform_gb_channel WHERE " +
56   - "platformId='${platformId}' AND channelId='${channelId}' ) AND channelId='${channelId}'")
  55 + @Select("SELECT dc.* FROM platform_gb_channel pgc left join device_channel dc on dc.id = pgc.deviceChannelId WHERE " +
  56 + "pgc.platformId=#{platformId} AND dc.channelId=#{channelId}")
57 57 DeviceChannel queryChannelInParentPlatform(String platformId, String channelId);
58 58  
59 59  
... ... @@ -62,7 +62,7 @@ public interface PlatformChannelMapper {
62 62 "where pgc.platformId=#{platformId} and pgc.catalogId=#{catalogId}")
63 63 List<PlatformCatalog> queryChannelInParentPlatformAndCatalog(String platformId, String catalogId);
64 64  
65   - @Select("SELECT * FROM device WHERE deviceId = (SELECT deviceId FROM platform_gb_channel WHERE platformId='${platformId}' AND channelId='${channelId}')")
  65 + @Select("SELECT * FROM device WHERE deviceId = (SELECT deviceId FROM platform_gb_channel pgc left join device_channel dc on dc.id = pgc.deviceChannelId WHERE pgc.platformId='${platformId}' AND dc.channelId='${channelId}')")
66 66 Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId);
67 67  
68 68 @Delete("<script> "+
... ... @@ -71,7 +71,7 @@ public interface PlatformChannelMapper {
71 71 int delByCatalogId(String id);
72 72  
73 73 @Delete("<script> "+
74   - "DELETE FROM platform_gb_channel WHERE catalogId=#{parentId} AND platformId=#{platformId} AND channelId=#{id}" +
  74 + "DELETE FROM platform_gb_channel WHERE catalogId=#{parentId} AND platformId=#{platformId} AND channelId=#{id}" +
75 75 "</script>")
76 76 int delByCatalogIdAndChannelIdAndPlatformId(PlatformCatalog platformCatalog);
77 77  
... ...
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
... ... @@ -139,7 +139,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
139 139  
140 140 @Override
141 141 public StreamInfo queryPlayByDevice(String deviceId, String channelId) {
142   -// List<Object> playLeys = redis.keys(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX,
143 142 List<Object> playLeys = redis.scan(String.format("%S_%s_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX,
144 143 userSetup.getServerId(),
145 144 deviceId,
... ...
src/test/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImplTest.java
... ... @@ -50,14 +50,7 @@ class DeviceAlarmServiceImplTest {
50 50 // System.out.println(deviceAlarmService.getAllAlarm(0, 10000, "11111111111111111111", null, "1", null,
51 51 // null, null).getSize());
52 52  
53   - System.out.println(deviceAlarmService.getAllAlarm(0, 10000, "11111111111111111111", null, null, null,
54   - "2021-01-01 00:00:00", null).getSize());
55 53  
56   - System.out.println(deviceAlarmService.getAllAlarm(0, 10000, "11111111111111111111", null, null, null,
57   - null, "2021-04-01 09:00:00").getSize());
58   -
59   - System.out.println(deviceAlarmService.getAllAlarm(0, 10000, "11111111111111111111", null, null, null,
60   - "2021-02-01 01:00:00", "2021-04-01 04:00:00").getSize());
61 54 }
62 55  
63 56  
... ...