Commit 1458014fe304e6a492a66c9a7b69600d47efc1d8

Authored by 648540858
1 parent c014a90c

修复合并主线后语音对讲失败的问题

src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
... ... @@ -3,8 +3,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
3 3 import com.alibaba.fastjson2.JSONObject;
4 4 import com.genersoft.iot.vmp.conf.DynamicTask;
5 5 import com.genersoft.iot.vmp.conf.UserSetting;
6   -import com.genersoft.iot.vmp.conf.UserSetting;
7   -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
8 6 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
9 7 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
10 8 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
... ... @@ -16,7 +14,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
16 14 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
17 15 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
18 16 import com.genersoft.iot.vmp.service.IMediaServerService;
19   -import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
20 17 import com.genersoft.iot.vmp.service.IPlayService;
21 18 import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
22 19 import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
... ... @@ -81,9 +78,6 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
81 78 private RedisGbPlayMsgListener redisGbPlayMsgListener;
82 79  
83 80 @Autowired
84   - private UserSetting userSetting;
85   -
86   - @Autowired
87 81 private IPlayService playService;
88 82  
89 83  
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
... ... @@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.conf.UserSetting;
6 6 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
7 7 import com.genersoft.iot.vmp.gb28181.bean.*;
8 8 import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
9   -import com.genersoft.iot.vmp.gb28181.bean.*;
10 9 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
11 10 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
12 11 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
... ... @@ -16,10 +15,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor
16 15 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
17 16 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
18 17 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
19   -import com.genersoft.iot.vmp.service.IDeviceService;
20   -import com.genersoft.iot.vmp.service.IInviteStreamService;
21   -import com.genersoft.iot.vmp.service.IMediaServerService;
22   -import com.genersoft.iot.vmp.service.IPlayService;
23 18 import com.genersoft.iot.vmp.service.*;
24 19 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
25 20 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
... ... @@ -38,7 +33,6 @@ import javax.sip.header.CallIdHeader;
38 33 import javax.sip.message.Response;
39 34 import java.text.ParseException;
40 35 import java.util.HashMap;
41   -import java.util.List;
42 36 import java.util.Map;
43 37  
44 38 /**
... ... @@ -164,53 +158,54 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
164 158 }
165 159 }
166 160  
167   -
168   -
169 161 // 可能是设备发送的停止
170 162 SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(null, null, callIdHeader.getCallId(), null);
171   - if (ssrcTransaction == null) {
  163 + if (ssrcTransaction == null && sendRtpItem == null) {
172 164 logger.info("[收到bye] 但是无法获取推流信息和发流信息,忽略此请求");
173 165 logger.info(request.toString());
174 166 return;
175 167 }
176   - logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
  168 + if (ssrcTransaction != null) {
  169 + logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
177 170  
178   - Device device = deviceService.getDevice(ssrcTransaction.getDeviceId());
179   - if (device == null) {
180   - logger.info("[收到bye] 未找到设备:{} ", ssrcTransaction.getDeviceId());
181   - return;
182   - }
183   - DeviceChannel channel = channelService.getOne(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
184   - if (channel == null) {
185   - logger.info("[收到bye] 未找到通道,设备:{}, 通道:{}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
186   - return;
187   - }
188   - storager.stopPlay(device.getDeviceId(), channel.getChannelId());
189   - InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId());
190   - if (inviteInfo != null) {
191   - inviteStreamService.removeInviteInfo(inviteInfo);
192   - if (inviteInfo.getStreamInfo() != null) {
193   - mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStreamInfo().getStream());
  171 + Device device = deviceService.getDevice(ssrcTransaction.getDeviceId());
  172 + if (device == null) {
  173 + logger.info("[收到bye] 未找到设备:{} ", ssrcTransaction.getDeviceId());
  174 + return;
194 175 }
195   - }
196   - // 释放ssrc
197   - MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId());
198   - if (mediaServerItem != null) {
199   - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc());
200   - }
201   - streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcTransaction.getStream());
202   - if (ssrcTransaction.getType() == InviteSessionType.BROADCAST) {
203   - // 查找来源的对讲设备,发送停止
204   - Device sourceDevice = storager.queryVideoDeviceByPlatformIdAndChannelId(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
205   - if (sourceDevice != null) {
206   - playService.stopAudioBroadcast(sourceDevice.getDeviceId(), channel.getChannelId());
  176 + DeviceChannel channel = channelService.getOne(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
  177 + if (channel == null) {
  178 + logger.info("[收到bye] 未找到通道,设备:{}, 通道:{}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
  179 + return;
  180 + }
  181 + storager.stopPlay(device.getDeviceId(), channel.getChannelId());
  182 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId());
  183 + if (inviteInfo != null) {
  184 + inviteStreamService.removeInviteInfo(inviteInfo);
  185 + if (inviteInfo.getStreamInfo() != null) {
  186 + mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStreamInfo().getStream());
  187 + }
  188 + }
  189 + // 释放ssrc
  190 + MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId());
  191 + if (mediaServerItem != null) {
  192 + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc());
  193 + }
  194 + streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcTransaction.getStream());
  195 + if (ssrcTransaction.getType() == InviteSessionType.BROADCAST) {
  196 + // 查找来源的对讲设备,发送停止
  197 + Device sourceDevice = storager.queryVideoDeviceByPlatformIdAndChannelId(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
  198 + if (sourceDevice != null) {
  199 + playService.stopAudioBroadcast(sourceDevice.getDeviceId(), channel.getChannelId());
  200 + }
  201 + }
  202 + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(ssrcTransaction.getDeviceId(), channel.getChannelId());
  203 + if (audioBroadcastCatch != null) {
  204 + // 来自上级平台的停止对讲
  205 + logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", ssrcTransaction.getDeviceId(), channel.getChannelId());
  206 + audioBroadcastManager.del(ssrcTransaction.getDeviceId(), channel.getChannelId());
207 207 }
208 208 }
209   - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(ssrcTransaction.getDeviceId(), channel.getChannelId());
210   - if (audioBroadcastCatch != null) {
211   - // 来自上级平台的停止对讲
212   - logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", ssrcTransaction.getDeviceId(), channel.getChannelId());
213   - audioBroadcastManager.del(ssrcTransaction.getDeviceId(), channel.getChannelId());
214   - }
  209 +
215 210 }
216 211 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
... ... @@ -1030,10 +1030,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
1030 1030 }
1031 1031 logger.info("设备{}请求语音流, 收流地址:{}:{},ssrc:{}, {}, 对讲方式:{}", requesterId, addressStr, port, ssrc,
1032 1032 mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP", sdp.getSessionName().getValue());
1033   -
  1033 + CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
1034 1034 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
1035 1035 device.getDeviceId(), broadcastCatch.getChannelId(),
1036   - mediaTransmissionTCP, false);
  1036 + mediaTransmissionTCP, false, ssrcFromCallback -> {
  1037 + return redisCatchStorage.querySendRTPServer(requesterId, channelId, null, callIdHeader.getCallId()) != null;
  1038 + });
1037 1039  
1038 1040 if (sendRtpItem == null) {
1039 1041 logger.warn("服务器端口资源不足");
... ... @@ -1048,7 +1050,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
1048 1050 }
1049 1051  
1050 1052  
1051   - CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
  1053 +
1052 1054 sendRtpItem.setPlayType(InviteStreamType.BROADCAST);
1053 1055 sendRtpItem.setCallId(callIdHeader.getCallId());
1054 1056 sendRtpItem.setPlatformId(requesterId);
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
... ... @@ -11,9 +11,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
11 11 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
12 12 import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
13 13 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam;
14   -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
15   -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
16   -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
17 14 import org.slf4j.Logger;
18 15 import org.slf4j.LoggerFactory;
19 16 import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -309,7 +306,7 @@ public class ZLMRTPServerFactory {
309 306 localPort = jsonObject.getInteger("port");
310 307 HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
311 308 // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
312   - Integer finalLocalPort = localPort;
  309 + int finalLocalPort = localPort;
313 310 hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
314 311 (MediaServerItem mediaServerItem, HookParam hookParam)->{
315 312 logger.info("[上级点播] {}->监听端口到期继续保持监听: {}", ssrc, finalLocalPort);
... ... @@ -324,7 +321,7 @@ public class ZLMRTPServerFactory {
324 321 }
325 322 }
326 323 });
327   - logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort);
  324 + logger.info("[上级点播] {}->: {}", ssrc, localPort);
328 325 return localPort;
329 326 }else {
330 327 logger.info("[上级点播] 监听端口失败: {}->{}", ssrc, localPort);
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
... ... @@ -235,8 +235,10 @@ public class PlayServiceImpl implements IPlayService {
235 235 sendRtpItem.setUsePs(false);
236 236 sendRtpItem.setReceiveStream(stream + "_talk");
237 237  
238   -
239   - int port = zlmrtpServerFactory.keepPort(mediaServerItem, playSsrc, null);
  238 + String callId = SipUtils.getNewCallId();
  239 + int port = zlmrtpServerFactory.keepPort(mediaServerItem, playSsrc, 0, ssrcFromCallback ->{
  240 + return redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, callId) != null;
  241 + });
240 242 //端口获取失败的ssrcInfo 没有必要发送点播指令
241 243 if (port <= 0) {
242 244 logger.info("[语音对讲] 端口分配异常,deviceId={},channelId={}", device.getDeviceId(), channelId);
... ... @@ -264,7 +266,7 @@ public class PlayServiceImpl implements IPlayService {
264 266 }
265 267 }, userSetting.getPlayTimeout());
266 268  
267   - String callId = SipUtils.getNewCallId();
  269 +
268 270  
269 271 zlmrtpServerFactory.releasePort(mediaServerItem, playSsrc);
270 272 Map<String, Object> param = new HashMap<>(12);
... ...
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
... ... @@ -80,7 +80,7 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
80 80 for (SendRtpItem sendRtpItem : sendRtpItems) {
81 81 ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
82 82 // 停止向上级推流
83   - String streamId = sendRtpItem.getStreamId();
  83 + String streamId = sendRtpItem.getStream();
84 84 Map<String, Object> param = new HashMap<>();
85 85 param.put("vhost","__defaultVhost__");
86 86 param.put("app",sendRtpItem.getApp());
... ... @@ -88,7 +88,7 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
88 88 param.put("ssrc",sendRtpItem.getSsrc());
89 89 logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId);
90 90 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
91   - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId());
  91 + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream());
92 92 zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
93 93  
94 94 try {
... ... @@ -98,7 +98,7 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
98 98 }
99 99 if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
100 100 MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
101   - sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
  101 + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
102 102 sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
103 103 messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
104 104 redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
... ...