Commit 663130df4556c35b8b390a74df571af8185d974d

Authored by 648540858
1 parent a209d173

完善支持语音对讲talk

Showing 26 changed files with 458 additions and 342 deletions
src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java
1 1 package com.genersoft.iot.vmp.gb28181.bean;
2 2  
3 3  
  4 +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
4 5 import gov.nist.javax.sip.message.SIPResponse;
5 6  
6 7 /**
... ... @@ -10,10 +11,11 @@ import gov.nist.javax.sip.message.SIPResponse;
10 11 public class AudioBroadcastCatch {
11 12  
12 13  
13   - public AudioBroadcastCatch(String deviceId, String channelId, AudioBroadcastCatchStatus status) {
  14 + public AudioBroadcastCatch(String deviceId, String channelId, AudioBroadcastCatchStatus status, MediaServerItem mediaServerItem) {
14 15 this.deviceId = deviceId;
15 16 this.channelId = channelId;
16 17 this.status = status;
  18 + this.mediaServerItem = mediaServerItem;
17 19 }
18 20  
19 21 public AudioBroadcastCatch() {
... ... @@ -39,6 +41,8 @@ public class AudioBroadcastCatch {
39 41 */
40 42 private SipTransactionInfo sipTransactionInfo;
41 43  
  44 + private MediaServerItem mediaServerItem;
  45 +
42 46  
43 47 public String getDeviceId() {
44 48 return deviceId;
... ... @@ -75,4 +79,12 @@ public class AudioBroadcastCatch {
75 79 public void setSipTransactionInfoByRequset(SIPResponse response) {
76 80 this.sipTransactionInfo = new SipTransactionInfo(response, false);
77 81 }
  82 +
  83 + public MediaServerItem getMediaServerItem() {
  84 + return mediaServerItem;
  85 + }
  86 +
  87 + public void setMediaServerItem(MediaServerItem mediaServerItem) {
  88 + this.mediaServerItem = mediaServerItem;
  89 + }
78 90 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
... ... @@ -49,7 +49,7 @@ public class SendRtpItem {
49 49 /**
50 50 * 设备推流的streamId
51 51 */
52   - private String streamId;
  52 + private String stream;
53 53  
54 54 /**
55 55 * 是否为tcp
... ... @@ -117,6 +117,11 @@ public class SendRtpItem {
117 117 */
118 118 private InviteStreamType playType;
119 119  
  120 + /**
  121 + * 发流的同时收流
  122 + */
  123 + private String receiveStream;
  124 +
120 125 public String getIp() {
121 126 return ip;
122 127 }
... ... @@ -181,12 +186,12 @@ public class SendRtpItem {
181 186 this.app = app;
182 187 }
183 188  
184   - public String getStreamId() {
185   - return streamId;
  189 + public String getStream() {
  190 + return stream;
186 191 }
187 192  
188   - public void setStreamId(String streamId) {
189   - this.streamId = streamId;
  193 + public void setStream(String stream) {
  194 + this.stream = stream;
190 195 }
191 196  
192 197 public boolean isTcp() {
... ... @@ -292,4 +297,12 @@ public class SendRtpItem {
292 297 public void setRtcp(boolean rtcp) {
293 298 this.rtcp = rtcp;
294 299 }
  300 +
  301 + public String getReceiveStream() {
  302 + return receiveStream;
  303 + }
  304 +
  305 + public void setReceiveStream(String receiveStream) {
  306 + this.receiveStream = receiveStream;
  307 + }
295 308 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java
1 1 package com.genersoft.iot.vmp.gb28181.event;
2 2  
3   -import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
4 3 import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent;
5 4 import gov.nist.javax.sip.message.SIPRequest;
6 5 import org.slf4j.Logger;
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java
... ... @@ -29,7 +29,8 @@ public class VideoStreamSessionManager {
29 29 play,
30 30 playback,
31 31 download,
32   - broadcast
  32 + broadcast,
  33 + talk
33 34 }
34 35  
35 36 /**
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java
... ... @@ -74,12 +74,12 @@ public class SipRunner implements CommandLineRunner {
74 74 if (sendRtpItems.size() > 0) {
75 75 for (SendRtpItem sendRtpItem : sendRtpItems) {
76 76 MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
77   - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(),sendRtpItem.getChannelId(), sendRtpItem.getCallId(),sendRtpItem.getStreamId());
  77 + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(),sendRtpItem.getChannelId(), sendRtpItem.getCallId(),sendRtpItem.getStream());
78 78 if (mediaServerItem != null) {
79 79 Map<String, Object> param = new HashMap<>();
80 80 param.put("vhost","__defaultVhost__");
81 81 param.put("app",sendRtpItem.getApp());
82   - param.put("stream",sendRtpItem.getStreamId());
  82 + param.put("stream",sendRtpItem.getStream());
83 83 param.put("ssrc",sendRtpItem.getSsrc());
84 84 JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaServerItem, param);
85 85 if (jsonObject != null && jsonObject.getInteger("code") == 0) {
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
... ... @@ -2,10 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
2 2  
3 3 import com.genersoft.iot.vmp.common.StreamInfo;
4 4 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
5   -import com.genersoft.iot.vmp.gb28181.bean.Device;
6   -import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
7   -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
8   -import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
  5 +import com.genersoft.iot.vmp.gb28181.bean.*;
9 6 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
10 7 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
11 8 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
... ... @@ -131,7 +128,7 @@ public interface ISIPCommander {
131 128 */
132 129 void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
133 130  
134   - void talkStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String callId, ZlmHttpHookSubscribe.Event event, ZlmHttpHookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
  131 + void talkStreamCmd(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, Device device, String channelId, String callId, ZlmHttpHookSubscribe.Event event, ZlmHttpHookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
135 132  
136 133  
137 134 void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException;
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
... ... @@ -32,7 +32,6 @@ import org.springframework.beans.factory.annotation.Autowired;
32 32 import org.springframework.context.annotation.DependsOn;
33 33 import org.springframework.stereotype.Component;
34 34 import org.springframework.util.ObjectUtils;
35   -import org.springframework.util.StringUtils;
36 35  
37 36 import javax.sip.InvalidArgumentException;
38 37 import javax.sip.ResponseEvent;
... ... @@ -584,9 +583,9 @@ public class SIPCommander implements ISIPCommander {
584 583 }
585 584  
586 585 @Override
587   - public void talkStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String callId, ZlmHttpHookSubscribe.Event event, ZlmHttpHookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
  586 + public void talkStreamCmd(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, Device device, String channelId, String callId, ZlmHttpHookSubscribe.Event event, ZlmHttpHookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
588 587  
589   - String stream = ssrcInfo.getStream();
  588 + String stream = sendRtpItem.getStream();
590 589  
591 590 if (device == null) {
592 591 return;
... ... @@ -597,7 +596,7 @@ public class SIPCommander implements ISIPCommander {
597 596 return;
598 597 }
599 598  
600   - logger.info("[语音对讲] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
  599 + logger.info("[语音对讲] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), sendRtpItem.getPort());
601 600 HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
602 601 subscribe.addSubscribe(hookSubscribeForStreamChange, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
603 602 if (event != null) {
... ... @@ -622,24 +621,27 @@ public class SIPCommander implements ISIPCommander {
622 621 content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
623 622 content.append("t=0 0\r\n");
624 623  
625   - content.append("m=audio " + ssrcInfo.getPort() + " RTP/AVP 8\r\n");
  624 + content.append("m=audio " + sendRtpItem.getPort() + " TCP/RTP/AVP 8\r\n");
  625 + content.append("a=setup:passive\r\n");
  626 + content.append("a=connection:new\r\n");
626 627 content.append("a=sendrecv\r\n");
627 628 content.append("a=rtpmap:8 PCMA/8000\r\n");
628 629  
629   - content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc
  630 + content.append("y=" + sendRtpItem.getSsrc() + "\r\n");//ssrc
630 631 // f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
631 632 content.append("f=v/////a/1/8/1" + "\r\n");
632 633  
633   - Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(), callIdHeader);
  634 + Request request = headerProvider.createInviteRequest(device, channelId, content.toString(),
  635 + SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, sendRtpItem.getSsrc(), callIdHeader);
634 636 sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> {
635   - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
636   - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  637 + streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream());
  638 + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
637 639 errorEvent.response(e);
638 640 }), e -> {
639 641 // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
640 642 ResponseEvent responseEvent = (ResponseEvent) e.event;
641 643 SIPResponse response = (SIPResponse) responseEvent.getResponse();
642   - streamSession.put(device.getDeviceId(), channelId, "talk", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.play);
  644 + streamSession.put(device.getDeviceId(), channelId, "talk", stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.play);
643 645 okEvent.response(e);
644 646 });
645 647 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
... ... @@ -675,7 +675,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
675 675 MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
676 676 if (mediaServerItem != null) {
677 677 mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
678   - zlmrtpServerFactory.closeRtpServer(mediaServerItem, sendRtpItem.getStreamId());
  678 + zlmrtpServerFactory.closeRtpServer(mediaServerItem, sendRtpItem.getStream());
679 679 }
680 680 SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem);
681 681 if (byeRequest == null) {
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
... ... @@ -102,12 +102,12 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
102 102 }
103 103 String isUdp = sendRtpItem.isTcp() ? "0" : "1";
104 104 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
105   - logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(),
  105 + logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(),
106 106 sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
107 107 Map<String, Object> param = new HashMap<>(12);
108 108 param.put("vhost","__defaultVhost__");
109 109 param.put("app",sendRtpItem.getApp());
110   - param.put("stream",sendRtpItem.getStreamId());
  110 + param.put("stream",sendRtpItem.getStream());
111 111 param.put("ssrc", sendRtpItem.getSsrc());
112 112 param.put("src_port", sendRtpItem.getLocalPort());
113 113 param.put("pt", sendRtpItem.getPt());
... ... @@ -121,7 +121,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
121 121  
122 122 if (mediaInfo == null) {
123 123 RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
124   - sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(),
  124 + sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
125 125 sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
126 126 sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
127 127 redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
... ... @@ -97,7 +97,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
97 97  
98 98 if (sendRtpItem != null){
99 99 logger.info("[收到bye] {}/{}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId());
100   - String streamId = sendRtpItem.getStreamId();
  100 + String streamId = sendRtpItem.getStream();
101 101 MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
102 102 if (mediaServerItem == null) {
103 103 return;
... ... @@ -105,7 +105,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
105 105  
106 106 Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), streamId);
107 107 if (!ready) {
108   - logger.info("[收到bye] 发现流{}/{}已经结束,不需处理", sendRtpItem.getApp(), sendRtpItem.getStreamId());
  108 + logger.info("[收到bye] 发现流{}/{}已经结束,不需处理", sendRtpItem.getApp(), sendRtpItem.getStream());
109 109 return;
110 110 }
111 111 Map<String, Object> param = new HashMap<>();
... ... @@ -113,7 +113,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
113 113 param.put("app",sendRtpItem.getApp());
114 114 param.put("stream",streamId);
115 115 param.put("ssrc",sendRtpItem.getSsrc());
116   - logger.info("[收到bye] 停止向上级推流:{}", streamId);
  116 + logger.info("[收到bye] 停止推流:{}", streamId);
117 117 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
118 118 redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), callIdHeader.getCallId(), null);
119 119 zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
... ... @@ -129,15 +129,14 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
129 129 try {
130 130 logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
131 131 cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null);
132   - } catch (InvalidArgumentException | ParseException | SipException |
133   - SsrcTransactionNotFoundException e) {
  132 + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
134 133 logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
135 134 }
136 135 }
137 136  
138 137 if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
139 138 MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
140   - sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
  139 + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
141 140 sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId());
142 141 redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
143 142 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
... ... @@ -478,7 +478,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
478 478 if ("Playback".equalsIgnoreCase(sessionName)) {
479 479 sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
480 480 SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, device.isSsrcCheck(), true);
481   - sendRtpItem.setStreamId(ssrcInfo.getStream());
  481 + sendRtpItem.setStream(ssrcInfo.getStream());
482 482 // 写入redis, 超时时回复
483 483 redisCatchStorage.updateSendRTPSever(sendRtpItem);
484 484 playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
... ... @@ -523,7 +523,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
523 523 }
524 524 SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false);
525 525 logger.info(JSONObject.toJSONString(ssrcInfo));
526   - sendRtpItem.setStreamId(ssrcInfo.getStream());
  526 + sendRtpItem.setStream(ssrcInfo.getStream());
527 527 sendRtpItem.setSsrc(ssrc.equals(ssrcDefault) ? ssrcInfo.getSsrc() : ssrc);
528 528  
529 529 // 写入redis, 超时时回复
... ... @@ -533,12 +533,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
533 533 redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), finalChannelId, callIdHeader.getCallId(), null);
534 534 });
535 535 } else {
536   - sendRtpItem.setStreamId(playTransaction.getStream());
  536 + sendRtpItem.setStream(playTransaction.getStream());
537 537 // 写入redis, 超时时回复
538 538 redisCatchStorage.updateSendRTPSever(sendRtpItem);
539 539 JSONObject jsonObject = new JSONObject();
540 540 jsonObject.put("app", sendRtpItem.getApp());
541   - jsonObject.put("stream", sendRtpItem.getStreamId());
  541 + jsonObject.put("stream", sendRtpItem.getStream());
542 542 hookEvent.response(mediaServerItem, jsonObject);
543 543 }
544 544 }
... ... @@ -986,9 +986,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
986 986 logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}, {}", requesterId, addressStr, port, ssrc,
987 987 mediaTransmissionTCP ? (tcpActive? "TCP主动":"TCP被动") : "UDP");
988 988  
989   - MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device);
  989 + MediaServerItem mediaServerItem = audioBroadcastCatch.getMediaServerItem();
