Commit f8f65d473bec182abeecd6fd17a9d4c4c4cfc7c5
1 parent
65fa75fb
优化语音广播流程
Showing
8 changed files
with
158 additions
and
60 deletions
src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java
| 1 | package com.genersoft.iot.vmp.gb28181.bean; | 1 | package com.genersoft.iot.vmp.gb28181.bean; |
| 2 | 2 | ||
| 3 | 3 | ||
| 4 | +import gov.nist.javax.sip.message.SIPRequest; | ||
| 5 | +import gov.nist.javax.sip.stack.SIPDialog; | ||
| 6 | + | ||
| 7 | +import javax.sip.Dialog; | ||
| 8 | + | ||
| 4 | /** | 9 | /** |
| 5 | * 缓存语音广播的状态 | 10 | * 缓存语音广播的状态 |
| 6 | * @author lin | 11 | * @author lin |
| @@ -32,6 +37,16 @@ public class AudioBroadcastCatch { | @@ -32,6 +37,16 @@ public class AudioBroadcastCatch { | ||
| 32 | */ | 37 | */ |
| 33 | private AudioBroadcastCatchStatus status; | 38 | private AudioBroadcastCatchStatus status; |
| 34 | 39 | ||
| 40 | + /** | ||
| 41 | + * 请求信息 | ||
| 42 | + */ | ||
| 43 | + private SIPRequest request; | ||
| 44 | + | ||
| 45 | + /** | ||
| 46 | + * 会话信息 | ||
| 47 | + */ | ||
| 48 | + private SIPDialog dialog; | ||
| 49 | + | ||
| 35 | 50 | ||
| 36 | public String getDeviceId() { | 51 | public String getDeviceId() { |
| 37 | return deviceId; | 52 | return deviceId; |
| @@ -56,4 +71,20 @@ public class AudioBroadcastCatch { | @@ -56,4 +71,20 @@ public class AudioBroadcastCatch { | ||
| 56 | public void setStatus(AudioBroadcastCatchStatus status) { | 71 | public void setStatus(AudioBroadcastCatchStatus status) { |
| 57 | this.status = status; | 72 | this.status = status; |
| 58 | } | 73 | } |
| 74 | + | ||
| 75 | + public void setDialog(SIPDialog dialog) { | ||
| 76 | + this.dialog = dialog; | ||
| 77 | + } | ||
| 78 | + | ||
| 79 | + public SIPDialog getDialog() { | ||
| 80 | + return dialog; | ||
| 81 | + } | ||
| 82 | + | ||
| 83 | + public SIPRequest getRequest() { | ||
| 84 | + return request; | ||
| 85 | + } | ||
| 86 | + | ||
| 87 | + public void setRequest(SIPRequest request) { | ||
| 88 | + this.request = request; | ||
| 89 | + } | ||
| 59 | } | 90 | } |
src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java
| 1 | package com.genersoft.iot.vmp.gb28181.session; | 1 | package com.genersoft.iot.vmp.gb28181.session; |
| 2 | 2 | ||
| 3 | +import com.genersoft.iot.vmp.conf.SipConfig; | ||
| 3 | import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; | 4 | import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; |
| 5 | +import org.springframework.beans.factory.annotation.Autowired; | ||
| 4 | import org.springframework.stereotype.Component; | 6 | import org.springframework.stereotype.Component; |
| 5 | 7 | ||
| 6 | -import java.util.ArrayList; | ||
| 7 | -import java.util.Collection; | ||
| 8 | -import java.util.List; | ||
| 9 | -import java.util.Map; | 8 | +import java.util.*; |
| 10 | import java.util.concurrent.ConcurrentHashMap; | 9 | import java.util.concurrent.ConcurrentHashMap; |
| 10 | +import java.util.stream.Collectors; | ||
| 11 | +import java.util.stream.Stream; | ||
| 11 | 12 | ||
| 12 | /** | 13 | /** |
| 13 | * 语音广播消息管理类 | 14 | * 语音广播消息管理类 |
| @@ -16,6 +17,9 @@ import java.util.concurrent.ConcurrentHashMap; | @@ -16,6 +17,9 @@ import java.util.concurrent.ConcurrentHashMap; | ||
| 16 | @Component | 17 | @Component |
| 17 | public class AudioBroadcastManager { | 18 | public class AudioBroadcastManager { |
| 18 | 19 | ||
| 20 | + @Autowired | ||
| 21 | + private SipConfig config; | ||
| 22 | + | ||
| 19 | public static Map<String, AudioBroadcastCatch> data = new ConcurrentHashMap<>(); | 23 | public static Map<String, AudioBroadcastCatch> data = new ConcurrentHashMap<>(); |
| 20 | 24 | ||
| 21 | public void add(AudioBroadcastCatch audioBroadcastCatch) { | 25 | public void add(AudioBroadcastCatch audioBroadcastCatch) { |
| @@ -54,6 +58,16 @@ public class AudioBroadcastManager { | @@ -54,6 +58,16 @@ public class AudioBroadcastManager { | ||
| 54 | } | 58 | } |
| 55 | 59 | ||
| 56 | public AudioBroadcastCatch get(String deviceId, String channelId) { | 60 | public AudioBroadcastCatch get(String deviceId, String channelId) { |
| 57 | - return data.get(deviceId + channelId); | 61 | + AudioBroadcastCatch audioBroadcastCatch = data.get(deviceId + channelId); |
| 62 | + if (audioBroadcastCatch == null) { | ||
| 63 | + Stream<AudioBroadcastCatch> allAudioBroadcastCatchStreamForDevice = data.values().stream().filter( | ||
| 64 | + audioBroadcastCatchItem -> Objects.equals(audioBroadcastCatchItem.getDeviceId(), deviceId)); | ||
| 65 | + List<AudioBroadcastCatch> audioBroadcastCatchList = allAudioBroadcastCatchStreamForDevice.collect(Collectors.toList()); | ||
| 66 | + if (audioBroadcastCatchList.size() == 1 && Objects.equals(config.getId(), channelId)) { | ||
| 67 | + audioBroadcastCatch = audioBroadcastCatchList.get(0); | ||
| 68 | + } | ||
| 69 | + } | ||
| 70 | + | ||
| 71 | + return audioBroadcastCatch; | ||
| 58 | } | 72 | } |
| 59 | } | 73 | } |
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
| @@ -18,6 +18,8 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; | @@ -18,6 +18,8 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; | ||
| 18 | import com.genersoft.iot.vmp.service.IMediaServerService; | 18 | import com.genersoft.iot.vmp.service.IMediaServerService; |
| 19 | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; | 19 | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; |
| 20 | import com.genersoft.iot.vmp.storager.IVideoManagerStorage; | 20 | import com.genersoft.iot.vmp.storager.IVideoManagerStorage; |
| 21 | +import gov.nist.javax.sip.message.SIPRequest; | ||
| 22 | +import gov.nist.javax.sip.stack.SIPDialog; | ||
| 21 | import org.ehcache.shadow.org.terracotta.offheapstore.storage.IntegerStorageEngine; | 23 | import org.ehcache.shadow.org.terracotta.offheapstore.storage.IntegerStorageEngine; |
| 22 | import org.slf4j.Logger; | 24 | import org.slf4j.Logger; |
| 23 | import org.slf4j.LoggerFactory; | 25 | import org.slf4j.LoggerFactory; |
| @@ -28,15 +30,18 @@ import org.springframework.stereotype.Component; | @@ -28,15 +30,18 @@ import org.springframework.stereotype.Component; | ||
| 28 | import javax.sip.Dialog; | 30 | import javax.sip.Dialog; |
| 29 | import javax.sip.DialogState; | 31 | import javax.sip.DialogState; |
| 30 | import javax.sip.RequestEvent; | 32 | import javax.sip.RequestEvent; |
| 33 | +import javax.sip.SipException; | ||
| 31 | import javax.sip.address.SipURI; | 34 | import javax.sip.address.SipURI; |
| 32 | import javax.sip.header.CallIdHeader; | 35 | import javax.sip.header.CallIdHeader; |
| 33 | import javax.sip.header.FromHeader; | 36 | import javax.sip.header.FromHeader; |
| 34 | import javax.sip.header.HeaderAddress; | 37 | import javax.sip.header.HeaderAddress; |
| 35 | import javax.sip.header.ToHeader; | 38 | import javax.sip.header.ToHeader; |
| 39 | +import java.text.ParseException; | ||
| 36 | import java.util.*; | 40 | import java.util.*; |
| 37 | 41 | ||
| 38 | /** | 42 | /** |
| 39 | * SIP命令类型: ACK请求 | 43 | * SIP命令类型: ACK请求 |
| 44 | + * @author lin | ||
| 40 | */ | 45 | */ |
| 41 | @Component | 46 | @Component |
| 42 | public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { | 47 | public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { |
| @@ -96,8 +101,8 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In | @@ -96,8 +101,8 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In | ||
| 96 | ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId); | 101 | ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId); |
| 97 | // 取消设置的超时任务 | 102 | // 取消设置的超时任务 |
| 98 | dynamicTask.stop(callIdHeader.getCallId()); | 103 | dynamicTask.stop(callIdHeader.getCallId()); |
| 99 | - String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); | ||
| 100 | - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); | 104 | +// String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); |
| 105 | + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, null, null, callIdHeader.getCallId()); | ||
| 101 | String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; | 106 | String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; |
| 102 | MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); | 107 | MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); |
| 103 | logger.info("收到ACK,开始向上级推流 rtp/{}", sendRtpItem.getStreamId()); | 108 | logger.info("收到ACK,开始向上级推流 rtp/{}", sendRtpItem.getStreamId()); |
| @@ -121,7 +126,14 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In | @@ -121,7 +126,14 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In | ||
| 121 | } else { | 126 | } else { |
| 122 | logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param)); | 127 | logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param)); |
| 123 | if (sendRtpItem.isOnlyAudio()) { | 128 | if (sendRtpItem.isOnlyAudio()) { |
| 124 | - // TODO 可能是语音对讲 | 129 | + // 语音对讲 |
| 130 | + try { | ||
| 131 | + cmder.streamByeCmd((SIPDialog) evt.getDialog(), (SIPRequest)evt.getRequest(), null); | ||
| 132 | + } catch (SipException e) { | ||
| 133 | + throw new RuntimeException(e); | ||
| 134 | + } catch (ParseException e) { | ||
| 135 | + throw new RuntimeException(e); | ||
| 136 | + } | ||
| 125 | }else { | 137 | }else { |
| 126 | // 向上级平台 | 138 | // 向上级平台 |
| 127 | commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); | 139 | commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); |
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
| @@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP | @@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP | ||
| 13 | import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; | 13 | import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; |
| 14 | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; | 14 | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; |
| 15 | import com.genersoft.iot.vmp.service.IMediaServerService; | 15 | import com.genersoft.iot.vmp.service.IMediaServerService; |
| 16 | +import com.genersoft.iot.vmp.service.IPlayService; | ||
| 16 | import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; | 17 | import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; |
| 17 | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; | 18 | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; |
| 18 | import com.genersoft.iot.vmp.storager.IVideoManagerStorage; | 19 | import com.genersoft.iot.vmp.storager.IVideoManagerStorage; |
| @@ -65,6 +66,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In | @@ -65,6 +66,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In | ||
| 65 | @Autowired | 66 | @Autowired |
| 66 | private VideoStreamSessionManager streamSession; | 67 | private VideoStreamSessionManager streamSession; |
| 67 | 68 | ||
| 69 | + @Autowired | ||
| 70 | + private IPlayService playService; | ||
| 71 | + | ||
| 68 | @Override | 72 | @Override |
| 69 | public void afterPropertiesSet() throws Exception { | 73 | public void afterPropertiesSet() throws Exception { |
| 70 | // 添加消息处理的订阅 | 74 | // 添加消息处理的订阅 |
| @@ -106,6 +110,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In | @@ -106,6 +110,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In | ||
| 106 | if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { | 110 | if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { |
| 107 | cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null); | 111 | cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null); |
| 108 | } | 112 | } |
| 113 | + if (sendRtpItem.isOnlyAudio()) { | ||
| 114 | + playService.stopAudioBroadcast(sendRtpItem.getDeviceId(), channelId); | ||
| 115 | + } | ||
| 109 | if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { | 116 | if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { |
| 110 | MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); | 117 | MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); |
| 111 | messageForPushChannel.setType(0); | 118 | messageForPushChannel.setType(0); |
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
| @@ -114,6 +114,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | @@ -114,6 +114,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | ||
| 114 | private SipConfig config; | 114 | private SipConfig config; |
| 115 | 115 | ||
| 116 | 116 | ||
| 117 | + | ||
| 117 | @Override | 118 | @Override |
| 118 | public void afterPropertiesSet() throws Exception { | 119 | public void afterPropertiesSet() throws Exception { |
| 119 | // 添加消息处理的订阅 | 120 | // 添加消息处理的订阅 |
| @@ -492,7 +493,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | @@ -492,7 +493,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | ||
| 492 | gbStream.getApp(), gbStream.getStream(), channelId, | 493 | gbStream.getApp(), gbStream.getStream(), channelId, |
| 493 | mediaTransmissionTCP); | 494 | mediaTransmissionTCP); |
| 494 | 495 | ||
| 495 | - | ||
| 496 | if (sendRtpItem == null) { | 496 | if (sendRtpItem == null) { |
| 497 | logger.warn("服务器端口资源不足"); | 497 | logger.warn("服务器端口资源不足"); |
| 498 | responseAck(evt, Response.BUSY_HERE); | 498 | responseAck(evt, Response.BUSY_HERE); |
| @@ -562,25 +562,16 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | @@ -562,25 +562,16 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | ||
| 562 | } | 562 | } |
| 563 | } | 563 | } |
| 564 | 564 | ||
| 565 | - public void inviteFromDeviceHandle(RequestEvent evt, String requesterId, String channelId) throws InvalidArgumentException, ParseException, SipException, SdpException { | ||
| 566 | - | ||
| 567 | - // 兼容奇葩的海康这里使用的不是通道编号而是本平台编号 | ||
| 568 | -// if (channelId.equals(config.getId())) { | ||
| 569 | -// List<AudioBroadcastCatch> all = audioBroadcastManager.getAll(); | ||
| 570 | -// for (AudioBroadcastCatch audioBroadcastCatch : all) { | ||
| 571 | -// if (audioBroadcastCatch.getDeviceId().equals(requesterId)) { | ||
| 572 | -// channelId = audioBroadcastCatch.getChannelId(); | ||
| 573 | -// } | ||
| 574 | -// } | ||
| 575 | -// } | ||
| 576 | -// // 兼容失败 | ||
| 577 | -// if (channelId.equals(config.getId())) { | ||
| 578 | -// responseAck(evt, Response.BAD_REQUEST); | ||
| 579 | -// return; | ||
| 580 | -// } | 565 | + public void inviteFromDeviceHandle(RequestEvent evt, String requesterId, String channelId1) throws InvalidArgumentException, ParseException, SipException, SdpException { |
| 566 | + | ||
| 581 | // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) | 567 | // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) |
| 582 | Device device = redisCatchStorage.getDevice(requesterId); | 568 | Device device = redisCatchStorage.getDevice(requesterId); |
| 583 | - | 569 | + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(requesterId, channelId1); |
| 570 | + if (audioBroadcastCatch == null) { | ||
| 571 | + logger.warn("来自设备的Invite请求非语音广播,已忽略"); | ||
| 572 | + responseAck(evt, Response.FORBIDDEN); | ||
| 573 | + return; | ||
| 574 | + } | ||
| 584 | Request request = evt.getRequest(); | 575 | Request request = evt.getRequest(); |
| 585 | if (device != null) { | 576 | if (device != null) { |
| 586 | logger.info("收到设备" + requesterId + "的语音广播Invite请求"); | 577 | logger.info("收到设备" + requesterId + "的语音广播Invite请求"); |
| @@ -606,7 +597,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | @@ -606,7 +597,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | ||
| 606 | 597 | ||
| 607 | // 查看是否支持PS 负载96 | 598 | // 查看是否支持PS 负载96 |
| 608 | int port = -1; | 599 | int port = -1; |
| 609 | - //boolean recvonly = false; | ||
| 610 | boolean mediaTransmissionTCP = false; | 600 | boolean mediaTransmissionTCP = false; |
| 611 | Boolean tcpActive = null; | 601 | Boolean tcpActive = null; |
| 612 | for (int i = 0; i < mediaDescriptions.size(); i++) { | 602 | for (int i = 0; i < mediaDescriptions.size(); i++) { |
| @@ -638,7 +628,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | @@ -638,7 +628,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | ||
| 638 | responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 | 628 | responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 |
| 639 | return; | 629 | return; |
| 640 | } | 630 | } |
| 641 | - String sessionName = sdp.getSessionName().getValue(); | ||
| 642 | String addressStr = sdp.getOrigin().getAddress(); | 631 | String addressStr = sdp.getOrigin().getAddress(); |
| 643 | logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", requesterId, addressStr, port, ssrc); | 632 | logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", requesterId, addressStr, port, ssrc); |
| 644 | 633 | ||
| @@ -649,20 +638,19 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | @@ -649,20 +638,19 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | ||
| 649 | return; | 638 | return; |
| 650 | } | 639 | } |
| 651 | SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, | 640 | SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, |
| 652 | - device.getDeviceId(), channelId, | 641 | + device.getDeviceId(), audioBroadcastCatch.getChannelId(), |
| 653 | mediaTransmissionTCP); | 642 | mediaTransmissionTCP); |
| 654 | - sendRtpItem.setTcp(mediaTransmissionTCP); | ||
| 655 | - if (tcpActive != null) { | ||
| 656 | - sendRtpItem.setTcpActive(tcpActive); | ||
| 657 | - } | ||
| 658 | if (sendRtpItem == null) { | 643 | if (sendRtpItem == null) { |
| 659 | logger.warn("服务器端口资源不足"); | 644 | logger.warn("服务器端口资源不足"); |
| 660 | responseAck(evt, Response.BUSY_HERE); | 645 | responseAck(evt, Response.BUSY_HERE); |
| 661 | return; | 646 | return; |
| 662 | } | 647 | } |
| 663 | - | 648 | + sendRtpItem.setTcp(mediaTransmissionTCP); |
| 649 | + if (tcpActive != null) { | ||
| 650 | + sendRtpItem.setTcpActive(tcpActive); | ||
| 651 | + } | ||
| 664 | String app = "broadcast"; | 652 | String app = "broadcast"; |
| 665 | - String stream = device.getDeviceId() + "_" + channelId; | 653 | + String stream = device.getDeviceId() + "_" + audioBroadcastCatch.getChannelId(); |
| 666 | 654 | ||
| 667 | CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); | 655 | CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); |
| 668 | sendRtpItem.setPlayType(InviteStreamType.PLAY); | 656 | sendRtpItem.setPlayType(InviteStreamType.PLAY); |
| @@ -685,12 +673,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | @@ -685,12 +673,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | ||
| 685 | subscribeKey.put("schema", "rtmp"); | 673 | subscribeKey.put("schema", "rtmp"); |
| 686 | subscribeKey.put("mediaServerId", mediaServerItem.getId()); | 674 | subscribeKey.put("mediaServerId", mediaServerItem.getId()); |
| 687 | String finalSsrc = ssrc; | 675 | String finalSsrc = ssrc; |
| 688 | - String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + channelId; | ||
| 689 | - | ||
| 690 | // 流已经存在时直接推流 | 676 | // 流已经存在时直接推流 |
| 691 | if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) { | 677 | if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) { |
| 692 | logger.info("发现已经在推流"); | 678 | logger.info("发现已经在推流"); |
| 693 | - dynamicTask.stop(waiteStreamTimeoutTaskKey); | ||
| 694 | sendRtpItem.setStatus(2); | 679 | sendRtpItem.setStatus(2); |
| 695 | redisCatchStorage.updateSendRTPSever(sendRtpItem); | 680 | redisCatchStorage.updateSendRTPSever(sendRtpItem); |
| 696 | StringBuffer content = new StringBuffer(200); | 681 | StringBuffer content = new StringBuffer(200); |
| @@ -711,6 +696,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | @@ -711,6 +696,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | ||
| 711 | parentPlatform.setServerGBId(device.getDeviceId()); | 696 | parentPlatform.setServerGBId(device.getDeviceId()); |
| 712 | try { | 697 | try { |
| 713 | responseSdpAck(evt, content.toString(), parentPlatform); | 698 | responseSdpAck(evt, content.toString(), parentPlatform); |
| 699 | + Dialog dialog = evt.getDialog(); | ||
| 700 | + audioBroadcastCatch.setDialog((SIPDialog) dialog); | ||
| 701 | + audioBroadcastCatch.setRequest((SIPRequest) request); | ||
| 702 | + audioBroadcastManager.update(audioBroadcastCatch); | ||
| 714 | } catch (SipException e) { | 703 | } catch (SipException e) { |
| 715 | throw new RuntimeException(e); | 704 | throw new RuntimeException(e); |
| 716 | } catch (InvalidArgumentException e) { | 705 | } catch (InvalidArgumentException e) { |
| @@ -721,20 +710,17 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | @@ -721,20 +710,17 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | ||
| 721 | }else { | 710 | }else { |
| 722 | // 流不存在时监听流上线 | 711 | // 流不存在时监听流上线 |
| 723 | // 设置等待推流的超时; 默认20s | 712 | // 设置等待推流的超时; 默认20s |
| 713 | + String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + audioBroadcastCatch.getChannelId(); | ||
| 724 | dynamicTask.startDelay(waiteStreamTimeoutTaskKey, ()->{ | 714 | dynamicTask.startDelay(waiteStreamTimeoutTaskKey, ()->{ |
| 725 | logger.info("等待推流超时: {}/{}", app, stream); | 715 | logger.info("等待推流超时: {}/{}", app, stream); |
| 726 | - if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { | ||
| 727 | - audioBroadcastManager.del(device.getDeviceId(), channelId); | ||
| 728 | - }else { | ||
| 729 | - // 兼容海康使用了错误的通道ID的情况 | ||
| 730 | - audioBroadcastManager.delByDeviceId(device.getDeviceId()); | ||
| 731 | - } | ||
| 732 | - | 716 | + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); |
| 733 | // 发送bye | 717 | // 发送bye |
| 734 | try { | 718 | try { |
| 735 | - cmder.streamByeCmd((SIPDialog)evt.getServerTransaction().getDialog(), (SIPRequest) evt.getRequest(), null); | 719 | + responseAck(evt, Response.BUSY_HERE); |
| 736 | } catch (SipException e) { | 720 | } catch (SipException e) { |
| 737 | throw new RuntimeException(e); | 721 | throw new RuntimeException(e); |
| 722 | + } catch (InvalidArgumentException e) { | ||
| 723 | + throw new RuntimeException(e); | ||
| 738 | } catch (ParseException e) { | 724 | } catch (ParseException e) { |
| 739 | throw new RuntimeException(e); | 725 | throw new RuntimeException(e); |
| 740 | } | 726 | } |
| @@ -743,10 +729,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | @@ -743,10 +729,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | ||
| 743 | subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, | 729 | subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, |
| 744 | (MediaServerItem mediaServerItemInUse, JSONObject json)->{ | 730 | (MediaServerItem mediaServerItemInUse, JSONObject json)->{ |
| 745 | sendRtpItem.setStatus(2); | 731 | sendRtpItem.setStatus(2); |
| 732 | + dynamicTask.stop(waiteStreamTimeoutTaskKey); | ||
| 746 | redisCatchStorage.updateSendRTPSever(sendRtpItem); | 733 | redisCatchStorage.updateSendRTPSever(sendRtpItem); |
| 747 | StringBuffer content = new StringBuffer(200); | 734 | StringBuffer content = new StringBuffer(200); |
| 748 | content.append("v=0\r\n"); | 735 | content.append("v=0\r\n"); |
| 749 | - content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); | 736 | + content.append("o="+ audioBroadcastCatch.getChannelId() +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); |
| 750 | content.append("s=Play\r\n"); | 737 | content.append("s=Play\r\n"); |
| 751 | content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); | 738 | content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); |
| 752 | content.append("t=0 0\r\n"); | 739 | content.append("t=0 0\r\n"); |
| @@ -771,8 +758,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | @@ -771,8 +758,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements | ||
| 771 | } | 758 | } |
| 772 | }); | 759 | }); |
| 773 | } | 760 | } |
| 774 | - String timeOutTaskKey = "audio-broadcast-" + device.getDeviceId() + channelId; | ||
| 775 | - dynamicTask.stop(timeOutTaskKey); | ||
| 776 | String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId(); | 761 | String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId(); |
| 777 | WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>(); | 762 | WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>(); |
| 778 | wvpResult.setCode(0); | 763 | wvpResult.setCode(0); |
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
| @@ -43,4 +43,5 @@ public interface IPlayService { | @@ -43,4 +43,5 @@ public interface IPlayService { | ||
| 43 | StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream); | 43 | StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream); |
| 44 | 44 | ||
| 45 | void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event); | 45 | void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event); |
| 46 | + void stopAudioBroadcast(String deviceId, String channelId); | ||
| 46 | } | 47 | } |
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
| @@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONArray; | @@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONArray; | ||
| 5 | import com.alibaba.fastjson.JSONObject; | 5 | import com.alibaba.fastjson.JSONObject; |
| 6 | import com.genersoft.iot.vmp.common.StreamInfo; | 6 | import com.genersoft.iot.vmp.common.StreamInfo; |
| 7 | import com.genersoft.iot.vmp.conf.DynamicTask; | 7 | import com.genersoft.iot.vmp.conf.DynamicTask; |
| 8 | +import com.genersoft.iot.vmp.conf.SipConfig; | ||
| 8 | import com.genersoft.iot.vmp.conf.UserSetting; | 9 | import com.genersoft.iot.vmp.conf.UserSetting; |
| 9 | import com.genersoft.iot.vmp.gb28181.bean.*; | 10 | import com.genersoft.iot.vmp.gb28181.bean.*; |
| 10 | import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; | 11 | import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; |
| @@ -44,9 +45,13 @@ import org.springframework.util.ResourceUtils; | @@ -44,9 +45,13 @@ import org.springframework.util.ResourceUtils; | ||
| 44 | import org.springframework.web.context.request.async.DeferredResult; | 45 | import org.springframework.web.context.request.async.DeferredResult; |
| 45 | 46 | ||
| 46 | import javax.sip.ResponseEvent; | 47 | import javax.sip.ResponseEvent; |
| 48 | +import javax.sip.SipException; | ||
| 47 | import java.io.FileNotFoundException; | 49 | import java.io.FileNotFoundException; |
| 48 | import java.math.BigDecimal; | 50 | import java.math.BigDecimal; |
| 51 | +import java.text.ParseException; | ||
| 49 | import java.util.*; | 52 | import java.util.*; |
| 53 | +import java.util.stream.Collectors; | ||
| 54 | +import java.util.stream.Stream; | ||
| 50 | 55 | ||
| 51 | @SuppressWarnings(value = {"rawtypes", "unchecked"}) | 56 | @SuppressWarnings(value = {"rawtypes", "unchecked"}) |
| 52 | @Service | 57 | @Service |
| @@ -94,6 +99,9 @@ public class PlayServiceImpl implements IPlayService { | @@ -94,6 +99,9 @@ public class PlayServiceImpl implements IPlayService { | ||
| 94 | private UserSetting userSetting; | 99 | private UserSetting userSetting; |
| 95 | 100 | ||
| 96 | @Autowired | 101 | @Autowired |
| 102 | + private SipConfig sipConfig; | ||
| 103 | + | ||
| 104 | + @Autowired | ||
| 97 | private DynamicTask dynamicTask; | 105 | private DynamicTask dynamicTask; |
| 98 | 106 | ||
| 99 | 107 | ||
| @@ -641,16 +649,13 @@ public class PlayServiceImpl implements IPlayService { | @@ -641,16 +649,13 @@ public class PlayServiceImpl implements IPlayService { | ||
| 641 | } | 649 | } |
| 642 | // 查询通道使用状态 | 650 | // 查询通道使用状态 |
| 643 | if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { | 651 | if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { |
| 644 | - logger.warn("语音广播已经开启: {}", channelId); | ||
| 645 | - event.call("语音广播已经开启"); | ||
| 646 | - return; | 652 | + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); |
| 653 | + if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { | ||
| 654 | + logger.warn("语音广播已经开启: {}", channelId); | ||
| 655 | + event.call("语音广播已经开启"); | ||
| 656 | + return; | ||
| 657 | + } | ||
| 647 | } | 658 | } |
| 648 | - String timeOutTaskKey = "audio-broadcast-" + device.getDeviceId() + channelId; | ||
| 649 | - dynamicTask.startDelay(timeOutTaskKey, ()->{ | ||
| 650 | - logger.error("语音广播发送超时: {}:{}", device.getDeviceId(), channelId); | ||
| 651 | - event.call("语音广播发送超时"); | ||
| 652 | - audioBroadcastManager.del(device.getDeviceId(), channelId); | ||
| 653 | - }, timeout * 1000); | ||
| 654 | 659 | ||
| 655 | // 发送通知 | 660 | // 发送通知 |
| 656 | cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> { | 661 | cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> { |
| @@ -658,11 +663,38 @@ public class PlayServiceImpl implements IPlayService { | @@ -658,11 +663,38 @@ public class PlayServiceImpl implements IPlayService { | ||
| 658 | AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready); | 663 | AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready); |
| 659 | audioBroadcastManager.add(audioBroadcastCatch); | 664 | audioBroadcastManager.add(audioBroadcastCatch); |
| 660 | }, eventResultForError -> { | 665 | }, eventResultForError -> { |
| 661 | - dynamicTask.stop(timeOutTaskKey); | ||
| 662 | // 发送失败 | 666 | // 发送失败 |
| 663 | logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg); | 667 | logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg); |
| 664 | event.call("语音广播发送失败"); | 668 | event.call("语音广播发送失败"); |
| 665 | - audioBroadcastManager.del(device.getDeviceId(), channelId); | 669 | + stopAudioBroadcast(device.getDeviceId(), channelId); |
| 666 | }); | 670 | }); |
| 667 | } | 671 | } |
| 672 | + | ||
| 673 | + @Override | ||
| 674 | + public void stopAudioBroadcast(String deviceId, String channelId){ | ||
| 675 | + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(deviceId, channelId); | ||
| 676 | + if (audioBroadcastCatch != null) { | ||
| 677 | + audioBroadcastManager.del(deviceId, audioBroadcastCatch.getChannelId()); | ||
| 678 | + } | ||
| 679 | + try { | ||
| 680 | + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, channelId, null, null); | ||
| 681 | + if (sendRtpItem != null) { | ||
| 682 | + redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null); | ||
| 683 | + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); | ||
| 684 | + Map<String, Object> param = new HashMap<>(); | ||
| 685 | + param.put("vhost", "__defaultVhost__"); | ||
| 686 | + param.put("app", sendRtpItem.getApp()); | ||
| 687 | + param.put("stream", sendRtpItem.getStreamId()); | ||
| 688 | + zlmresTfulUtils.stopSendRtp(mediaInfo, param); | ||
| 689 | + } | ||
| 690 | + if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) { | ||
| 691 | + cmder.streamByeCmd(audioBroadcastCatch.getDialog(), audioBroadcastCatch.getRequest(), null); | ||
| 692 | + } | ||
| 693 | + } catch (SipException e) { | ||
| 694 | + throw new RuntimeException(e); | ||
| 695 | + } catch (ParseException e) { | ||
| 696 | + throw new RuntimeException(e); | ||
| 697 | + } | ||
| 698 | + | ||
| 699 | + } | ||
| 668 | } | 700 | } |
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
| @@ -319,6 +319,22 @@ public class PlayController { | @@ -319,6 +319,22 @@ public class PlayController { | ||
| 319 | return result; | 319 | return result; |
| 320 | } | 320 | } |
| 321 | 321 | ||
| 322 | + | ||
| 323 | + @ApiOperation("停止语音广播") | ||
| 324 | + @ApiImplicitParams({ | ||
| 325 | + @ApiImplicitParam(name = "deviceId", value = "设备Id", dataTypeClass = String.class), | ||
| 326 | + @ApiImplicitParam(name = "channelId", value = "通道Id", dataTypeClass = String.class), | ||
| 327 | + }) | ||
| 328 | + @GetMapping("/broadcast/stop/{deviceId}/{channelId}") | ||
| 329 | + @PostMapping("/broadcast/stop/{deviceId}/{channelId}") | ||
| 330 | + public WVPResult<String> stopBroadcastA(@PathVariable String deviceId, @PathVariable String channelId) { | ||
| 331 | + if (logger.isDebugEnabled()) { | ||
| 332 | + logger.debug("停止语音广播API调用"); | ||
| 333 | + } | ||
| 334 | + playService.stopAudioBroadcast(deviceId, channelId); | ||
| 335 | + return new WVPResult<>(0, "success", null); | ||
| 336 | + } | ||
| 337 | + | ||
| 322 | @ApiOperation("获取所有的ssrc") | 338 | @ApiOperation("获取所有的ssrc") |
| 323 | @GetMapping("/ssrc") | 339 | @GetMapping("/ssrc") |
| 324 | public WVPResult<JSONObject> getSsrc() { | 340 | public WVPResult<JSONObject> getSsrc() { |