Commit e0344ccf9725fe3d22a90ab11257396071e7f55f

Authored by 648540858
1 parent c827d151

国标级联推送推流 支持多wvp间自动选择与推送

Showing 37 changed files with 1860 additions and 693 deletions
sql/update.sql
1   -alter table parent_platform
2   - add startOfflinePush int default 0 null;
  1 +alter table stream_push
  2 + add serverId varchar(50) not null;
3 3  
4   -alter table parent_platform
5   - add administrativeDivision varchar(50) not null;
6   -
7   -alter table parent_platform
8   - add catalogGroup int default 1 null;
9   -
10   -alter table device
11   - add ssrcCheck int default 0 null;
12 4  
... ...
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
... ... @@ -97,4 +97,5 @@ public class VideoManagerConstants {
97 97 //************************** 第三方 ****************************************
98 98 public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_";
99 99 public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_";
  100 +
100 101 }
... ...
src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java
... ... @@ -2,7 +2,9 @@ package com.genersoft.iot.vmp.conf;
2 2  
3 3 import com.genersoft.iot.vmp.common.VideoManagerConstants;
4 4 import com.genersoft.iot.vmp.service.impl.RedisAlarmMsgListener;
5   -import com.genersoft.iot.vmp.service.impl.RedisGPSMsgListener;
  5 +import com.genersoft.iot.vmp.service.impl.RedisGpsMsgListener;
  6 +import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
  7 +import com.genersoft.iot.vmp.service.impl.RedisStreamMsgListener;
6 8 import org.apache.commons.lang3.StringUtils;
7 9 import org.springframework.beans.factory.annotation.Autowired;
8 10 import org.springframework.beans.factory.annotation.Value;
... ... @@ -47,11 +49,17 @@ public class RedisConfig extends CachingConfigurerSupport {
47 49 private int poolMaxWait;
48 50  
49 51 @Autowired
50   - private RedisGPSMsgListener redisGPSMsgListener;
  52 + private RedisGpsMsgListener redisGPSMsgListener;
51 53  
52 54 @Autowired
53 55 private RedisAlarmMsgListener redisAlarmMsgListener;
54 56  
  57 + @Autowired
  58 + private RedisStreamMsgListener redisStreamMsgListener;
  59 +
  60 + @Autowired
  61 + private RedisGbPlayMsgListener redisGbPlayMsgListener;
  62 +
55 63 @Bean
56 64 public JedisPool jedisPool() {
57 65 if (StringUtils.isBlank(password)) {
... ... @@ -98,6 +106,8 @@ public class RedisConfig extends CachingConfigurerSupport {
98 106 container.setConnectionFactory(connectionFactory);
99 107 container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS));
100 108 container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE));
  109 + container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
  110 + container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