990 990 if (mediaServerItem == null) {
991   - logger.warn("未找到用的zlm");
  991 + logger.warn("未找到语音喊话使用的zlm");
992 992 try {
993 993 responseAck(request, Response.BUSY_HERE);
994 994 } catch (SipException | InvalidArgumentException | ParseException e) {
... ... @@ -1022,7 +1022,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
1022 1022 sendRtpItem.setPlatformId(requesterId);
1023 1023 sendRtpItem.setStatus(1);
1024 1024 sendRtpItem.setApp(app);
1025   - sendRtpItem.setStreamId(stream);
  1025 + sendRtpItem.setStream(stream);
1026 1026 sendRtpItem.setPt(8);
1027 1027 sendRtpItem.setUsePs(false);
1028 1028 sendRtpItem.setRtcp(false);
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java
... ... @@ -102,7 +102,7 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I
102 102 String contentSubType = header.getContentSubType();
103 103 if ("Application".equalsIgnoreCase(contentType) && "MANSRTSP".equalsIgnoreCase(contentSubType)) {
104 104 SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
105   - String streamId = sendRtpItem.getStreamId();
  105 + String streamId = sendRtpItem.getStream();
106 106 StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null);
107 107 if (null == streamInfo) {
108 108 responseAck(request, Response.NOT_FOUND, "stream " + streamId + " not found");
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java
... ... @@ -90,7 +90,7 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
90 90  
91 91 try {
92 92 cmder.streamByeCmd(device, ssrcTransaction.getChannelId(), null, callIdHeader.getCallId());
93   - } catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException | SipException e) {
  93 + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
94 94 logger.error("[录像流]推送完毕,收到关流通知, 发送BYE失败 {}", e.getMessage());
95 95 }
96 96  
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java
... ... @@ -122,7 +122,7 @@ public class SipUtils {
122 122 }
123 123  
124 124 public static String getNewCallId() {
125   - return (int) Math.floor(Math.random() * 10000) + "";
  125 + return (int) Math.floor(Math.random() * 1000000000) + "";
126 126 }
127 127  
128 128 public static int getTypeCodeFromGbCode(String deviceId) {
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
... ... @@ -9,9 +9,9 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
9 9 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
10 10 import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
11 11 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
12   -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
13 12 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
14 13 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
  14 +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
15 15 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
16 16 import com.genersoft.iot.vmp.media.zlm.dto.HookType;
17 17 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
... ... @@ -274,12 +274,12 @@ public class ZLMHttpHookListener {
274 274 logger.info("[ZLM HOOK] 流注销, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
275 275 }
276 276  
277   -
  277 + MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
278 278 JSONObject json = (JSONObject) JSON.toJSON(param);
279 279 taskExecutor.execute(() -> {
280 280 ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
281 281 if (subscribe != null) {
282   - MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
  282 +
283 283 if (mediaInfo != null) {
284 284 subscribe.response(mediaInfo, json);
285 285 }
... ... @@ -343,7 +343,7 @@ public class ZLMHttpHookListener {
343 343 }
344 344 // 开启语音对讲通道
345 345 try {
346   - playService.audioBroadcastCmd(device, channelId, 60, (msg)->{
  346 + playService.audioBroadcastCmd(device, channelId, mediaInfo, 60, (msg)->{
347 347 logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
348 348 });
349 349 } catch (InvalidArgumentException | ParseException | SipException e) {
... ... @@ -360,62 +360,30 @@ public class ZLMHttpHookListener {
360 360 }
361 361 }else if ("talk".equals(param.getApp())){
362 362 // 语音对讲推流 stream需要满足格式deviceId_channelId
363   - if (param.isRegist() && param.getStream().indexOf("_") > 0) {
364   - String[] streamArray = param.getStream().split("_");
365   - if (streamArray.length == 2) {
366   - String deviceId = streamArray[0];
367   - String channelId = streamArray[1];
368   - Device device = deviceService.getDevice(deviceId);
369   - if (device != null) {
370   - DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
371   - if (deviceChannel != null) {
372   - if (audioBroadcastManager.exit(deviceId, channelId)) {
373   - // 直接推流
374   - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, param.getStream(), null);
375   - if (sendRtpItem == null) {
376   - // TODO 可能数据错误,重新开启语音通道
377   - }else {
378   - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
379   - logger.info("rtp/{}开始向上级推流, 目标={}:{},SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc());
380   - Map<String, Object> sendParam = new HashMap<>(12);
381   - sendParam.put("vhost","__defaultVhost__");
382   - sendParam.put("app",sendRtpItem.getApp());
383   - sendParam.put("stream",sendRtpItem.getStreamId());
384   - sendParam.put("ssrc", sendRtpItem.getSsrc());
385   - sendParam.put("src_port", sendRtpItem.getLocalPort());
386   - sendParam.put("pt", sendRtpItem.getPt());
387   - sendParam.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
388   - sendParam.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
389   -
390   - JSONObject jsonObject;
391   - if (sendRtpItem.isTcpActive()) {
392   - jsonObject = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, sendParam);
393   - } else {
394   - sendParam.put("is_udp", sendRtpItem.isTcp() ? "0" : "1");
395   - sendParam.put("dst_url", sendRtpItem.getIp());
396   - sendParam.put("dst_port", sendRtpItem.getPort());
397   - jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, sendParam);
398   - }
399   - if (jsonObject != null && jsonObject.getInteger("code") == 0) {
400   - logger.info("[语音对讲] 自动推流成功, device: {}, channel: {}", deviceId, channelId);
401   - }
402   - }
403   - }else {
404   - // 开启语音对讲通道
405   - MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
406   - playService.talk(mediaServerItem, device, channelId, (mediaServer, jsonObject)->{
407   - System.out.println("开始推流");
408   - }, eventResult -> {
409   - System.out.println(eventResult.msg);
410   - }, ()->{
411   - System.out.println("超时");
412   - });
413   - }
414   -
415   - }
416   - }
417   - }
418   - }
  363 + if (param.getStream().indexOf("_") > 0) {
  364 + String[] streamArray = param.getStream().split("_");
  365 + if (streamArray.length == 2) {
  366 + String deviceId = streamArray[0];
  367 + String channelId = streamArray[1];
  368 + Device device = deviceService.getDevice(deviceId);
  369 + if (device != null) {
  370 + if (param.isRegist()) {
  371 + if (audioBroadcastManager.exit(deviceId, channelId)) {
  372 + playService.stopAudioBroadcast(deviceId, channelId);
  373 + }
  374 + // 开启语音对讲通道
  375 + playService.talkCmd(device, channelId, mediaInfo, param.getStream(), (msg)->{
  376 + logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
  377 + });
  378 + }else {
  379 + // 流注销
  380 + playService.stopTalk(device, channelId, param.isRegist());
  381 + }
  382 + } else{
  383 + logger.info("[语音对讲] 未找到设备:{}", deviceId);
  384 + }
  385 + }
  386 + }
419 387  
420 388 }else{
421 389 if (!"rtp".equals(param.getApp())){
... ... @@ -475,16 +443,21 @@ public class ZLMHttpHookListener {
475 443 ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
476 444 Device device = deviceService.getDevice(platformId);
477 445  
478   - try {
  446 +
479 447 if (platform != null) {
480   - commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
  448 + try {
  449 + commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
  450 + } catch (SipException | InvalidArgumentException | ParseException e) {
  451 + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
  452 + }
481 453 } else {
482   - cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
  454 + try {
  455 + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
  456 + } catch (SipException | InvalidArgumentException | ParseException |
  457 + SsrcTransactionNotFoundException e) {
  458 + logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
  459 + }
483 460 }
484   - } catch (SipException | InvalidArgumentException | ParseException |
485   - SsrcTransactionNotFoundException e) {
486   - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
487   - }
488 461 }
489 462 }
490 463 }
... ... @@ -527,7 +500,7 @@ public class ZLMHttpHookListener {
527 500 logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
528 501 }
529 502 redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
530   - sendRtpItem.getCallId(), sendRtpItem.getStreamId());
  503 + sendRtpItem.getCallId(), sendRtpItem.getStream());
531 504 }
532 505 }
533 506 }
... ... @@ -556,8 +529,7 @@ public class ZLMHttpHookListener {
556 529 try {
557 530 cmder.streamByeCmd(device, streamInfoForPlayBackCatch.getChannelId(),
558 531 streamInfoForPlayBackCatch.getStream(), null);
559   - } catch (InvalidArgumentException | ParseException | SipException |
560   - SsrcTransactionNotFoundException e) {
  532 + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
561 533 logger.error("[无人观看]回放, 发送BYE失败 {}", e.getMessage());
562 534 }
563 535 }
... ... @@ -573,6 +545,13 @@ public class ZLMHttpHookListener {
573 545 ret.put("close", false);
574 546 return ret;
575 547 }
  548 + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, param.getStream(), null);
  549 + if ("talk".equals(sendRtpItem.getApp())){
  550 + ret.put("close", false);
  551 + return ret;
  552 + }
  553 + }else if ("talk".equals(param.getApp()) || "broadcast".equals(param.getApp())){
  554 + ret.put("close", false);
576 555 } else {
577 556 // 非国标流 推流/拉流代理
578 557 // 拉流代理
... ... @@ -734,7 +713,7 @@ public class ZLMHttpHookListener {
734 713 logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
735 714 }
736 715 redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
737   - sendRtpItem.getCallId(), sendRtpItem.getStreamId());
  716 + sendRtpItem.getCallId(), sendRtpItem.getStream());
