Commit 17ea37506eb736077f911c6d76e041db04e01b7e

Authored by 648540858
1 parent a3d87102

修复端口分配的并发问题

src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java
@@ -3,12 +3,14 @@ package com.genersoft.iot.vmp.media.zlm; @@ -3,12 +3,14 @@ package com.genersoft.iot.vmp.media.zlm;
3 import com.genersoft.iot.vmp.common.VideoManagerConstants; 3 import com.genersoft.iot.vmp.common.VideoManagerConstants;
4 import com.genersoft.iot.vmp.conf.UserSetting; 4 import com.genersoft.iot.vmp.conf.UserSetting;
5 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; 5 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
6 -import com.genersoft.iot.vmp.media.zlm.dto.MediaSendRtpPortInfo; 6 +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
7 import com.genersoft.iot.vmp.utils.redis.RedisUtil; 7 import com.genersoft.iot.vmp.utils.redis.RedisUtil;
  8 +import org.apache.commons.lang3.math.NumberUtils;
8 import org.slf4j.Logger; 9 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory; 10 import org.slf4j.LoggerFactory;
10 import org.springframework.beans.factory.annotation.Autowired; 11 import org.springframework.beans.factory.annotation.Autowired;
11 import org.springframework.data.redis.core.RedisTemplate; 12 import org.springframework.data.redis.core.RedisTemplate;
  13 +import org.springframework.data.redis.support.atomic.RedisAtomicInteger;