101 111 return container;
102 112 }
103 113  
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
... ... @@ -72,6 +72,11 @@ public class SendRtpItem {
72 72 private String mediaServerId;
73 73  
74 74 /**
  75 + * 使用的服务的ID
  76 + */
  77 + private String serverId;
  78 +
  79 + /**
75 80 * invite的callId
76 81 */
77 82 private String CallId;
... ... @@ -259,4 +264,12 @@ public class SendRtpItem {
259 264 public void setOnlyAudio(boolean onlyAudio) {
260 265 this.onlyAudio = onlyAudio;
261 266 }
  267 +
  268 + public String getServerId() {
  269 + return serverId;
  270 + }
  271 +
  272 + public void setServerId(String serverId) {
  273 + this.serverId = serverId;
  274 + }
262 275 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
... ... @@ -71,7 +71,9 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
71 71 String gbId = gbStream.getGbId();
72 72 GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
73 73 if (gpsMsgInfo != null) { // 无最新位置不发送
74   - logger.info("无最新位置不发送");
  74 + if (logger.isDebugEnabled()) {
  75 + logger.debug("无最新位置不发送");
  76 + }
75 77 // 经纬度都为0不发送
76 78 if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) {
77 79 continue;
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
... ... @@ -16,6 +16,8 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
16 16 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
17 17 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
18 18 import com.genersoft.iot.vmp.service.IMediaServerService;
  19 +import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
  20 +import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
19 21 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
20 22 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
21 23 import com.genersoft.iot.vmp.utils.SerializeUtils;
... ... @@ -43,7 +45,7 @@ import java.util.*;
43 45 public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
44 46  
45 47 private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class);
46   - private String method = "ACK";
  48 + private final String method = "ACK";
47 49  
48 50 @Autowired
49 51 private SIPProcessorObserver sipProcessorObserver;
... ... @@ -78,6 +80,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
78 80 @Autowired
79 81 private ISIPCommanderForPlatform commanderForPlatform;
80 82  
  83 + @Autowired
  84 + private RedisGbPlayMsgListener redisGbPlayMsgListener;
  85 +
81 86  
82 87 /**
83 88 * 处理 ACK请求
... ... @@ -114,78 +119,41 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
114 119 param.put("pt", sendRtpItem.getPt());
115 120 param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
116 121 param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
117   - JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
118   - if (jsonObject == null) {
119   - logger.error("RTP推流失败: 请检查ZLM服务");
120   - } else if (jsonObject.getInteger("code") == 0) {
121   - logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
122   - byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
123   - sendRtpItem.setDialog(dialogByteArray);
124   - byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
125   - sendRtpItem.setTransaction(transactionByteArray);
126   - redisCatchStorage.updateSendRTPSever(sendRtpItem);
127   - } else {
128   - logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param));
129   - if (sendRtpItem.isOnlyAudio()) {
130   - // TODO 可能是语音对讲
131   - }else {
132   - // 向上级平台
133   - commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
134   - }
  122 + if (mediaInfo == null) {
  123 + RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
  124 + sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(),
  125 + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
  126 + sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
  127 + redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, jsonObject->{
  128 + startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader);
  129 + });
  130 + }else {
  131 + JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
  132 + startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader);
135 133 }
136 134  
137 135  
138   -// if (streamInfo == null) { // 流还没上来,对方就回复ack
139   -// logger.info("监听流以等待流上线1 rtp/{}", sendRtpItem.getStreamId());
140   -// // 监听流上线
141   -// // 添加订阅
142   -// JSONObject subscribeKey = new JSONObject();
143   -// subscribeKey.put("app", "rtp");
144   -// subscribeKey.put("stream", sendRtpItem.getStreamId());
145   -// subscribeKey.put("regist", true);
146   -// subscribeKey.put("schema", "rtmp");
147   -// subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId());
148   -// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
149   -// (MediaServerItem mediaServerItemInUse, JSONObject json)->{
150   -// Map<String, Object> param = new HashMap<>();
151   -// param.put("vhost","__defaultVhost__");
152   -// param.put("app",json.getString("app"));
153   -// param.put("stream",json.getString("stream"));
154   -// param.put("ssrc", sendRtpItem.getSsrc());
155   -// param.put("dst_url",sendRtpItem.getIp());
156   -// param.put("dst_port", sendRtpItem.getPort());
157   -// param.put("is_udp", is_Udp);
158   -// param.put("src_port", sendRtpItem.getLocalPort());
159   -// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
160   -// });
161   -// }else {
162   -// Map<String, Object> param = new HashMap<>();
163   -// param.put("vhost","__defaultVhost__");
164   -// param.put("app",streamInfo.getApp());
165   -// param.put("stream",streamInfo.getStream());
166   -// param.put("ssrc", sendRtpItem.getSsrc());
167   -// param.put("dst_url",sendRtpItem.getIp());
168   -// param.put("dst_port", sendRtpItem.getPort());
169   -// param.put("is_udp", is_Udp);
170   -// param.put("src_port", sendRtpItem.getLocalPort());
171   -//
172   -// JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
173   -// if (jsonObject.getInteger("code") != 0) {
174   -// logger.info("监听流以等待流上线2 {}/{}", streamInfo.getApp(), streamInfo.getStream());
175   -// // 监听流上线
176   -// // 添加订阅
177   -// JSONObject subscribeKey = new JSONObject();
178   -// subscribeKey.put("app", "rtp");
179   -// subscribeKey.put("stream", streamInfo.getStream());
180   -// subscribeKey.put("regist", true);
181   -// subscribeKey.put("schema", "rtmp");
182   -// subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId());
183   -// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
184   -// (MediaServerItem mediaServerItemInUse, JSONObject json)->{
185   -// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
186   -// });
187   -// }
188   -// }
  136 + }
  137 + }
  138 + private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
  139 + JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
  140 + if (jsonObject == null) {
  141 + logger.error("RTP推流失败: 请检查ZLM服务");
  142 + } else if (jsonObject.getInteger("code") == 0) {
  143 + logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
  144 + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
  145 + sendRtpItem.setDialog(dialogByteArray);
  146 + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
  147 + sendRtpItem.setTransaction(transactionByteArray);
  148 + redisCatchStorage.updateSendRTPSever(sendRtpItem);
  149 + } else {
  150 + logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param));
  151 + if (sendRtpItem.isOnlyAudio()) {
  152 + // TODO 可能是语音对讲
  153 + }else {
  154 + // 向上级平台
  155 + commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
  156 + }
189 157 }
190 158 }
191 159 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
... ... @@ -107,13 +107,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
107 107 cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null);
108 108 }
109 109 if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
110   - MessageForPushChannel messageForPushChannel = new MessageForPushChannel();
111   - messageForPushChannel.setType(0);
112   - messageForPushChannel.setGbId(sendRtpItem.getChannelId());
113   - messageForPushChannel.setApp(sendRtpItem.getApp());
114   - messageForPushChannel.setStream(sendRtpItem.getStreamId());
115   - messageForPushChannel.setMediaServerId(sendRtpItem.getMediaServerId());
116   - messageForPushChannel.setPlatFormId(sendRtpItem.getPlatformId());
  110 + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
  111 + sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
  112 + sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId());
117 113 redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
118 114 }
119 115 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java
... ... @@ -15,7 +15,7 @@ import javax.sip.RequestEvent;
15 15 @Component
16 16 public class CancelRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
17 17  
18   - private String method = "CANCEL";
  18 + private final String method = "CANCEL";
19 19  
20 20 @Autowired
21 21 private SIPProcessorObserver sipProcessorObserver;
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
... ... @@ -17,10 +17,13 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
17 17 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
18 18 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
19 19 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  20 +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
20 21 import com.genersoft.iot.vmp.service.IMediaServerService;
21 22 import com.genersoft.iot.vmp.service.IPlayService;
  23 +import com.genersoft.iot.vmp.service.IStreamPushService;
22 24 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
23 25 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
  26 +import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
24 27 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
25 28 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
26 29 import com.genersoft.iot.vmp.utils.DateUtil;
... ... @@ -50,562 +53,709 @@ import java.util.Vector;
50 53 @Component
51 54 public class InviteRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
52 55  
53   - private final static Logger logger = LoggerFactory.getLogger(InviteRequestProcessor.class);
54   -
55   - private String method = "INVITE";
56   -
57   - @Autowired
58   - private SIPCommanderFroPlatform cmderFroPlatform;
59   -
60   - @Autowired
61   - private IVideoManagerStorage storager;
62   -
63   - @Autowired
64   - private IRedisCatchStorage redisCatchStorage;
65   -
66   - @Autowired
67   - private DynamicTask dynamicTask;
68   -
69   - @Autowired
70   - private SIPCommander cmder;
71   -
72   - @Autowired
73   - private IPlayService playService;
74   -
75   - @Autowired
76   - private ISIPCommander commander;
77   -
78   - @Autowired
79   - private ZLMRTPServerFactory zlmrtpServerFactory;
80   -
81   - @Autowired
82   - private IMediaServerService mediaServerService;
83   -
84   - @Autowired
85   - private SIPProcessorObserver sipProcessorObserver;
86   -
87   - @Autowired
88   - private VideoStreamSessionManager sessionManager;
89   -
90   - @Autowired
91   - private UserSetting userSetting;
92   -
93   - @Autowired
94   - private ZLMMediaListManager mediaListManager;
95   -
96   -
97   - @Override
98   - public void afterPropertiesSet() throws Exception {
99   - // 添加消息处理的订阅
100   - sipProcessorObserver.addRequestProcessor(method, this);
101   - }
102   -
103   - /**
104   - * 处理invite请求
105   - *
106   - * @param evt
107   - * 请求消息
108   - */
109   - @Override
110   - public void process(RequestEvent evt) {
111   - // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令
112   - try {
113   - Request request = evt.getRequest();
114   - SipURI sipURI = (SipURI) request.getRequestURI();
115   - //从subject读取channelId,不再从request-line读取。 有些平台request-line是平台国标编码,不是设备国标编码。
116   - //String channelId = sipURI.getUser();
117   - String channelId = SipUtils.getChannelIdFromHeader(request);
118   - String requesterId = SipUtils.getUserIdFromFromHeader(request);
119   - CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
120   - if (requesterId == null || channelId == null) {
121   - logger.info("无法从FromHeader的Address中获取到平台id,返回400");
122   - responseAck(evt, Response.BAD_REQUEST); // 参数不全, 发400,请求错误
123   - return;
124   - }
125   -
126   - // 查询请求是否来自上级平台\设备
127   - ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId);
128   - if (platform == null) {
129   - inviteFromDeviceHandle(evt, requesterId);
130   - }else {
131   - // 查询平台下是否有该通道
132   - DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
133   - GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId);
134   - PlatformCatalog catalog = storager.getCatalog(channelId);
135   - MediaServerItem mediaServerItem = null;
136   - // 不是通道可能是直播流
137   - if (channel != null && gbStream == null ) {
138   - if (channel.getStatus() == 0) {
139   - logger.info("通道离线,返回400");
140   - responseAck(evt, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline");
141   - return;
142   - }
143   - responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
144   - }else if(channel == null && gbStream != null){
145   - String mediaServerId = gbStream.getMediaServerId();
146   - mediaServerItem = mediaServerService.getOne(mediaServerId);
147   - if (mediaServerItem == null) {
148   - logger.info("[ app={}, stream={} ]找不到zlm {},返回410",gbStream.getApp(), gbStream.getStream(), mediaServerId);
149   - responseAck(evt, Response.GONE);
150   - return;
151   - }
152   - responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
153   - }else if (catalog != null) {
154   - responseAck(evt, Response.BAD_REQUEST, "catalog channel can not play"); // 目录不支持点播
155   - return;
156   - } else {
157   - logger.info("通道不存在,返回404");
158   - responseAck(evt, Response.NOT_FOUND); // 通道不存在,发404,资源不存在
159   - return;
160   - }
161   - // 解析sdp消息, 使用jainsip 自带的sdp解析方式
162   - String contentString = new String(request.getRawContent());
163   -
164   - // jainSip不支持y=字段, 移除以解析。
165   - int ssrcIndex = contentString.indexOf("y=");
166   - // 检查是否有y字段
167   - String ssrcDefault = "0000000000";
168   - String ssrc;
169   - SessionDescription sdp;
170   - if (ssrcIndex >= 0) {
171   - //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段
172   - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
173   - String substring = contentString.substring(0, contentString.indexOf("y="));
174   - sdp = SdpFactory.getInstance().createSessionDescription(substring);
175   - }else {
176   - ssrc = ssrcDefault;
177   - sdp = SdpFactory.getInstance().createSessionDescription(contentString);
178   - }
179   - String sessionName = sdp.getSessionName().getValue();
180   -
181   - Long startTime = null;
182   - Long stopTime = null;
183   - Instant start = null;
184   - Instant end = null;
185   - if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) {
186   - TimeDescriptionImpl timeDescription = (TimeDescriptionImpl)(sdp.getTimeDescriptions(false).get(0));
187   - TimeField startTimeFiled = (TimeField)timeDescription.getTime();
188   - startTime = startTimeFiled.getStartTime();
189   - stopTime = startTimeFiled.getStopTime();
190   -
191   - start = Instant.ofEpochSecond(startTime);
192   - end = Instant.ofEpochSecond(stopTime);
193   - }
194   - // 获取支持的格式
195   - Vector mediaDescriptions = sdp.getMediaDescriptions(true);
196   - // 查看是否支持PS 负载96
197   - //String ip = null;
198   - int port = -1;
199   - boolean mediaTransmissionTCP = false;
200   - Boolean tcpActive = null;
201   - for (Object description : mediaDescriptions) {
202   - MediaDescription mediaDescription = (MediaDescription) description;
203   - Media media = mediaDescription.getMedia();
204   -
205   - Vector mediaFormats = media.getMediaFormats(false);
206   - if (mediaFormats.contains("96")) {
207   - port = media.getMediaPort();
208   - //String mediaType = media.getMediaType();
209   - String protocol = media.getProtocol();
210   -
211   - // 区分TCP发流还是udp, 当前默认udp
212   - if ("TCP/RTP/AVP".equals(protocol)) {
213   - String setup = mediaDescription.getAttribute("setup");
214   - if (setup != null) {
215   - mediaTransmissionTCP = true;
216   - if ("active".equals(setup)) {
217   - tcpActive = true;
218   - // 不支持tcp主动
219   - responseAck(evt, Response.NOT_IMPLEMENTED, "tcp active not support"); // 目录不支持点播
220   - return;
221   - } else if ("passive".equals(setup)) {
222   - tcpActive = false;
223   - }
224   - }
225   - }
226   - break;
227   - }
228   - }
229   - if (port == -1) {
230   - logger.info("不支持的媒体格式,返回415");
231   - // 回复不支持的格式
232   - responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415
233   - return;
234   - }
235   - String username = sdp.getOrigin().getUsername();
236   - String addressStr = sdp.getOrigin().getAddress();
237   -
238   - logger.info("[上级点播]用户:{}, 通道:{}, 地址:{}:{}, ssrc:{}", username, channelId, addressStr, port, ssrc);
239   - Device device = null;
240   - // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
241   - if (channel != null) {
242   - device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId);
243   - if (device == null) {
244   - logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel);
245   - responseAck(evt, Response.SERVER_INTERNAL_ERROR);
246   - return;
247   - }
248   - mediaServerItem = playService.getNewMediaServerItem(device);
249   - if (mediaServerItem == null) {
250   - logger.warn("未找到可用的zlm");
251   - responseAck(evt, Response.BUSY_HERE);
252   - return;
253   - }
254   - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
255   - device.getDeviceId(), channelId,
256   - mediaTransmissionTCP);
257   - if (tcpActive != null) {
258   - sendRtpItem.setTcpActive(tcpActive);
259   - }
260   - if (sendRtpItem == null) {
261   - logger.warn("服务器端口资源不足");
262   - responseAck(evt, Response.BUSY_HERE);
263   - return;
264   - }
265   - sendRtpItem.setCallId(callIdHeader.getCallId());
266   - sendRtpItem.setPlayType("Play".equals(sessionName)?InviteStreamType.PLAY:InviteStreamType.PLAYBACK);
267   -
268   - Long finalStartTime = startTime;
269   - Long finalStopTime = stopTime;
270   - ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{
271   - String app = responseJSON.getString("app");
272   - String stream = responseJSON.getString("stream");
273   - logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", app, stream);
274   - // * 0 等待设备推流上来
275   - // * 1 下级已经推流,等待上级平台回复ack
276   - // * 2 推流中
277   - sendRtpItem.setStatus(1);
278   - redisCatchStorage.updateSendRTPSever(sendRtpItem);
279   -
280   - StringBuffer content = new StringBuffer(200);
281   - content.append("v=0\r\n");
282   - content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n");
283   - content.append("s=" + sessionName+"\r\n");
284   - content.append("c=IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n");
285   - if ("Playback".equals(sessionName)) {
286   - content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n");
287   - }else {
288   - content.append("t=0 0\r\n");
289   - }
290   - content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n");
291   - content.append("a=sendonly\r\n");
292   - content.append("a=rtpmap:96 PS/90000\r\n");
293   - content.append("y="+ ssrc + "\r\n");
294   - content.append("f=\r\n");
295   -
296   - try {
297   - // 超时未收到Ack应该回复bye,当前等待时间为10秒
298   - dynamicTask.startDelay(callIdHeader.getCallId(), ()->{
299   - logger.info("Ack 等待超时");
300   - mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), ssrc);
301   - // 回复bye
302   - cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
303   - }, 60*1000);
304   - responseSdpAck(evt, content.toString(), platform);
305   -
306   - } catch (SipException e) {
307   - e.printStackTrace();
308   - } catch (InvalidArgumentException e) {
309   - e.printStackTrace();
310   - } catch (ParseException e) {
311   - e.printStackTrace();
312   - }
313   - };
314   - SipSubscribe.Event errorEvent = ((event) -> {
315   - // 未知错误。直接转发设备点播的错误
316   - Response response = null;
317   - try {
318   - response = getMessageFactory().createResponse(event.statusCode, evt.getRequest());
319   - ServerTransaction serverTransaction = getServerTransaction(evt);
320   - serverTransaction.sendResponse(response);
321   - if (serverTransaction.getDialog() != null) {
322   - serverTransaction.getDialog().delete();
323   - }
324   - } catch (ParseException | SipException | InvalidArgumentException e) {
325   - e.printStackTrace();
326   - }
327   - });
328   - sendRtpItem.setApp("rtp");
329   - if ("Playback".equals(sessionName)) {
330   - sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
331   - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true, true);
332   - sendRtpItem.setStreamId(ssrcInfo.getStream());
333   - // 写入redis, 超时时回复
334   - redisCatchStorage.updateSendRTPSever(sendRtpItem);
335   - playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
336   - DateUtil.formatter.format(end), null, result -> {
337   - if (result.getCode() != 0){
338   - logger.warn("录像回放失败");
339   - if (result.getEvent() != null) {
340   - errorEvent.response(result.getEvent());
341   - }
342   - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
343   - try {
344   - responseAck(evt, Response.REQUEST_TIMEOUT);
345   - } catch (SipException e) {
346   - e.printStackTrace();
347   - } catch (InvalidArgumentException e) {
348   - e.printStackTrace();
349   - } catch (ParseException e) {
350   - e.printStackTrace();
351   - }
352   - }else {
353   - if (result.getMediaServerItem() != null) {
354   - hookEvent.response(result.getMediaServerItem(), result.getResponse());
355   - }
356   - }
357   - });
358   - }else {
359   - sendRtpItem.setPlayType(InviteStreamType.PLAY);
360   - SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
361   - if (playTransaction != null) {
362   - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream());
363   - if (!streamReady) {
364   - playTransaction = null;
365   - }
366   - }
367   - if (playTransaction == null) {
368   - String streamId = null;
369   - if (mediaServerItem.isRtpEnable()) {
370   - streamId = String.format("%s_%s", device.getDeviceId(), channelId);
371   - }
372   - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false);
373   - sendRtpItem.setStreamId(ssrcInfo.getStream());
374   - // 写入redis, 超时时回复
375   - redisCatchStorage.updateSendRTPSever(sendRtpItem);
376   - playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg)->{
377   - logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId);
378   - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
379   - }, null);
380   - }else {
381   - sendRtpItem.setStreamId(playTransaction.getStream());
382   - // 写入redis, 超时时回复
383   - redisCatchStorage.updateSendRTPSever(sendRtpItem);
384   - JSONObject jsonObject = new JSONObject();
385   - jsonObject.put("app", sendRtpItem.getApp());
386   - jsonObject.put("stream", sendRtpItem.getStreamId());
387   - hookEvent.response(mediaServerItem, jsonObject);
388   - }
389   - }
390   - }else if (gbStream != null) {
391   -
392   - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
393   - if (!streamReady ) {
394   - if ("proxy".equals(gbStream.getStreamType())) {
395   - // TODO 控制启用以使设备上线
396   - logger.info("[ app={}, stream={} ]通道离线,启用流后开始推流",gbStream.getApp(), gbStream.getStream());
397   - responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
398   - }else if ("push".equals(gbStream.getStreamType())) {
399   - if (!platform.isStartOfflinePush()) {
400   - responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel unavailable");
401   - return;
402   - }
403   - // 发送redis消息以使设备上线
404   - logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流",gbStream.getApp(), gbStream.getStream());
405   - MessageForPushChannel messageForPushChannel = new MessageForPushChannel();
406   - messageForPushChannel.setType(1);
407   - messageForPushChannel.setGbId(gbStream.getGbId());
408   - messageForPushChannel.setApp(gbStream.getApp());
409   - messageForPushChannel.setStream(gbStream.getStream());
410   - // TODO 获取低负载的节点
411   - messageForPushChannel.setMediaServerId(gbStream.getMediaServerId());
412   - messageForPushChannel.setPlatFormId(platform.getServerGBId());
413   - messageForPushChannel.setPlatFormName(platform.getName());
414   - redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
415   - // 设置超时
416   - dynamicTask.startDelay(callIdHeader.getCallId(), ()->{
417   - logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream());
418   - try {
419   - mediaListManager.removedChannelOnlineEventLister(gbStream.getGbId());
420   - responseAck(evt, Response.REQUEST_TIMEOUT); // 超时
421   - } catch (SipException e) {
422   - e.printStackTrace();
423   - } catch (InvalidArgumentException e) {
424   - e.printStackTrace();
425   - } catch (ParseException e) {
426   - e.printStackTrace();
427   - }
428   - }, userSetting.getPlatformPlayTimeout());
429   - // 添加监听
430   - MediaServerItem finalMediaServerItem = mediaServerItem;
431   - int finalPort = port;
432   - boolean finalMediaTransmissionTCP = mediaTransmissionTCP;
433   - Boolean finalTcpActive = tcpActive;
434   - mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream)->{
435   - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(finalMediaServerItem, addressStr, finalPort, ssrc, requesterId,
436   - app, stream, channelId, finalMediaTransmissionTCP);
437   -
438   - if (sendRtpItem == null) {
439   - logger.warn("服务器端口资源不足");
440   - try {
441   - responseAck(evt, Response.BUSY_HERE);
442   - } catch (SipException e) {
443   - e.printStackTrace();
444   - } catch (InvalidArgumentException e) {
445   - e.printStackTrace();
446   - } catch (ParseException e) {
447   - e.printStackTrace();
448   - }
449   - return;
450   - }
451   - if (finalTcpActive != null) {
452   - sendRtpItem.setTcpActive(finalTcpActive);
453   - }
454   - sendRtpItem.setPlayType(InviteStreamType.PUSH);
455   - // 写入redis, 超时时回复
456   - sendRtpItem.setStatus(1);
457   - sendRtpItem.setCallId(callIdHeader.getCallId());
458   - byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
459   - sendRtpItem.setDialog(dialogByteArray);
460   - byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
461   - sendRtpItem.setTransaction(transactionByteArray);
462   - redisCatchStorage.updateSendRTPSever(sendRtpItem);
463   - sendStreamAck(finalMediaServerItem, sendRtpItem, platform, evt);
464   -
465   - });
466   - }
467   - }else {
468   - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
469   - gbStream.getApp(), gbStream.getStream(), channelId,
470   - mediaTransmissionTCP);
471   -
472   -
473   - if (sendRtpItem == null) {
474   - logger.warn("服务器端口资源不足");
475   - responseAck(evt, Response.BUSY_HERE);
476   - return;
477   - }
478   - if (tcpActive != null) {
479   - sendRtpItem.setTcpActive(tcpActive);
480   - }
481   - sendRtpItem.setPlayType(InviteStreamType.PUSH);
482   - // 写入redis, 超时时回复
483   - sendRtpItem.setStatus(1);
484   - sendRtpItem.setCallId(callIdHeader.getCallId());
485   - byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
486   - sendRtpItem.setDialog(dialogByteArray);
487   - byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
488   - sendRtpItem.setTransaction(transactionByteArray);
489   - redisCatchStorage.updateSendRTPSever(sendRtpItem);
490   - sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
491   - }
492   -
493   -
494   - }
495   -
496   - }
497   -
498   - } catch (SipException | InvalidArgumentException | ParseException e) {
499   - e.printStackTrace();
500   - logger.warn("sdp解析错误");
501   - e.printStackTrace();
502   - } catch (SdpParseException e) {
503   - e.printStackTrace();
504   - } catch (SdpException e) {
505   - e.printStackTrace();
506   - }
507   - }
508   -
509   - public void sendStreamAck(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt){
510   -
511   - StringBuffer content = new StringBuffer(200);
512   - content.append("v=0\r\n");
513   - content.append("o="+ sendRtpItem.getChannelId() +" 0 0 IN IP4 "+ mediaServerItem.getSdpIp()+"\r\n");
514   - content.append("s=Play\r\n");
515   - content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
516   - content.append("t=0 0\r\n");
517   - content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n");
518   - content.append("a=sendonly\r\n");
519   - content.append("a=rtpmap:96 PS/90000\r\n");
520   - if (sendRtpItem.isTcp()) {
521   - content.append("a=connection:new\r\n");
522   - if (!sendRtpItem.isTcpActive()) {
523   - content.append("a=setup:active\r\n");
524   - }else {
525   - content.append("a=setup:passive\r\n");
526   - }
527   - }
528   - content.append("y="+ sendRtpItem.getSsrc() + "\r\n");
529   - content.append("f=\r\n");
530   -
531   - try {
532   - responseSdpAck(evt, content.toString(), platform);
533   - } catch (SipException e) {
534   - e.printStackTrace();
535   - } catch (InvalidArgumentException e) {
536   - e.printStackTrace();
537   - } catch (ParseException e) {
538   - e.printStackTrace();
539   - }
540   - }
541   -
542   - public void inviteFromDeviceHandle(RequestEvent evt, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException {
543   -
544   - // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
545   - Device device = redisCatchStorage.getDevice(requesterId);
546   - Request request = evt.getRequest();
547   - if (device != null) {
548   - logger.info("收到设备" + requesterId + "的语音广播Invite请求");
549   - responseAck(evt, Response.TRYING);
550   -
551   - String contentString = new String(request.getRawContent());
552   - // jainSip不支持y=字段, 移除移除以解析。
553   - String substring = contentString;
554   - String ssrc = "0000000404";
555   - int ssrcIndex = contentString.indexOf("y=");
556   - if (ssrcIndex > 0) {
557   - substring = contentString.substring(0, ssrcIndex);
558   - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
559   - }
560   - ssrcIndex = substring.indexOf("f=");
561   - if (ssrcIndex > 0) {
562   - substring = contentString.substring(0, ssrcIndex);
563   - }
564   - SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
565   -
566   - // 获取支持的格式
567   - Vector mediaDescriptions = sdp.getMediaDescriptions(true);
568   - // 查看是否支持PS 负载96
569   - int port = -1;
570   - //boolean recvonly = false;
571   - boolean mediaTransmissionTCP = false;
572   - Boolean tcpActive = null;
573   - for (int i = 0; i < mediaDescriptions.size(); i++) {
574   - MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i);
575   - Media media = mediaDescription.getMedia();
576   -
577   - Vector mediaFormats = media.getMediaFormats(false);
578   - if (mediaFormats.contains("8")) {
579   - port = media.getMediaPort();
580   - String protocol = media.getProtocol();
581   - // 区分TCP发流还是udp, 当前默认udp
582   - if ("TCP/RTP/AVP".equals(protocol)) {
583   - String setup = mediaDescription.getAttribute("setup");
584   - if (setup != null) {
585   - mediaTransmissionTCP = true;
586   - if ("active".equals(setup)) {
587   - tcpActive = true;
588   - } else if ("passive".equals(setup)) {
589   - tcpActive = false;
590   - }
591   - }
592   - }
593   - break;
594   - }
595   - }
596   - if (port == -1) {
597   - logger.info("不支持的媒体格式,返回415");
598   - // 回复不支持的格式
599   - responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415
600   - return;
601   - }
602   - String username = sdp.getOrigin().getUsername();
603   - String addressStr = sdp.getOrigin().getAddress();
604   - logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc);
605   -
606   - } else {
607   - logger.warn("来自无效设备/平台的请求");
608   - responseAck(evt, Response.BAD_REQUEST);
609   - }
610   - }
  56 + private final static Logger logger = LoggerFactory.getLogger(InviteRequestProcessor.class);
  57 +
  58 + private final String method = "INVITE";
  59 +
  60 + @Autowired
  61 + private SIPCommanderFroPlatform cmderFroPlatform;
  62 +
  63 + @Autowired
  64 + private IVideoManagerStorage storager;
  65 +
  66 + @Autowired
  67 + private IStreamPushService streamPushService;
  68 +
  69 + @Autowired
  70 + private IRedisCatchStorage redisCatchStorage;
  71 +
  72 + @Autowired
  73 + private DynamicTask dynamicTask;
  74 +
  75 + @Autowired
  76 + private SIPCommander cmder;
  77 +
  78 + @Autowired
  79 + private IPlayService playService;
  80 +
  81 + @Autowired
  82 + private ISIPCommander commander;
  83 +
  84 + @Autowired
  85 + private ZLMRTPServerFactory zlmrtpServerFactory;
  86 +
  87 + @Autowired
  88 + private IMediaServerService mediaServerService;
  89 +
  90 + @Autowired
  91 + private SIPProcessorObserver sipProcessorObserver;
  92 +
  93 + @Autowired
  94 + private VideoStreamSessionManager sessionManager;
  95 +
  96 + @Autowired
  97 + private UserSetting userSetting;
  98 +
  99 + @Autowired
  100 + private ZLMMediaListManager mediaListManager;
  101 +
  102 +
  103 + @Autowired
  104 + private RedisGbPlayMsgListener redisGbPlayMsgListener;
  105 +
  106 +
  107 + @Override
  108 + public void afterPropertiesSet() throws Exception {
  109 + // 添加消息处理的订阅
  110 + sipProcessorObserver.addRequestProcessor(method, this);
  111 + }
  112 +
  113 + /**
  114 + * 处理invite请求
  115 + *
  116 + * @param evt 请求消息
  117 + */
  118 + @Override
  119 + public void process(RequestEvent evt) {
  120 + // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令
  121 + try {
  122 + Request request = evt.getRequest();
  123 + SipURI sipUri = (SipURI) request.getRequestURI();
  124 + //从subject读取channelId,不再从request-line读取。 有些平台request-line是平台国标编码,不是设备国标编码。
  125 + //String channelId = sipURI.getUser();
  126 + String channelId = SipUtils.getChannelIdFromHeader(request);
  127 + String requesterId = SipUtils.getUserIdFromFromHeader(request);
  128 + CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
  129 + if (requesterId == null || channelId == null) {
  130 + logger.info("无法从FromHeader的Address中获取到平台id,返回400");
  131 + // 参数不全, 发400,请求错误
  132 + responseAck(evt, Response.BAD_REQUEST);
  133 + return;
  134 + }
  135 +
  136 + // 查询请求是否来自上级平台\设备
  137 + ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId);
  138 + if (platform == null) {
  139 + inviteFromDeviceHandle(evt, requesterId);
  140 + } else {
  141 + // 查询平台下是否有该通道
  142 + DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
  143 + GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId);
  144 + PlatformCatalog catalog = storager.getCatalog(channelId);
  145 +
  146 + MediaServerItem mediaServerItem = null;
  147 + StreamPushItem streamPushItem = null;
  148 + // 不是通道可能是直播流
  149 + if (channel != null && gbStream == null) {
  150 + if (channel.getStatus() == 0) {
  151 + logger.info("通道离线,返回400");
  152 + responseAck(evt, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline");
  153 + return;
  154 + }
  155 + responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
  156 + } else if (channel == null && gbStream != null) {
  157 +
  158 + String mediaServerId = gbStream.getMediaServerId();
  159 + mediaServerItem = mediaServerService.getOne(mediaServerId);
  160 + if (mediaServerItem == null) {
  161 + if ("proxy".equals(gbStream.getStreamType())) {
  162 + logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
  163 + responseAck(evt, Response.GONE);
  164 + return;
  165 + } else {
  166 + streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
  167 + if (streamPushItem == null || streamPushItem.getServerId().equals(userSetting.getServerId())) {
  168 + logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
  169 + responseAck(evt, Response.GONE);
  170 + return;
  171 + }
  172 + }
  173 + } else {
  174 + if ("push".equals(gbStream.getStreamType())) {
  175 + streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
  176 + if (streamPushItem == null) {
  177 + logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
  178 + responseAck(evt, Response.GONE);
  179 + return;
  180 + }
  181 + }
  182 + }
  183 + responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
  184 + } else if (catalog != null) {
  185 + responseAck(evt, Response.BAD_REQUEST, "catalog channel can not play"); // 目录不支持点播
  186 + return;
  187 + } else {
  188 + logger.info("通道不存在,返回404");
  189 + responseAck(evt, Response.NOT_FOUND); // 通道不存在,发404,资源不存在
  190 + return;
  191 + }
  192 + // 解析sdp消息, 使用jainsip 自带的sdp解析方式
  193 + String contentString = new String(request.getRawContent());
  194 +
  195 + // jainSip不支持y=字段, 移除以解析。
  196 + int ssrcIndex = contentString.indexOf("y=");
  197 + // 检查是否有y字段
  198 + String ssrcDefault = "0000000000";
  199 + String ssrc;
  200 + SessionDescription sdp;
  201 + if (ssrcIndex >= 0) {
  202 + //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段
  203 + ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
  204 + String substring = contentString.substring(0, contentString.indexOf("y="));
  205 + sdp = SdpFactory.getInstance().createSessionDescription(substring);
  206 + } else {
  207 + ssrc = ssrcDefault;
  208 + sdp = SdpFactory.getInstance().createSessionDescription(contentString);
  209 + }
  210 + String sessionName = sdp.getSessionName().getValue();
  211 +
  212 + Long startTime = null;
  213 + Long stopTime = null;
  214 + Instant start = null;
  215 + Instant end = null;
  216 + if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) {
  217 + TimeDescriptionImpl timeDescription = (TimeDescriptionImpl) (sdp.getTimeDescriptions(false).get(0));
  218 + TimeField startTimeFiled = (TimeField) timeDescription.getTime();
  219 + startTime = startTimeFiled.getStartTime();
  220 + stopTime = startTimeFiled.getStopTime();
  221 +
  222 + start = Instant.ofEpochSecond(startTime);
  223 + end = Instant.ofEpochSecond(stopTime);
  224 + }
  225 + // 获取支持的格式
  226 + Vector mediaDescriptions = sdp.getMediaDescriptions(true);
  227 + // 查看是否支持PS 负载96
  228 + //String ip = null;
  229 + int port = -1;
  230 + boolean mediaTransmissionTCP = false;
  231 + Boolean tcpActive = null;
  232 + for (Object description : mediaDescriptions) {
  233 + MediaDescription mediaDescription = (MediaDescription) description;
  234 + Media media = mediaDescription.getMedia();
  235 +
  236 + Vector mediaFormats = media.getMediaFormats(false);
  237 + if (mediaFormats.contains("96")) {
  238 + port = media.getMediaPort();
  239 + //String mediaType = media.getMediaType();
  240 + String protocol = media.getProtocol();
  241 +
  242 + // 区分TCP发流还是udp, 当前默认udp
  243 + if ("TCP/RTP/AVP".equals(protocol)) {
  244 + String setup = mediaDescription.getAttribute("setup");
  245 + if (setup != null) {
  246 + mediaTransmissionTCP = true;
  247 + if ("active".equals(setup)) {
  248 + tcpActive = true;
  249 + // 不支持tcp主动
  250 + responseAck(evt, Response.NOT_IMPLEMENTED, "tcp active not support"); // 目录不支持点播
  251 + return;
  252 + } else if ("passive".equals(setup)) {
  253 + tcpActive = false;
  254 + }
  255 + }
  256 + }
  257 + break;
  258 + }
  259 + }
  260 + if (port == -1) {
  261 + logger.info("不支持的媒体格式,返回415");
  262 + // 回复不支持的格式
  263 + responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415
  264 + return;
  265 + }
  266 + String username = sdp.getOrigin().getUsername();
  267 + String addressStr = sdp.getOrigin().getAddress();
  268 +
  269 + logger.info("[上级点播]用户:{}, 通道:{}, 地址:{}:{}, ssrc:{}", username, channelId, addressStr, port, ssrc);
  270 + Device device = null;
  271 + // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
  272 + if (channel != null) {
  273 + device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId);
  274 + if (device == null) {
  275 + logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel);
  276 + responseAck(evt, Response.SERVER_INTERNAL_ERROR);
  277 + return;
  278 + }
  279 + mediaServerItem = playService.getNewMediaServerItem(device);
  280 + if (mediaServerItem == null) {
  281 + logger.warn("未找到可用的zlm");
  282 + responseAck(evt, Response.BUSY_HERE);
  283 + return;
  284 + }
  285 + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
  286 + device.getDeviceId(), channelId,
  287 + mediaTransmissionTCP);
  288 + if (tcpActive != null) {
  289 + sendRtpItem.setTcpActive(tcpActive);
  290 + }
  291 + if (sendRtpItem == null) {
  292 + logger.warn("服务器端口资源不足");
  293 + responseAck(evt, Response.BUSY_HERE);
  294 + return;
  295 + }
  296 + sendRtpItem.setCallId(callIdHeader.getCallId());
  297 + sendRtpItem.setPlayType("Play".equals(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK);
  298 +
  299 + Long finalStartTime = startTime;
  300 + Long finalStopTime = stopTime;
  301 + ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> {
  302 + String app = responseJSON.getString("app");
  303 + String stream = responseJSON.getString("stream");
  304 + logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", app, stream);
  305 + // * 0 等待设备推流上来
  306 + // * 1 下级已经推流,等待上级平台回复ack
  307 + // * 2 推流中
  308 + sendRtpItem.setStatus(1);
  309 + redisCatchStorage.updateSendRTPSever(sendRtpItem);
  310 +
  311 + StringBuffer content = new StringBuffer(200);
  312 + content.append("v=0\r\n");
  313 + content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n");
  314 + content.append("s=" + sessionName + "\r\n");
  315 + content.append("c=IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n");
  316 + if ("Playback".equals(sessionName)) {
  317 + content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n");
  318 + } else {
  319 + content.append("t=0 0\r\n");
  320 + }
  321 + content.append("m=video " + sendRtpItem.getLocalPort() + " RTP/AVP 96\r\n");
  322 + content.append("a=sendonly\r\n");
  323 + content.append("a=rtpmap:96 PS/90000\r\n");
  324 + content.append("y=" + ssrc + "\r\n");
  325 + content.append("f=\r\n");
  326 +
  327 + try {
  328 + // 超时未收到Ack应该回复bye,当前等待时间为10秒
  329 + dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
  330 + logger.info("Ack 等待超时");
  331 + mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), ssrc);
  332 + // 回复bye
  333 + cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
  334 + }, 60 * 1000);
  335 + responseSdpAck(evt, content.toString(), platform);
  336 +
  337 + } catch (SipException e) {
  338 + e.printStackTrace();
  339 + } catch (InvalidArgumentException e) {
  340 + e.printStackTrace();
  341 + } catch (ParseException e) {
  342 + e.printStackTrace();
  343 + }
  344 + };
  345 + SipSubscribe.Event errorEvent = ((event) -> {
  346 + // 未知错误。直接转发设备点播的错误
  347 + Response response = null;
  348 + try {
  349 + response = getMessageFactory().createResponse(event.statusCode, evt.getRequest());
  350 + ServerTransaction serverTransaction = getServerTransaction(evt);
  351 + serverTransaction.sendResponse(response);
  352 + if (serverTransaction.getDialog() != null) {
  353 + serverTransaction.getDialog().delete();
  354 + }
  355 + } catch (ParseException | SipException | InvalidArgumentException e) {
  356 + e.printStackTrace();
  357 + }
  358 + });
  359 + sendRtpItem.setApp("rtp");
  360 + if ("Playback".equals(sessionName)) {
  361 + sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
  362 + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true, true);
  363 + sendRtpItem.setStreamId(ssrcInfo.getStream());
  364 + // 写入redis, 超时时回复
  365 + redisCatchStorage.updateSendRTPSever(sendRtpItem);
  366 + playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
  367 + DateUtil.formatter.format(end), null, result -> {
  368 + if (result.getCode() != 0) {
  369 + logger.warn("录像回放失败");
  370 + if (result.getEvent() != null) {
  371 + errorEvent.response(result.getEvent());
  372 + }
  373 + redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
  374 + try {
  375 + responseAck(evt, Response.REQUEST_TIMEOUT);
  376 + } catch (SipException e) {
  377 + e.printStackTrace();
  378 + } catch (InvalidArgumentException e) {
  379 + e.printStackTrace();
  380 + } catch (ParseException e) {
  381 + e.printStackTrace();
  382 + }
  383 + } else {
  384 + if (result.getMediaServerItem() != null) {
  385 + hookEvent.response(result.getMediaServerItem(), result.getResponse());
  386 + }
  387 + }
  388 + });
  389 + } else {
  390 + sendRtpItem.setPlayType(InviteStreamType.PLAY);
  391 + SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
  392 + if (playTransaction != null) {
  393 + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream());
  394 + if (!streamReady) {
  395 + playTransaction = null;
  396 + }
  397 + }
  398 + if (playTransaction == null) {
  399 + String streamId = null;
  400 + if (mediaServerItem.isRtpEnable()) {
  401 + streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  402 + }
  403 + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false);
  404 + sendRtpItem.setStreamId(ssrcInfo.getStream());
  405 + // 写入redis, 超时时回复
  406 + redisCatchStorage.updateSendRTPSever(sendRtpItem);
  407 + playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> {
  408 + logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId);
  409 + redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
  410 + }, null);
  411 + } else {
  412 + sendRtpItem.setStreamId(playTransaction.getStream());
  413 + // 写入redis, 超时时回复
  414 + redisCatchStorage.updateSendRTPSever(sendRtpItem);
  415 + JSONObject jsonObject = new JSONObject();
  416 + jsonObject.put("app", sendRtpItem.getApp());
  417 + jsonObject.put("stream", sendRtpItem.getStreamId());
  418 + hookEvent.response(mediaServerItem, jsonObject);
  419 + }
  420 + }
  421 + } else if (gbStream != null) {
  422 + if (streamPushItem.isStatus()) {
  423 + // 在线状态
  424 + pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
  425 + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  426 + } else {
  427 + // 不在线 拉起
  428 + notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
  429 + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  430 + }
  431 +
  432 + }
  433 +
  434 + }
  435 +
  436 + } catch (SipException | InvalidArgumentException | ParseException e) {
  437 + e.printStackTrace();
  438 + logger.warn("sdp解析错误");
  439 + e.printStackTrace();
  440 + } catch (SdpParseException e) {
  441 + e.printStackTrace();
  442 + } catch (SdpException e) {
  443 + e.printStackTrace();
  444 + }
  445 + }
  446 +
  447 + /**
  448 + * 安排推流
  449 + */
  450 +
  451 + private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
  452 + CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
  453 + int port, Boolean tcpActive, boolean mediaTransmissionTCP,
  454 + String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
  455 + // 推流
  456 + if (streamPushItem.getServerId().equals(userSetting.getServerId())) {
  457 + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
  458 + if (streamReady) {
  459 + // 自平台内容
  460 + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
  461 + gbStream.getApp(), gbStream.getStream(), channelId,
  462 + mediaTransmissionTCP);
  463 +
  464 + if (sendRtpItem == null) {
  465 + logger.warn("服务器端口资源不足");
  466 + responseAck(evt, Response.BUSY_HERE);
  467 + return;
  468 + }
  469 + if (tcpActive != null) {
  470 + sendRtpItem.setTcpActive(tcpActive);
  471 + }
  472 + sendRtpItem.setPlayType(InviteStreamType.PUSH);
  473 + // 写入redis, 超时时回复
  474 + sendRtpItem.setStatus(1);
  475 + sendRtpItem.setCallId(callIdHeader.getCallId());
  476 + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
  477 + sendRtpItem.setDialog(dialogByteArray);
  478 + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
  479 + sendRtpItem.setTransaction(transactionByteArray);
  480 + redisCatchStorage.updateSendRTPSever(sendRtpItem);
  481 + sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
  482 + } else {
  483 + // 不在线 拉起
  484 + notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
  485 + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  486 + }
  487 +
  488 + } else {
  489 + // 其他平台内容
  490 + otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
  491 + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  492 + }
  493 +
  494 + }
  495 +
  496 + /**
  497 + * 通知流上线
  498 + */
  499 + private void notifyStreamOnline(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
  500 + CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
  501 + int port, Boolean tcpActive, boolean mediaTransmissionTCP,
  502 + String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
  503 + if ("proxy".equals(gbStream.getStreamType())) {
  504 + // TODO 控制启用以使设备上线
  505 + logger.info("[ app={}, stream={} ]通道离线,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
  506 + responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
  507 + } else if ("push".equals(gbStream.getStreamType())) {
  508 + if (!platform.isStartOfflinePush()) {
  509 + responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel unavailable");
  510 + return;
  511 + }
  512 + // 发送redis消息以使设备上线
  513 + logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream());
  514 +
  515 + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
  516 + gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
  517 + platform.getName(), null, gbStream.getMediaServerId());
  518 + redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
  519 + // 设置超时
  520 + dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
  521 + logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream());
  522 + try {
  523 + mediaListManager.removedChannelOnlineEventLister(gbStream.getGbId());
  524 + responseAck(evt, Response.REQUEST_TIMEOUT); // 超时
  525 + } catch (SipException e) {
  526 + e.printStackTrace();
  527 + } catch (InvalidArgumentException e) {
  528 + e.printStackTrace();
  529 + } catch (ParseException e) {
  530 + e.printStackTrace();
  531 + }
  532 + }, userSetting.getPlatformPlayTimeout());
  533 + // 添加监听
  534 + int finalPort = port;
  535 + Boolean finalTcpActive = tcpActive;
  536 +
  537 + // 添加在本机上线的通知
  538 + mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream, serverId) -> {
  539 + dynamicTask.stop(callIdHeader.getCallId());
  540 + if (serverId.equals(userSetting.getServerId())) {
  541 + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
  542 + app, stream, channelId, mediaTransmissionTCP);
  543 +
  544 + if (sendRtpItem == null) {
  545 + logger.warn("服务器端口资源不足");
  546 + try {
  547 + responseAck(evt, Response.BUSY_HERE);
  548 + } catch (SipException e) {
  549 + e.printStackTrace();
  550 + } catch (InvalidArgumentException e) {
  551 + e.printStackTrace();
  552 + } catch (ParseException e) {
  553 + e.printStackTrace();
  554 + }
  555 + return;
  556 + }
  557 + if (finalTcpActive != null) {
  558 + sendRtpItem.setTcpActive(finalTcpActive);
  559 + }
  560 + sendRtpItem.setPlayType(InviteStreamType.PUSH);
  561 + // 写入redis, 超时时回复
  562 + sendRtpItem.setStatus(1);
  563 + sendRtpItem.setCallId(callIdHeader.getCallId());
  564 + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
  565 + sendRtpItem.setDialog(dialogByteArray);
  566 + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
  567 + sendRtpItem.setTransaction(transactionByteArray);
  568 + redisCatchStorage.updateSendRTPSever(sendRtpItem);
  569 + sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
  570 + } else {
  571 + // 其他平台内容
  572 + otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
  573 + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  574 + }
  575 + });
  576 + }
  577 + }
  578 +
  579 + /**
  580 + * 来自其他wvp的推流
  581 + */
  582 + private void otherWvpPushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
  583 + CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
  584 + int port, Boolean tcpActive, boolean mediaTransmissionTCP,
  585 + String channelId, String addressStr, String ssrc, String requesterId) {
  586 + logger.info("[级联点播]直播流来自其他平台,发送redis消息");
  587 + // 发送redis消息
  588 + redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(),
  589 + streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId,
  590 + channelId, mediaTransmissionTCP, null, responseSendItemMsg -> {
  591 + SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem();
  592 + if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) {
  593 + logger.warn("服务器端口资源不足");
  594 + try {
  595 + responseAck(evt, Response.BUSY_HERE);
  596 + } catch (SipException e) {
  597 + e.printStackTrace();
  598 + } catch (InvalidArgumentException e) {
  599 + e.printStackTrace();
  600 + } catch (ParseException e) {
  601 + e.printStackTrace();
  602 + }
  603 + return;
  604 + }
  605 + // 收到sendItem
  606 + if (tcpActive != null) {
  607 + sendRtpItem.setTcpActive(tcpActive);
  608 + }
  609 + sendRtpItem.setPlayType(InviteStreamType.PUSH);
  610 + // 写入redis, 超时时回复
  611 + sendRtpItem.setStatus(1);
  612 + sendRtpItem.setCallId(callIdHeader.getCallId());
  613 + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
  614 + sendRtpItem.setDialog(dialogByteArray);
  615 + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
  616 + sendRtpItem.setTransaction(transactionByteArray);
  617 + redisCatchStorage.updateSendRTPSever(sendRtpItem);
  618 + sendStreamAck(responseSendItemMsg.getMediaServerItem(), sendRtpItem, platform, evt);
  619 + }, (wvpResult) -> {
  620 + try {
  621 + // 错误
  622 + if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) {
  623 + // 离线
  624 + // 查询是否在本机上线了
  625 + StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
  626 + if (currentStreamPushItem.isStatus()) {
  627 + // 在线状态
  628 + pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
  629 + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  630 +
  631 + } else {
  632 + // 不在线 拉起
  633 + notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
  634 + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
  635 + }
  636 + }
  637 + } catch (InvalidArgumentException e) {
  638 + throw new RuntimeException(e);
  639 + } catch (ParseException e) {
  640 + throw new RuntimeException(e);
  641 + } catch (SipException e) {
  642 + throw new RuntimeException(e);
  643 + }
  644 +
  645 +
  646 + try {
  647 + responseAck(evt, Response.BUSY_HERE);
  648 + } catch (SipException e) {
  649 + e.printStackTrace();
  650 + } catch (InvalidArgumentException e) {
  651 + e.printStackTrace();
  652 + } catch (ParseException e) {
  653 + e.printStackTrace();
  654 + }
  655 + return;
  656 + });
  657 + }
  658 +
  659 + public void sendStreamAck(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) {
  660 +
  661 + StringBuffer content = new StringBuffer(200);
  662 + content.append("v=0\r\n");
  663 + content.append("o=" + sendRtpItem.getChannelId() + " 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
  664 + content.append("s=Play\r\n");
  665 + content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
  666 + content.append("t=0 0\r\n");
  667 + content.append("m=video " + sendRtpItem.getLocalPort() + " RTP/AVP 96\r\n");
  668 + content.append("a=sendonly\r\n");
  669 + content.append("a=rtpmap:96 PS/90000\r\n");
  670 + if (sendRtpItem.isTcp()) {
  671 + content.append("a=connection:new\r\n");
  672 + if (!sendRtpItem.isTcpActive()) {
  673 + content.append("a=setup:active\r\n");
  674 + } else {
  675 + content.append("a=setup:passive\r\n");
  676 + }
  677 + }
  678 + content.append("y=" + sendRtpItem.getSsrc() + "\r\n");
  679 + content.append("f=\r\n");
  680 +
  681 + try {
  682 + responseSdpAck(evt, content.toString(), platform);
  683 + } catch (SipException e) {
  684 + e.printStackTrace();
  685 + } catch (InvalidArgumentException e) {
  686 + e.printStackTrace();
  687 + } catch (ParseException e) {
  688 + e.printStackTrace();
  689 + }
  690 + }
  691 +
  692 + public void inviteFromDeviceHandle(RequestEvent evt, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException {
  693 +
  694 + // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
  695 + Device device = redisCatchStorage.getDevice(requesterId);
  696 + Request request = evt.getRequest();
  697 + if (device != null) {
  698 + logger.info("收到设备" + requesterId + "的语音广播Invite请求");
  699 + responseAck(evt, Response.TRYING);
  700 +
  701 + String contentString = new String(request.getRawContent());
  702 + // jainSip不支持y=字段, 移除移除以解析。
  703 + String substring = contentString;
  704 + String ssrc = "0000000404";
  705 + int ssrcIndex = contentString.indexOf("y=");
  706 + if (ssrcIndex > 0) {
  707 + substring = contentString.substring(0, ssrcIndex);
  708 + ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
  709 + }
  710 + ssrcIndex = substring.indexOf("f=");
  711 + if (ssrcIndex > 0) {
  712 + substring = contentString.substring(0, ssrcIndex);
  713 + }
  714 + SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
  715 +
  716 + // 获取支持的格式
  717 + Vector mediaDescriptions = sdp.getMediaDescriptions(true);
  718 + // 查看是否支持PS 负载96
  719 + int port = -1;
  720 + //boolean recvonly = false;
  721 + boolean mediaTransmissionTCP = false;
  722 + Boolean tcpActive = null;
  723 + for (int i = 0; i < mediaDescriptions.size(); i++) {
  724 + MediaDescription mediaDescription = (MediaDescription) mediaDescriptions.get(i);
  725 + Media media = mediaDescription.getMedia();
  726 +
  727 + Vector mediaFormats = media.getMediaFormats(false);
  728 + if (mediaFormats.contains("8")) {
  729 + port = media.getMediaPort();
  730 + String protocol = media.getProtocol();
  731 + // 区分TCP发流还是udp, 当前默认udp
  732 + if ("TCP/RTP/AVP".equals(protocol)) {
  733 + String setup = mediaDescription.getAttribute("setup");
  734 + if (setup != null) {
  735 + mediaTransmissionTCP = true;
  736 + if ("active".equals(setup)) {
  737 + tcpActive = true;
  738 + } else if ("passive".equals(setup)) {
  739 + tcpActive = false;
  740 + }
  741 + }
  742 + }
  743 + break;
  744 + }
  745 + }
  746 + if (port == -1) {
  747 + logger.info("不支持的媒体格式,返回415");
  748 + // 回复不支持的格式
  749 + responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415
  750 + return;
  751 + }
  752 + String username = sdp.getOrigin().getUsername();
  753 + String addressStr = sdp.getOrigin().getAddress();
  754 + logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc);
  755 +
  756 + } else {
  757 + logger.warn("来自无效设备/平台的请求");
  758 + responseAck(evt, Response.BAD_REQUEST);
  759 + }
  760 + }
611 761 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java
... ... @@ -41,7 +41,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
41 41  
42 42 private final Logger logger = LoggerFactory.getLogger(RegisterRequestProcessor.class);
43 43  
44   - public String method = "REGISTER";
  44 + public final String method = "REGISTER";
45 45  
46 46 @Autowired
47 47 private SipConfig sipConfig;
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java
... ... @@ -40,9 +40,7 @@ import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
40 40 public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
41 41  
42 42 private Logger logger = LoggerFactory.getLogger(RecordInfoResponseMessageHandler.class);
43   - public static volatile List<String> threadNameList = new ArrayList();
44 43 private final String cmdType = "RecordInfo";
45   - private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_";
46 44  
47 45 private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
48 46  
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java
... ... @@ -17,7 +17,7 @@ import javax.sip.ResponseEvent;
17 17 @Component
18 18 public class ByeResponseProcessor extends SIPResponseProcessorAbstract {
19 19  
20   - private String method = "BYE";
  20 + private final String method = "BYE";
21 21  
22 22 @Autowired
23 23 private SipLayer sipLayer;
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java
... ... @@ -17,7 +17,7 @@ import javax.sip.ResponseEvent;
17 17 @Component
18 18 public class CancelResponseProcessor extends SIPResponseProcessorAbstract {
19 19  
20   - private String method = "CANCEL";
  20 + private final String method = "CANCEL";
21 21  
22 22 @Autowired
23 23 private SipLayer sipLayer;
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java
... ... @@ -31,7 +31,7 @@ import java.text.ParseException;
31 31 public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
32 32  
33 33 private final static Logger logger = LoggerFactory.getLogger(InviteResponseProcessor.class);
34   - private String method = "INVITE";
  34 + private final String method = "INVITE";
35 35  
36 36 @Autowired
37 37 private SipLayer sipLayer;
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
... ... @@ -27,7 +27,7 @@ import javax.sip.message.Response;
27 27 public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
28 28  
29 29 private Logger logger = LoggerFactory.getLogger(RegisterResponseProcessor.class);
30   - private String method = "REGISTER";
  30 + private final String method = "REGISTER";
31 31  
32 32 @Autowired
33 33 private ISIPCommanderForPlatform sipCommanderForPlatform;
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
... ... @@ -397,21 +397,22 @@ public class ZLMHttpHookListener {
397 397 if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
398 398 || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
399 399 || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
400   - streamPushItem = zlmMediaListManager.addPush(item);
  400 + item.setSeverId(userSetting.getServerId());
  401 + zlmMediaListManager.addPush(item);
401 402 }
402 403  
403   - List<GbStream> gbStreams = new ArrayList<>();
404   - if (streamPushItem == null || streamPushItem.getGbId() == null) {
405   - GbStream gbStream = storager.getGbStream(app, streamId);
406   - gbStreams.add(gbStream);
407   - }else {
408   - if (streamPushItem.getGbId() != null) {
409   - gbStreams.add(streamPushItem);
410   - }
411   - }
412   - if (gbStreams.size() > 0) {
  404 +// List<GbStream> gbStreams = new ArrayList<>();
  405 +// if (streamPushItem == null || streamPushItem.getGbId() == null) {
  406 +// GbStream gbStream = storager.getGbStream(app, streamId);
  407 +// gbStreams.add(gbStream);
  408 +// }else {
  409 +// if (streamPushItem.getGbId() != null) {
  410 +// gbStreams.add(streamPushItem);
  411 +// }
  412 +// }
  413 +// if (gbStreams.size() > 0) {
413 414 // eventPublisher.catalogEventPublishForStream(null, gbStreams, CatalogEvent.ON);
414   - }
  415 +// }
415 416  
416 417 }else {
417 418 // 兼容流注销时类型从redis记录获取
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
... ... @@ -24,6 +24,9 @@ import java.util.concurrent.ConcurrentHashMap;
24 24 import java.util.regex.Matcher;
25 25 import java.util.regex.Pattern;
26 26  
  27 +/**
  28 + * @author lin
  29 + */
27 30 @Component
28 31 public class ZLMMediaListManager {
29 32  
... ... @@ -147,7 +150,6 @@ public class ZLMMediaListManager {
147 150 }
148 151 }
149 152 }
150   - // StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(transform.getApp(), transform.getStream());
151 153 List<GbStream> gbStreamList = gbStreamMapper.selectByGBId(transform.getGbId());
152 154 if (gbStreamList != null && gbStreamList.size() == 1) {
153 155 transform.setGbStreamId(gbStreamList.get(0).getGbStreamId());
... ... @@ -162,13 +164,12 @@ public class ZLMMediaListManager {
162 164 }
163 165 if (transform != null) {
164 166 if (channelOnlineEvents.get(transform.getGbId()) != null) {
165   - channelOnlineEvents.get(transform.getGbId()).run(transform.getApp(), transform.getStream());
  167 + channelOnlineEvents.get(transform.getGbId()).run(transform.getApp(), transform.getStream(), transform.getServerId());
166 168 channelOnlineEvents.remove(transform.getGbId());
167 169 }
168 170 }
169 171 }
170 172  
171   -
172 173 storager.updateMedia(transform);
173 174 return transform;
174 175 }
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
... ... @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.media.zlm;
2 2  
3 3 import com.alibaba.fastjson.JSONArray;
4 4 import com.alibaba.fastjson.JSONObject;
  5 +import com.genersoft.iot.vmp.conf.UserSetting;
5 6 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
6 7 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
7 8 import org.slf4j.Logger;
... ... @@ -20,6 +21,9 @@ public class ZLMRTPServerFactory {
20 21 @Autowired
21 22 private ZLMRESTfulUtils zlmresTfulUtils;
22 23  
  24 + @Autowired
  25 + private UserSetting userSetting;
  26 +
23 27 private int[] portRangeArray = new int[2];
24 28  
25 29 public int getFreePort(MediaServerItem mediaServerItem, int startPort, int endPort, List<Integer> usedFreelist) {
... ... @@ -197,6 +201,7 @@ public class ZLMRTPServerFactory {
197 201 sendRtpItem.setTcp(tcp);
198 202 sendRtpItem.setApp("rtp");
199 203 sendRtpItem.setLocalPort(localPort);
  204 + sendRtpItem.setServerId(userSetting.getServerId());
200 205 sendRtpItem.setMediaServerId(serverItem.getId());
201 206 return sendRtpItem;
202 207 }
... ... @@ -238,6 +243,7 @@ public class ZLMRTPServerFactory {
238 243 sendRtpItem.setChannelId(channelId);
239 244 sendRtpItem.setTcp(tcp);
240 245 sendRtpItem.setLocalPort(localPort);
  246 + sendRtpItem.setServerId(userSetting.getServerId());
241 247 sendRtpItem.setMediaServerId(serverItem.getId());
242 248 return sendRtpItem;
243 249 }
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java
1 1 package com.genersoft.iot.vmp.media.zlm.dto;
2 2  
  3 +/**
  4 + * @author lin
  5 + */
3 6 public interface ChannelOnlineEvent {
4 7  
5   - void run(String app, String stream);
  8 + void run(String app, String stream, String serverId);
6 9 }
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java
... ... @@ -61,11 +61,16 @@ public class MediaItem {
61 61 private String originUrl;
62 62  
63 63 /**
64   - * 服务器id
  64 + * 流媒体服务器id
65 65 */
66 66 private String mediaServerId;
67 67  
68 68 /**
  69 + * 服务器id
  70 + */
  71 + private String severId;
  72 +
  73 + /**
69 74 * GMT unix系统时间戳,单位秒
70 75 */
71 76 private Long createStamp;
... ... @@ -414,4 +419,12 @@ public class MediaItem {
414 419 public void setStreamInfo(StreamInfo streamInfo) {
415 420 this.streamInfo = streamInfo;
416 421 }
  422 +
  423 + public String getSeverId() {
  424 + return severId;
  425 + }
  426 +
  427 + public void setSeverId(String severId) {
  428 + this.severId = severId;
  429 + }
417 430 }
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java
... ... @@ -81,6 +81,11 @@ public class StreamPushItem extends GbStream implements Comparable&lt;StreamPushIte
81 81 */
82 82 private String mediaServerId;
83 83  
  84 + /**
  85 + * 使用的服务ID
  86 + */
  87 + private String serverId;
  88 +
84 89 public String getVhost() {
85 90 return vhost;
86 91 }
... ... @@ -219,5 +224,13 @@ public class StreamPushItem extends GbStream implements Comparable&lt;StreamPushIte
219 224 public void setMediaServerId(String mediaServerId) {
220 225 this.mediaServerId = mediaServerId;
221 226 }
  227 +
  228 + public String getServerId() {
  229 + return serverId;
  230 + }
  231 +
  232 + public void setServerId(String serverId) {
  233 + this.serverId = serverId;
  234 + }
222 235 }
223 236  
... ...
src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java
... ... @@ -23,7 +23,6 @@ public class StreamGPSSubscribeTask {
23 23 private IVideoManagerStorage storager;
24 24  
25 25  
26   -
27 26 @Scheduled(fixedRate = 30 * 1000) //每30秒执行一次
28 27 public void execute(){
29 28 List<GPSMsgInfo> gpsMsgInfo = redisCatchStorage.getAllGpsMsgInfo();
... ...
src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java
1 1 package com.genersoft.iot.vmp.service.bean;
2 2  
  3 +import java.util.stream.Stream;
  4 +
3 5 /**
4 6 * 当上级平台
  7 + * @author lin
5 8 */
6 9 public class MessageForPushChannel {
7 10 /**
... ... @@ -45,6 +48,20 @@ public class MessageForPushChannel {
45 48 */
46 49 private String mediaServerId;
47 50  
  51 + public static MessageForPushChannel getInstance(int type, String app, String stream, String gbId,
  52 + String platFormId, String platFormName, String serverId,
  53 + String mediaServerId){
  54 + MessageForPushChannel messageForPushChannel = new MessageForPushChannel();
  55 + messageForPushChannel.setType(type);
  56 + messageForPushChannel.setGbId(gbId);
  57 + messageForPushChannel.setApp(app);
  58 + messageForPushChannel.setStream(stream);
  59 + messageForPushChannel.setMediaServerId(mediaServerId);
  60 + messageForPushChannel.setPlatFormId(platFormId);
  61 + messageForPushChannel.setPlatFormName(platFormName);
  62 + return messageForPushChannel;
  63 + }
  64 +
48 65  
49 66 public int getType() {
50 67 return type;
... ...
src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java 0 → 100644
  1 +package com.genersoft.iot.vmp.service.bean;
  2 +
  3 +/**
  4 + * redis消息:请求下级推送流信息
  5 + * @author lin
  6 + */
  7 +public class RequestPushStreamMsg {
  8 +
  9 +
  10 + /**
  11 + * 下级服务ID
  12 + */
  13 + private String mediaServerId;
  14 +
  15 + /**
  16 + * 流ID
  17 + */
  18 + private String app;
  19 +
  20 + /**
  21 + * 应用名
  22 + */
  23 + private String stream;
  24 +
  25 + /**
  26 + * 目标IP
  27 + */
  28 + private String ip;
  29 +
  30 + /**
  31 + * 目标端口
  32 + */
  33 + private int port;
  34 +
  35 + /**
  36 + * ssrc
  37 + */
  38 + private String ssrc;
  39 +
  40 + /**
  41 + * 是否使用TCP方式
  42 + */
  43 + private boolean tcp;
  44 +
  45 + /**
  46 + * 本地使用的端口
  47 + */
  48 + private int srcPort;
  49 +
  50 + /**
  51 + * 发送时,rtp的pt(uint8_t),不传时默认为96
  52 + */
  53 + private int pt;
  54 +
  55 + /**
  56 + * 发送时,rtp的负载类型。为true时,负载为ps;为false时,为es;
  57 + */
  58 + private boolean ps;
  59 +
  60 + /**
  61 + * 是否只有音频
  62 + */
  63 + private boolean onlyAudio;
  64 +
  65 +
  66 + public static RequestPushStreamMsg getInstance(String mediaServerId, String app, String stream, String ip, int port, String ssrc,
  67 + boolean tcp, int srcPort, int pt, boolean ps, boolean onlyAudio) {
  68 + RequestPushStreamMsg requestPushStreamMsg = new RequestPushStreamMsg();
  69 + requestPushStreamMsg.setMediaServerId(mediaServerId);
  70 + requestPushStreamMsg.setApp(app);
  71 + requestPushStreamMsg.setStream(stream);
  72 + requestPushStreamMsg.setIp(ip);
  73 + requestPushStreamMsg.setPort(port);
  74 + requestPushStreamMsg.setSsrc(ssrc);
  75 + requestPushStreamMsg.setTcp(tcp);
  76 + requestPushStreamMsg.setSrcPort(srcPort);
  77 + requestPushStreamMsg.setPt(pt);
  78 + requestPushStreamMsg.setPs(ps);
  79 + requestPushStreamMsg.setOnlyAudio(onlyAudio);
  80 + return requestPushStreamMsg;
  81 + }
  82 +
  83 + public String getMediaServerId() {
  84 + return mediaServerId;
  85 + }
  86 +
  87 + public void setMediaServerId(String mediaServerId) {
  88 + this.mediaServerId = mediaServerId;
  89 + }
  90 +
  91 + public String getApp() {
  92 + return app;
  93 + }
  94 +
  95 + public void setApp(String app) {
  96 + this.app = app;
  97 + }
  98 +
  99 + public String getStream() {
  100 + return stream;
  101 + }
  102 +
  103 + public void setStream(String stream) {
  104 + this.stream = stream;
  105 + }
  106 +
  107 + public String getIp() {
  108 + return ip;
  109 + }
  110 +
  111 + public void setIp(String ip) {
  112 + this.ip = ip;
  113 + }
  114 +
  115 + public int getPort() {
  116 + return port;
  117 + }
  118 +
  119 + public void setPort(int port) {
  120 + this.port = port;
  121 + }
  122 +
  123 + public String getSsrc() {
  124 + return ssrc;
  125 + }
  126 +
  127 + public void setSsrc(String ssrc) {
  128 + this.ssrc = ssrc;
  129 + }
  130 +
  131 + public boolean isTcp() {
  132 + return tcp;
  133 + }
  134 +
  135 + public void setTcp(boolean tcp) {
  136 + this.tcp = tcp;
  137 + }
  138 +
  139 + public int getSrcPort() {
  140 + return srcPort;
  141 + }
  142 +
  143 + public void setSrcPort(int srcPort) {
  144 + this.srcPort = srcPort;
  145 + }
  146 +
  147 + public int getPt() {
  148 + return pt;
  149 + }
  150 +
  151 + public void setPt(int pt) {
  152 + this.pt = pt;
  153 + }
  154 +
  155 + public boolean isPs() {
  156 + return ps;
  157 + }
  158 +
  159 + public void setPs(boolean ps) {
  160 + this.ps = ps;
  161 + }
  162 +
  163 + public boolean isOnlyAudio() {
  164 + return onlyAudio;
  165 + }
  166 +
  167 + public void setOnlyAudio(boolean onlyAudio) {
  168 + this.onlyAudio = onlyAudio;
  169 + }
  170 +}
... ...
src/main/java/com/genersoft/iot/vmp/service/bean/RequestSendItemMsg.java 0 → 100644
  1 +package com.genersoft.iot.vmp.service.bean;
  2 +
  3 +/**
  4 + * redis消息:请求下级回复推送信息
  5 + * @author lin
  6 + */
  7 +public class RequestSendItemMsg {
  8 +
  9 + /**
  10 + * 下级服务ID
  11 + */
  12 + private String serverId;
  13 +
  14 + /**
  15 + * 下级服务ID
  16 + */
  17 + private String mediaServerId;
  18 +
  19 + /**
  20 + * 流ID
  21 + */
  22 + private String app;
  23 +
  24 + /**
  25 + * 应用名
  26 + */
  27 + private String stream;
  28 +
  29 + /**
  30 + * 目标IP
  31 + */
  32 + private String ip;
  33 +
  34 + /**
  35 + * 目标端口
  36 + */
  37 + private int port;
  38 +
  39 + /**
  40 + * ssrc
  41 + */
  42 + private String ssrc;
  43 +
  44 + /**
  45 + * 平台国标编号
  46 + */
  47 + private String platformId;
  48 +
  49 + /**
  50 + * 平台名称
  51 + */
  52 + private String platformName;
  53 +
  54 + /**
  55 + * 通道ID
  56 + */
  57 + private String channelId;
  58 +
  59 +
  60 + /**
  61 + * 是否使用TCP
  62 + */
  63 + private Boolean isTcp;
  64 +
  65 +
  66 +
  67 +
  68 + public static RequestSendItemMsg getInstance(String serverId, String mediaServerId, String app, String stream, String ip, int port,
  69 + String ssrc, String platformId, String channelId, Boolean isTcp, String platformName) {
  70 + RequestSendItemMsg requestSendItemMsg = new RequestSendItemMsg();
  71 + requestSendItemMsg.setServerId(serverId);
  72 + requestSendItemMsg.setMediaServerId(mediaServerId);
  73 + requestSendItemMsg.setApp(app);
  74 + requestSendItemMsg.setStream(stream);
  75 + requestSendItemMsg.setIp(ip);
  76 + requestSendItemMsg.setPort(port);
  77 + requestSendItemMsg.setSsrc(ssrc);
  78 + requestSendItemMsg.setPlatformId(platformId);
  79 + requestSendItemMsg.setPlatformName(platformName);
  80 + requestSendItemMsg.setChannelId(channelId);
  81 + requestSendItemMsg.setTcp(isTcp);
  82 +
  83 + return requestSendItemMsg;
  84 + }
  85 +
  86 + public String getServerId() {
  87 + return serverId;
  88 + }
  89 +
  90 + public void setServerId(String serverId) {
  91 + this.serverId = serverId;
  92 + }
  93 +
  94 + public String getMediaServerId() {
  95 + return mediaServerId;
  96 + }
  97 +
  98 + public void setMediaServerId(String mediaServerId) {
  99 + this.mediaServerId = mediaServerId;
  100 + }
  101 +
  102 + public String getApp() {
  103 + return app;
  104 + }
  105 +
  106 + public void setApp(String app) {
  107 + this.app = app;
  108 + }
  109 +
  110 + public String getStream() {
  111 + return stream;
  112 + }
  113 +
  114 + public void setStream(String stream) {
  115 + this.stream = stream;
  116 + }
  117 +
  118 + public String getIp() {
  119 + return ip;
  120 + }
  121 +
  122 + public void setIp(String ip) {
  123 + this.ip = ip;
  124 + }
  125 +
  126 + public int getPort() {
  127 + return port;
  128 + }
  129 +
  130 + public void setPort(int port) {
  131 + this.port = port;
  132 + }
  133 +
  134 + public String getSsrc() {
  135 + return ssrc;
  136 + }
  137 +
  138 + public void setSsrc(String ssrc) {
  139 + this.ssrc = ssrc;
  140 + }
  141 +
  142 + public String getPlatformId() {
  143 + return platformId;
  144 + }
  145 +
  146 + public void setPlatformId(String platformId) {
  147 + this.platformId = platformId;
  148 + }
  149 +
  150 + public String getPlatformName() {
  151 + return platformName;
  152 + }
  153 +
  154 + public void setPlatformName(String platformName) {
  155 + this.platformName = platformName;
  156 + }
  157 +
  158 + public String getChannelId() {
  159 + return channelId;
  160 + }
  161 +
  162 + public void setChannelId(String channelId) {
  163 + this.channelId = channelId;
  164 + }
  165 +
  166 + public Boolean getTcp() {
  167 + return isTcp;
  168 + }
  169 +
  170 + public void setTcp(Boolean tcp) {
  171 + isTcp = tcp;
  172 + }
  173 +}
... ...
src/main/java/com/genersoft/iot/vmp/service/bean/ResponseSendItemMsg.java 0 → 100644
  1 +package com.genersoft.iot.vmp.service.bean;
  2 +
  3 +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
  4 +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  5 +
  6 +/**
  7 + * redis消息:下级回复推送信息
  8 + * @author lin
  9 + */
  10 +public class ResponseSendItemMsg {
  11 +
  12 + private SendRtpItem sendRtpItem;
  13 +
  14 + private MediaServerItem mediaServerItem;
  15 +
  16 + public SendRtpItem getSendRtpItem() {
  17 + return sendRtpItem;
  18 + }
  19 +
  20 + public void setSendRtpItem(SendRtpItem sendRtpItem) {
  21 + this.sendRtpItem = sendRtpItem;
  22 + }
  23 +
  24 + public MediaServerItem getMediaServerItem() {
  25 + return mediaServerItem;
  26 + }
  27 +
  28 + public void setMediaServerItem(MediaServerItem mediaServerItem) {
  29 + this.mediaServerItem = mediaServerItem;
  30 + }
  31 +}
... ...
src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsg.java 0 → 100644
  1 +package com.genersoft.iot.vmp.service.bean;
  2 +
  3 +/**
  4 + * @author lin
  5 + */
  6 +public class WvpRedisMsg {
  7 +
  8 + public static WvpRedisMsg getInstance(String fromId, String toId, String type, String cmd, String serial, String content){
  9 + WvpRedisMsg wvpRedisMsg = new WvpRedisMsg();
  10 + wvpRedisMsg.setFromId(fromId);
  11 + wvpRedisMsg.setToId(toId);
  12 + wvpRedisMsg.setType(type);
  13 + wvpRedisMsg.setCmd(cmd);
  14 + wvpRedisMsg.setSerial(serial);
  15 + wvpRedisMsg.setContent(content);
  16 + return wvpRedisMsg;
  17 + }
  18 +
  19 + private String fromId;
  20 +
  21 + private String toId;
  22 + /**
  23 + * req 请求, res 回复
  24 + */
  25 + private String type;
  26 + private String cmd;
  27 +
  28 + /**
  29 + * 消息的ID
  30 + */
  31 + private String serial;
  32 + private Object content;
  33 +
  34 + private final static String requestTag = "req";
  35 + private final static String responseTag = "res";
  36 +
  37 + public static WvpRedisMsg getRequestInstance(String fromId, String toId, String cmd, String serial, Object content) {
  38 + WvpRedisMsg wvpRedisMsg = new WvpRedisMsg();
  39 + wvpRedisMsg.setType(requestTag);
  40 + wvpRedisMsg.setFromId(fromId);
  41 + wvpRedisMsg.setToId(toId);
  42 + wvpRedisMsg.setCmd(cmd);
  43 + wvpRedisMsg.setSerial(serial);
  44 + wvpRedisMsg.setContent(content);
  45 + return wvpRedisMsg;
  46 + }
  47 +
  48 + public static WvpRedisMsg getResponseInstance() {
  49 + WvpRedisMsg wvpRedisMsg = new WvpRedisMsg();
  50 + wvpRedisMsg.setType(responseTag);
  51 + return wvpRedisMsg;
  52 + }
  53 +
  54 + public static WvpRedisMsg getResponseInstance(String fromId, String toId, String cmd, String serial, Object content) {
  55 + WvpRedisMsg wvpRedisMsg = new WvpRedisMsg();
  56 + wvpRedisMsg.setType(responseTag);
  57 + wvpRedisMsg.setFromId(fromId);
  58 + wvpRedisMsg.setToId(toId);
  59 + wvpRedisMsg.setCmd(cmd);
  60 + wvpRedisMsg.setSerial(serial);
  61 + wvpRedisMsg.setContent(content);
  62 + return wvpRedisMsg;
  63 + }
  64 +
  65 + public static boolean isRequest(WvpRedisMsg wvpRedisMsg) {
  66 + return requestTag.equals(wvpRedisMsg.getType());
  67 + }
  68 +
  69 + public String getSerial() {
  70 + return serial;
  71 + }
  72 +
  73 + public void setSerial(String serial) {
  74 + this.serial = serial;
  75 + }
  76 +
  77 + public String getFromId() {
  78 + return fromId;
  79 + }
  80 +
  81 + public void setFromId(String fromId) {
  82 + this.fromId = fromId;
  83 + }
  84 +
  85 + public String getToId() {
  86 + return toId;
  87 + }
  88 +
  89 + public void setToId(String toId) {
  90 + this.toId = toId;
  91 + }
  92 +
  93 + public String getType() {
  94 + return type;
  95 + }
  96 +
  97 + public void setType(String type) {
  98 + this.type = type;
  99 + }
  100 +
  101 + public String getCmd() {
  102 + return cmd;
  103 + }
  104 +
  105 + public void setCmd(String cmd) {
  106 + this.cmd = cmd;
  107 + }
  108 +
  109 + public Object getContent() {
  110 + return content;
  111 + }
  112 +
  113 + public void setContent(Object content) {
  114 + this.content = content;
  115 + }
  116 +}
... ...
src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java 0 → 100644
  1 +package com.genersoft.iot.vmp.service.bean;
  2 +
  3 +/**
  4 + * @author lin
  5 + */
  6 +
  7 +public class WvpRedisMsgCmd {
  8 +
  9 + public static final String GET_SEND_ITEM = "GetSendItem";
  10 + public static final String REQUEST_PUSH_STREAM = "RequestPushStream";
  11 +
  12 +}
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java 0 → 100644
  1 +package com.genersoft.iot.vmp.service.impl;
  2 +
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.alibaba.fastjson.JSONObject;
  5 +import com.genersoft.iot.vmp.conf.DynamicTask;
  6 +import com.genersoft.iot.vmp.conf.UserSetting;
  7 +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
  8 +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
  9 +import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
  10 +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
  11 +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  12 +import com.genersoft.iot.vmp.service.IMediaServerService;
  13 +import com.genersoft.iot.vmp.service.bean.*;
  14 +import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  15 +import com.genersoft.iot.vmp.utils.redis.RedisUtil;
  16 +import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
  17 +import org.slf4j.Logger;
  18 +import org.slf4j.LoggerFactory;
  19 +import org.springframework.beans.factory.annotation.Autowired;
  20 +import org.springframework.data.redis.connection.Message;
  21 +import org.springframework.data.redis.connection.MessageListener;
  22 +import org.springframework.stereotype.Component;
  23 +
  24 +import javax.sip.InvalidArgumentException;
  25 +import javax.sip.SipException;
  26 +import java.text.ParseException;
  27 +import java.util.HashMap;
  28 +import java.util.Map;
  29 +import java.util.UUID;
  30 +import java.util.concurrent.ConcurrentHashMap;
  31 +
  32 +
  33 +/**
  34 + * 监听下级发送推送信息,并发送国标推流消息上级
  35 + * @author lin
  36 + */
  37 +@Component
  38 +public class RedisGbPlayMsgListener implements MessageListener {
  39 +
  40 + private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class);
  41 +
  42 + public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM";
  43 +
  44 + /**
  45 + * 流媒体不存在的错误玛
  46 + */
  47 + public static final int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1;
  48 +
  49 + /**
  50 + * 离线的错误玛
  51 + */
  52 + public static final int ERROR_CODE_OFFLINE = -2;
  53 +
  54 + /**
  55 + * 超时的错误玛
  56 + */
  57 + public static final int ERROR_CODE_TIMEOUT = -3;
  58 +
  59 + private Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>();
  60 + private Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>();
  61 + private Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>();
  62 +
  63 + @Autowired
  64 + private UserSetting userSetting;
  65 +
  66 + @Autowired
  67 + private RedisUtil redis;
  68 +
  69 + @Autowired
  70 + private ZLMMediaListManager zlmMediaListManager;
  71 +
  72 + @Autowired
  73 + private ZLMRTPServerFactory zlmrtpServerFactory;
  74 +
  75 + @Autowired
  76 + private IMediaServerService mediaServerService;
  77 +
  78 + @Autowired
  79 + private IRedisCatchStorage redisCatchStorage;
  80 +
  81 + @Autowired
  82 + private DynamicTask dynamicTask;
  83 +
  84 + @Autowired
  85 + private ZLMMediaListManager mediaListManager;
  86 +
  87 + @Autowired
  88 + private ZLMHttpHookSubscribe subscribe;
  89 +
  90 +
  91 + public interface PlayMsgCallback{
  92 + void handler(ResponseSendItemMsg responseSendItemMsg);
  93 + }
  94 +
  95 + public interface PlayMsgCallbackForStartSendRtpStream{
  96 + void handler(JSONObject jsonObject);
  97 + }
  98 +
  99 + public interface PlayMsgErrorCallback{
  100 + void handler(WVPResult wvpResult);
  101 + }
  102 +
  103 + @Override
  104 + public void onMessage(Message message, byte[] bytes) {
  105 + JSONObject msgJSON = JSON.parseObject(message.getBody(), JSONObject.class);
  106 + WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class);
  107 + if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
  108 + return;
  109 + }
  110 + if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
  111 + logger.info("[收到REDIS通知] 请求: {}", new String(message.getBody()));
  112 +
  113 + switch (wvpRedisMsg.getCmd()){
  114 + case WvpRedisMsgCmd.GET_SEND_ITEM:
  115 + RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class);
  116 + requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
  117 + break;
  118 + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
  119 + RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);;
  120 + requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
  121 + break;
  122 + default:
  123 + break;
  124 + }
  125 +
  126 + }else {
  127 + logger.info("[收到REDIS通知] 回复: {}", new String(message.getBody()));
  128 + switch (wvpRedisMsg.getCmd()){
  129 + case WvpRedisMsgCmd.GET_SEND_ITEM:
  130 +
  131 + WVPResult content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class);
  132 +
  133 + String key = wvpRedisMsg.getSerial();
  134 + switch (content.getCode()) {
  135 + case 0:
  136 + ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class);
  137 + PlayMsgCallback playMsgCallback = callbacks.get(key);
  138 + if (playMsgCallback != null) {
  139 + callbacksForError.remove(key);
  140 + playMsgCallback.handler(responseSendItemMsg);
  141 + }
  142 + break;
  143 + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
  144 + case ERROR_CODE_OFFLINE:
  145 + case ERROR_CODE_TIMEOUT:
  146 + PlayMsgErrorCallback errorCallback = callbacksForError.get(key);
  147 + if (errorCallback != null) {
  148 + callbacks.remove(key);
  149 + errorCallback.handler(content);
  150 + }
  151 + break;
  152 + default:
  153 + break;
  154 + }
  155 + break;
  156 + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
  157 + WVPResult wvpResult = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class);
  158 + String serial = wvpRedisMsg.getSerial();
  159 + switch (wvpResult.getCode()) {
  160 + case 0:
  161 + JSONObject jsonObject = (JSONObject)wvpResult.getData();
  162 + PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
  163 + if (playMsgCallback != null) {
  164 + callbacksForError.remove(serial);
  165 + playMsgCallback.handler(jsonObject);
  166 + }
  167 + break;
  168 + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
  169 + case ERROR_CODE_OFFLINE:
  170 + case ERROR_CODE_TIMEOUT:
  171 + PlayMsgErrorCallback errorCallback = callbacksForError.get(serial);
  172 + if (errorCallback != null) {
  173 + callbacks.remove(serial);
  174 + errorCallback.handler(wvpResult);
  175 + }
  176 + break;
  177 + default:
  178 + break;
  179 + }
  180 + break;
  181 + default:
  182 + break;
  183 + }
  184 + }
  185 +
  186 +
  187 +
  188 +
  189 + }
  190 +
  191 + /**
  192 + * 处理收到的请求推流的请求
  193 + */
  194 + private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) {
  195 + MediaServerItem mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId());
  196 + if (mediaInfo == null) {
  197 + // TODO 回复错误
  198 + return;
  199 + }
  200 + String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1";
  201 + Map<String, Object> param = new HashMap<>();
  202 + param.put("vhost","__defaultVhost__");
  203 + param.put("app",requestPushStreamMsg.getApp());
  204 + param.put("stream",requestPushStreamMsg.getStream());
  205 + param.put("ssrc", requestPushStreamMsg.getSsrc());
  206 + param.put("dst_url",requestPushStreamMsg.getIp());
  207 + param.put("dst_port", requestPushStreamMsg.getPort());
  208 + param.put("is_udp", is_Udp);
  209 + param.put("src_port", requestPushStreamMsg.getSrcPort());
  210 + param.put("pt", requestPushStreamMsg.getPt());
  211 + param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0");
  212 + param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0");
  213 + JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
  214 + // 回复消息
  215 + responsePushStream(jsonObject, fromId, serial);
  216 + }
  217 +
  218 + private void responsePushStream(JSONObject content, String toId, String serial) {
  219 +
  220 + WVPResult<JSONObject> result = new WVPResult<>();
  221 + result.setCode(0);
  222 + result.setData(content);
  223 +
  224 + WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
  225 + WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result);
  226 + JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
  227 + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
  228 + }
  229 +
  230 + /**
  231 + * 处理收到的请求sendItem的请求
  232 + */
  233 + private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) {
  234 + MediaServerItem mediaServerItem = mediaServerService.getOne(content.getMediaServerId());
  235 + if (mediaServerItem == null) {
  236 + logger.info("[回复推流信息] 流媒体{}不存在 ", content.getMediaServerId());
  237 +
  238 + WVPResult<SendRtpItem> result = new WVPResult<>();
  239 + result.setCode(ERROR_CODE_MEDIA_SERVER_NOT_FOUND);
  240 + result.setMsg("流媒体不存在");
  241 +
  242 + WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
  243 + WvpRedisMsgCmd.GET_SEND_ITEM, serial, result);
  244 +
  245 + JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
  246 + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
  247 + return;
  248 + }
  249 + // 确定流是否在线
  250 + boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream());
  251 + if (streamReady) {
  252 + logger.info("[回复推流信息] {}/{}", content.getApp(), content.getStream());
  253 + responseSendItem(mediaServerItem, content, toId, serial);
  254 + }else {
  255 + // 流已经离线
  256 + // 发送redis消息以使设备上线
  257 + logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流",content.getApp(), content.getStream());
  258 +
  259 + String taskKey = UUID.randomUUID().toString();
  260 + // 设置超时
  261 + dynamicTask.startDelay(taskKey, ()->{
  262 + logger.info("[ app={}, stream={} ] 等待设备开始推流超时", content.getApp(), content.getStream());
  263 + WVPResult<SendRtpItem> result = new WVPResult<>();
  264 + result.setCode(ERROR_CODE_TIMEOUT);
  265 + WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
  266 + userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result
  267 + );
  268 + JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
  269 + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
  270 + }, userSetting.getPlatformPlayTimeout());
  271 +
  272 + // 添加订阅
  273 + JSONObject subscribeKey = new JSONObject();
  274 + subscribeKey.put("app", content.getApp());
  275 + subscribeKey.put("stream", content.getStream());
  276 + subscribeKey.put("regist", true);
  277 + subscribeKey.put("schema", "rtmp");
  278 + subscribeKey.put("mediaServerId", mediaServerItem.getId());
  279 + subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
  280 + (MediaServerItem mediaServerItemInUse, JSONObject json)->{
  281 + dynamicTask.stop(taskKey);
  282 + responseSendItem(mediaServerItem, content, toId, serial);
  283 + });
  284 +
  285 + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(),
  286 + content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(),
  287 + content.getMediaServerId());
  288 + redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
  289 +
  290 + }
  291 + }
  292 +
  293 + /**
  294 + * 将获取到的sendItem发送出去
  295 + */
  296 + private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) {
  297 + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
  298 + content.getPort(), content.getSsrc(), content.getPlatformId(),
  299 + content.getApp(), content.getStream(), content.getChannelId(),
  300 + content.getTcp());
  301 +
  302 + WVPResult<ResponseSendItemMsg> result = new WVPResult<>();
  303 + result.setCode(0);
  304 + ResponseSendItemMsg responseSendItemMsg = new ResponseSendItemMsg();
  305 + responseSendItemMsg.setSendRtpItem(sendRtpItem);
  306 + responseSendItemMsg.setMediaServerItem(mediaServerItem);
  307 + result.setData(responseSendItemMsg);
  308 +
  309 + WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
  310 + userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result
  311 + );
  312 + JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
  313 + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
  314 + }
  315 +
  316 + /**
  317 + * 发送消息要求下级生成推流信息
  318 + * @param serverId 下级服务ID
  319 + * @param app 应用名
  320 + * @param stream 流ID
  321 + * @param ip 目标IP
  322 + * @param port 目标端口
  323 + * @param ssrc ssrc
  324 + * @param platformId 平台国标编号
  325 + * @param channelId 通道ID
  326 + * @param isTcp 是否使用TCP
  327 + * @param callback 得到信息的回调
  328 + */
  329 + public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc,
  330 + String platformId, String channelId, boolean isTcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) {
  331 + RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance(
  332 + serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, platformName);
  333 + requestSendItemMsg.setServerId(serverId);
  334 + String key = UUID.randomUUID().toString();
  335 + WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM,
  336 + key, requestSendItemMsg);
  337 +
  338 + JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
  339 + logger.info("[请求推流SendItem] {}: {}", serverId, jsonObject);
  340 + callbacks.put(key, callback);
  341 + callbacksForError.put(key, errorCallback);
  342 + dynamicTask.startDelay(key, ()->{
  343 + callbacks.remove(key);
  344 + callbacksForError.remove(key);
  345 + WVPResult<Object> wvpResult = new WVPResult<>();
  346 + wvpResult.setCode(ERROR_CODE_TIMEOUT);
  347 + wvpResult.setMsg("timeout");
  348 + errorCallback.handler(wvpResult);
  349 + }, userSetting.getPlatformPlayTimeout());
  350 + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
  351 + }
  352 +
  353 + /**
  354 + * 发送请求推流的消息
  355 + * @param param 推流参数
  356 + * @param callback 回调
  357 + */
  358 + public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) {
  359 + String key = UUID.randomUUID().toString();
  360 + WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
  361 + WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, param);
  362 +
  363 + JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
  364 + logger.info("[REDIS 请求其他平台推流] {}: {}", serverId, jsonObject);
  365 + dynamicTask.startDelay(key, ()->{
  366 + callbacksForStartSendRtpStream.remove(key);
  367 + callbacksForError.remove(key);
  368 + }, userSetting.getPlatformPlayTimeout());
  369 + callbacksForStartSendRtpStream.put(key, callback);
  370 + callbacksForError.put(key, (wvpResult)->{
  371 + logger.info("[REDIS 请求其他平台推流] 失败: {}", wvpResult.getMsg());
  372 + callbacksForStartSendRtpStream.remove(key);
  373 + callbacksForError.remove(key);
  374 + });
  375 + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
  376 + }
  377 +}
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java renamed to src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java
... ... @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.service.impl;
3 3 import com.alibaba.fastjson.JSON;
4 4 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
5 5 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  6 +import org.jetbrains.annotations.NotNull;
6 7 import org.slf4j.Logger;
7 8 import org.slf4j.LoggerFactory;
8 9 import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -10,17 +11,23 @@ import org.springframework.data.redis.connection.Message;
10 11 import org.springframework.data.redis.connection.MessageListener;
11 12 import org.springframework.stereotype.Component;
12 13  
  14 +/**
  15 + * 接收来自redis的GPS更新通知
  16 + * @author lin
  17 + */