738 717 }
739 718 }
740 719 });
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
... ... @@ -291,6 +291,10 @@ public class ZLMRESTfulUtils {
291 291 return sendPost(mediaServerItem, "startSendRtpPassive",param, null);
292 292 }
293 293  
  294 + public JSONObject startSendRtpPassive(MediaServerItem mediaServerItem, Map<String, Object> param, RequestCallback callback) {
  295 + return sendPost(mediaServerItem, "startSendRtpPassive",param, callback);
  296 + }
  297 +
294 298 public JSONObject stopSendRtp(MediaServerItem mediaServerItem, Map<String, Object> param) {
295 299 return sendPost(mediaServerItem, "stopSendRtp",param, null);
296 300 }
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
... ... @@ -229,7 +229,7 @@ public class ZLMRTPServerFactory {
229 229 sendRtpItem.setPort(port);
230 230 sendRtpItem.setSsrc(ssrc);
231 231 sendRtpItem.setApp(app);
232   - sendRtpItem.setStreamId(stream);
  232 + sendRtpItem.setStream(stream);
233 233 sendRtpItem.setPlatformId(platformId);
234 234 sendRtpItem.setChannelId(channelId);
235 235 sendRtpItem.setTcp(tcp);
... ... @@ -290,6 +290,10 @@ public class ZLMRTPServerFactory {
290 290 return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param);
291 291 }
292 292  
  293 + public JSONObject startSendRtpPassive(MediaServerItem mediaServerItem, Map<String, Object>param, ZLMRESTfulUtils.RequestCallback callback) {
  294 + return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param, callback);
  295 + }
  296 +
