Commit ea08969bfcff38b8195282c77495eb1f8bd8eb07

Authored by 648540858
1 parent 0e692512

使用zlm原生的rtp随机端口配置

src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -372,12 +372,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @@ -372,12 +372,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
372 } 372 }
373 } 373 }
374 if (playTransaction == null) { 374 if (playTransaction == null) {
375 - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true); 375 + String streamId = null;
376 if (mediaServerItem.isRtpEnable()) { 376 if (mediaServerItem.isRtpEnable()) {
377 - sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId));  
378 - }else {  
379 - sendRtpItem.setStreamId(ssrcInfo.getStream()); 377 + streamId = String.format("%s_%s", device.getDeviceId(), channelId);
380 } 378 }
  379 + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, true);
  380 + sendRtpItem.setStreamId(ssrcInfo.getStream());
381 // 写入redis, 超时时回复 381 // 写入redis, 超时时回复
382 redisCatchStorage.updateSendRTPSever(sendRtpItem); 382 redisCatchStorage.updateSendRTPSever(sendRtpItem);
383 playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg)->{ 383 playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg)->{
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -10,8 +10,7 @@ import org.springframework.beans.factory.annotation.Autowired; @@ -10,8 +10,7 @@ import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.stereotype.Component; 10 import org.springframework.stereotype.Component;
11 import org.springframework.util.StringUtils; 11 import org.springframework.util.StringUtils;
12 12
13 -import java.util.HashMap;  
14 -import java.util.Map; 13 +import java.util.*;
15 14
16 @Component 15 @Component
17 public class ZLMRTPServerFactory { 16 public class ZLMRTPServerFactory {
@@ -23,54 +22,80 @@ public class ZLMRTPServerFactory { @@ -23,54 +22,80 @@ public class ZLMRTPServerFactory {
23 22
24 private int[] portRangeArray = new int[2]; 23 private int[] portRangeArray = new int[2];
25 24
26 - public int createRTPServer(MediaServerItem mediaServerItem, String streamId) {  
27 - Map<String, Integer> currentStreams = new HashMap<>(); 25 + public int getFreePort(MediaServerItem mediaServerItem, int startPort, int endPort, List<Integer> usedFreelist) {
  26 + if (endPort <= startPort) return -1;
  27 + if (usedFreelist == null) {
  28 + usedFreelist = new ArrayList<>();
  29 + }
28 JSONObject listRtpServerJsonResult = zlmresTfulUtils.listRtpServer(mediaServerItem); 30 JSONObject listRtpServerJsonResult = zlmresTfulUtils.listRtpServer(mediaServerItem);
29 if (listRtpServerJsonResult != null) { 31 if (listRtpServerJsonResult != null) {
30 JSONArray data = listRtpServerJsonResult.getJSONArray("data"); 32 JSONArray data = listRtpServerJsonResult.getJSONArray("data");
31 if (data != null) { 33 if (data != null) {
32 for (int i = 0; i < data.size(); i++) { 34 for (int i = 0; i < data.size(); i++) {
33 JSONObject dataItem = data.getJSONObject(i); 35 JSONObject dataItem = data.getJSONObject(i);
34 - currentStreams.put(dataItem.getString("stream_id"), dataItem.getInteger("port")); 36 + usedFreelist.add(dataItem.getInteger("port"));
35 } 37 }
36 } 38 }
37 } 39 }
38 - // 已经在推流  
39 - if (currentStreams.get(streamId) != null) {  
40 - Map<String, Object> closeRtpServerParam = new HashMap<>();  
41 - closeRtpServerParam.put("stream_id", streamId);  
42 - zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam);  
43 - currentStreams.remove(streamId);  
44 - }  
45 40
46 Map<String, Object> param = new HashMap<>(); 41 Map<String, Object> param = new HashMap<>();
47 int result = -1; 42 int result = -1;
48 - // 不设置推流端口端则使用随机端口  
49 - if (!StringUtils.isEmpty(mediaServerItem.getSendRtpPortRange())){  
50 - int newPort = getPortFromportRange(mediaServerItem);  
51 - param.put("port", newPort); 43 + // 设置推流端口
  44 + if (startPort%2 == 1) {
  45 + startPort ++;
  46 + }
  47 + boolean checkPort = false;
  48 + for (int i = startPort; i < endPort + 1; i+=2) {
  49 + if (!usedFreelist.contains(i)){
  50 + checkPort = true;
  51 + startPort = i;
  52 + break;
  53 + }
  54 + }
  55 + if (!checkPort) {
  56 + logger.warn("未找到节点{}上范围[{}-{}]的空闲端口", mediaServerItem.getId(), startPort, endPort);
  57 + return -1;
  58 + }
  59 + param.put("port", startPort);
  60 + String stream = UUID.randomUUID().toString();
  61 + param.put("enable_tcp", 1);
  62 + param.put("stream_id", stream);
  63 + param.put("port", 0);
  64 + JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param);
  65 +
  66 + if (openRtpServerResultJson != null) {
  67 + if (openRtpServerResultJson.getInteger("code") == 0) {
  68 + result= openRtpServerResultJson.getInteger("port");
  69 + Map<String, Object> closeRtpServerParam = new HashMap<>();
  70 + closeRtpServerParam.put("stream_id", stream);
  71 + zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam);
  72 + }else {
  73 + usedFreelist.add(startPort);
  74 + startPort +=2;
  75 + result = getFreePort(mediaServerItem, startPort, endPort,usedFreelist);
  76 + }
  77 + }else {
  78 + // 检查ZLM状态
  79 + logger.error("创建RTP Server 失败 {}: 请检查ZLM服务", param.get("port"));
52 } 80 }
  81 + return result;
  82 + }
  83 +
  84 + public int createRTPServer(MediaServerItem mediaServerItem, String streamId) {
  85 +
  86 + Map<String, Object> param = new HashMap<>();
  87 + int result = -1;
  88 + // 推流端口设置0则使用随机端口
53 param.put("enable_tcp", 1); 89 param.put("enable_tcp", 1);
54 param.put("stream_id", streamId); 90 param.put("stream_id", streamId);
  91 + param.put("port", 0);
55 JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param); 92 JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param);
56 93
57 if (openRtpServerResultJson != null) { 94 if (openRtpServerResultJson != null) {
58 - switch (openRtpServerResultJson.getInteger("code")){  
59 - case 0:  
60 - result= openRtpServerResultJson.getInteger("port");  
61 - break;  
62 - case -300: // id已经存在, 可能已经在其他端口推流  
63 - Map<String, Object> closeRtpServerParam = new HashMap<>();  
64 - closeRtpServerParam.put("stream_id", streamId);  
65 - zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam);  
66 - result = createRTPServer(mediaServerItem, streamId);;  
67 - break;  
68 - case -400: // 端口占用  
69 - result= createRTPServer(mediaServerItem, streamId);  
70 - break;  
71 - default:  
72 - logger.error("创建RTP Server 失败 {}: " + openRtpServerResultJson.getString("msg"), param.get("port"));  
73 - break; 95 + if (openRtpServerResultJson.getInteger("code") == 0) {
  96 + result= openRtpServerResultJson.getInteger("port");
  97 + }else {
  98 + logger.error("创建RTP Server 失败 {}: " + openRtpServerResultJson.getString("msg"), param.get("port"));
74 } 99 }
75 }else { 100 }else {
76 // 检查ZLM状态 101 // 检查ZLM状态
@@ -99,32 +124,32 @@ public class ZLMRTPServerFactory { @@ -99,32 +124,32 @@ public class ZLMRTPServerFactory {
99 return result; 124 return result;
100 } 125 }
101 126
102 - private int getPortFromportRange(MediaServerItem mediaServerItem) {  
103 - int currentPort = mediaServerItem.getCurrentPort();  
104 - if (currentPort == 0) {  
105 - String[] portRangeStrArray = mediaServerItem.getSendRtpPortRange().split(",");  
106 - if (portRangeStrArray.length != 2) {  
107 - portRangeArray[0] = 30000;  
108 - portRangeArray[1] = 30500;  
109 - }else {  
110 - portRangeArray[0] = Integer.parseInt(portRangeStrArray[0]);  
111 - portRangeArray[1] = Integer.parseInt(portRangeStrArray[1]);  
112 - }  
113 - }  
114 -  
115 - if (currentPort == 0 || currentPort++ > portRangeArray[1]) {  
116 - currentPort = portRangeArray[0];  
117 - mediaServerItem.setCurrentPort(currentPort);  
118 - return portRangeArray[0];  
119 - } else {  
120 - if (currentPort % 2 == 1) {  
121 - currentPort++;  
122 - }  
123 - currentPort++;  
124 - mediaServerItem.setCurrentPort(currentPort);  
125 - return currentPort;  
126 - }  
127 - } 127 +// private int getPortFromportRange(MediaServerItem mediaServerItem) {
  128 +// int currentPort = mediaServerItem.getCurrentPort();
  129 +// if (currentPort == 0) {
  130 +// String[] portRangeStrArray = mediaServerItem.getSendRtpPortRange().split(",");
  131 +// if (portRangeStrArray.length != 2) {
  132 +// portRangeArray[0] = 30000;
  133 +// portRangeArray[1] = 30500;
  134 +// }else {
  135 +// portRangeArray[0] = Integer.parseInt(portRangeStrArray[0]);
  136 +// portRangeArray[1] = Integer.parseInt(portRangeStrArray[1]);
  137 +// }
  138 +// }
  139 +//
  140 +// if (currentPort == 0 || currentPort++ > portRangeArray[1]) {
  141 +// currentPort = portRangeArray[0];
  142 +// mediaServerItem.setCurrentPort(currentPort);
  143 +// return portRangeArray[0];
  144 +// } else {
  145 +// if (currentPort % 2 == 1) {
  146 +// currentPort++;
  147 +// }
  148 +// currentPort++;
  149 +// mediaServerItem.setCurrentPort(currentPort);
  150 +// return currentPort;
  151 +// }
  152 +// }
128 153
129 /** 154 /**
130 * 创建一个国标推流 155 * 创建一个国标推流
@@ -139,13 +164,18 @@ public class ZLMRTPServerFactory { @@ -139,13 +164,18 @@ public class ZLMRTPServerFactory {
139 public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp){ 164 public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp){
140 165
141 // 使用RTPServer 功能找一个可用的端口 166 // 使用RTPServer 功能找一个可用的端口
142 - String playSsrc = serverItem.getSsrcConfig().getPlaySsrc();  
143 - int localPort = createRTPServer(serverItem, playSsrc);  
144 - if (localPort != -1) {  
145 - // TODO 高并发时可能因为未放入缓存而ssrc冲突  
146 - serverItem.getSsrcConfig().releaseSsrc(playSsrc);  
147 - closeRTPServer(serverItem, playSsrc); 167 + String sendRtpPortRange = serverItem.getSendRtpPortRange();
  168 + if (StringUtils.isEmpty(sendRtpPortRange)) {
  169 + return null;
  170 + }
  171 + String[] portRangeStrArray = serverItem.getSendRtpPortRange().split(",");
  172 + int localPort = -1;
  173 + if (portRangeStrArray.length != 2) {
  174 + localPort = getFreePort(serverItem, 30000, 30500, null);
148 }else { 175 }else {
  176 + localPort = getFreePort(serverItem, Integer.parseInt(portRangeStrArray[0]), Integer.parseInt(portRangeStrArray[1]), null);
  177 + }
  178 + if (localPort == -1) {
149 logger.error("没有可用的端口"); 179 logger.error("没有可用的端口");
150 return null; 180 return null;
151 } 181 }
@@ -174,13 +204,19 @@ public class ZLMRTPServerFactory { @@ -174,13 +204,19 @@ public class ZLMRTPServerFactory {
174 * @return SendRtpItem 204 * @return SendRtpItem
175 */ 205 */
176 public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp){ 206 public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp){
177 - String playSsrc = serverItem.getSsrcConfig().getPlaySsrc();  
178 - int localPort = createRTPServer(serverItem, playSsrc);  
179 - if (localPort != -1) {  
180 - // TODO 高并发时可能因为未放入缓存而ssrc冲突  
181 - serverItem.getSsrcConfig().releaseSsrc(ssrc);  
182 - closeRTPServer(serverItem, playSsrc); 207 + // 使用RTPServer 功能找一个可用的端口
  208 + String sendRtpPortRange = serverItem.getSendRtpPortRange();
  209 + if (StringUtils.isEmpty(sendRtpPortRange)) {
  210 + return null;
  211 + }
  212 + String[] portRangeStrArray = serverItem.getSendRtpPortRange().split(",");
  213 + int localPort = -1;
  214 + if (portRangeStrArray.length != 2) {
  215 + localPort = getFreePort(serverItem, 30000, 30500, null);
183 }else { 216 }else {
  217 + localPort = getFreePort(serverItem, Integer.parseInt(portRangeStrArray[0]), Integer.parseInt(portRangeStrArray[1]), null);
  218 + }
  219 + if (localPort == -1) {
184 logger.error("没有可用的端口"); 220 logger.error("没有可用的端口");
185 return null; 221 return null;
186 } 222 }
@@ -199,7 +235,7 @@ public class ZLMRTPServerFactory { @@ -199,7 +235,7 @@ public class ZLMRTPServerFactory {
199 } 235 }
200 236
201 /** 237 /**
202 - * 调用zlm RESTful API —— startSendRtp 238 + * 调用zlm RESTFUL API —— startSendRtp
203 */ 239 */
204 public JSONObject startSendRtpStream(MediaServerItem mediaServerItem, Map<String, Object>param) { 240 public JSONObject startSendRtpStream(MediaServerItem mediaServerItem, Map<String, Object>param) {
205 Boolean result = false; 241 Boolean result = false;
@@ -208,9 +244,9 @@ public class ZLMRTPServerFactory { @@ -208,9 +244,9 @@ public class ZLMRTPServerFactory {
208 logger.error("RTP推流失败: 请检查ZLM服务"); 244 logger.error("RTP推流失败: 请检查ZLM服务");
209 } else if (jsonObject.getInteger("code") == 0) { 245 } else if (jsonObject.getInteger("code") == 0) {
210 result= true; 246 result= true;
211 - logger.info("RTP推流[ {}/{} ]请求成功,本地推流端口:{}" ,param.get("app"), param.get("stream"), jsonObject.getString("local_port")); 247 + logger.info("RTP推流成功[ {}/{} ],本地推流端口:{}" ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"));
212 } else { 248 } else {
213 - logger.error("RTP推流失败: " + jsonObject.getString("msg")); 249 + logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param));
214 } 250 }
215 return jsonObject; 251 return jsonObject;
216 } 252 }
@@ -265,7 +301,7 @@ public class ZLMRTPServerFactory { @@ -265,7 +301,7 @@ public class ZLMRTPServerFactory {
265 result= true; 301 result= true;
266 logger.info("停止RTP推流成功"); 302 logger.info("停止RTP推流成功");
267 } else { 303 } else {
268 - logger.error("停止RTP推流失败: " + jsonObject.getString("msg")); 304 + logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param));
269 } 305 }
270 return result; 306 return result;
271 } 307 }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java
@@ -194,6 +194,9 @@ public class ZLMServerConfig { @@ -194,6 +194,9 @@ public class ZLMServerConfig {
194 @JSONField(name = "rtp_proxy.port") 194 @JSONField(name = "rtp_proxy.port")
195 private int rtpProxyPort; 195 private int rtpProxyPort;
196 196
  197 + @JSONField(name = "rtp_proxy.port_range")
  198 + private String portRange;
  199 +
197 @JSONField(name = "rtp_proxy.timeoutSec") 200 @JSONField(name = "rtp_proxy.timeoutSec")
198 private String rtpProxyTimeoutSec; 201 private String rtpProxyTimeoutSec;
199 202
@@ -802,4 +805,12 @@ public class ZLMServerConfig { @@ -802,4 +805,12 @@ public class ZLMServerConfig {
802 public void setHookAliveInterval(int hookAliveInterval) { 805 public void setHookAliveInterval(int hookAliveInterval) {
803 this.hookAliveInterval = hookAliveInterval; 806 this.hookAliveInterval = hookAliveInterval;
804 } 807 }
  808 +
  809 + public String getPortRange() {
  810 + return portRange;
  811 + }
  812 +
  813 + public void setPortRange(String portRange) {
  814 + this.portRange = portRange;
  815 + }
805 } 816 }
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java
@@ -91,7 +91,7 @@ public class MediaServerItem{ @@ -91,7 +91,7 @@ public class MediaServerItem{
91 streamNoneReaderDelayMS = zlmServerConfig.getGeneralStreamNoneReaderDelayMS(); 91 streamNoneReaderDelayMS = zlmServerConfig.getGeneralStreamNoneReaderDelayMS();
92 hookAliveInterval = zlmServerConfig.getHookAliveInterval(); 92 hookAliveInterval = zlmServerConfig.getHookAliveInterval();
93 rtpEnable = false; // 默认使用单端口;直到用户自己设置开启多端口 93 rtpEnable = false; // 默认使用单端口;直到用户自己设置开启多端口
94 - rtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号 94 + rtpPortRange = zlmServerConfig.getPortRange().replace("_",","); // 默认使用30000,30500作为级联时发送流的端口号
95 sendRtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号 95 sendRtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号
96 recordAssistPort = 0; // 默认关闭 96 recordAssistPort = 0; // 默认关闭
97 97
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -521,6 +521,9 @@ public class MediaServerServiceImpl implements IMediaServerService { @@ -521,6 +521,9 @@ public class MediaServerServiceImpl implements IMediaServerService {
521 // 最多等待未初始化的Track时间,单位毫秒,超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流, 521 // 最多等待未初始化的Track时间,单位毫秒,超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流,
522 // 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项 522 // 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项
523 param.put("general.wait_track_ready_ms", "3000" ); 523 param.put("general.wait_track_ready_ms", "3000" );
  524 + if (mediaServerItem.isRtpEnable() && !StringUtils.isEmpty(mediaServerItem.getRtpPortRange())) {
  525 + param.put("rtp_proxy.port_range", mediaServerItem.getRtpPortRange().replace(",", "-"));
  526 + }
524 527
525 JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param); 528 JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param);
526 529
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
@@ -109,7 +109,6 @@ public class PlayController { @@ -109,7 +109,6 @@ public class PlayController {
109 // 录像查询以channelId作为deviceId查询 109 // 录像查询以channelId作为deviceId查询
110 String key = DeferredResultHolder.CALLBACK_CMD_STOP + deviceId + channelId; 110 String key = DeferredResultHolder.CALLBACK_CMD_STOP + deviceId + channelId;
111 resultHolder.put(key, uuid, result); 111 resultHolder.put(key, uuid, result);
112 - Device device = storager.queryVideoDevice(deviceId);  
113 StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); 112 StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
114 if (streamInfo == null) { 113 if (streamInfo == null) {
115 RequestMessage msg = new RequestMessage(); 114 RequestMessage msg = new RequestMessage();
@@ -120,15 +119,14 @@ public class PlayController { @@ -120,15 +119,14 @@ public class PlayController {
120 storager.stopPlay(deviceId, channelId); 119 storager.stopPlay(deviceId, channelId);
121 return result; 120 return result;
122 } 121 }
123 - cmder.streamByeCmd(deviceId, channelId, streamInfo.getStream(), null, (event) -> { 122 + cmder.streamByeCmd(deviceId, channelId, streamInfo.getStream(), null, eventResult -> {
124 redisCatchStorage.stopPlay(streamInfo); 123 redisCatchStorage.stopPlay(streamInfo);
125 storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); 124 storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
126 - RequestMessage msg = new RequestMessage();  
127 - msg.setId(uuid);  
128 - msg.setKey(key);  
129 - //Response response = event.getResponse();  
130 - msg.setData(String.format("success"));  
131 - resultHolder.invokeAllResult(msg); 125 + RequestMessage msgForSuccess = new RequestMessage();
  126 + msgForSuccess.setId(uuid);
  127 + msgForSuccess.setKey(key);
  128 + msgForSuccess.setData(String.format("success"));
  129 + resultHolder.invokeAllResult(msgForSuccess);
132 }); 130 });
133 131
134 if (deviceId != null || channelId != null) { 132 if (deviceId != null || channelId != null) {