13 18 @Component
14   -public class RedisGPSMsgListener implements MessageListener {
  19 +public class RedisGpsMsgListener implements MessageListener {
15 20  
16   - private final static Logger logger = LoggerFactory.getLogger(RedisGPSMsgListener.class);
  21 + private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class);
17 22  
18 23 @Autowired
19 24 private IRedisCatchStorage redisCatchStorage;
20 25  
21 26 @Override
22   - public void onMessage(Message message, byte[] bytes) {
23   - logger.info("收到来自REDIS的GPS通知: {}", new String(message.getBody()));
  27 + public void onMessage(@NotNull Message message, byte[] bytes) {
  28 + if (logger.isDebugEnabled()) {
  29 + logger.debug("收到来自REDIS的GPS通知: {}", new String(message.getBody()));
  30 + }
24 31 GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class);
25 32 redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
26 33 }
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java 0 → 100644
  1 +package com.genersoft.iot.vmp.service.impl;
  2 +
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.alibaba.fastjson.JSONObject;
  5 +import com.genersoft.iot.vmp.conf.UserSetting;
  6 +import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage;
  7 +import com.genersoft.iot.vmp.gb28181.bean.Device;
  8 +import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
  9 +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
  10 +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
  11 +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
  12 +import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
  13 +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
  14 +import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  15 +import com.genersoft.iot.vmp.utils.DateUtil;
  16 +import org.slf4j.Logger;
  17 +import org.slf4j.LoggerFactory;
  18 +import org.springframework.beans.factory.annotation.Autowired;
  19 +import org.springframework.data.redis.connection.Message;
  20 +import org.springframework.data.redis.connection.MessageListener;
  21 +import org.springframework.stereotype.Component;
  22 +
  23 +
  24 +/**
  25 + * @author lin
  26 + */
  27 +@Component
  28 +public class RedisStreamMsgListener implements MessageListener {
  29 +
  30 + private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class);
  31 +
  32 + @Autowired
  33 + private ISIPCommander commander;
  34 +
  35 + @Autowired
  36 + private ISIPCommanderForPlatform commanderForPlatform;
  37 +
  38 + @Autowired
  39 + private IVideoManagerStorage storage;
  40 +
  41 + @Autowired
  42 + private UserSetting userSetting;
  43 +
  44 + @Autowired
  45 + private ZLMMediaListManager zlmMediaListManager;
  46 +
  47 + @Override
  48 + public void onMessage(Message message, byte[] bytes) {
  49 +
  50 + JSONObject steamMsgJson = JSON.parseObject(message.getBody(), JSONObject.class);
  51 + if (steamMsgJson == null) {
  52 + logger.warn("[REDIS的ALARM通知]消息解析失败");
  53 + return;
  54 + }
  55 + String serverId = steamMsgJson.getString("serverId");
  56 +
  57 + if (userSetting.getServerId().equals(serverId)) {
  58 + // 自己发送的消息忽略即可
  59 + return;
  60 + }
  61 + logger.info("[REDIS通知] 流变化: {}", new String(message.getBody()));
  62 + String app = steamMsgJson.getString("app");
  63 + String stream = steamMsgJson.getString("stream");
  64 + boolean register = steamMsgJson.getBoolean("register");
  65 + String mediaServerId = steamMsgJson.getString("mediaServerId");
  66 + MediaItem mediaItem = new MediaItem();
  67 + mediaItem.setSeverId(serverId);
  68 + mediaItem.setApp(app);
  69 + mediaItem.setStream(stream);
  70 + mediaItem.setRegist(register);
  71 + mediaItem.setMediaServerId(mediaServerId);
  72 + mediaItem.setCreateStamp(System.currentTimeMillis()/1000);
  73 + mediaItem.setAliveSecond(0L);
  74 + mediaItem.setTotalReaderCount("0");
  75 + mediaItem.setOriginType(0);
  76 + mediaItem.setOriginTypeStr("0");
  77 + mediaItem.setOriginTypeStr("unknown");
  78 +
  79 + zlmMediaListManager.addPush(mediaItem);
  80 +
  81 +
  82 + }
  83 +}
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
... ... @@ -107,6 +107,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
107 107 streamPushItem.setStatus(true);
108 108 streamPushItem.setStreamType("push");
109 109 streamPushItem.setVhost(item.getVhost());
  110 + streamPushItem.setServerId(item.getSeverId());