293 297 /**
294 298 * 查询待转推的流是否就绪
295 299 */
... ... @@ -343,7 +347,7 @@ public class ZLMRTPServerFactory {
343 347 result= true;
344 348 logger.info("[停止RTP推流] 成功");
345 349 } else {
346   - logger.error("[停止RTP推流] 失败: {}, 参数:{}->\r\n{}",jsonObject.getString("msg"), JSON.toJSON(param), jsonObject);
  350 + logger.warn("[停止RTP推流] 失败: {}, 参数:{}->\r\n{}",jsonObject.getString("msg"), JSON.toJSON(param), jsonObject);
347 351 }
348 352 return result;
349 353 }
... ...
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
... ... @@ -11,10 +11,8 @@ import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
11 11 import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
12 12 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
13 13 import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
14   -import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
15   -import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
  14 +import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioEvent;
16 15 import gov.nist.javax.sip.message.SIPResponse;
17   -import org.springframework.web.context.request.async.DeferredResult;
18 16  
19 17 import javax.sip.InvalidArgumentException;
20 18 import javax.sip.SipException;
... ... @@ -29,10 +27,6 @@ public interface IPlayService {
29 27  
30 28 void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId);
31 29  
32   - void talk(MediaServerItem mediaServerItem, Device device, String channelId,
33   - ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
34   - Runnable timeoutCallback);
35   -
36 30 void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
37 31 ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
38 32 InviteTimeOutCallback timeoutCallback);
... ... @@ -62,7 +56,7 @@ public interface IPlayService {
62 56 AudioBroadcastResult audioBroadcast(Device device, String channelId);
63 57 void stopAudioBroadcast(String deviceId, String channelId);
64 58  
65   - void audioBroadcastCmd(Device device, String channelId, int timeout, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException;
  59 + void audioBroadcastCmd(Device device, String channelId, MediaServerItem mediaServerItem, int timeout, AudioEvent event) throws InvalidArgumentException, ParseException, SipException;
66 60  
67 61 void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException;
68 62  
... ... @@ -72,4 +66,8 @@ public interface IPlayService {
72 66  
73 67 void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
74 68 JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader);
  69 +
  70 + void talkCmd(Device device, String channelId, MediaServerItem mediaServerItem, String stream, AudioEvent event);
  71 +
  72 + void stopTalk(Device device, String channelId, Boolean streamIsReady);
75 73 }
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
... ... @@ -202,7 +202,7 @@ public class DeviceServiceImpl implements IDeviceService {
202 202 Map<String, Object> param = new HashMap<>();
203 203 param.put("vhost", "__defaultVhost__");
204 204 param.put("app", sendRtpItem.getApp());
205   - param.put("stream", sendRtpItem.getStreamId());
  205 + param.put("stream", sendRtpItem.getStream());
206 206 zlmresTfulUtils.stopSendRtp(mediaInfo, param);
207 207 }
208 208  
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
... ... @@ -253,7 +253,7 @@ public class PlatformServiceImpl implements IPlatformService {
253 253 Map<String, Object> param = new HashMap<>(3);
254 254 param.put("vhost", "__defaultVhost__");
255 255 param.put("app", sendRtpItem.getApp());
256   - param.put("stream", sendRtpItem.getStreamId());
  256 + param.put("stream", sendRtpItem.getStream());
257 257 zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
258 258 }
259 259 }
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
... ... @@ -41,7 +41,7 @@ import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
41 41 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
42 42 import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
43 43 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
44   -import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
  44 +import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioEvent;
