Commit 6b8ecd1f9d2abe1e6ac0af858487755a58a2643a

Authored by 648540858
1 parent 23710f1c

优化级联注册稳定性

src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
@@ -127,11 +127,15 @@ public class DynamicTask { @@ -127,11 +127,15 @@ public class DynamicTask {
127 public void execute(){ 127 public void execute(){
128 if (futureMap.size() > 0) { 128 if (futureMap.size() > 0) {
129 for (String key : futureMap.keySet()) { 129 for (String key : futureMap.keySet()) {
130 - if (futureMap.get(key).isDone()) { 130 + if (futureMap.get(key).isDone() || futureMap.get(key).isCancelled()) {
131 futureMap.remove(key); 131 futureMap.remove(key);
132 runnableMap.remove(key); 132 runnableMap.remove(key);
133 } 133 }
134 } 134 }
135 } 135 }
136 } 136 }
  137 +
  138 + public boolean isAlive(String key) {
  139 + return futureMap.get(key) != null && !futureMap.get(key).isDone() && !futureMap.get(key).isCancelled();
  140 + }
137 } 141 }
src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java
@@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.conf; @@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.conf;
2 2
3 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; 3 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
4 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; 4 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
5 -import com.genersoft.iot.vmp.gb28181.event.EventPublisher;  
6 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; 5 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
7 import com.genersoft.iot.vmp.service.IPlatformService; 6 import com.genersoft.iot.vmp.service.IPlatformService;
8 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 7 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -47,7 +46,7 @@ public class SipPlatformRunner implements CommandLineRunner { @@ -47,7 +46,7 @@ public class SipPlatformRunner implements CommandLineRunner {
47 parentPlatformCatch.setId(parentPlatform.getServerGBId()); 46 parentPlatformCatch.setId(parentPlatform.getServerGBId());
48 redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); 47 redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
49 // 设置所有平台离线 48 // 设置所有平台离线
50 - platformService.offline(parentPlatform); 49 + platformService.offline(parentPlatform, true);
51 // 取消订阅 50 // 取消订阅
52 sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{ 51 sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{
53 platformService.login(parentPlatform); 52 platformService.login(parentPlatform);
src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java
@@ -5,11 +5,9 @@ import org.slf4j.Logger; @@ -5,11 +5,9 @@ import org.slf4j.Logger;
5 import org.slf4j.LoggerFactory; 5 import org.slf4j.LoggerFactory;
6 import org.springframework.context.ApplicationListener; 6 import org.springframework.context.ApplicationListener;
7 import org.springframework.stereotype.Component; 7 import org.springframework.stereotype.Component;
8 -import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;  
9 8
10 -import java.util.HashMap;  
11 -import java.util.Hashtable;  
12 import java.util.Map; 9 import java.util.Map;
  10 +import java.util.concurrent.ConcurrentHashMap;
13 11
14 /** 12 /**
15 * @description: 录像查询结束事件 13 * @description: 录像查询结束事件
@@ -22,13 +20,12 @@ public class RecordEndEventListener implements ApplicationListener<RecordEndEven @@ -22,13 +20,12 @@ public class RecordEndEventListener implements ApplicationListener<RecordEndEven
22 20
23 private final static Logger logger = LoggerFactory.getLogger(RecordEndEventListener.class); 21 private final static Logger logger = LoggerFactory.getLogger(RecordEndEventListener.class);
24 22
25 - private static Map<String, SseEmitter> sseEmitters = new Hashtable<>();  
26 -  
27 public interface RecordEndEventHandler{ 23 public interface RecordEndEventHandler{
28 void handler(RecordInfo recordInfo); 24 void handler(RecordInfo recordInfo);
29 } 25 }
30 26
31 - private Map<String, RecordEndEventHandler> handlerMap = new HashMap<>(); 27 + private Map<String, RecordEndEventHandler> handlerMap = new ConcurrentHashMap<>();
  28 +
32 @Override 29 @Override
33 public void onApplicationEvent(RecordEndEvent event) { 30 public void onApplicationEvent(RecordEndEvent event) {
34 logger.info("录像查询完成事件触发,deviceId:{}, channelId: {}, 录像数量{}条", event.getRecordInfo().getDeviceId(), 31 logger.info("录像查询完成事件触发,deviceId:{}, channelId: {}, 录像数量{}条", event.getRecordInfo().getDeviceId(),
@@ -38,7 +35,6 @@ public class RecordEndEventListener implements ApplicationListener&lt;RecordEndEven @@ -38,7 +35,6 @@ public class RecordEndEventListener implements ApplicationListener&lt;RecordEndEven
38 recordEndEventHandler.handler(event.getRecordInfo()); 35 recordEndEventHandler.handler(event.getRecordInfo());
39 } 36 }
40 } 37 }
41 -  
42 } 38 }
43 39
44 public void addEndEventHandler(String device, String channelId, RecordEndEventHandler recordEndEventHandler) { 40 public void addEndEventHandler(String device, String channelId, RecordEndEventHandler recordEndEventHandler) {
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
@@ -100,7 +100,7 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { @@ -100,7 +100,7 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
100 if (platformRegisterInfo.isRegister()) { 100 if (platformRegisterInfo.isRegister()) {
101 platformService.online(parentPlatform); 101 platformService.online(parentPlatform);
102 }else { 102 }else {
103 - platformService.offline(parentPlatform); 103 + platformService.offline(parentPlatform, false);
104 } 104 }
105 105
106 // 注册/注销成功移除缓存的信息 106 // 注册/注销成功移除缓存的信息
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
@@ -169,7 +169,6 @@ public class ZLMRESTfulUtils { @@ -169,7 +169,6 @@ public class ZLMRESTfulUtils {
169 .build(); 169 .build();
170 Response response = client.newCall(request).execute(); 170 Response response = client.newCall(request).execute();
171 if (response.isSuccessful()) { 171 if (response.isSuccessful()) {
172 - logger.info("response body contentType: " + Objects.requireNonNull(response.body()).contentType());  
173 if (targetPath != null) { 172 if (targetPath != null) {
174 File snapFolder = new File(targetPath); 173 File snapFolder = new File(targetPath);
175 if (!snapFolder.exists()) { 174 if (!snapFolder.exists()) {
src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java
1 package com.genersoft.iot.vmp.service; 1 package com.genersoft.iot.vmp.service;
2 2
3 -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;  
4 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; 3 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
5 -import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;  
6 -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;  
7 import com.github.pagehelper.PageInfo; 4 import com.github.pagehelper.PageInfo;
8 5
9 -import java.util.List;  
10 -  
11 /** 6 /**
12 * 国标平台的业务类 7 * 国标平台的业务类
13 * @author lin 8 * @author lin
@@ -40,7 +35,7 @@ public interface IPlatformService { @@ -40,7 +35,7 @@ public interface IPlatformService {
40 * 平台离线 35 * 平台离线
41 * @param parentPlatform 平台信息 36 * @param parentPlatform 平台信息
42 */ 37 */
43 - void offline(ParentPlatform parentPlatform); 38 + void offline(ParentPlatform parentPlatform, boolean stopRegisterTask);
44 39
45 /** 40 /**
46 * 向上级平台发起注册 41 * 向上级平台发起注册
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -22,7 +22,6 @@ import org.springframework.stereotype.Service; @@ -22,7 +22,6 @@ import org.springframework.stereotype.Service;
22 22
23 import javax.sip.InvalidArgumentException; 23 import javax.sip.InvalidArgumentException;
24 import javax.sip.SipException; 24 import javax.sip.SipException;
25 -import javax.sip.TimeoutEvent;  
26 import java.text.ParseException; 25 import java.text.ParseException;
27 import java.util.HashMap; 26 import java.util.HashMap;
28 import java.util.List; 27 import java.util.List;
@@ -131,20 +130,23 @@ public class PlatformServiceImpl implements IPlatformService { @@ -131,20 +130,23 @@ public class PlatformServiceImpl implements IPlatformService {
131 } 130 }
132 131
133 final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId(); 132 final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
134 - if (dynamicTask.contains(registerTaskKey)) {  
135 - dynamicTask.stop(registerTaskKey);  
136 - }  
137 - // 添加注册任务  
138 - dynamicTask.startDelay(registerTaskKey, 133 + if (!dynamicTask.isAlive(registerTaskKey)) {
  134 + // 添加注册任务
  135 + dynamicTask.startCron(registerTaskKey,
139 // 注册失败(注册成功时由程序直接调用了online方法) 136 // 注册失败(注册成功时由程序直接调用了online方法)
140 ()-> { 137 ()-> {
141 try { 138 try {
142 - commanderForPlatform.register(parentPlatform, eventResult -> offline(parentPlatform),null); 139 + logger.info("[国标级联] 平台:{}注册即将到期,重新注册", parentPlatform.getServerGBId());
  140 + commanderForPlatform.register(parentPlatform, eventResult -> {
  141 + offline(parentPlatform, false);
  142 + },null);
143 } catch (InvalidArgumentException | ParseException | SipException e) { 143 } catch (InvalidArgumentException | ParseException | SipException e) {
144 logger.error("[命令发送失败] 国标级联定时注册: {}", e.getMessage()); 144 logger.error("[命令发送失败] 国标级联定时注册: {}", e.getMessage());
145 } 145 }
146 }, 146 },
147 (parentPlatform.getExpires() - 10) *1000); 147 (parentPlatform.getExpires() - 10) *1000);
  148 + }
  149 +
148 150
149 final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId(); 151 final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
150 if (!dynamicTask.contains(keepaliveTaskKey)) { 152 if (!dynamicTask.contains(keepaliveTaskKey)) {
@@ -160,16 +162,11 @@ public class PlatformServiceImpl implements IPlatformService { @@ -160,16 +162,11 @@ public class PlatformServiceImpl implements IPlatformService {
160 // 此时是第三次心跳超时, 平台离线 162 // 此时是第三次心跳超时, 平台离线
161 if (platformCatch.getKeepAliveReply() == 2) { 163 if (platformCatch.getKeepAliveReply() == 2) {
162 // 设置平台离线,并重新注册 164 // 设置平台离线,并重新注册
163 - offline(parentPlatform);  
164 logger.info("[国标级联] {},三次心跳超时后再次发起注册", parentPlatform.getServerGBId()); 165 logger.info("[国标级联] {},三次心跳超时后再次发起注册", parentPlatform.getServerGBId());
165 try { 166 try {
166 commanderForPlatform.register(parentPlatform, eventResult1 -> { 167 commanderForPlatform.register(parentPlatform, eventResult1 -> {
167 logger.info("[国标级联] {},三次心跳超时后再次发起注册仍然失败,开始定时发起注册,间隔为1分钟", parentPlatform.getServerGBId()); 168 logger.info("[国标级联] {},三次心跳超时后再次发起注册仍然失败,开始定时发起注册,间隔为1分钟", parentPlatform.getServerGBId());
168 - // 添加注册任务  
169 - dynamicTask.startCron(registerTaskKey,  
170 - // 注册失败(注册成功时由程序直接调用了online方法)  
171 - ()->logger.info("[国标级联] {},平台离线后持续发起注册,失败", parentPlatform.getServerGBId()),  
172 - 60*1000); 169 + offline(parentPlatform, false);
173 }, null); 170 }, null);
174 } catch (InvalidArgumentException | ParseException | SipException e) { 171 } catch (InvalidArgumentException | ParseException | SipException e) {
175 logger.error("[命令发送失败] 国标级联 注册: {}", e.getMessage()); 172 logger.error("[命令发送失败] 国标级联 注册: {}", e.getMessage());
@@ -198,7 +195,7 @@ public class PlatformServiceImpl implements IPlatformService { @@ -198,7 +195,7 @@ public class PlatformServiceImpl implements IPlatformService {
198 } 195 }
199 196
200 @Override 197 @Override
201 - public void offline(ParentPlatform parentPlatform) { 198 + public void offline(ParentPlatform parentPlatform, boolean stopRegister) {
202 logger.info("[平台离线]:{}", parentPlatform.getServerGBId()); 199 logger.info("[平台离线]:{}", parentPlatform.getServerGBId());
203 ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); 200 ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
204 parentPlatformCatch.setKeepAliveReply(0); 201 parentPlatformCatch.setKeepAliveReply(0);
@@ -212,11 +209,13 @@ public class PlatformServiceImpl implements IPlatformService { @@ -212,11 +209,13 @@ public class PlatformServiceImpl implements IPlatformService {
212 // 停止所有推流 209 // 停止所有推流
213 logger.info("[平台离线] {}, 停止所有推流", parentPlatform.getServerGBId()); 210 logger.info("[平台离线] {}, 停止所有推流", parentPlatform.getServerGBId());
214 stopAllPush(parentPlatform.getServerGBId()); 211 stopAllPush(parentPlatform.getServerGBId());
215 - // 清除注册定时  
216 - logger.info("[平台离线] {}, 停止定时注册任务", parentPlatform.getServerGBId());  
217 - final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();  
218 - if (dynamicTask.contains(registerTaskKey)) {  
219 - dynamicTask.stop(registerTaskKey); 212 + if (stopRegister) {
  213 + // 清除注册定时
  214 + logger.info("[平台离线] {}, 停止定时注册任务", parentPlatform.getServerGBId());
  215 + final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
  216 + if (dynamicTask.contains(registerTaskKey)) {
  217 + dynamicTask.stop(registerTaskKey);
  218 + }
220 } 219 }
221 // 清除心跳定时 220 // 清除心跳定时
222 logger.info("[平台离线] {}, 停止定时发送心跳任务", parentPlatform.getServerGBId()); 221 logger.info("[平台离线] {}, 停止定时发送心跳任务", parentPlatform.getServerGBId());