Commit b05ddb2003bb8510694d433c33c6ae09ee94e140

Authored by 648540858
Committed by GitHub
2 parents cbd2bc8e 2c1dbe63

Merge pull request #64 from lawrencehj/wvp-28181-2.0

修正一处可能导致死循环的代码
src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java
@@ -64,6 +64,7 @@ public class PlatformKeepaliveExpireEventLister implements ApplicationListener<P @@ -64,6 +64,7 @@ public class PlatformKeepaliveExpireEventLister implements ApplicationListener<P
64 // 有3次未收到心跳回复, 设置平台状态为离线, 开始重新注册 64 // 有3次未收到心跳回复, 设置平台状态为离线, 开始重新注册
65 logger.warn("有3次未收到心跳回复,标记设置平台状态为离线, 并重新注册 平台国标ID:" + event.getPlatformGbID()); 65 logger.warn("有3次未收到心跳回复,标记设置平台状态为离线, 并重新注册 平台国标ID:" + event.getPlatformGbID());
66 publisher.platformNotRegisterEventPublish(event.getPlatformGbID()); 66 publisher.platformNotRegisterEventPublish(event.getPlatformGbID());
  67 + parentPlatformCatch.setKeepAliveReply(0);
67 }else { 68 }else {
68 // 再次发送心跳 69 // 再次发送心跳
69 String callId = sipCommanderForPlatform.keepalive(parentPlatform); 70 String callId = sipCommanderForPlatform.keepalive(parentPlatform);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
@@ -154,6 +154,8 @@ public class SIPProcessorFactory { @@ -154,6 +154,8 @@ public class SIPProcessorFactory {
154 } else if (Request.BYE.equals(method)) { 154 } else if (Request.BYE.equals(method)) {
155 ByeRequestProcessor processor = new ByeRequestProcessor(); 155 ByeRequestProcessor processor = new ByeRequestProcessor();
156 processor.setRequestEvent(evt); 156 processor.setRequestEvent(evt);
  157 + processor.setRedisCatchStorage(redisCatchStorage);
  158 + processor.setZlmrtpServerFactory(zlmrtpServerFactory);
157 return processor; 159 return processor;
158 } else if (Request.CANCEL.equals(method)) { 160 } else if (Request.CANCEL.equals(method)) {
159 CancelRequestProcessor processor = new CancelRequestProcessor(); 161 CancelRequestProcessor processor = new CancelRequestProcessor();
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -893,7 +893,7 @@ public class SIPCommander implements ISIPCommander { @@ -893,7 +893,7 @@ public class SIPCommander implements ISIPCommander {
893 catalogXml.append("</Query>\r\n"); 893 catalogXml.append("</Query>\r\n");
894 894
895 String tm = Long.toString(System.currentTimeMillis()); 895 String tm = Long.toString(System.currentTimeMillis());
896 - Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "ViaDeviceInfoBranch", "FromDev" + tm, null); 896 + Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaDeviceInfo" + tm, "FromDev" + tm, null);
897 897
898 transmitRequest(device, request); 898 transmitRequest(device, request);
899 899
@@ -923,7 +923,7 @@ public class SIPCommander implements ISIPCommander { @@ -923,7 +923,7 @@ public class SIPCommander implements ISIPCommander {
923 catalogXml.append("</Query>\r\n"); 923 catalogXml.append("</Query>\r\n");
924 924
925 String tm = Long.toString(System.currentTimeMillis()); 925 String tm = Long.toString(System.currentTimeMillis());
926 - Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "ViaCatalogBranch", "FromCat" + tm, null); 926 + Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaCatalog" + tm, "FromCat" + tm, null);
927 927
928 transmitRequest(device, request, errorEvent); 928 transmitRequest(device, request, errorEvent);
929 } catch (SipException | ParseException | InvalidArgumentException e) { 929 } catch (SipException | ParseException | InvalidArgumentException e) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -118,18 +118,18 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @@ -118,18 +118,18 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
118 try { 118 try {
119 119
120 StringBuffer keepaliveXml = new StringBuffer(200); 120 StringBuffer keepaliveXml = new StringBuffer(200);
121 - keepaliveXml.append("<?xml version=\"1.0\" encoding=\"GB2312\" ?>\r\n"); 121 + keepaliveXml.append("<?xml version=\"1.0\"?>\r\n");//" encoding=\"GB2312\"?>\r\n");
122 keepaliveXml.append("<Notify>\r\n"); 122 keepaliveXml.append("<Notify>\r\n");
123 keepaliveXml.append("<CmdType>Keepalive</CmdType>\r\n"); 123 keepaliveXml.append("<CmdType>Keepalive</CmdType>\r\n");
124 keepaliveXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n"); 124 keepaliveXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
125 - keepaliveXml.append("<DeviceID>" + parentPlatform.getServerGBId() + "</DeviceID>\r\n"); 125 + keepaliveXml.append("<DeviceID>" + parentPlatform.getDeviceGBId() + "</DeviceID>\r\n");
126 keepaliveXml.append("<Status>OK</Status>\r\n"); 126 keepaliveXml.append("<Status>OK</Status>\r\n");
127 keepaliveXml.append("</Notify>\r\n"); 127 keepaliveXml.append("</Notify>\r\n");
128 128
129 Request request = headerProviderPlarformProvider.createKeetpaliveMessageRequest( 129 Request request = headerProviderPlarformProvider.createKeetpaliveMessageRequest(
130 parentPlatform, 130 parentPlatform,
131 keepaliveXml.toString(), 131 keepaliveXml.toString(),
132 - UUID.randomUUID().toString().replace("-", ""), 132 + "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""),
133 UUID.randomUUID().toString().replace("-", ""), 133 UUID.randomUUID().toString().replace("-", ""),
134 null); 134 null);
135 transmitRequest(parentPlatform, request); 135 transmitRequest(parentPlatform, request);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java
@@ -22,13 +22,10 @@ import org.springframework.stereotype.Component; @@ -22,13 +22,10 @@ import org.springframework.stereotype.Component;
22 @Component 22 @Component
23 public class AckRequestProcessor extends SIPRequestAbstractProcessor { 23 public class AckRequestProcessor extends SIPRequestAbstractProcessor {
24 24
25 - //@Autowired  
26 private IRedisCatchStorage redisCatchStorage; 25 private IRedisCatchStorage redisCatchStorage;
27 26
28 - //@Autowired  
29 private ZLMRTPServerFactory zlmrtpServerFactory; 27 private ZLMRTPServerFactory zlmrtpServerFactory;
30 28
31 -  
32 /** 29 /**
33 * 处理 ACK请求 30 * 处理 ACK请求
34 * 31 *
@@ -49,6 +46,8 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor { @@ -49,6 +46,8 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor {
49 String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; 46 String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
50 String deviceId = sendRtpItem.getDeviceId(); 47 String deviceId = sendRtpItem.getDeviceId();
51 StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); 48 StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
  49 + sendRtpItem.setStreamId(streamInfo.getStreamId());
  50 + redisCatchStorage.updateSendRTPSever(sendRtpItem);
52 System.out.println(platformGbId); 51 System.out.println(platformGbId);
53 System.out.println(channelId); 52 System.out.println(channelId);
54 Map<String, Object> param = new HashMap<>(); 53 Map<String, Object> param = new HashMap<>();
@@ -68,11 +67,16 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor { @@ -68,11 +67,16 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor {
68 if (System.currentTimeMillis() - startTime < 30 * 1000) { 67 if (System.currentTimeMillis() - startTime < 30 * 1000) {
69 if (zlmrtpServerFactory.isRtpReady(streamInfo.getStreamId())) { 68 if (zlmrtpServerFactory.isRtpReady(streamInfo.getStreamId())) {
70 rtpPushed = true; 69 rtpPushed = true;
  70 + System.out.println("已获取设备推流,开始向上级推流");
71 zlmrtpServerFactory.startSendRtpStream(param); 71 zlmrtpServerFactory.startSendRtpStream(param);
72 } else { 72 } else {
  73 + System.out.println("等待设备推流.......");
73 Thread.sleep(2000); 74 Thread.sleep(2000);
74 continue; 75 continue;
75 } 76 }
  77 + } else {
  78 + rtpPushed = true;
  79 + System.out.println("设备推流超时,终止向上级推流");
76 } 80 }
77 } catch (InterruptedException e) { 81 } catch (InterruptedException e) {
78 e.printStackTrace(); 82 e.printStackTrace();
@@ -108,5 +112,4 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor { @@ -108,5 +112,4 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor {
108 public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) { 112 public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) {
109 this.zlmrtpServerFactory = zlmrtpServerFactory; 113 this.zlmrtpServerFactory = zlmrtpServerFactory;
110 } 114 }
111 -  
112 } 115 }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java
1 package com.genersoft.iot.vmp.gb28181.transmit.request.impl; 1 package com.genersoft.iot.vmp.gb28181.transmit.request.impl;
2 2
  3 +import javax.sip.Dialog;
  4 +import javax.sip.DialogState;
3 import javax.sip.InvalidArgumentException; 5 import javax.sip.InvalidArgumentException;
4 import javax.sip.RequestEvent; 6 import javax.sip.RequestEvent;
5 import javax.sip.SipException; 7 import javax.sip.SipException;
6 import javax.sip.message.Response; 8 import javax.sip.message.Response;
7 9
  10 +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
8 import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; 11 import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor;
  12 +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
  13 +import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
9 14
10 import java.text.ParseException; 15 import java.text.ParseException;
  16 +import java.util.HashMap;
  17 +import java.util.Map;
11 18
12 /** 19 /**
13 * @Description: BYE请求处理器 20 * @Description: BYE请求处理器
@@ -16,6 +23,10 @@ import java.text.ParseException; @@ -16,6 +23,10 @@ import java.text.ParseException;
16 */ 23 */
17 public class ByeRequestProcessor extends SIPRequestAbstractProcessor { 24 public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
18 25
  26 + private IRedisCatchStorage redisCatchStorage;
  27 +
  28 + private ZLMRTPServerFactory zlmrtpServerFactory;
  29 +
19 /** 30 /**
20 * 处理BYE请求 31 * 处理BYE请求
21 * @param evt 32 * @param evt
@@ -24,6 +35,22 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { @@ -24,6 +35,22 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
24 public void process(RequestEvent evt) { 35 public void process(RequestEvent evt) {
25 try { 36 try {
26 responseAck(evt); 37 responseAck(evt);
  38 + Dialog dialog = evt.getDialog();
  39 + if (dialog == null) return;
  40 + if (dialog.getState().equals(DialogState.TERMINATED)) {
  41 + String remoteUri = dialog.getRemoteParty().getURI().toString();
  42 + String localUri = dialog.getLocalParty().getURI().toString();
  43 + String platformGbId = remoteUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@"));
  44 + String channelId = localUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@"));
  45 + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId);
  46 + String streamId = sendRtpItem.getStreamId();
  47 + Map<String, Object> param = new HashMap<>();
  48 + param.put("vhost","__defaultVhost__");
  49 + param.put("app","rtp");
  50 + param.put("stream",streamId);
  51 + System.out.println("停止向上级推流:" + streamId);
  52 + zlmrtpServerFactory.stopSendRtpStream(param);
  53 + }
27 } catch (SipException e) { 54 } catch (SipException e) {
28 e.printStackTrace(); 55 e.printStackTrace();
29 } catch (InvalidArgumentException e) { 56 } catch (InvalidArgumentException e) {
@@ -47,4 +74,19 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { @@ -47,4 +74,19 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
47 getServerTransaction(evt).sendResponse(response); 74 getServerTransaction(evt).sendResponse(response);
48 } 75 }
49 76
  77 + public IRedisCatchStorage getRedisCatchStorage() {
  78 + return redisCatchStorage;
  79 + }
  80 +
  81 + public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
  82 + this.redisCatchStorage = redisCatchStorage;
  83 + }
  84 +
  85 + public ZLMRTPServerFactory getZlmrtpServerFactory() {
  86 + return zlmrtpServerFactory;
  87 + }
  88 +
  89 + public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) {
  90 + this.zlmrtpServerFactory = zlmrtpServerFactory;
  91 + }
50 } 92 }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
@@ -123,4 +123,8 @@ public class ZLMRESTfulUtils { @@ -123,4 +123,8 @@ public class ZLMRESTfulUtils {
123 public JSONObject startSendRtp(Map<String, Object> param) { 123 public JSONObject startSendRtp(Map<String, Object> param) {
124 return sendPost("startSendRtp",param); 124 return sendPost("startSendRtp",param);
125 } 125 }
  126 +
  127 + public JSONObject stopSendRtp(Map<String, Object> param) {
  128 + return sendPost("stopSendRtp",param);
  129 + }
126 } 130 }
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -127,46 +127,46 @@ public class ZLMRTPServerFactory { @@ -127,46 +127,46 @@ public class ZLMRTPServerFactory {
127 } 127 }
128 128
129 /** 129 /**
130 - * 130 + * 调用zlm RESTful API —— startSendRtp
131 */ 131 */
132 public Boolean startSendRtpStream(Map<String, Object>param) { 132 public Boolean startSendRtpStream(Map<String, Object>param) {
133 Boolean result = false; 133 Boolean result = false;
134 JSONObject jsonObject = zlmresTfulUtils.startSendRtp(param); 134 JSONObject jsonObject = zlmresTfulUtils.startSendRtp(param);
135 System.out.println(jsonObject); 135 System.out.println(jsonObject);
136 - if (jsonObject != null) {  
137 - switch (jsonObject.getInteger("code")){  
138 - case 0:  
139 - result= true;  
140 - logger.error("RTP推流请求成功,本地推流端口:" + jsonObject.getString("local_port"));  
141 - break;  
142 - // case -300: // id已经存在  
143 - // result = false;  
144 - // break;  
145 - // case -400: // 端口占用  
146 - // result= false;  
147 - // break;  
148 - default:  
149 - logger.error("RTP推流失败: " + jsonObject.getString("msg"));  
150 - break;  
151 - }  
152 - }else {  
153 - // 检查ZLM状态 136 + if (jsonObject == null) {
154 logger.error("RTP推流失败: 请检查ZLM服务"); 137 logger.error("RTP推流失败: 请检查ZLM服务");
  138 + } else if (jsonObject.getInteger("code") == 0) {
  139 + result= true;
  140 + logger.error("RTP推流请求成功,本地推流端口:" + jsonObject.getString("local_port"));
  141 + } else {
  142 + logger.error("RTP推流失败: " + jsonObject.getString("msg"));
155 } 143 }
156 return result; 144 return result;
157 } 145 }
158 146
159 /** 147 /**
160 - * 148 + * 查询待转推的流是否就绪
161 */ 149 */
162 public Boolean isRtpReady(String streamId) { 150 public Boolean isRtpReady(String streamId) {
163 JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo("rtp", "rtmp", streamId); 151 JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo("rtp", "rtmp", streamId);
164 - if (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")) {  
165 - logger.info("设备RTP推流成功");  
166 - return true; 152 + return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"));
  153 + }
  154 +
  155 + /**
  156 + * 调用zlm RESTful API —— stopSendRtp
  157 + */
  158 + public Boolean stopSendRtpStream(Map<String, Object>param) {
  159 + Boolean result = false;
  160 + JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(param);
  161 + System.out.println(jsonObject);
  162 + if (jsonObject == null) {
  163 + logger.error("停止RTP推流失败: 请检查ZLM服务");
  164 + } else if (jsonObject.getInteger("code") == 0) {
  165 + result= true;
  166 + logger.error("停止RTP推流成功");
167 } else { 167 } else {
168 - logger.info("设备RTP推流未完成");  
169 - return false; 168 + logger.error("停止RTP推流失败: " + jsonObject.getString("msg"));
170 } 169 }
  170 + return result;
171 } 171 }
172 } 172 }