110 111 return streamPushItem;
111 112 }
112 113  
... ...
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java
... ... @@ -357,6 +357,15 @@ public interface IVideoManagerStorage {
357 357  
358 358  
359 359 /**
  360 + * 获取但个推流
  361 + * @param app
  362 + * @param stream
  363 + * @return
  364 + */
  365 + StreamPushItem getMedia(String app, String stream);
  366 +
  367 +
  368 + /**
360 369 * 清空推流列表
361 370 */
362 371 void clearMediaList();
... ...
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
... ... @@ -14,9 +14,9 @@ import java.util.List;
14 14 public interface StreamPushMapper {
15 15  
16 16 @Insert("INSERT INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " +
17   - "createStamp, aliveSecond, mediaServerId) VALUES" +
  17 + "createStamp, aliveSecond, mediaServerId, serverId) VALUES" +
18 18 "('${app}', '${stream}', '${totalReaderCount}', '${originType}', '${originTypeStr}', " +
19   - "'${createStamp}', '${aliveSecond}', '${mediaServerId}' )")
  19 + "'${createStamp}', '${aliveSecond}', '${mediaServerId}' , '${serverId}' )")
20 20 int add(StreamPushItem streamPushItem);
21 21  
22 22 @Update("UPDATE stream_push " +
... ...
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
... ... @@ -587,11 +587,11 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
587 587 String scanKey = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetting.getServerId() + "_*";
588 588 List<GPSMsgInfo> result = new ArrayList<>();
589 589 List<Object> keys = redis.scan(scanKey);
590   - for (int i = 0; i < keys.size(); i++) {
591   - String key = (String) keys.get(i);
  590 + for (Object o : keys) {
  591 + String key = (String) o;
592 592 GPSMsgInfo gpsMsgInfo = (GPSMsgInfo) redis.get(key);
593 593 if (!gpsMsgInfo.isStored()) { // 只取没有存过得
594   - result.add((GPSMsgInfo)redis.get(key));
  594 + result.add((GPSMsgInfo) redis.get(key));
595 595 }
596 596 }
597 597  
... ... @@ -667,7 +667,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
667 667 @Override
668 668 public void sendStreamPushRequestedMsg(MessageForPushChannel msg) {
669 669 String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED;
670   - logger.info("[redis 推流被请求通知] {}: {}-{}", key, msg.getApp(), msg.getStream());
  670 + logger.info("[redis 推流被请求通知] {}: {}/{}", key, msg.getApp(), msg.getStream());
671 671 redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg));
672 672 }
673 673  
... ...
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
... ... @@ -885,6 +885,11 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
885 885 }
886 886  
887 887 @Override
  888 + public StreamPushItem getMedia(String app, String stream) {
  889 + return streamPushMapper.selectOne(app, stream);
  890 + }
  891 +
  892 + @Override
