Commit 8cf5b65e3882580be5a9fa815d476fb0da2da6b7

Authored by 648540858
1 parent 8942ab01

优化国标级联注册机制

src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
@@ -45,6 +45,7 @@ public class DynamicTask { @@ -45,6 +45,7 @@ public class DynamicTask {
45 * @return 45 * @return
46 */ 46 */
47 public void startCron(String key, Runnable task, int cycleForCatalog) { 47 public void startCron(String key, Runnable task, int cycleForCatalog) {
  48 + System.out.println(cycleForCatalog);
48 ScheduledFuture<?> future = futureMap.get(key); 49 ScheduledFuture<?> future = futureMap.get(key);
49 if (future != null) { 50 if (future != null) {
50 if (future.isCancelled()) { 51 if (future.isCancelled()) {
src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java
@@ -56,7 +56,7 @@ public class SipPlatformRunner implements CommandLineRunner { @@ -56,7 +56,7 @@ public class SipPlatformRunner implements CommandLineRunner {
56 } 56 }
57 57
58 // 设置所有平台离线 58 // 设置所有平台离线
59 - platformService.offline(parentPlatform, true); 59 + platformService.offline(parentPlatform, false);
60 } 60 }
61 } 61 }
62 } 62 }
src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
@@ -68,6 +68,10 @@ public class UserSetting { @@ -68,6 +68,10 @@ public class UserSetting {
68 68
69 private int maxNotifyCountQueue = 10000; 69 private int maxNotifyCountQueue = 10000;
70 70
  71 + private int registerAgainAfterTime = 60;
  72 +
  73 + private boolean registerKeepIntDialog = false;
  74 +
71 public Boolean getSavePositionHistory() { 75 public Boolean getSavePositionHistory() {
72 return savePositionHistory; 76 return savePositionHistory;
73 } 77 }
@@ -287,4 +291,20 @@ public class UserSetting { @@ -287,4 +291,20 @@ public class UserSetting {
287 public void setCivilCodeFile(String civilCodeFile) { 291 public void setCivilCodeFile(String civilCodeFile) {
288 this.civilCodeFile = civilCodeFile; 292 this.civilCodeFile = civilCodeFile;
289 } 293 }
  294 +
  295 + public int getRegisterAgainAfterTime() {
  296 + return registerAgainAfterTime;
  297 + }
  298 +
  299 + public void setRegisterAgainAfterTime(int registerAgainAfterTime) {
  300 + this.registerAgainAfterTime = registerAgainAfterTime;
  301 + }
  302 +
  303 + public boolean isRegisterKeepIntDialog() {
  304 + return registerKeepIntDialog;
  305 + }
  306 +
  307 + public void setRegisterKeepIntDialog(boolean registerKeepIntDialog) {
  308 + this.registerKeepIntDialog = registerKeepIntDialog;
  309 + }
290 } 310 }
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -34,6 +34,8 @@ import java.util.Map; @@ -34,6 +34,8 @@ import java.util.Map;
34 public class PlatformServiceImpl implements IPlatformService { 34 public class PlatformServiceImpl implements IPlatformService {
35 35
36 private final static String REGISTER_KEY_PREFIX = "platform_register_"; 36 private final static String REGISTER_KEY_PREFIX = "platform_register_";
  37 +
  38 + private final static String REGISTER_FAIL_AGAIN_KEY_PREFIX = "platform_register_fail_again_";
37 private final static String KEEPALIVE_KEY_PREFIX = "platform_keepalive_"; 39 private final static String KEEPALIVE_KEY_PREFIX = "platform_keepalive_";
38 40
39 private final static Logger logger = LoggerFactory.getLogger(PlatformServiceImpl.class); 41 private final static Logger logger = LoggerFactory.getLogger(PlatformServiceImpl.class);
@@ -140,12 +142,11 @@ public class PlatformServiceImpl implements IPlatformService { @@ -140,12 +142,11 @@ public class PlatformServiceImpl implements IPlatformService {
140 // 注销旧的 142 // 注销旧的
141 try { 143 try {
142 if (parentPlatformOld.isStatus()) { 144 if (parentPlatformOld.isStatus()) {
143 - logger.info("保存平台{}时发现平台在线,发送注销命令", parentPlatformOld.getServerGBId()); 145 + logger.info("保存平台{}时发现平台在线,发送注销命令", parentPlatformOld.getServerGBId());
144 commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> { 146 commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> {
145 logger.info("[国标级联] 注销成功, 平台:{}", parentPlatformOld.getServerGBId()); 147 logger.info("[国标级联] 注销成功, 平台:{}", parentPlatformOld.getServerGBId());
146 }); 148 });
147 } 149 }
148 -  
149 } catch (InvalidArgumentException | ParseException | SipException e) { 150 } catch (InvalidArgumentException | ParseException | SipException e) {
150 logger.error("[命令发送失败] 国标级联 注销: {}", e.getMessage()); 151 logger.error("[命令发送失败] 国标级联 注销: {}", e.getMessage());
151 } 152 }
@@ -178,9 +179,6 @@ public class PlatformServiceImpl implements IPlatformService { @@ -178,9 +179,6 @@ public class PlatformServiceImpl implements IPlatformService {
178 logger.error("[命令发送失败] 国标级联: {}", e.getMessage()); 179 logger.error("[命令发送失败] 国标级联: {}", e.getMessage());
179 } 180 }
180 } 181 }
181 - // 重新开启定时注册, 使用续订消息  
182 - // 重新开始心跳保活  
183 -  
184 182
185 return false; 183 return false;
186 } 184 }
@@ -189,6 +187,9 @@ public class PlatformServiceImpl implements IPlatformService { @@ -189,6 +187,9 @@ public class PlatformServiceImpl implements IPlatformService {
189 @Override 187 @Override
190 public void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo) { 188 public void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo) {
191 logger.info("[国标级联]:{}, 平台上线", parentPlatform.getServerGBId()); 189 logger.info("[国标级联]:{}, 平台上线", parentPlatform.getServerGBId());
  190 + final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId();
  191 + dynamicTask.stop(registerFailAgainTaskKey);
  192 +
192 platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), true); 193 platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), true);
193 ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); 194 ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
194 if (parentPlatformCatch == null) { 195 if (parentPlatformCatch == null) {
@@ -229,15 +230,9 @@ public class PlatformServiceImpl implements IPlatformService { @@ -229,15 +230,9 @@ public class PlatformServiceImpl implements IPlatformService {
229 // 此时是第三次心跳超时, 平台离线 230 // 此时是第三次心跳超时, 平台离线
230 if (platformCatch.getKeepAliveReply() == 2) { 231 if (platformCatch.getKeepAliveReply() == 2) {
231 // 设置平台离线,并重新注册 232 // 设置平台离线,并重新注册
232 - logger.info("[国标级联] {},三次心跳超时后再次发起注册", parentPlatform.getServerGBId());  
233 - try {  
234 - commanderForPlatform.register(parentPlatform, eventResult1 -> {  
235 - logger.info("[国标级联] {},三次心跳超时后再次发起注册仍然失败,开始定时发起注册,间隔为1分钟", parentPlatform.getServerGBId());  
236 - offline(parentPlatform, false);  
237 - }, null);  
238 - } catch (InvalidArgumentException | ParseException | SipException e) {  
239 - logger.error("[命令发送失败] 国标级联 注册: {}", e.getMessage());  
240 - } 233 + logger.info("[国标级联] 三次心跳超时, 平台{}({})离线", parentPlatform.getName(), parentPlatform.getServerGBId());
  234 + offline(parentPlatform, false);
  235 +
241 } 236 }
242 237
243 }else { 238 }else {
@@ -263,21 +258,22 @@ public class PlatformServiceImpl implements IPlatformService { @@ -263,21 +258,22 @@ public class PlatformServiceImpl implements IPlatformService {
263 258
264 private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){ 259 private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){
265 try { 260 try {
266 - // 设置超时重发, 后续从底层支持消息重发  
267 - String key = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId() + "_timeout";  
268 - if (dynamicTask.isAlive(key)) {  
269 - return; 261 + // 不在同一个会话中续订则每次全新注册
  262 + if (!userSetting.isRegisterKeepIntDialog()) {
  263 + sipTransactionInfo = null;
  264 + }
  265 +
  266 + if (sipTransactionInfo == null) {
  267 + logger.info("[国标级联] 平台:{}注册即将到期,开始重新注册", parentPlatform.getServerGBId());
  268 + }else {
  269 + logger.info("[国标级联] 平台:{}注册即将到期,开始续订", parentPlatform.getServerGBId());
270 } 270 }
271 - dynamicTask.startDelay(key, ()->{  
272 - registerTask(parentPlatform, sipTransactionInfo);  
273 - }, 1000);  
274 - logger.info("[国标级联] 平台:{}注册即将到期,开始续订", parentPlatform.getServerGBId()); 271 +
275 commanderForPlatform.register(parentPlatform, sipTransactionInfo, eventResult -> { 272 commanderForPlatform.register(parentPlatform, sipTransactionInfo, eventResult -> {
276 - dynamicTask.stop(key); 273 + logger.info("[国标级联] 平台:{}注册失败,{}:{}", parentPlatform.getServerGBId(),
  274 + eventResult.statusCode, eventResult.msg);
277 offline(parentPlatform, false); 275 offline(parentPlatform, false);
278 - },eventResult -> {  
279 - dynamicTask.stop(key);  
280 - }); 276 + }, null);
281 } catch (InvalidArgumentException | ParseException | SipException e) { 277 } catch (InvalidArgumentException | ParseException | SipException e) {
282 logger.error("[命令发送失败] 国标级联定时注册: {}", e.getMessage()); 278 logger.error("[命令发送失败] 国标级联定时注册: {}", e.getMessage());
283 } 279 }
@@ -298,24 +294,35 @@ public class PlatformServiceImpl implements IPlatformService { @@ -298,24 +294,35 @@ public class PlatformServiceImpl implements IPlatformService {
298 // 停止所有推流 294 // 停止所有推流
299 logger.info("[平台离线] {}, 停止所有推流", parentPlatform.getServerGBId()); 295 logger.info("[平台离线] {}, 停止所有推流", parentPlatform.getServerGBId());
300 stopAllPush(parentPlatform.getServerGBId()); 296 stopAllPush(parentPlatform.getServerGBId());
301 - if (stopRegister) {  
302 - // 清除注册定时  
303 - logger.info("[平台离线] {}, 停止定时注册任务", parentPlatform.getServerGBId());  
304 - final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();  
305 - if (dynamicTask.contains(registerTaskKey)) {  
306 - dynamicTask.stop(registerTaskKey);  
307 - } 297 +
  298 + // 清除注册定时
  299 + logger.info("[平台离线] {}, 停止定时注册任务", parentPlatform.getServerGBId());
  300 + final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
  301 + if (dynamicTask.contains(registerTaskKey)) {
  302 + dynamicTask.stop(registerTaskKey);
308 } 303 }
309 // 清除心跳定时 304 // 清除心跳定时
310 logger.info("[平台离线] {}, 停止定时发送心跳任务", parentPlatform.getServerGBId()); 305 logger.info("[平台离线] {}, 停止定时发送心跳任务", parentPlatform.getServerGBId());
311 final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId(); 306 final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
312 if (dynamicTask.contains(keepaliveTaskKey)) { 307 if (dynamicTask.contains(keepaliveTaskKey)) {
313 - // 添加心跳任务 308 + // 清除心跳任务
314 dynamicTask.stop(keepaliveTaskKey); 309 dynamicTask.stop(keepaliveTaskKey);
315 } 310 }
316 // 停止目录订阅回复 311 // 停止目录订阅回复
317 logger.info("[平台离线] {}, 停止订阅回复", parentPlatform.getServerGBId()); 312 logger.info("[平台离线] {}, 停止订阅回复", parentPlatform.getServerGBId());
318 subscribeHolder.removeAllSubscribe(parentPlatform.getServerGBId()); 313 subscribeHolder.removeAllSubscribe(parentPlatform.getServerGBId());
  314 + // 发起定时自动重新注册
  315 + if (!stopRegister) {
  316 + // 设置为60秒自动尝试重新注册
  317 + final String registerFailAgainTaskKey = REGISTER_FAIL_AGAIN_KEY_PREFIX + parentPlatform.getServerGBId();
  318 + ParentPlatform platform = platformMapper.getParentPlatById(parentPlatform.getId());
  319 + if (platform.isEnable()) {
  320 + dynamicTask.startCron(registerFailAgainTaskKey,
  321 + ()-> registerTask(platform, null),
  322 + userSetting.getRegisterAgainAfterTime() * 1000);
  323 + }
  324 +
  325 + }
319 } 326 }
320 327
321 private void stopAllPush(String platformId) { 328 private void stopAllPush(String platformId) {
src/main/resources/all-application.yml
@@ -185,6 +185,10 @@ user-settings: @@ -185,6 +185,10 @@ user-settings:
185 max-notify-count-queue: 10000 185 max-notify-count-queue: 10000
186 # 设备/通道状态变化时发送消息 186 # 设备/通道状态变化时发送消息
187 device-status-notify: false 187 device-status-notify: false
  188 + # 国标级联离线后多久重试一次注册
  189 + register-again-after-time: 60
  190 + # 国标续订方式,true为续订,每次注册在同一个会话里,false为重新注册,每次使用新的会话
  191 + register-keep-int-dialog: false
188 # 跨域配置,配置你访问前端页面的地址即可, 可以配置多个 192 # 跨域配置,配置你访问前端页面的地址即可, 可以配置多个
189 allowed-origins: 193 allowed-origins:
190 - http://localhost:8008 194 - http://localhost:8008