Commit d67f0a1ea403fbebfff70e7eaf4fe4870be2a1b8

Authored by 648540858
1 parent 874a5738

解决使用redis集群时获取发送端口失败的问题

src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java
@@ -30,7 +30,7 @@ public class SendRtpPortManager { @@ -30,7 +30,7 @@ public class SendRtpPortManager {
30 30
31 private final String KEY = "VM_MEDIA_SEND_RTP_PORT_"; 31 private final String KEY = "VM_MEDIA_SEND_RTP_PORT_";
32 32
33 - public int getNextPort(MediaServerItem mediaServer) { 33 + public synchronized int getNextPort(MediaServerItem mediaServer) {
34 if (mediaServer == null) { 34 if (mediaServer == null) {
35 logger.warn("[发送端口管理] 参数错误,mediaServer为NULL"); 35 logger.warn("[发送端口管理] 参数错误,mediaServer为NULL");
36 return -1; 36 return -1;
@@ -50,17 +50,15 @@ public class SendRtpPortManager { @@ -50,17 +50,15 @@ public class SendRtpPortManager {
50 String sendRtpPortRange = mediaServer.getSendRtpPortRange(); 50 String sendRtpPortRange = mediaServer.getSendRtpPortRange();
51 int startPort; 51 int startPort;
52 int endPort; 52 int endPort;
53 - if (sendRtpPortRange == null) {  
54 - logger.warn("{}未设置发送端口默认值,自动使用40000-50000作为端口范围", mediaServer.getId()); 53 + if (sendRtpPortRange != null) {
55 String[] portArray = sendRtpPortRange.split(","); 54 String[] portArray = sendRtpPortRange.split(",");
56 if (portArray.length != 2 || !NumberUtils.isParsable(portArray[0]) || !NumberUtils.isParsable(portArray[1])) { 55 if (portArray.length != 2 || !NumberUtils.isParsable(portArray[0]) || !NumberUtils.isParsable(portArray[1])) {
57 - logger.warn("{}发送端口配置格式错误,自动使用40000-50000作为端口范围", mediaServer.getId()); 56 + logger.warn("{}发送端口配置格式错误,自动使用50000-60000作为端口范围", mediaServer.getId());
58 startPort = 50000; 57 startPort = 50000;
59 endPort = 60000; 58 endPort = 60000;
60 }else { 59 }else {
61 -  
62 if ( Integer.parseInt(portArray[1]) - Integer.parseInt(portArray[0]) < 1) { 60 if ( Integer.parseInt(portArray[1]) - Integer.parseInt(portArray[0]) < 1) {
63 - logger.warn("{}发送端口配置错误,结束端口至少比开始端口大一,自动使用40000-50000作为端口范围", mediaServer.getId()); 61 + logger.warn("{}发送端口配置错误,结束端口至少比开始端口大一,自动使用50000-60000作为端口范围", mediaServer.getId());
64 startPort = 50000; 62 startPort = 50000;
65 endPort = 60000; 63 endPort = 60000;
66 }else { 64 }else {
@@ -69,6 +67,7 @@ public class SendRtpPortManager { @@ -69,6 +67,7 @@ public class SendRtpPortManager {
69 } 67 }
70 } 68 }
71 }else { 69 }else {
  70 + logger.warn("{}未设置发送端口默认值,自动使用50000-60000作为端口范围", mediaServer.getId());
72 startPort = 50000; 71 startPort = 50000;
73 endPort = 60000; 72 endPort = 60000;
74 } 73 }
@@ -76,10 +75,35 @@ public class SendRtpPortManager { @@ -76,10 +75,35 @@ public class SendRtpPortManager {
76 logger.warn("{}获取redis连接信息失败", mediaServer.getId()); 75 logger.warn("{}获取redis连接信息失败", mediaServer.getId());
77 return -1; 76 return -1;
78 } 77 }
  78 +// RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory());
  79 +// return redisAtomicInteger.getAndUpdate((current)->{
  80 +// return getPort(current, startPort, endPort, checkPort-> !sendRtpItemMap.containsKey(checkPort));
  81 +// });
  82 + return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap);
  83 + }
  84 +
  85 + private synchronized int getSendPort(int startPort, int endPort, String sendIndexKey, Map<Integer, SendRtpItem> sendRtpItemMap){
79 RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory()); 86 RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory());
80 - return redisAtomicInteger.getAndUpdate((current)->{  
81 - return getPort(current, startPort, endPort, checkPort-> !sendRtpItemMap.containsKey(checkPort));  
82 - }); 87 + if (redisAtomicInteger.get() < startPort) {
  88 + redisAtomicInteger.set(startPort);
  89 + return startPort;
  90 + }else {
  91 + int port = redisAtomicInteger.getAndIncrement();
  92 + if (port > endPort) {
  93 + redisAtomicInteger.set(startPort);
  94 + if (sendRtpItemMap.containsKey(startPort)) {
  95 + return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap);
  96 + }else {
  97 + return startPort;
  98 + }
  99 + }
  100 + if (sendRtpItemMap.containsKey(port)) {
  101 + return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap);
  102 + }else {
  103 + return port;
  104 + }
  105 + }
  106 +
83 } 107 }
84 108
85 interface CheckPortCallback{ 109 interface CheckPortCallback{
src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java
@@ -16,7 +16,6 @@ import com.genersoft.iot.vmp.service.IMediaServerService; @@ -16,7 +16,6 @@ import com.genersoft.iot.vmp.service.IMediaServerService;
16 import com.genersoft.iot.vmp.utils.redis.RedisUtil; 16 import com.genersoft.iot.vmp.utils.redis.RedisUtil;
17 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; 17 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
18 import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo; 18 import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo;
19 -import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;  
20 import io.swagger.v3.oas.annotations.Operation; 19 import io.swagger.v3.oas.annotations.Operation;
21 import io.swagger.v3.oas.annotations.Parameter; 20 import io.swagger.v3.oas.annotations.Parameter;
22 import io.swagger.v3.oas.annotations.tags.Tag; 21 import io.swagger.v3.oas.annotations.tags.Tag;
@@ -200,9 +199,9 @@ public class PsController { @@ -200,9 +199,9 @@ public class PsController {
200 callId); 199 callId);
201 MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); 200 MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
202 String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId; 201 String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId;
203 - OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); 202 + OtherPsSendInfo sendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(key);
204 if (sendInfo == null) { 203 if (sendInfo == null) {
205 - sendInfo = new OtherRtpSendInfo(); 204 + sendInfo = new OtherPsSendInfo();
206 } 205 }
207 sendInfo.setPushApp(app); 206 sendInfo.setPushApp(app);
208 sendInfo.setPushStream(stream); 207 sendInfo.setPushStream(stream);
@@ -221,7 +220,7 @@ public class PsController { @@ -221,7 +220,7 @@ public class PsController {
221 param.put("dst_port", dstPort); 220 param.put("dst_port", dstPort);
222 String is_Udp = isUdp ? "1" : "0"; 221 String is_Udp = isUdp ? "1" : "0";
223 param.put("is_udp", is_Udp); 222 param.put("is_udp", is_Udp);
224 - param.put("src_port", sendInfo.getSendLocalPortForAudio()); 223 + param.put("src_port", sendInfo.getSendLocalPort());
225 param.put("use_ps", "0"); 224 param.put("use_ps", "0");
226 param.put("only_audio", "1"); 225 param.put("only_audio", "1");
227 226
@@ -248,7 +247,7 @@ public class PsController { @@ -248,7 +247,7 @@ public class PsController {
248 }, 10000); 247 }, 10000);
249 248
250 // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 249 // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
251 - OtherRtpSendInfo finalSendInfo = sendInfo; 250 + OtherPsSendInfo finalSendInfo = sendInfo;
252 hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); 251 hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
253 hookSubscribe.addSubscribe(hookSubscribeForStreamChange, 252 hookSubscribe.addSubscribe(hookSubscribeForStreamChange,
254 (mediaServerItemInUse, response)->{ 253 (mediaServerItemInUse, response)->{
@@ -280,7 +279,7 @@ public class PsController { @@ -280,7 +279,7 @@ public class PsController {
280 public void closeSendRTP(String callId) { 279 public void closeSendRTP(String callId) {
281 logger.info("[第三方PS服务对接->关闭发送流] callId->{}", callId); 280 logger.info("[第三方PS服务对接->关闭发送流] callId->{}", callId);
282 String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId; 281 String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId;
283 - OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); 282 + OtherPsSendInfo sendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(key);
284 if (sendInfo == null){ 283 if (sendInfo == null){
285 throw new ControllerException(ErrorCode.ERROR100.getCode(), "未开启发流"); 284 throw new ControllerException(ErrorCode.ERROR100.getCode(), "未开启发流");
286 } 285 }
@@ -300,4 +299,24 @@ public class PsController { @@ -300,4 +299,24 @@ public class PsController {
300 redisTemplate.delete(key); 299 redisTemplate.delete(key);
301 } 300 }
302 301
  302 +
  303 + @GetMapping(value = "/getTestPort")
  304 + @ResponseBody
  305 + public int getTestPort() {
  306 + MediaServerItem defaultMediaServer = mediaServerService.getDefaultMediaServer();
  307 +
  308 +// for (int i = 0; i <300; i++) {
  309 +// new Thread(() -> {
  310 +// int nextPort = sendRtpPortManager.getNextPort(defaultMediaServer);
  311 +// try {
  312 +// Thread.sleep((int)Math.random()*10);
  313 +// } catch (InterruptedException e) {
  314 +// throw new RuntimeException(e);
  315 +// }
  316 +// System.out.println(nextPort);
  317 +// }).start();
  318 +// }
  319 +
  320 + return sendRtpPortManager.getNextPort(defaultMediaServer);
  321 + }
303 } 322 }