Commit c273a6aa5a8e02c12ad43c2c29108829583ebbbc

Authored by gushouzheng
1 parent d09d8f11

更新级联查看直播视频及代理拉流视频bug

src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -17,9 +17,11 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; @@ -17,9 +17,11 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
17 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; 17 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
18 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; 18 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
19 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 19 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  20 +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
20 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; 21 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
21 import com.genersoft.iot.vmp.service.IMediaServerService; 22 import com.genersoft.iot.vmp.service.IMediaServerService;
22 import com.genersoft.iot.vmp.service.IPlayService; 23 import com.genersoft.iot.vmp.service.IPlayService;
  24 +import com.genersoft.iot.vmp.service.IStreamProxyService;
23 import com.genersoft.iot.vmp.service.IStreamPushService; 25 import com.genersoft.iot.vmp.service.IStreamPushService;
24 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; 26 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
25 import com.genersoft.iot.vmp.service.bean.SSRCInfo; 27 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -65,6 +67,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -65,6 +67,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
65 67
66 @Autowired 68 @Autowired
67 private IStreamPushService streamPushService; 69 private IStreamPushService streamPushService;
  70 + @Autowired
  71 + private IStreamProxyService streamProxyService;
68 72
69 @Autowired 73 @Autowired
70 private IRedisCatchStorage redisCatchStorage; 74 private IRedisCatchStorage redisCatchStorage;
@@ -145,6 +149,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -145,6 +149,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
145 149
146 MediaServerItem mediaServerItem = null; 150 MediaServerItem mediaServerItem = null;
147 StreamPushItem streamPushItem = null; 151 StreamPushItem streamPushItem = null;
  152 + StreamProxyItem proxyByAppAndStream =null;
148 // 不是通道可能是直播流 153 // 不是通道可能是直播流
149 if (channel != null && gbStream == null) { 154 if (channel != null && gbStream == null) {
150 if (channel.getStatus() == 0) { 155 if (channel.getStatus() == 0) {
@@ -178,6 +183,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -178,6 +183,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
178 responseAck(evt, Response.GONE); 183 responseAck(evt, Response.GONE);
179 return; 184 return;
180 } 185 }
  186 + }else if("proxy".equals(gbStream.getStreamType())){
  187 + proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream());
  188 + if (proxyByAppAndStream == null) {
  189 + logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
  190 + responseAck(evt, Response.GONE);
  191 + return;
  192 + }
181 } 193 }
182 } 194 }
183 responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 195 responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
@@ -419,14 +431,33 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -419,14 +431,33 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
419 } 431 }
420 } 432 }
421 } else if (gbStream != null) { 433 } else if (gbStream != null) {
422 - if (streamPushItem != null && streamPushItem.isPushIng()) {  
423 - // 推流状态  
424 - pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,  
425 - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);  
426 - } else {  
427 - // 未推流 拉起  
428 - notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,  
429 - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); 434 + if("push".equals(gbStream.getStreamType())) {
  435 + if (streamPushItem != null && streamPushItem.isPushIng()) {
  436 + // 推流状态
  437 + pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
  438 + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  439 + } else {
  440 + // 未推流 拉起
  441 + notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
  442 + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  443 + }
  444 + }else if ("proxy".equals(gbStream.getStreamType())){
  445 + if(null != proxyByAppAndStream &&proxyByAppAndStream.isStatus()){
  446 + pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
  447 + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  448 + }else{
  449 + //开启代理拉流
  450 + boolean start1 = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
  451 + if(start1) {
  452 + pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
  453 + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  454 + }else{
  455 + //失败后通知
  456 + notifyStreamOnline(evt, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
  457 + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  458 + }
  459 + }
  460 +
430 } 461 }
431 } 462 }
432 } 463 }
@@ -445,7 +476,39 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -445,7 +476,39 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
445 /** 476 /**
446 * 安排推流 477 * 安排推流
447 */ 478 */
  479 + private void pushProxyStream(RequestEvent evt, GbStream gbStream, ParentPlatform platform,
  480 + CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
  481 + int port, Boolean tcpActive, boolean mediaTransmissionTCP,
  482 + String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
  483 + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
  484 + if (streamReady) {
  485 + // 自平台内容
  486 + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
  487 + gbStream.getApp(), gbStream.getStream(), channelId,
  488 + mediaTransmissionTCP);
448 489
  490 + if (sendRtpItem == null) {
  491 + logger.warn("服务器端口资源不足");
  492 + responseAck(evt, Response.BUSY_HERE);
  493 + return;
  494 + }
  495 + if (tcpActive != null) {
  496 + sendRtpItem.setTcpActive(tcpActive);
  497 + }
  498 + sendRtpItem.setPlayType(InviteStreamType.PUSH);
  499 + // 写入redis, 超时时回复
  500 + sendRtpItem.setStatus(1);
  501 + sendRtpItem.setCallId(callIdHeader.getCallId());
  502 + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
  503 + sendRtpItem.setDialog(dialogByteArray);
  504 + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
  505 + sendRtpItem.setTransaction(transactionByteArray);
  506 + redisCatchStorage.updateSendRTPSever(sendRtpItem);
  507 + sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
  508 +
  509 + }
  510 +
  511 + }