12 import org.springframework.stereotype.Component; 14 import org.springframework.stereotype.Component;
13 15
14 import java.util.HashMap; 16 import java.util.HashMap;
@@ -26,23 +28,14 @@ public class SendRtpPortManager { @@ -26,23 +28,14 @@ public class SendRtpPortManager {
26 @Autowired 28 @Autowired
27 private RedisTemplate<Object, Object> redisTemplate; 29 private RedisTemplate<Object, Object> redisTemplate;
28 30
29 - private final String KEY = "VM_MEDIA_SEND_RTP_PORT_RANGE_"; 31 + private final String KEY = "VM_MEDIA_SEND_RTP_PORT_";
30 32
31 -  
32 - public void initServerPort(String mediaServerId, int startPort, int endPort){  
33 - String key = KEY + userSetting.getServerId() + "_" + mediaServerId;  
34 - MediaSendRtpPortInfo mediaSendRtpPortInfo = new MediaSendRtpPortInfo(startPort, endPort, mediaServerId);  
35 - redisTemplate.opsForValue().set(key, mediaSendRtpPortInfo);  
36 - }  
37 -  
38 - public int getNextPort(String mediaServerId) {  
39 - String sendIndexKey = KEY + userSetting.getServerId() + "_" + mediaServerId;  
40 - MediaSendRtpPortInfo mediaSendRtpPortInfo = (MediaSendRtpPortInfo)redisTemplate.opsForValue().get(sendIndexKey);  
41 - if (mediaSendRtpPortInfo == null) {  
42 - logger.warn("[发送端口管理] 获取{}的发送端口时未找到端口信息", mediaSendRtpPortInfo);  
43 - return 0; 33 + public int getNextPort(MediaServerItem mediaServer) {
  34 + if (mediaServer == null) {
  35 + logger.warn("[发送端口管理] 参数错误,mediaServer为NULL");
  36 + return -1;
44 } 37 }
45 - 38 + String sendIndexKey = KEY + userSetting.getServerId() + "_" + mediaServer.getId();
46 String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX 39 String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
47 + userSetting.getServerId() + "_*"; 40 + userSetting.getServerId() + "_*";
48 List<Object> queryResult = RedisUtil.scan(redisTemplate, key); 41 List<Object> queryResult = RedisUtil.scan(redisTemplate, key);
@@ -54,14 +47,39 @@ public class SendRtpPortManager { @@ -54,14 +47,39 @@ public class SendRtpPortManager {
54 sendRtpItemMap.put(sendRtpItem.getLocalPort(), sendRtpItem); 47 sendRtpItemMap.put(sendRtpItem.getLocalPort(), sendRtpItem);
55 } 48 }
56 } 49 }
  50 + String sendRtpPortRange = mediaServer.getSendRtpPortRange();
  51 + int startPort;
  52 + int endPort;
  53 + if (sendRtpPortRange == null) {
  54 + logger.warn("{}未设置发送端口默认值,自动使用40000-50000作为端口范围", mediaServer.getId());
  55 + String[] portArray = sendRtpPortRange.split(",");
  56 + if (portArray.length != 2 || !NumberUtils.isParsable(portArray[0]) || !NumberUtils.isParsable(portArray[1])) {
  57 + logger.warn("{}发送端口配置格式错误,自动使用40000-50000作为端口范围", mediaServer.getId());
  58 + startPort = 50000;
  59 + endPort = 60000;
  60 + }else {
57 61
58 - int port = getPort(mediaSendRtpPortInfo.getCurrent(),  
59 - mediaSendRtpPortInfo.getStart(),  
60 - mediaSendRtpPortInfo.getEnd(), checkPort -> sendRtpItemMap.get(checkPort) == null);  
61 -  
62 - mediaSendRtpPortInfo.setCurrent(port);  
63 - redisTemplate.opsForValue().set(sendIndexKey, mediaSendRtpPortInfo);  
64 - return port; 62 + if ( Integer.parseInt(portArray[1]) - Integer.parseInt(portArray[0]) < 1) {
  63 + logger.warn("{}发送端口配置错误,结束端口至少比开始端口大一,自动使用40000-50000作为端口范围", mediaServer.getId());
  64 + startPort = 50000;
  65 + endPort = 60000;
  66 + }else {
  67 + startPort = Integer.parseInt(portArray[0]);
  68 + endPort = Integer.parseInt(portArray[1]);
  69 + }
  70 + }
  71 + }else {
  72 + startPort = 50000;
  73 + endPort = 60000;
  74 + }
  75 + if (redisTemplate == null || redisTemplate.getConnectionFactory() == null) {
  76 + logger.warn("{}获取redis连接信息失败", mediaServer.getId());
  77 + return -1;
  78 + }
  79 + RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory());
  80 + return redisAtomicInteger.getAndUpdate((current)->{
  81 + return getPort(current, startPort, endPort, checkPort-> !sendRtpItemMap.containsKey(checkPort));
  82 + });
65 } 83 }
66 84
67 interface CheckPortCallback{ 85 interface CheckPortCallback{
@@ -69,22 +87,25 @@ public class SendRtpPortManager { @@ -69,22 +87,25 @@ public class SendRtpPortManager {
69 } 87 }
70 88
71 private int getPort(int current, int start, int end, CheckPortCallback checkPortCallback) { 89 private int getPort(int current, int start, int end, CheckPortCallback checkPortCallback) {
72 - int port;  
73 - if (current %2 != 0) {  
74 - port = current + 1;  
75 - }else {  
76 - port = current + 2;  
77 - }  
78 - if (port > end) {  
79 - if (start %2 != 0) {  
80 - port = start + 1; 90 + if (current <= 0) {
  91 + if (start%2 == 0) {
  92 + current = start;
81 }else { 93 }else {
82 - port = start; 94 + current = start + 1;
  95 + }
  96 + }else {
  97 + current += 2;
  98 + if (current > end) {
  99 + if (start%2 == 0) {
  100 + current = start;
  101 + }else {
  102 + current = start + 1;
  103 + }
83 } 104 }
84 } 105 }
85 - if (!checkPortCallback.check(port)) {  
86 - return getPort(port, start, end, checkPortCallback); 106 + if (!checkPortCallback.check(current)) {
  107 + return getPort(current + 2, start, end, checkPortCallback);
87 } 108 }
88 - return port; 109 + return current;
89 } 110 }
90 } 111 }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -167,7 +167,7 @@ public class ZLMRTPServerFactory { @@ -167,7 +167,7 @@ public class ZLMRTPServerFactory {
167 int localPort = 0; 167 int localPort = 0;
168 if (userSetting.getGbSendStreamStrict()) { 168 if (userSetting.getGbSendStreamStrict()) {
169 if (userSetting.getGbSendStreamStrict()) { 169 if (userSetting.getGbSendStreamStrict()) {
170 - localPort = sendRtpPortManager.getNextPort(serverItem.getId()); 170 + localPort = sendRtpPortManager.getNextPort(serverItem);
171 if (localPort == 0) { 171 if (localPort == 0) {
172 return null; 172 return null;
173 } 173 }
@@ -204,7 +204,7 @@ public class ZLMRTPServerFactory { @@ -204,7 +204,7 @@ public class ZLMRTPServerFactory {
204 // 默认为随机端口 204 // 默认为随机端口
205 int localPort = 0; 205 int localPort = 0;
206 if (userSetting.getGbSendStreamStrict()) { 206 if (userSetting.getGbSendStreamStrict()) {
207 - localPort = sendRtpPortManager.getNextPort(serverItem.getId()); 207 + localPort = sendRtpPortManager.getNextPort(serverItem);
208 if (localPort == 0) { 208 if (localPort == 0) {
209 return null; 209 return null;
210 } 210 }
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -116,34 +116,6 @@ public class MediaServerServiceImpl implements IMediaServerService { @@ -116,34 +116,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
116 if (ssrcFactory.hasMediaServerSSRC(mediaServerItem.getId())) { 116 if (ssrcFactory.hasMediaServerSSRC(mediaServerItem.getId())) {
117 ssrcFactory.initMediaServerSSRC(mediaServerItem.getId(), null); 117 ssrcFactory.initMediaServerSSRC(mediaServerItem.getId(), null);
118 } 118 }
119 - if (userSetting.getGbSendStreamStrict()) {  
120 - int startPort = 50000;  
121 - int endPort = 60000;  
122 - String sendRtpPortRange = mediaServerItem.getSendRtpPortRange();  
123 - if (sendRtpPortRange == null) {  
124 - logger.warn("[zlm] ] 未配置发流端口范围,默认使用50000到60000");  
125 - }else {  
126 - String[] sendRtpPortRangeArray = sendRtpPortRange.trim().split(",");  
127 - if (sendRtpPortRangeArray.length != 2) {  
128 - logger.warn("[zlm] ] 发流端口范围错误,默认使用50000到60000");  
129 - }else {  
130 - try {  
131 - startPort = Integer.parseInt(sendRtpPortRangeArray[0]);  
132 - endPort = Integer.parseInt(sendRtpPortRangeArray[1]);  
133 - if (endPort <= startPort) {  
134 - logger.warn("[zlm] ] 发流端口范围错误,结束端口应大于开始端口,使用默认端口");  
135 - startPort = 50000;  
136 - endPort = 60000;  
137 - }  
138 -  
139 - }catch (NumberFormatException e) {  
140 - logger.warn("[zlm] ] 发流端口范围错误,默认使用50000到60000");  
141 - }  
142 - }  
143 - }  
144 - logger.info("[[zlm] ] 配置发流端口范围,{}-{}", startPort, endPort);  
145 - sendRtpPortManager.initServerPort(mediaServerItem.getId(), startPort, endPort);  
146 - }  
147 // 查询redis是否存在此mediaServer 119 // 查询redis是否存在此mediaServer
148 String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId(); 120 String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId();
149 Boolean hasKey = redisTemplate.hasKey(key); 121 Boolean hasKey = redisTemplate.hasKey(key);
src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java
@@ -140,8 +140,8 @@ public class RtpController { @@ -140,8 +140,8 @@ public class RtpController {
140 if (isSend != null && isSend) { 140 if (isSend != null && isSend) {
141 String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; 141 String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId;
142 // 预创建发流信息 142 // 预创建发流信息
143 - int portForVideo = sendRtpPortManager.getNextPort(mediaServerItem.getId());  
144 - int portForAudio = sendRtpPortManager.getNextPort(mediaServerItem.getId()); 143 + int portForVideo = sendRtpPortManager.getNextPort(mediaServerItem);
  144 + int portForAudio = sendRtpPortManager.getNextPort(mediaServerItem);
145 145
146 otherRtpSendInfo.setSendLocalIp(mediaServerItem.getSdpIp()); 146 otherRtpSendInfo.setSendLocalIp(mediaServerItem.getSdpIp());
147 otherRtpSendInfo.setSendLocalPortForVideo(portForVideo); 147 otherRtpSendInfo.setSendLocalPortForVideo(portForVideo);