Commit 935221ab4112894ad3bc92ed91eff3af8bd2226b

Authored by 648540858
Committed by GitHub
2 parents a580ff6f 7ecc86b3

Merge pull request #567 from mrjackwang/wvp-28181-2.0

更新上级级联查看直播视频及代理拉流视频流bug
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
... ... @@ -17,9 +17,11 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
17 17 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
18 18 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
19 19 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  20 +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
20 21 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
21 22 import com.genersoft.iot.vmp.service.IMediaServerService;
22 23 import com.genersoft.iot.vmp.service.IPlayService;
  24 +import com.genersoft.iot.vmp.service.IStreamProxyService;
23 25 import com.genersoft.iot.vmp.service.IStreamPushService;
24 26 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
25 27 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
... ... @@ -65,6 +67,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
65 67  
66 68 @Autowired
67 69 private IStreamPushService streamPushService;
  70 + @Autowired
  71 + private IStreamProxyService streamProxyService;
68 72  
69 73 @Autowired
70 74 private IRedisCatchStorage redisCatchStorage;
... ... @@ -142,6 +146,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
142 146  
143 147 MediaServerItem mediaServerItem = null;
144 148 StreamPushItem streamPushItem = null;
  149 + StreamProxyItem proxyByAppAndStream =null;
145 150 // 不是通道可能是直播流
146 151 if (channel != null && gbStream == null) {
147 152 if (channel.getStatus() == 0) {
... ... @@ -175,6 +180,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
175 180 responseAck(evt, Response.GONE);
176 181 return;
177 182 }
  183 + }else if("proxy".equals(gbStream.getStreamType())){
  184 + proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream());
  185 + if (proxyByAppAndStream == null) {
  186 + logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
  187 + responseAck(evt, Response.GONE);
  188 + return;
  189 + }
178 190 }
179 191 }
180 192 responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
... ... @@ -416,14 +428,33 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
416 428 }
417 429 }
418 430 } else if (gbStream != null) {
419   - if (streamPushItem != null && streamPushItem.isPushIng()) {
420   - // 推流状态
421   - pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
422   - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
423   - } else {
424   - // 未推流 拉起
425   - notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
426   - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  431 + if("push".equals(gbStream.getStreamType())) {
  432 + if (streamPushItem != null && streamPushItem.isPushIng()) {
  433 + // 推流状态
  434 + pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
  435 + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  436 + } else {
  437 + // 未推流 拉起
  438 + notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
  439 + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  440 + }
  441 + }else if ("proxy".equals(gbStream.getStreamType())){
  442 + if(null != proxyByAppAndStream &&proxyByAppAndStream.isStatus()){
  443 + pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
  444 + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  445 + }else{
  446 + //开启代理拉流
  447 + boolean start1 = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
  448 + if(start1) {
  449 + pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
  450 + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  451 + }else{
  452 + //失败后通知
  453 + notifyStreamOnline(evt, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
  454 + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  455 + }
  456 + }
  457 +
427 458 }
428 459 }
429 460 }
... ... @@ -442,7 +473,39 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
442 473 /**
443 474 * 安排推流
444 475 */
  476 + private void pushProxyStream(RequestEvent evt, GbStream gbStream, ParentPlatform platform,
  477 + CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
  478 + int port, Boolean tcpActive, boolean mediaTransmissionTCP,
  479 + String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
  480 + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
  481 + if (streamReady) {
  482 + // 自平台内容
  483 + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
  484 + gbStream.getApp(), gbStream.getStream(), channelId,
  485 + mediaTransmissionTCP);
445 486  
  487 + if (sendRtpItem == null) {
  488 + logger.warn("服务器端口资源不足");
  489 + responseAck(evt, Response.BUSY_HERE);
  490 + return;
  491 + }
  492 + if (tcpActive != null) {
  493 + sendRtpItem.setTcpActive(tcpActive);
  494 + }
  495 + sendRtpItem.setPlayType(InviteStreamType.PUSH);
  496 + // 写入redis, 超时时回复
  497 + sendRtpItem.setStatus(1);
  498 + sendRtpItem.setCallId(callIdHeader.getCallId());
  499 + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
  500 + sendRtpItem.setDialog(dialogByteArray);
  501 + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
  502 + sendRtpItem.setTransaction(transactionByteArray);
  503 + redisCatchStorage.updateSendRTPSever(sendRtpItem);
  504 + sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
  505 +
  506 + }
  507 +
  508 + }
446 509 private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
447 510 CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
448 511 int port, Boolean tcpActive, boolean mediaTransmissionTCP,
... ... @@ -487,7 +550,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
487 550 }
488 551  
489 552 }
490   -
491 553 /**
492 554 * 通知流上线
493 555 */
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
... ... @@ -8,6 +8,7 @@ import java.util.List;
8 8 import java.util.Map;
9 9 import java.util.Set;
10 10  
  11 +import com.genersoft.iot.vmp.media.zlm.ZLMRunner;
