Commit 6106bda1510518b083a970b7c642342c55933273

Authored by 648540858
2 parents 5e34039d d67f0a1e

Merge branch '2.6.8' into wvp-28181-2.0

src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java
... ... @@ -30,7 +30,7 @@ public class SendRtpPortManager {
30 30  
31 31 private final String KEY = "VM_MEDIA_SEND_RTP_PORT_";
32 32  
33   - public int getNextPort(MediaServerItem mediaServer) {
  33 + public synchronized int getNextPort(MediaServerItem mediaServer) {
34 34 if (mediaServer == null) {
35 35 logger.warn("[发送端口管理] 参数错误,mediaServer为NULL");
36 36 return -1;
... ... @@ -50,17 +50,15 @@ public class SendRtpPortManager {
50 50 String sendRtpPortRange = mediaServer.getSendRtpPortRange();
51 51 int startPort;
52 52 int endPort;
53   - if (sendRtpPortRange == null) {
54   - logger.warn("{}未设置发送端口默认值,自动使用40000-50000作为端口范围", mediaServer.getId());
  53 + if (sendRtpPortRange != null) {
55 54 String[] portArray = sendRtpPortRange.split(",");
56 55 if (portArray.length != 2 || !NumberUtils.isParsable(portArray[0]) || !NumberUtils.isParsable(portArray[1])) {
57   - logger.warn("{}发送端口配置格式错误,自动使用40000-50000作为端口范围", mediaServer.getId());
  56 + logger.warn("{}发送端口配置格式错误,自动使用50000-60000作为端口范围", mediaServer.getId());
58 57 startPort = 50000;
59 58 endPort = 60000;
60 59 }else {
61   -
62 60 if ( Integer.parseInt(portArray[1]) - Integer.parseInt(portArray[0]) < 1) {
63   - logger.warn("{}发送端口配置错误,结束端口至少比开始端口大一,自动使用40000-50000作为端口范围", mediaServer.getId());
  61 + logger.warn("{}发送端口配置错误,结束端口至少比开始端口大一,自动使用50000-60000作为端口范围", mediaServer.getId());
64 62 startPort = 50000;
65 63 endPort = 60000;
66 64 }else {
... ... @@ -69,6 +67,7 @@ public class SendRtpPortManager {
69 67 }
70 68 }
71 69 }else {
  70 + logger.warn("{}未设置发送端口默认值,自动使用50000-60000作为端口范围", mediaServer.getId());
72 71 startPort = 50000;
73 72 endPort = 60000;
74 73 }
... ... @@ -76,10 +75,35 @@ public class SendRtpPortManager {
76 75 logger.warn("{}获取redis连接信息失败", mediaServer.getId());
77 76 return -1;
78 77 }
  78 +// RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory());
  79 +// return redisAtomicInteger.getAndUpdate((current)->{
  80 +// return getPort(current, startPort, endPort, checkPort-> !sendRtpItemMap.containsKey(checkPort));
  81 +// });
  82 + return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap);
  83 + }
  84 +
  85 + private synchronized int getSendPort(int startPort, int endPort, String sendIndexKey, Map<Integer, SendRtpItem> sendRtpItemMap){
79 86 RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory());
80   - return redisAtomicInteger.getAndUpdate((current)->{
81   - return getPort(current, startPort, endPort, checkPort-> !sendRtpItemMap.containsKey(checkPort));
82   - });
  87 + if (redisAtomicInteger.get() < startPort) {
  88 + redisAtomicInteger.set(startPort);
  89 + return startPort;
  90 + }else {
  91 + int port = redisAtomicInteger.getAndIncrement();
  92 + if (port > endPort) {
  93 + redisAtomicInteger.set(startPort);
  94 + if (sendRtpItemMap.containsKey(startPort)) {
  95 + return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap);
  96 + }else {
  97 + return startPort;
  98 + }
  99 + }
  100 + if (sendRtpItemMap.containsKey(port)) {
  101 + return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap);
  102 + }else {
  103 + return port;
  104 + }
  105 + }
  106 +
83 107 }
84 108  
85 109 interface CheckPortCallback{
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java
... ... @@ -17,7 +17,6 @@ import com.genersoft.iot.vmp.service.IMediaServerService;
17 17 import com.genersoft.iot.vmp.utils.redis.RedisUtil;
18 18 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
19 19 import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo;
20   -import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
21 20 import io.swagger.v3.oas.annotations.Operation;
22 21 import io.swagger.v3.oas.annotations.Parameter;
23 22 import io.swagger.v3.oas.annotations.tags.Tag;
... ... @@ -202,9 +201,9 @@ public class PsController {
202 201 callId);
203 202 MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
204 203 String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId;
205   - OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
  204 + OtherPsSendInfo sendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(key);
206 205 if (sendInfo == null) {
207   - sendInfo = new OtherRtpSendInfo();
  206 + sendInfo = new OtherPsSendInfo();
208 207 }
209 208 sendInfo.setPushApp(app);
210 209 sendInfo.setPushStream(stream);
... ... @@ -223,7 +222,7 @@ public class PsController {
223 222 param.put("dst_port", dstPort);
224 223 String is_Udp = isUdp ? "1" : "0";
225 224 param.put("is_udp", is_Udp);
226   - param.put("src_port", sendInfo.getSendLocalPortForAudio());
  225 + param.put("src_port", sendInfo.getSendLocalPort());
227 226 param.put("use_ps", "0");
228 227 param.put("only_audio", "1");
229 228  
... ... @@ -250,7 +249,7 @@ public class PsController {
250 249 }, 10000);
251 250  
252 251 // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
253   - OtherRtpSendInfo finalSendInfo = sendInfo;
  252 + OtherPsSendInfo finalSendInfo = sendInfo;
254 253 hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
255 254 hookSubscribe.addSubscribe(hookSubscribeForStreamChange,
256 255 (mediaServerItemInUse, response)->{
... ... @@ -282,7 +281,7 @@ public class PsController {
282 281 public void closeSendRTP(String callId) {
283 282 logger.info("[第三方PS服务对接->关闭发送流] callId->{}", callId);
284 283 String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId;
285   - OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
  284 + OtherPsSendInfo sendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(key);
286 285 if (sendInfo == null){
287 286 throw new ControllerException(ErrorCode.ERROR100.getCode(), "未开启发流");
288 287 }
... ... @@ -302,4 +301,24 @@ public class PsController {
302 301 redisTemplate.delete(key);
303 302 }
304 303  
  304 +
  305 + @GetMapping(value = "/getTestPort")
  306 + @ResponseBody
  307 + public int getTestPort() {
  308 + MediaServerItem defaultMediaServer = mediaServerService.getDefaultMediaServer();
  309 +
  310 +// for (int i = 0; i <300; i++) {
  311 +// new Thread(() -> {
  312 +// int nextPort = sendRtpPortManager.getNextPort(defaultMediaServer);
  313 +// try {
  314 +// Thread.sleep((int)Math.random()*10);
  315 +// } catch (InterruptedException e) {
  316 +// throw new RuntimeException(e);
  317 +// }
  318 +// System.out.println(nextPort);
  319 +// }).start();
  320 +// }
  321 +
  322 + return sendRtpPortManager.getNextPort(defaultMediaServer);
  323 + }
305 324 }
... ...