Commit a9bdb0a706e10d2dffb50ae5a8086dd744bbd976

Authored by 648540858
1 parent f8f65d47

优化语音广播流程

src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -4,9 +4,8 @@ import com.alibaba.fastjson.JSON; @@ -4,9 +4,8 @@ import com.alibaba.fastjson.JSON;
4 import com.alibaba.fastjson.JSONObject; 4 import com.alibaba.fastjson.JSONObject;
5 import com.genersoft.iot.vmp.common.StreamInfo; 5 import com.genersoft.iot.vmp.common.StreamInfo;
6 import com.genersoft.iot.vmp.conf.DynamicTask; 6 import com.genersoft.iot.vmp.conf.DynamicTask;
7 -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;  
8 -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;  
9 -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; 7 +import com.genersoft.iot.vmp.gb28181.bean.*;
  8 +import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
10 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; 9 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
11 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; 10 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
12 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; 11 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@@ -82,6 +81,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @@ -82,6 +81,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
82 @Autowired 81 @Autowired
83 private ISIPCommanderForPlatform commanderForPlatform; 82 private ISIPCommanderForPlatform commanderForPlatform;
84 83
  84 + @Autowired
  85 + private AudioBroadcastManager audioBroadcastManager;
  86 +
85 87
86 /** 88 /**
87 * 处理 ACK请求 89 * 处理 ACK请求
@@ -122,6 +124,13 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @@ -122,6 +124,13 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
122 if (jsonObject == null) { 124 if (jsonObject == null) {
123 logger.error("RTP推流失败: 请检查ZLM服务"); 125 logger.error("RTP推流失败: 请检查ZLM服务");
124 } else if (jsonObject.getInteger("code") == 0) { 126 } else if (jsonObject.getInteger("code") == 0) {
  127 + if (sendRtpItem.isOnlyAudio()) {
  128 + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
  129 + audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok);
  130 + audioBroadcastCatch.setDialog((SIPDialog) evt.getDialog());
  131 + audioBroadcastCatch.setRequest((SIPRequest) evt.getRequest());
  132 + audioBroadcastManager.update(audioBroadcastCatch);
  133 + }
125 logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); 134 logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
126 } else { 135 } else {
127 logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param)); 136 logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param));
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -91,7 +91,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @@ -91,7 +91,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
91 if (dialog.getState().equals(DialogState.TERMINATED)) { 91 if (dialog.getState().equals(DialogState.TERMINATED)) {
92 String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); 92 String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
93 String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); 93 String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
94 - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); 94 + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, null, null, callIdHeader.getCallId());
95 logger.info("收到bye, [{}/{}]", platformGbId, channelId); 95 logger.info("收到bye, [{}/{}]", platformGbId, channelId);
96 if (sendRtpItem != null){ 96 if (sendRtpItem != null){
97 String streamId = sendRtpItem.getStreamId(); 97 String streamId = sendRtpItem.getStreamId();
@@ -103,15 +103,15 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @@ -103,15 +103,15 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
103 logger.info("收到bye:停止向上级推流:" + streamId); 103 logger.info("收到bye:停止向上级推流:" + streamId);
104 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); 104 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
105 zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); 105 zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
106 - redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null); 106 + redisCatchStorage.deleteSendRTPServer(platformGbId, sendRtpItem.getChannelId(), callIdHeader.getCallId(), null);
107 int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); 107 int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
108 if (totalReaderCount <= 0) { 108 if (totalReaderCount <= 0) {
109 logger.info("收到bye: {} 无其它观看者,通知设备停止推流", streamId); 109 logger.info("收到bye: {} 无其它观看者,通知设备停止推流", streamId);
110 if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { 110 if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
111 - cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null); 111 + cmder.streamByeCmd(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId(), streamId, null);
112 } 112 }
113 if (sendRtpItem.isOnlyAudio()) { 113 if (sendRtpItem.isOnlyAudio()) {
114 - playService.stopAudioBroadcast(sendRtpItem.getDeviceId(), channelId); 114 + playService.stopAudioBroadcast(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
115 } 115 }
116 if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { 116 if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
117 MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); 117 MessageForPushChannel messageForPushChannel = new MessageForPushChannel();
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -713,6 +713,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -713,6 +713,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
713 String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + audioBroadcastCatch.getChannelId(); 713 String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + audioBroadcastCatch.getChannelId();
714 dynamicTask.startDelay(waiteStreamTimeoutTaskKey, ()->{ 714 dynamicTask.startDelay(waiteStreamTimeoutTaskKey, ()->{
715 logger.info("等待推流超时: {}/{}", app, stream); 715 logger.info("等待推流超时: {}/{}", app, stream);
  716 + subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
716 playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); 717 playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId());
717 // 发送bye 718 // 发送bye
718 try { 719 try {
@@ -728,35 +729,42 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -728,35 +729,42 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
728 729
729 subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, 730 subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
730 (MediaServerItem mediaServerItemInUse, JSONObject json)->{ 731 (MediaServerItem mediaServerItemInUse, JSONObject json)->{
731 - sendRtpItem.setStatus(2);  
732 - dynamicTask.stop(waiteStreamTimeoutTaskKey);  
733 - redisCatchStorage.updateSendRTPSever(sendRtpItem);  
734 - StringBuffer content = new StringBuffer(200);  
735 - content.append("v=0\r\n");  
736 - content.append("o="+ audioBroadcastCatch.getChannelId() +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");  
737 - content.append("s=Play\r\n");  
738 - content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");  
739 - content.append("t=0 0\r\n");  
740 - content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");  
741 - content.append("a=sendonly\r\n");  
742 - content.append("a=rtpmap:8 PCMA/8000\r\n");  
743 - content.append("y="+ finalSsrc + "\r\n");  
744 - content.append("f=v/////a/1/8/1\r\n");  
745 -  
746 - ParentPlatform parentPlatform = new ParentPlatform();  
747 - parentPlatform.setServerIP(device.getIp());  
748 - parentPlatform.setServerPort(device.getPort());  
749 - parentPlatform.setServerGBId(device.getDeviceId());  
750 - try {  
751 - responseSdpAck(evt, content.toString(), parentPlatform);  
752 - } catch (SipException e) {  
753 - throw new RuntimeException(e);  
754 - } catch (InvalidArgumentException e) {  
755 - throw new RuntimeException(e);  
756 - } catch (ParseException e) {  
757 - throw new RuntimeException(e);  
758 - }  
759 - }); 732 + logger.info("收到语音对讲推流");
  733 + try {
  734 + sendRtpItem.setStatus(2);
  735 + redisCatchStorage.updateSendRTPSever(sendRtpItem);
  736 + StringBuffer content = new StringBuffer(200);
  737 + content.append("v=0\r\n");
  738 + content.append("o="+ config.getId() +" "+ sdp.getOrigin().getSessionId() +" " + sdp.getOrigin().getSessionVersion() + " IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
  739 + content.append("s=Play\r\n");
  740 + content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
  741 + content.append("t=0 0\r\n");
  742 + content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");
  743 + content.append("a=sendonly\r\n");
  744 + content.append("a=rtpmap:8 PCMA/8000\r\n");
  745 + content.append("y="+ finalSsrc + "\r\n");
  746 + content.append("f=v/////a/1/8/1\r\n");
  747 +
  748 + ParentPlatform parentPlatform = new ParentPlatform();
  749 + parentPlatform.setServerIP(device.getIp());
  750 + parentPlatform.setServerPort(device.getPort());
  751 + parentPlatform.setServerGBId(device.getDeviceId());
  752 +
  753 + responseSdpAck(evt, content.toString(), parentPlatform);
  754 + Dialog dialog = evt.getDialog();
  755 + audioBroadcastCatch.setDialog((SIPDialog) dialog);
  756 + audioBroadcastCatch.setRequest((SIPRequest) request);
  757 + audioBroadcastManager.update(audioBroadcastCatch);
  758 + } catch (SipException e) {
  759 + throw new RuntimeException(e);
  760 + } catch (InvalidArgumentException e) {
  761 + throw new RuntimeException(e);
  762 + } catch (ParseException e) {
  763 + throw new RuntimeException(e);
  764 + } catch (SdpParseException e) {
  765 + throw new RuntimeException(e);
  766 + }
  767 + });
760 } 768 }
761 String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId(); 769 String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId();
762 WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>(); 770 WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>();
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -675,26 +675,27 @@ public class PlayServiceImpl implements IPlayService { @@ -675,26 +675,27 @@ public class PlayServiceImpl implements IPlayService {
675 AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(deviceId, channelId); 675 AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(deviceId, channelId);
676 if (audioBroadcastCatch != null) { 676 if (audioBroadcastCatch != null) {
677 audioBroadcastManager.del(deviceId, audioBroadcastCatch.getChannelId()); 677 audioBroadcastManager.del(deviceId, audioBroadcastCatch.getChannelId());
678 - }  
679 - try {  
680 - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, channelId, null, null);  
681 - if (sendRtpItem != null) {  
682 - redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);  
683 - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());  
684 - Map<String, Object> param = new HashMap<>();  
685 - param.put("vhost", "__defaultVhost__");  
686 - param.put("app", sendRtpItem.getApp());  
687 - param.put("stream", sendRtpItem.getStreamId());  
688 - zlmresTfulUtils.stopSendRtp(mediaInfo, param);  
689 - }  
690 - if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) {  
691 - cmder.streamByeCmd(audioBroadcastCatch.getDialog(), audioBroadcastCatch.getRequest(), null); 678 + try {
  679 + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
  680 + if (sendRtpItem != null) {
  681 + redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
  682 + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
  683 + Map<String, Object> param = new HashMap<>();
  684 + param.put("vhost", "__defaultVhost__");
  685 + param.put("app", sendRtpItem.getApp());
  686 + param.put("stream", sendRtpItem.getStreamId());
  687 + zlmresTfulUtils.stopSendRtp(mediaInfo, param);
  688 + }
  689 + if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) {
  690 + cmder.streamByeCmd(audioBroadcastCatch.getDialog(), audioBroadcastCatch.getRequest(), null);
  691 + }
  692 + } catch (SipException e) {
  693 + throw new RuntimeException(e);
  694 + } catch (ParseException e) {
  695 + throw new RuntimeException(e);
692 } 696 }
693 - } catch (SipException e) {  
694 - throw new RuntimeException(e);  
695 - } catch (ParseException e) {  
696 - throw new RuntimeException(e);  
697 } 697 }
698 698
  699 +
699 } 700 }
700 } 701 }