11 12 import org.slf4j.Logger;
12 13 import org.slf4j.LoggerFactory;
13 14 import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -54,6 +55,9 @@ public class MediaServerServiceImpl implements IMediaServerService {
54 55 @Autowired
55 56 private SipConfig sipConfig;
56 57  
  58 + @Autowired
  59 + private ZLMRunner zlmRunner;
  60 +
57 61 @Value("${server.ssl.enabled:false}")
58 62 private boolean sslEnabled;
59 63  
... ... @@ -277,7 +281,13 @@ public class MediaServerServiceImpl implements IMediaServerService {
277 281 return null;
278 282 }
279 283 String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
280   - return (MediaServerItem)redisUtil.get(key);
  284 + MediaServerItem serverItem=(MediaServerItem)redisUtil.get(key);
  285 + if(null==serverItem){
  286 + //zlm服务不在线,启动重连
  287 + reloadZlm();
  288 + serverItem=(MediaServerItem)redisUtil.get(key);
  289 + }
  290 + return serverItem;
281 291 }
282 292  
283 293 @Override
... ... @@ -470,8 +480,13 @@ public class MediaServerServiceImpl implements IMediaServerService {
470 480 String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
471 481  
472 482 if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) {
473   - logger.info("获取负载最低的节点时无在线节点");
474   - return null;
  483 + logger.info("获取负载最低的节点时无在线节点,启动重连机制");
  484 + //启动重连
  485 + reloadZlm();
  486 + if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) {
  487 + logger.info("获取负载最低的节点时无在线节点");
  488 + return null;
  489 + }
475 490 }
476 491  
477 492 // 获取分数最低的,及并发最低的
... ... @@ -633,8 +648,14 @@ public class MediaServerServiceImpl implements IMediaServerService {
633 648 MediaServerItem mediaServerItem = getOne(mediaServerId);
634 649 if (mediaServerItem == null) {
635 650 // zlm连接重试
636   - logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息");
637   - return;
  651 + logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息,尝试重连zlm");
  652 + reloadZlm();
  653 + mediaServerItem = getOne(mediaServerId);
  654 + if (mediaServerItem == null) {
  655 + // zlm连接重试
  656 + logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息");
  657 + return;
  658 + }
638 659 }
639 660 String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
640 661 int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2;
... ... @@ -657,4 +678,12 @@ public class MediaServerServiceImpl implements IMediaServerService {
657 678 }
658 679 }
659 680  
  681 + public void reloadZlm(){
  682 + try {
  683 + zlmRunner.run();
  684 + Thread.sleep(500);//延迟0.5秒缓冲时间
  685 + } catch (Exception e) {
  686 + logger.warn("尝试重连zlm失败!",e);
  687 + }
  688 + }
660 689 }
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
... ... @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
4 4 import com.alibaba.fastjson.JSONArray;
5 5 import com.alibaba.fastjson.JSONObject;
6 6 import com.alibaba.fastjson.TypeReference;
  7 +import com.genersoft.iot.vmp.conf.MediaConfig;
7 8 import com.genersoft.iot.vmp.conf.UserSetting;
8 9 import com.genersoft.iot.vmp.gb28181.bean.*;
9 10 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
... ... @@ -78,6 +79,10 @@ public class StreamPushServiceImpl implements IStreamPushService {
78 79 @Autowired
79 80 TransactionDefinition transactionDefinition;
80 81  
  82 + @Autowired
  83 + private MediaConfig mediaConfig;
  84 +
  85 +
81 86 @Override
82 87 public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
83 88 if (jsonData == null) {
... ... @@ -142,6 +147,8 @@ public class StreamPushServiceImpl implements IStreamPushService {
142 147 stream.setStreamType("push");
143 148 stream.setStatus(true);
144 149 stream.setCreateTime(DateUtil.getNow());
  150 + stream.setStreamType("push");
  151 + stream.setMediaServerId(mediaConfig.getId());
145 152 int add = gbStreamMapper.add(stream);
146 153 return add > 0;
147 154 }
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java
... ... @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.conf.security.dto.LoginUser;
6 6 import com.genersoft.iot.vmp.media.zlm.dto.OnPublishHookParam;
7 7 import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
8 8 import com.genersoft.iot.vmp.service.IMediaServerService;
  9 +import com.genersoft.iot.vmp.service.IStreamProxyService;
9 10 import com.genersoft.iot.vmp.service.IStreamPushService;
10 11 import com.genersoft.iot.vmp.service.IMediaService;
11 12 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
... ... @@ -37,6 +38,8 @@ public class MediaController {
37 38  
38 39 @Autowired
39 40 private IMediaService mediaService;
  41 + @Autowired
  42 + private IStreamProxyService streamProxyService;
40 43  
41 44  
42 45 /**
... ... @@ -95,8 +98,30 @@ public class MediaController {
95 98 result.setMsg("scccess");
96 99 result.setData(streamInfo);
97 100 }else {
98   - result.setCode(-1);
99   - result.setMsg("fail");
  101 + //获取流失败,重启拉流后重试一次
  102 + streamProxyService.stop(app,stream);
  103 + boolean start = streamProxyService.start(app, stream);
  104 + try {
  105 + Thread.sleep(1000);
  106 + } catch (InterruptedException e) {
  107 + e.printStackTrace();
  108 + }
  109 + if (useSourceIpAsStreamIp != null && useSourceIpAsStreamIp) {
  110 + String host = request.getHeader("Host");
  111 + String localAddr = host.split(":")[0];
  112 + logger.info("使用{}作为返回流的ip", localAddr);
  113 + streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, localAddr, authority);
  114 + }else {
  115 + streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority);
  116 + }
  117 + if (streamInfo != null){
  118 + result.setCode(0);
  119 + result.setMsg("scccess");
  120 + result.setData(streamInfo);
  121 + }else {
  122 + result.setCode(-1);
  123 + result.setMsg("fail");
  124 + }
100 125 }
101 126 return result;
102 127 }
... ...