45 45 import gov.nist.javax.sip.message.SIPResponse;
46 46 import org.slf4j.Logger;
47 47 import org.slf4j.LoggerFactory;
... ... @@ -134,8 +134,8 @@ public class PlayServiceImpl implements IPlayService {
134 134  
135 135 @Override
136 136 public void play(MediaServerItem mediaServerItem, String deviceId, String channelId,
137   - ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
138   - Runnable timeoutCallback) {
  137 + ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
  138 + Runnable timeoutCallback) {
139 139 if (mediaServerItem == null) {
140 140 throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
141 141 }
... ... @@ -243,194 +243,148 @@ public class PlayServiceImpl implements IPlayService {
243 243 }
244 244 }
245 245  
246   - @Override
247   - public void talk(MediaServerItem mediaServerItem, Device device, String channelId,
248   - ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
249   - Runnable timeoutCallback) {
250   - String streamId = null;
251   - if (mediaServerItem.isRtpEnable()) {
252   - streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  246 + private void talk(MediaServerItem mediaServerItem, Device device, String channelId, String stream,
  247 + ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
  248 + Runnable timeoutCallback, AudioEvent audioEvent) {
  249 +
  250 + String playSsrc = mediaServerItem.getSsrcConfig().getPlaySsrc();
  251 + if (playSsrc == null) {
  252 + audioEvent.call("ssrc已经用尽");
  253 + return;
  254 + }
  255 + SendRtpItem sendRtpItem = new SendRtpItem();
  256 + sendRtpItem.setApp("talk");
  257 + sendRtpItem.setStream(stream);
  258 + sendRtpItem.setSsrc(playSsrc);
  259 + sendRtpItem.setDeviceId(device.getDeviceId());
  260 + sendRtpItem.setPlatformId(device.getDeviceId());
  261 + sendRtpItem.setChannelId(channelId);
  262 + sendRtpItem.setRtcp(false);
  263 + sendRtpItem.setMediaServerId(mediaServerItem.getId());
  264 + sendRtpItem.setOnlyAudio(true);
  265 + sendRtpItem.setPlayType(InviteStreamType.TALK);
  266 + sendRtpItem.setPt(8);
  267 + sendRtpItem.setStatus(1);
  268 + sendRtpItem.setTcpActive(false);
  269 + sendRtpItem.setTcp(true);
  270 + sendRtpItem.setUsePs(false);
  271 + sendRtpItem.setReceiveStream(stream);
  272 +
  273 +
  274 + int port = zlmrtpServerFactory.keepPort(mediaServerItem, playSsrc);
  275 + //端口获取失败的ssrcInfo 没有必要发送点播指令
  276 + if (port <= 0) {
  277 + logger.info("[语音对讲] 端口分配异常,deviceId={},channelId={}", device.getDeviceId(), channelId);
  278 + audioEvent.call("端口分配异常");
  279 + return;
253 280 }
254   - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
255   - logger.info("[对讲开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
  281 + sendRtpItem.setLocalPort(port);
  282 + sendRtpItem.setPort(port);
  283 + logger.info("[语音对讲]开始 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sendRtpItem.getLocalPort(), device.getStreamMode(), sendRtpItem.getSsrc(), false);
256 284 // 超时处理
257 285 String timeOutTaskKey = UUID.randomUUID().toString();
258   - SSRCInfo finalSsrcInfo = ssrcInfo;
259   - System.out.println("设置超时任务: " + timeOutTaskKey);
260 286 dynamicTask.startDelay(timeOutTaskKey, () -> {
261 287  
262   - logger.info("[对讲超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, finalSsrcInfo.getPort(), finalSsrcInfo.getSsrc());
  288 + logger.info("[语音对讲] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, sendRtpItem.getPort(), sendRtpItem.getSsrc());
263 289 timeoutCallback.run();
264 290 // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
265 291 try {
266   - cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null);
267   - } catch (InvalidArgumentException | ParseException | SipException e) {
268   - logger.error("[对讲超时], 发送BYE失败 {}", e.getMessage());
269   - } catch (SsrcTransactionNotFoundException e) {
  292 + cmder.streamByeCmd(device, channelId, sendRtpItem.getStream(), null);
  293 + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
  294 + logger.error("[语音对讲]超时, 发送BYE失败 {}", e.getMessage());
  295 + } finally {
270 296 timeoutCallback.run();
271   - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
272   - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
273   - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  297 + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
  298 + streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream());
274 299 }
275 300 }, userSetting.getPlayTimeout());
276   - final String ssrc = ssrcInfo.getSsrc();
277   - final String stream = ssrcInfo.getStream();
278   - //端口获取失败的ssrcInfo 没有必要发送点播指令
279   - if (ssrcInfo.getPort() <= 0) {
280   - logger.info("[对讲] 端口分配异常,deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
281   - return;
282   - }
283 301  
284 302 String callId = SipUtils.getNewCallId();
285   - boolean pushing = false;
286   - // 查看设备是否已经在推流
287   -// MediaItem mediaItem = zlmrtpServerFactory.getMediaInfo(mediaServerItem, "rtp",ssrcInfo.getStream());
288   -// if (mediaItem != null) {
289   -// SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem,
290   -// mediaItem.getOriginSock().getPeer_ip(), mediaItem.getOriginSock().getPeer_port(), ssrcInfo.getSsrc(), device.getDeviceId(),
291   -// device.getDeviceId(), channelId,
292   -// false);
293   -//
294   -// sendRtpItem.setTcpActive(false);
295   -// sendRtpItem.setCallId(callId);
296   -// sendRtpItem.setPlayType(InviteStreamType.TALK);
297   -// sendRtpItem.setStatus(1);
298   -// sendRtpItem.setIp(mediaItem.getOriginSock().getPeer_ip());
299   -// sendRtpItem.setPort(mediaItem.getOriginSock().getPeer_port());
300   -// sendRtpItem.setTcpActive(false);
301   -// sendRtpItem.setStreamId(ssrcInfo.getStream());
302   -// sendRtpItem.setApp("1000");
303   -// sendRtpItem.setStreamId("1000");
304   -// sendRtpItem.setSsrc(ssrc);
305   -// sendRtpItem.setOnlyAudio(true);
306   -// redisCatchStorage.updateSendRTPSever(sendRtpItem);
307   -//
308   -// Map<String, Object> param = new HashMap<>(12);
309   -// param.put("vhost","__defaultVhost__");
310   -// param.put("app",sendRtpItem.getApp());
311   -// param.put("stream",sendRtpItem.getStreamId());
312   -// param.put("ssrc", sendRtpItem.getSsrc());
313   -// param.put("dst_url", sendRtpItem.getIp());
314   -// param.put("dst_port", sendRtpItem.getPort());
315   -// param.put("src_port", sendRtpItem.getLocalPort());
316   -// param.put("pt", sendRtpItem.getPt());
317   -// param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
318   -// param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1");
319   -// param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
320   -// JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaServerItem, param);
321   -// System.out.println(2222);
322   -// System.out.println(jsonObject);
323   -// }else {
324   - try {
325   - cmder.talkStreamCmd(mediaServerItem, ssrcInfo, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
326   - logger.info("[对讲] 流已生成, 开始推流: " + response.toJSONString());
327   - dynamicTask.stop(timeOutTaskKey);
328   - // TODO 暂不做处理
329   - }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> {
330   - logger.info("[对讲] 设备开始推流: " + json.toJSONString());
331   - dynamicTask.stop(timeOutTaskKey);
332   - // 获取远程IP端口 作为回复语音流的地址
333   - String ip = json.getString("ip");
334   - Integer port = json.getInteger("port");
335   - logger.info("[设备开始推流]{}/{}, 来自ip:{}, 端口:{}", device.getDeviceId(), channelId, ip, port);
336   - // 查看平台推流是否就绪
337   -// Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemInuse, "talk", stream);
338   -// if (!ready) {
339   -// try {
340   -// cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null);
341   -// } catch (InvalidArgumentException | ParseException | SipException e) {
342   -// logger.error("[对讲超时], 发送BYE失败 {}", e.getMessage());
343   -// } catch (SsrcTransactionNotFoundException e) {
344   -// timeoutCallback.run();
345   -// mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
346   -// mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
347   -// streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
348   -// }
349   -// }else {
350   -// try {
351   -// Thread.sleep(1000);
352   -// } catch (InterruptedException e) {
353   -// throw new RuntimeException(e);
354   -// }
355   - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, ip, port, ssrcInfo.getSsrc(), device.getDeviceId(),
356   - device.getDeviceId(), channelId,
357   - false, false);
358   -
359   -
360   -// if (sendRtpItem.getLocalPort() == 0) {
361   -// logger.warn("服务器端口资源不足");
362   -// try {
363   -// cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null);
364   -// } catch (InvalidArgumentException | ParseException | SipException e) {
365   -// logger.error("[对讲超时], 发送BYE失败 {}", e.getMessage());
366   -// } catch (SsrcTransactionNotFoundException e) {
367   -// timeoutCallback.run();
368   -// mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
369   -// mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
370   -// streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
371   -// }
372   -// return;
373   -// }
374   - sendRtpItem.setTcpActive(false);
375   - sendRtpItem.setCallId(callId);
376   - sendRtpItem.setPlayType(InviteStreamType.TALK);
377   - sendRtpItem.setStatus(1);
378   - sendRtpItem.setIp(ip);
379   - sendRtpItem.setPort(port);
380   - sendRtpItem.setTcpActive(false);
381   - sendRtpItem.setApp("1000");
382   - sendRtpItem.setStreamId("1000");
383   - sendRtpItem.setSsrc(ssrc);
384   - sendRtpItem.setOnlyAudio(true);
385   - sendRtpItem.setRtcp(false);
386   - redisCatchStorage.updateSendRTPSever(sendRtpItem);
387 303  
388   - Map<String, Object> param = new HashMap<>(12);
389   - param.put("vhost","__defaultVhost__");
390   - param.put("app",sendRtpItem.getApp());
391   - param.put("stream",sendRtpItem.getStreamId());
392   - param.put("ssrc", sendRtpItem.getSsrc());
393   - param.put("dst_url", sendRtpItem.getIp());
394   - param.put("dst_port", sendRtpItem.getPort());
395   - param.put("src_port", sendRtpItem.getLocalPort());
396   - param.put("pt", sendRtpItem.getPt());
397   - param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
398   - param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1");
399   - param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
400   - JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaServerItemInuse, param);
401   - System.out.println(11111);
402   - System.out.println(sendRtpItem.getIp() + ":" + sendRtpItem.getPort());
403   -// System.out.println(jsonObject);
404   -// }
  304 + zlmrtpServerFactory.releasePort(mediaServerItem, playSsrc);
  305 + Map<String, Object> param = new HashMap<>(12);
  306 + param.put("vhost","__defaultVhost__");
  307 + param.put("app", sendRtpItem.getApp());
  308 + param.put("stream", sendRtpItem.getStream());
  309 + param.put("ssrc", sendRtpItem.getSsrc());
  310 + param.put("src_port", sendRtpItem.getLocalPort());
  311 + param.put("pt", sendRtpItem.getPt());
  312 + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
  313 + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
  314 + param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1");
  315 + param.put("recv_stream_id", sendRtpItem.getReceiveStream());
  316 + param.put("close_delay_ms", userSetting.getPlayTimeout() * 1000);
  317 +
  318 + zlmrtpServerFactory.startSendRtpPassive(mediaServerItem, param, jsonObject -> {
  319 + if (jsonObject == null || jsonObject.getInteger("code") != 0 ) {
  320 + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
  321 + logger.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
  322 + audioEvent.call("失败, " + jsonObject.getString("msg"));
  323 + // 查看是否已经建立了通道,存在则发送bye
  324 + stopTalk(device, channelId);
  325 + }
  326 + });
405 327  
406   - }, (event) -> {
407 328  
408   - }, (event) -> {
409   - dynamicTask.stop(timeOutTaskKey);
410   - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
411   - // 释放ssrc
412   - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
  329 + // 查看设备是否已经在推流
  330 + try {
  331 + cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
  332 + logger.info("[语音对讲] 流已生成, 开始推流: " + response.toJSONString());
  333 + dynamicTask.stop(timeOutTaskKey);
  334 + // TODO 暂不做处理
  335 + }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> {
  336 + logger.info("[语音对讲] 设备开始推流: " + json.toJSONString());
  337 + dynamicTask.stop(timeOutTaskKey);
413 338  
414   - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
415   - errorEvent.response(event);
416   - });
417   - } catch (InvalidArgumentException | SipException | ParseException e) {
  339 + }, (event) -> {
  340 + dynamicTask.stop(timeOutTaskKey);
  341 +
  342 + if (event.event instanceof ResponseEvent) {
  343 + ResponseEvent responseEvent = (ResponseEvent) event.event;
  344 + if (responseEvent.getResponse() instanceof SIPResponse) {
  345 + SIPResponse response = (SIPResponse) responseEvent.getResponse();
  346 + sendRtpItem.setFromTag(response.getFromTag());
  347 + sendRtpItem.setToTag(response.getToTag());
  348 + sendRtpItem.setCallId(response.getCallIdHeader().getCallId());
  349 + redisCatchStorage.updateSendRTPSever(sendRtpItem);
  350 +
  351 + streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(),
  352 + sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(),
  353 + response, VideoStreamSessionManager.SessionType.talk);
  354 + } else {
  355 + logger.error("[语音对讲]收到的消息错误,response不是SIPResponse");
  356 + }
  357 + } else {
  358 + logger.error("[语音对讲]收到的消息错误,event不是ResponseEvent");
  359 + }
418 360  
419   - logger.error("[命令发送失败] 对讲消息: {}", e.getMessage());
  361 + }, (event) -> {
420 362 dynamicTask.stop(timeOutTaskKey);
421   - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
  363 + mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream());
422 364 // 释放ssrc
423   - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
  365 + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
  366 + streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream());
  367 + errorEvent.response(event);
  368 + });
  369 + } catch (InvalidArgumentException | SipException | ParseException e) {
424 370  
425   - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
426   - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null));
427   - eventResult.msg = "命令发送失败";
428   - errorEvent.response(eventResult);
429   - }
  371 + logger.error("[命令发送失败] 对讲消息: {}", e.getMessage());
  372 + dynamicTask.stop(timeOutTaskKey);
  373 + mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream());
  374 + // 释放ssrc
  375 + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
  376 +
  377 + streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream());
  378 + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null));
  379 + eventResult.msg = "命令发送失败";
  380 + errorEvent.response(eventResult);
  381 + }