888 893 public void clearMediaList() {
889 894 streamPushMapper.clear();
890 895 }
... ...
web_src/src/components/dialog/rtcPlayer.vue
... ... @@ -7,11 +7,11 @@
7 7 </template>
8 8  
9 9 <script>
  10 +let webrtcPlayer = null;
10 11 export default {
11 12 name: 'rtcPlayer',
12 13 data() {
13 14 return {
14   - webrtcPlayer: null,
15 15 timer: null
16 16 };
17 17 },
... ... @@ -35,7 +35,7 @@ export default {
35 35 },
36 36 methods: {
37 37 play: function (url) {
38   - this.webrtcPlayer = new ZLMRTCClient.Endpoint({
  38 + webrtcPlayer = new ZLMRTCClient.Endpoint({
39 39 element: document.getElementById('webRtcPlayerBox'),// video 标签
40 40 debug: true,// 是否打印日志
41 41 zlmsdpUrl: url,//流地址
... ... @@ -45,17 +45,17 @@ export default {
45 45 videoEnable: false,
46 46 recvOnly: true,
47 47 })
48   - this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ICE_CANDIDATE_ERROR,(e)=>{// ICE 协商出错
  48 + webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ICE_CANDIDATE_ERROR,(e)=>{// ICE 协商出错
49 49 console.error('ICE 协商出错')
50 50 this.eventcallbacK("ICE ERROR", "ICE 协商出错")
51 51 });
52 52  
53   - this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_REMOTE_STREAMS,(e)=>{//获取到了远端流,可以播放
  53 + webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_REMOTE_STREAMS,(e)=>{//获取到了远端流,可以播放
54 54 console.error('播放成功',e.streams)
55 55 this.eventcallbacK("playing", "播放成功")
56 56 });
57 57  
58   - this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_OFFER_ANWSER_EXCHANGE_FAILED,(e)=>{// offer anwser 交换失败
  58 + webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_OFFER_ANWSER_EXCHANGE_FAILED,(e)=>{// offer anwser 交换失败
59 59 console.error('offer anwser 交换失败',e)
60 60 this.eventcallbacK("OFFER ANSWER ERROR ", "offer anwser 交换失败")
61 61 if (e.code ==-400 && e.msg=="流不存在"){
... ... @@ -68,7 +68,7 @@ export default {
68 68 }
69 69 });
70 70  
71   - this.webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_LOCAL_STREAM,(s)=>{// 获取到了本地流
  71 + webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_LOCAL_STREAM,(s)=>{// 获取到了本地流
72 72  
73 73 // document.getElementById('selfVideo').srcObject=s;
74 74 this.eventcallbacK("LOCAL STREAM", "获取到了本地流")
... ... @@ -76,9 +76,9 @@ export default {
76 76  
77 77 },
78 78 pause: function () {
79   - if (this.webrtcPlayer != null) {
80   - this.webrtcPlayer.close();
81   - this.webrtcPlayer = null;
  79 + if (webrtcPlayer != null) {
  80 + webrtcPlayer.close();
  81 + webrtcPlayer = null;
82 82 }
83 83  
84 84 },
... ...