449 private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, 512 private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
450 CallIdHeader callIdHeader, MediaServerItem mediaServerItem, 513 CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
451 int port, Boolean tcpActive, boolean mediaTransmissionTCP, 514 int port, Boolean tcpActive, boolean mediaTransmissionTCP,
@@ -490,7 +553,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -490,7 +553,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
490 } 553 }
491 554
492 } 555 }
493 -  
494 /** 556 /**
495 * 通知流上线 557 * 通知流上线
496 */ 558 */
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -277,7 +277,11 @@ public class MediaServerServiceImpl implements IMediaServerService { @@ -277,7 +277,11 @@ public class MediaServerServiceImpl implements IMediaServerService {
277 return null; 277 return null;
278 } 278 }
279 String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId; 279 String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
280 - return (MediaServerItem)redisUtil.get(key); 280 + MediaServerItem serverItem=(MediaServerItem)redisUtil.get(key);
  281 + if(null==serverItem){
  282 + serverItem=mediaServerMapper.queryOne(mediaServerId);
  283 + }
  284 + return serverItem;
281 } 285 }
282 286
283 @Override 287 @Override
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON; @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
4 import com.alibaba.fastjson.JSONArray; 4 import com.alibaba.fastjson.JSONArray;
5 import com.alibaba.fastjson.JSONObject; 5 import com.alibaba.fastjson.JSONObject;
6 import com.alibaba.fastjson.TypeReference; 6 import com.alibaba.fastjson.TypeReference;
  7 +import com.genersoft.iot.vmp.conf.MediaConfig;
7 import com.genersoft.iot.vmp.conf.UserSetting; 8 import com.genersoft.iot.vmp.conf.UserSetting;
8 import com.genersoft.iot.vmp.gb28181.bean.*; 9 import com.genersoft.iot.vmp.gb28181.bean.*;
9 import com.genersoft.iot.vmp.gb28181.event.EventPublisher; 10 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
@@ -78,6 +79,10 @@ public class StreamPushServiceImpl implements IStreamPushService { @@ -78,6 +79,10 @@ public class StreamPushServiceImpl implements IStreamPushService {
78 @Autowired 79 @Autowired
79 TransactionDefinition transactionDefinition; 80 TransactionDefinition transactionDefinition;
80 81
  82 + @Autowired
  83 + private MediaConfig mediaConfig;
  84 +
  85 +
81 @Override 86 @Override
82 public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) { 87 public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
83 if (jsonData == null) { 88 if (jsonData == null) {
@@ -142,6 +147,8 @@ public class StreamPushServiceImpl implements IStreamPushService { @@ -142,6 +147,8 @@ public class StreamPushServiceImpl implements IStreamPushService {
142 stream.setStreamType("push"); 147 stream.setStreamType("push");
143 stream.setStatus(true); 148 stream.setStatus(true);
144 stream.setCreateTime(DateUtil.getNow()); 149 stream.setCreateTime(DateUtil.getNow());
  150 + stream.setStreamType("push");
  151 + stream.setMediaServerId(mediaConfig.getId());
145 int add = gbStreamMapper.add(stream); 152 int add = gbStreamMapper.add(stream);
146 return add > 0; 153 return add > 0;
147 } 154 }