430 382 // }
431 383  
432 384 }
433 385  
  386 +
  387 +
434 388 @Override
435 389 public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
436 390 ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
... ... @@ -446,7 +400,8 @@ public class PlayServiceImpl implements IPlayService {
446 400 // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
447 401 try {
448 402 cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
449   - } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
  403 + } catch (InvalidArgumentException | ParseException | SipException |
  404 + SsrcTransactionNotFoundException e) {
450 405 logger.error("[点播超时], 发送BYE失败 {}", e.getMessage());
451 406 } finally {
452 407 timeoutCallback.run(1, "收流超时");
... ... @@ -483,7 +438,7 @@ public class PlayServiceImpl implements IPlayService {
483 438 onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId);
484 439 hookEvent.response(mediaServerItemInuse, response);
485 440 logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
486   - String streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.flv", mediaServerItemInuse.getHttpPort(), "rtp", ssrcInfo.getStream());
  441 + String streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.flv", mediaServerItemInuse.getHttpPort(), "rtp", ssrcInfo.getStream());
487 442 String path = "snap";
488 443 String fileName = device.getDeviceId() + "_" + channelId + ".jpg";
489 444 // 请求截图
... ... @@ -652,8 +607,8 @@ public class PlayServiceImpl implements IPlayService {
652 607  
653 608 @Override
654 609 public void playBack(String deviceId, String channelId, String startTime,
655   - String endTime, InviteStreamCallback inviteStreamCallback,
656   - PlayBackCallback callback) {
  610 + String endTime, InviteStreamCallback inviteStreamCallback,
  611 + PlayBackCallback callback) {
657 612 Device device = storager.queryVideoDevice(deviceId);
658 613 if (device == null) {
659 614 return;
... ... @@ -666,9 +621,9 @@ public class PlayServiceImpl implements IPlayService {
666 621  
667 622 @Override
668 623 public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
669   - String deviceId, String channelId, String startTime,
670   - String endTime, InviteStreamCallback infoCallBack,
671   - PlayBackCallback playBackCallback) {
  624 + String deviceId, String channelId, String startTime,
  625 + String endTime, InviteStreamCallback infoCallBack,
  626 + PlayBackCallback playBackCallback) {
672 627 if (mediaServerItem == null || ssrcInfo == null) {
673 628 return;
674 629 }
... ... @@ -792,7 +747,6 @@ public class PlayServiceImpl implements IPlayService {
792 747 }
793 748  
794 749  
795   -
796 750 @Override
797 751 public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback playBackCallback) {
798 752 Device device = storager.queryVideoDevice(deviceId);
... ... @@ -977,7 +931,7 @@ public class PlayServiceImpl implements IPlayService {
977 931 cmder.streamByeCmd(device, ssrcTransaction.getChannelId(),
978 932 ssrcTransaction.getStream(), null);
979 933 } catch (InvalidArgumentException | ParseException | SipException |
980   - SsrcTransactionNotFoundException e) {
  934 + SsrcTransactionNotFoundException e) {
981 935 logger.error("[zlm离线]为正在使用此zlm的设备, 发送BYE失败 {}", e.getMessage());
982 936 }
983 937 }
... ... @@ -987,6 +941,7 @@ public class PlayServiceImpl implements IPlayService {
987 941  
988 942 @Override
989 943 public AudioBroadcastResult audioBroadcast(Device device, String channelId) {
  944 + // TODO 必须多端口模式才支持语音喊话鹤语音对讲
990 945 if (device == null || channelId == null) {
991 946 return null;
992 947 }
... ... @@ -1005,13 +960,13 @@ public class PlayServiceImpl implements IPlayService {
1005 960 AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult();
1006 961 audioBroadcastResult.setApp(app);
1007 962 audioBroadcastResult.setStream(stream);
1008   - audioBroadcastResult.setStreamInfo(new StreamContent(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null,false)));
  963 + audioBroadcastResult.setStreamInfo(new StreamContent(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null, false)));
1009 964 audioBroadcastResult.setCodec("G.711");
1010 965 return audioBroadcastResult;
1011 966 }
1012 967  
1013 968 @Override
1014   - public void audioBroadcastCmd(Device device, String channelId, int timeout, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException {
  969 + public void audioBroadcastCmd(Device device, String channelId, MediaServerItem mediaServerItem, int timeout, AudioEvent event) throws InvalidArgumentException, ParseException, SipException {
1015 970 if (device == null || channelId == null) {
1016 971 return;
1017 972 }
... ... @@ -1027,8 +982,8 @@ public class PlayServiceImpl implements IPlayService {
1027 982 SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
1028 983 if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
1029 984 // 查询流是否存在,不存在则认为是异常状态
1030   - MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
1031   - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStreamId());
  985 + MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
  986 + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
1032 987 if (streamReady) {
1033 988 logger.warn("语音广播已经开启: {}", channelId);
1034 989 event.call("语音广播已经开启");
... ... @@ -1038,11 +993,23 @@ public class PlayServiceImpl implements IPlayService {
1038 993 }
1039 994 }
1040 995 }
  996 + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
  997 + if (sendRtpItem != null) {
  998 + MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
  999 + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
  1000 + if (streamReady) {
  1001 + logger.warn("[语音对讲] 进行中: {}", channelId);
  1002 + event.call("语音对讲进行中");
  1003 + return;
  1004 + } else {
  1005 + stopTalk(device, channelId);
  1006 + }
  1007 + }
1041 1008  
1042 1009 // 发送通知
1043 1010 cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> {
1044 1011 // 发送成功
1045   - AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready);
  1012 + AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready, mediaServerItem);
1046 1013 audioBroadcastManager.update(audioBroadcastCatch);
1047 1014 }, eventResultForError -> {
1048 1015 // 发送失败
... ... @@ -1053,19 +1020,18 @@ public class PlayServiceImpl implements IPlayService {
1053 1020 }
1054 1021  
1055 1022  
1056   -
1057 1023 @Override
1058 1024 public void stopAudioBroadcast(String deviceId, String channelId) {
1059 1025 List<AudioBroadcastCatch> audioBroadcastCatchList = new ArrayList<>();
1060 1026 if (channelId == null) {
1061 1027 audioBroadcastCatchList.addAll(audioBroadcastManager.get(deviceId));
1062   - }else {
  1028 + } else {
1063 1029 audioBroadcastCatchList.add(audioBroadcastManager.get(deviceId, channelId));
1064 1030 }
1065 1031 if (audioBroadcastCatchList.size() > 0) {
1066 1032 for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatchList) {
1067 1033 Device device = deviceService.getDevice(deviceId);
1068   - if (device == null || audioBroadcastCatch == null ) {
  1034 + if (device == null || audioBroadcastCatch == null) {
1069 1035 return;
1070 1036 }
1071 1037 SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
... ... @@ -1075,7 +1041,7 @@ public class PlayServiceImpl implements IPlayService {
1075 1041 Map<String, Object> param = new HashMap<>();
1076 1042 param.put("vhost", "__defaultVhost__");
1077 1043 param.put("app", sendRtpItem.getApp());
1078   - param.put("stream", sendRtpItem.getStreamId());
  1044 + param.put("stream", sendRtpItem.getStream());
1079 1045 zlmresTfulUtils.stopSendRtp(mediaInfo, param);
1080 1046 try {
1081 1047 cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
... ... @@ -1199,12 +1165,12 @@ public class PlayServiceImpl implements IPlayService {
1199 1165  
1200 1166 String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
1201 1167 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
1202   - logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(),
  1168 + logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(),
1203 1169 sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
1204 1170 Map<String, Object> param = new HashMap<>(12);
1205   - param.put("vhost","__defaultVhost__");
1206   - param.put("app",sendRtpItem.getApp());
1207   - param.put("stream",sendRtpItem.getStreamId());
  1171 + param.put("vhost", "__defaultVhost__");
  1172 + param.put("app", sendRtpItem.getApp());
  1173 + param.put("stream", sendRtpItem.getStream());
1208 1174 param.put("ssrc", sendRtpItem.getSsrc());
1209 1175 param.put("src_port", sendRtpItem.getLocalPort());
1210 1176 param.put("pt", sendRtpItem.getPt());
... ... @@ -1213,12 +1179,12 @@ public class PlayServiceImpl implements IPlayService {
1213 1179 param.put("is_udp", is_Udp);
1214 1180 if (!sendRtpItem.isTcp()) {
1215 1181 // udp模式下开启rtcp保活
1216   - param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
  1182 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0");
1217 1183 }
1218 1184  
1219 1185 if (mediaInfo == null) {
1220 1186 RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
1221   - sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(),
  1187 + sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
1222 1188 sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
1223 1189 sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
1224 1190 redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
... ... @@ -1233,16 +1199,16 @@ public class PlayServiceImpl implements IPlayService {
1233 1199 if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) {
1234 1200 if (sendRtpItem.isTcpActive()) {
1235 1201 startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
1236   - }else {
  1202 + } else {
1237 1203 param.put("dst_url", sendRtpItem.getIp());
1238 1204 param.put("dst_port", sendRtpItem.getPort());
1239 1205 startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
1240 1206 }
1241 1207 }
1242   - }else {
  1208 + } else {
1243 1209 if (sendRtpItem.isTcpActive()) {
1244 1210 startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
1245   - }else {
  1211 + } else {
1246 1212 param.put("dst_url", sendRtpItem.getIp());
1247 1213 param.put("dst_port", sendRtpItem.getPort());
1248 1214 startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
... ... @@ -1260,10 +1226,10 @@ public class PlayServiceImpl implements IPlayService {
1260 1226 if (jsonObject == null) {
1261 1227 logger.error("RTP推流失败: 请检查ZLM服务");
1262 1228 } else if (jsonObject.getInteger("code") == 0) {
1263   - logger.info("调用ZLM推流接口, 结果: {}", jsonObject);
1264   - logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
  1229 + logger.info("调用ZLM推流接口, 结果: {}", jsonObject);
  1230 + logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
1265 1231 } else {
1266   - logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param));
  1232 + logger.error("RTP推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param));
1267 1233 if (sendRtpItem.isOnlyAudio()) {
1268 1234 Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
1269 1235 AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
... ... @@ -1275,7 +1241,7 @@ public class PlayServiceImpl implements IPlayService {
1275 1241 logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage());
1276 1242 }
1277 1243 }
1278   - }else {
  1244 + } else {
1279 1245 // 向上级平台
1280 1246 try {
1281 1247 commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
... ... @@ -1285,4 +1251,105 @@ public class PlayServiceImpl implements IPlayService {
1285 1251 }
1286 1252 }
1287 1253 }
  1254 +
  1255 + @Override
  1256 + public void talkCmd(Device device, String channelId, MediaServerItem mediaServerItem, String stream, AudioEvent event) {
  1257 + if (device == null || channelId == null) {
  1258 + return;
  1259 + }
  1260 + // TODO 必须多端口模式才支持语音喊话鹤语音对讲
  1261 + logger.info("[语音对讲] device: {}, channel: {}", device.getDeviceId(), channelId);
  1262 + DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId);
  1263 + if (deviceChannel == null) {
  1264 + logger.warn("开启语音对讲的时候未找到通道: {}", channelId);
  1265 + event.call("开启语音对讲的时候未找到通道");
  1266 + return;
  1267 + }
  1268 + // 查询通道使用状态
  1269 + if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
  1270 + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
  1271 + if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
  1272 + // 查询流是否存在,不存在则认为是异常状态
  1273 + MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
  1274 + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
  1275 + if (streamReady) {
  1276 + logger.warn("[语音对讲] 正在语音广播,无法开启语音通话: {}", channelId);
  1277 + event.call("正在语音广播");
  1278 + return;
  1279 + } else {
  1280 + stopAudioBroadcast(device.getDeviceId(), channelId);
  1281 + }
  1282 + }
  1283 + }
  1284 +
  1285 + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, stream, null);
  1286 + if (sendRtpItem != null) {
  1287 + MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
  1288 + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
  1289 + if (streamReady) {
  1290 + logger.warn("[语音对讲] 进行中: {}", channelId);
  1291 + event.call("语音对讲进行中");
  1292 + return;
  1293 + } else {
  1294 + stopTalk(device, channelId);
  1295 + }
  1296 + }
  1297 +
  1298 + talk(mediaServerItem, device, channelId, stream, (MediaServerItem mediaServerItem1, JSONObject response) -> {
  1299 + logger.info("[语音对讲] 收到设备发来的流");
  1300 + }, eventResult -> {
  1301 + logger.warn("[语音对讲] 失败,{}/{}, 错误码 {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg);
  1302 + event.call("失败,错误码 " + eventResult.statusCode + ", " + eventResult.msg);
  1303 + }, () -> {
  1304 + logger.warn("[语音对讲] 失败,{}/{} 超时", device.getDeviceId(), channelId);
  1305 + event.call("失败,超时 ");
  1306 + stopTalk(device, channelId);
  1307 + }, errorMsg -> {
  1308 + logger.warn("[语音对讲] 失败,{}/{} {}", device.getDeviceId(), channelId, errorMsg);
  1309 + event.call(errorMsg);
  1310 + stopTalk(device, channelId);
  1311 + });
  1312 + }
  1313 +
  1314 + private void stopTalk(Device device, String channelId) {
  1315 + stopTalk(device, channelId, null);
  1316 + }
  1317 +
  1318 + @Override
  1319 + public void stopTalk(Device device, String channelId, Boolean streamIsReady) {
  1320 + logger.info("[语音对讲] 停止, {}/{}", device.getDeviceId(), channelId);
  1321 + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
  1322 + if (sendRtpItem == null) {
  1323 + logger.info("[语音对讲] 停止失败, 未找到发送信息,可能已经停止");
  1324 + return;
  1325 + }
  1326 + // 停止向设备推流
  1327 + String mediaServerId = sendRtpItem.getMediaServerId();
  1328 + if (mediaServerId == null) {
  1329 + return;
  1330 + }
  1331 +
  1332 + MediaServerItem mediaServer = mediaServerService.getOne(mediaServerId);
  1333 +
  1334 + if (streamIsReady == null || streamIsReady) {
  1335 + Map<String, Object> param = new HashMap<>();
  1336 + param.put("vhost", "__defaultVhost__");
  1337 + param.put("app", sendRtpItem.getApp());
  1338 + param.put("stream", sendRtpItem.getStream());
  1339 + param.put("ssrc", sendRtpItem.getSsrc());
  1340 + zlmrtpServerFactory.stopSendRtpStream(mediaServer, param);
  1341 + }
  1342 +
  1343 + mediaServer.getSsrcConfig().releaseSsrc(sendRtpItem.getSsrc());
  1344 +
  1345 + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, null, sendRtpItem.getStream());
  1346 + if (ssrcTransaction != null) {
  1347 + try {
  1348 + cmder.streamByeCmd(device, channelId, sendRtpItem.getStream(), null);
  1349 + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
  1350 + logger.info("[语音对讲] 停止消息发送失败,可能已经停止");
  1351 + }
  1352 + }
  1353 + redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId,null, null);
  1354 + }
1288 1355 }
... ...
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
... ... @@ -378,7 +378,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
378 378 + sendRtpItem.getMediaServerId() + "_"
379 379 + sendRtpItem.getPlatformId() + "_"
380 380 + sendRtpItem.getChannelId() + "_"
381   - + sendRtpItem.getStreamId() + "_"
  381 + + sendRtpItem.getStream() + "_"
382 382 + sendRtpItem.getCallId();
383 383 RedisUtil.set(key, sendRtpItem);
384 384 }
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/bean/StreamContent.java
1 1 package com.genersoft.iot.vmp.vmanager.bean;
2 2  
3 3 import com.genersoft.iot.vmp.common.StreamInfo;
  4 +import io.swagger.v3.oas.annotations.media.Schema;
4 5  
  6 +@Schema(description = "流信息")
5 7 public class StreamContent {
6 8  
  9 + @Schema(description = "应用名")
7 10 private String app;
  11 +
  12 + @Schema(description = "流ID")
8 13 private String stream;
9 14  
  15 + @Schema(description = "IP")
10 16 private String ip;
11 17  
  18 + @Schema(description = "HTTP-FLV流地址")
12 19 private String flv;
13 20  
  21 + @Schema(description = "HTTPS-FLV流地址")
14 22 private String https_flv;
  23 +
  24 + @Schema(description = "Websocket-FLV流地址")
15 25 private String ws_flv;
  26 +
  27 + @Schema(description = "Websockets-FLV流地址")
16 28 private String wss_flv;
  29 +
  30 + @Schema(description = "HTTP-FMP4流地址")
17 31 private String fmp4;
  32 +
  33 + @Schema(description = "HTTPS-FMP4流地址")
18 34 private String https_fmp4;
  35 +
  36 + @Schema(description = "Websocket-FMP4流地址")
19 37 private String ws_fmp4;
  38 +
  39 + @Schema(description = "Websockets-FMP4流地址")
20 40 private String wss_fmp4;
  41 +
  42 + @Schema(description = "HLS流地址")
21 43 private String hls;
  44 +
  45 + @Schema(description = "HTTPS-HLS流地址")
22 46 private String https_hls;
  47 +
  48 + @Schema(description = "Websocket-HLS流地址")
23 49 private String ws_hls;
  50 +
  51 + @Schema(description = "Websockets-HLS流地址")
24 52 private String wss_hls;
  53 +
  54 + @Schema(description = "HTTP-TS流地址")
25 55 private String ts;
  56 +
  57 + @Schema(description = "HTTPS-TS流地址")
26 58 private String https_ts;
  59 +
  60 + @Schema(description = "Websocket-TS流地址")
27 61 private String ws_ts;
  62 +
  63 + @Schema(description = "Websockets-TS流地址")
28 64 private String wss_ts;
  65 +
  66 + @Schema(description = "RTMP流地址")
29 67 private String rtmp;
  68 +
  69 + @Schema(description = "RTMPS流地址")
30 70 private String rtmps;
  71 +
  72 + @Schema(description = "RTSP流地址")
31 73 private String rtsp;
  74 +
  75 + @Schema(description = "RTSPS流地址")
32 76 private String rtsps;
  77 +
  78 + @Schema(description = "RTC流地址")
33 79 private String rtc;
34 80  
  81 + @Schema(description = "RTCS流地址")
35 82 private String rtcs;
  83 +
  84 + @Schema(description = "流媒体ID")
36 85 private String mediaServerId;
  86 +
  87 + @Schema(description = "流编码信息")
37 88 private Object tracks;
38 89  
  90 + @Schema(description = "开始时间")
39 91 private String startTime;
40 92  
  93 + @Schema(description = "结束时间")
41 94 private String endTime;
42 95  
43 96 private double progress;
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
... ... @@ -19,11 +19,7 @@ import com.genersoft.iot.vmp.service.IMediaService;
19 19 import com.genersoft.iot.vmp.service.IPlayService;
20 20 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
21 21 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
22   -import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;
23   -import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
24   -import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
25   -import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
26   -import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
  22 +import com.genersoft.iot.vmp.vmanager.bean.*;
27 23 import io.swagger.v3.oas.annotations.Operation;
28 24 import io.swagger.v3.oas.annotations.Parameter;
29 25 import io.swagger.v3.oas.annotations.tags.Tag;
... ... @@ -269,14 +265,6 @@ public class PlayController {
269 265  
270 266 }
271 267  
272   - @GetMapping("/1111")
273   - public void broadcastApi1() {
274   - MediaServerItem defaultMediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
275   - Device device = storager.queryVideoDevice("34020000001320090001");
276   - playService.talk(defaultMediaServer, device, "34020000001370000001", null, null, null);
277   -
278   - }
279   -
280 268  
281 269 @Operation(summary = "停止语音广播")
282 270 @Parameter(name = "deviceId", description = "设备Id", required = true)
... ... @@ -289,7 +277,7 @@ public class PlayController {
289 277 }
290 278 // try {
291 279 // playService.stopAudioBroadcast(deviceId, channelId);
292   -// } catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException | SipException e) {
  280 +// } catch (InvalidArgumentException | ParseException | SipException e) {
293 281 // logger.error("[命令发送失败] 停止语音: {}", e.getMessage());
294 282 // throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
295 283 // }
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/bean/AudioBroadcastEvent.java renamed to src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/bean/AudioEvent.java
... ... @@ -4,6 +4,6 @@ package com.genersoft.iot.vmp.vmanager.gb28181.play.bean;
4 4 /**
5 5 * @author lin
6 6 */
7   -public interface AudioBroadcastEvent {
  7 +public interface AudioEvent {
8 8 void call(String msg);
9 9 }
... ...
src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java
... ... @@ -185,7 +185,7 @@ public class ApiStreamController {
185 185 }
186 186 try {
187 187 cmder.streamByeCmd(device, code, streamInfo.getStream(), null);
188   - } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
  188 + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
189 189 JSONObject result = new JSONObject();
190 190 result.put("error","发送BYE失败:" + e.getMessage());
191 191 return result;
... ...