Commit 5e56a2bfd558e576f97095b065b099034bc7dd4f

Authored by guzijian
1 parent d81f3f3a

feat: 新增事件订阅

src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/AlarmSubscribeTask.java 0 → 100644
  1 +package com.genersoft.iot.vmp.gb28181.task.impl;
  2 +
  3 +import com.genersoft.iot.vmp.common.CommonCallback;
  4 +import com.genersoft.iot.vmp.conf.DynamicTask;
  5 +import com.genersoft.iot.vmp.gb28181.bean.Device;
  6 +import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
  7 +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
  8 +import gov.nist.javax.sip.message.SIPRequest;
  9 +import org.slf4j.Logger;
  10 +import org.slf4j.LoggerFactory;
  11 +
  12 +import javax.sip.InvalidArgumentException;
  13 +import javax.sip.SipException;
  14 +import java.text.ParseException;
  15 +
  16 +/**
  17 + * @author 20412
  18 + */
  19 +public class AlarmSubscribeTask implements ISubscribeTask {
  20 + private final Logger logger = LoggerFactory.getLogger(MobilePositionSubscribeTask.class);
  21 + private Device device;
  22 + private ISIPCommander sipCommander;
  23 +
  24 + private SIPRequest request;
  25 + private DynamicTask dynamicTask;
  26 + private String taskKey = "alarm-subscribe-timeout";
  27 +
  28 + public AlarmSubscribeTask(Device device, ISIPCommander sipCommander, DynamicTask dynamicTask) {
  29 + this.device = device;
  30 + this.sipCommander = sipCommander;
  31 + this.dynamicTask = dynamicTask;
  32 + }
  33 +
  34 + @Override
  35 + public void run() {
  36 + if (dynamicTask.get(taskKey) != null) {
  37 + dynamicTask.stop(taskKey);
  38 + }
  39 + SIPRequest sipRequest = null;
  40 + try {
  41 + // 事件订阅 相当一部分设备不支持事件订阅 如海康。。。 这一部分通过布防处理 因为有部分设备不支持布防,如华为
  42 + sipRequest = sipCommander.alarmSubscribe(device,1000,"0","9","0",null,null);
  43 + logger.info("[事件订阅]成功: {}", device.getDeviceId());
  44 + } catch (InvalidArgumentException | SipException | ParseException e) {
  45 + logger.error("[命令发送失败] 事件订阅: {}", e.getMessage());
  46 + }
  47 + if (sipRequest != null) {
  48 + this.request = sipRequest;
  49 + }
  50 +
  51 + }
  52 +
  53 + @Override
  54 + public void stop(CommonCallback<Boolean> callback) {
  55 + /**
  56 + * dialog 的各个状态
  57 + * EARLY-> Early state状态-初始请求发送以后,收到了一个临时响应消息
  58 + * CONFIRMED-> Confirmed Dialog状态-已确认
  59 + * COMPLETED-> Completed Dialog状态-已完成
  60 + * TERMINATED-> Terminated Dialog状态-终止
  61 + */
  62 + if (dynamicTask.get(taskKey) != null) {
  63 + dynamicTask.stop(taskKey);
  64 + }
  65 + try {
  66 + // 当expires为0时取消订阅
  67 + sipCommander.alarmSubscribe(device,0,"0","9","0",null,null);
  68 + logger.info("[事件订阅]取消成功: {}", device.getDeviceId());
  69 + } catch (InvalidArgumentException | SipException | ParseException e) {
  70 + logger.error("[命令发送失败] 事件订阅取消失败: {}", e.getMessage());
  71 + }
  72 + device.setSubscribeCycleForAlarm(0);
  73 + }
  74 +}
0 \ No newline at end of file 75 \ No newline at end of file
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
@@ -336,7 +336,7 @@ public interface ISIPCommander { @@ -336,7 +336,7 @@ public interface ISIPCommander {
336 * @param endTime 报警发生终止时间(可选) 336 * @param endTime 报警发生终止时间(可选)
337 * @return true = 命令发送成功 337 * @return true = 命令发送成功
338 */ 338 */
339 - void alarmSubscribe(Device device, int expires, String startPriority, String endPriority, String alarmMethod, String startTime, String endTime) throws InvalidArgumentException, SipException, ParseException; 339 + SIPRequest alarmSubscribe(Device device, int expires, String startPriority, String endPriority, String alarmMethod, String startTime, String endTime) throws InvalidArgumentException, SipException, ParseException;
340 340
341 /** 341 /**
342 * 订阅、取消订阅目录信息 342 * 订阅、取消订阅目录信息
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -1279,7 +1279,7 @@ public class SIPCommander implements ISIPCommander { @@ -1279,7 +1279,7 @@ public class SIPCommander implements ISIPCommander {
1279 * @return true = 命令发送成功 1279 * @return true = 命令发送成功
1280 */ 1280 */
1281 @Override 1281 @Override
1282 - public void alarmSubscribe(Device device, int expires, String startPriority, String endPriority, String alarmMethod, String startTime, String endTime) throws InvalidArgumentException, SipException, ParseException { 1282 + public SIPRequest alarmSubscribe(Device device, int expires, String startPriority, String endPriority, String alarmMethod, String startTime, String endTime) throws InvalidArgumentException, SipException, ParseException {
1283 1283
1284 StringBuffer cmdXml = new StringBuffer(200); 1284 StringBuffer cmdXml = new StringBuffer(200);
1285 String charset = device.getCharset(); 1285 String charset = device.getCharset();
@@ -1307,9 +1307,9 @@ public class SIPCommander implements ISIPCommander { @@ -1307,9 +1307,9 @@ public class SIPCommander implements ISIPCommander {
1307 1307
1308 1308
1309 1309
1310 - Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), null, expires, "presence",sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); 1310 + SIPRequest request = (SIPRequest)headerProvider.createSubscribeRequest(device, cmdXml.toString(), null, expires, "presence",sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
1311 sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request); 1311 sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request);
1312 - 1312 + return request;
1313 } 1313 }
1314 1314
1315 @Override 1315 @Override
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -222,16 +222,16 @@ public class ZLMHttpHookListener { @@ -222,16 +222,16 @@ public class ZLMHttpHookListener {
222 logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)"); 222 logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)");
223 return new HookResultForOnPublish(401, "Unauthorized"); 223 return new HookResultForOnPublish(401, "Unauthorized");
224 } 224 }
  225 + // 鉴权配置
225 // 推流自定义播放鉴权码 226 // 推流自定义播放鉴权码
226 - String callId = paramMap.get("callId");  
227 // 鉴权配置 227 // 鉴权配置
228 - boolean hasAuthority = userService.checkPushAuthority(callId, sign); 228 + boolean hasAuthority = userService.checkPushAuthority(null, sign);
229 if (!hasAuthority) { 229 if (!hasAuthority) {
230 - logger.info("推流鉴权失败: sign 无权限: callId={}. sign={}", callId, sign); 230 + logger.info("推流鉴权失败: sign 无权限: sign={}", sign);
231 return new HookResultForOnPublish(401, "Unauthorized"); 231 return new HookResultForOnPublish(401, "Unauthorized");
232 } 232 }
233 StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param); 233 StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
234 - streamAuthorityInfo.setCallId(callId); 234 + streamAuthorityInfo.setCallId(sign);
235 streamAuthorityInfo.setSign(sign); 235 streamAuthorityInfo.setSign(sign);
236 // 鉴权通过 236 // 鉴权通过
237 redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo); 237 redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java
@@ -169,4 +169,7 @@ public interface IDeviceService { @@ -169,4 +169,7 @@ public interface IDeviceService {
169 * 获取所有设备 169 * 获取所有设备
170 */ 170 */
171 List<Device> getAll(); 171 List<Device> getAll();
  172 +
  173 + boolean addAlarmDirectorySubscribe(Device device);
  174 + boolean removeAlarmDirectorySubscription(Device device);
172 } 175 }
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.gb28181.bean.*; @@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
9 import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; 9 import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
10 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; 10 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
11 import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; 11 import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
  12 +import com.genersoft.iot.vmp.gb28181.task.impl.AlarmSubscribeTask;
12 import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask; 13 import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
13 import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; 14 import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
14 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; 15 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
@@ -532,6 +533,23 @@ public class DeviceServiceImpl implements IDeviceService { @@ -532,6 +533,23 @@ public class DeviceServiceImpl implements IDeviceService {
532 if (!ObjectUtils.isEmpty(device.getStreamMode())) { 533 if (!ObjectUtils.isEmpty(device.getStreamMode())) {
533 deviceInStore.setStreamMode(device.getStreamMode()); 534 deviceInStore.setStreamMode(device.getStreamMode());
534 } 535 }
  536 + // 事件订阅相关的信息
  537 + if (deviceInStore.getSubscribeCycleForAlarm() != device.getSubscribeCycleForAlarm()) {
  538 + if (device.getSubscribeCycleForAlarm() > 0) {
  539 + // 若已开启订阅,但订阅周期不同,则先取消
  540 + if (deviceInStore.getSubscribeCycleForAlarm() != 0) {
  541 + removeAlarmDirectorySubscription(deviceInStore);
  542 + }
  543 + // 开启订阅
  544 + deviceInStore.setSubscribeCycleForAlarm(device.getSubscribeCycleForAlarm());
  545 + addAlarmDirectorySubscribe(deviceInStore);
  546 + } else if (device.getSubscribeCycleForAlarm() == 0) {
  547 + // 取消订阅
  548 + deviceInStore.setSubscribeCycleForAlarm(device.getSubscribeCycleForAlarm());
  549 + removeAlarmDirectorySubscription(deviceInStore);
  550 + }
  551 + }
  552 +
535 // 目录订阅相关的信息 553 // 目录订阅相关的信息
536 if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) { 554 if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
537 if (device.getSubscribeCycleForCatalog() > 0) { 555 if (device.getSubscribeCycleForCatalog() > 0) {
@@ -637,5 +655,39 @@ public class DeviceServiceImpl implements IDeviceService { @@ -637,5 +655,39 @@ public class DeviceServiceImpl implements IDeviceService {
637 return deviceMapper.getAll(); 655 return deviceMapper.getAll();
638 } 656 }
639 657
  658 + @Override
  659 + public boolean addAlarmDirectorySubscribe(Device device) {
  660 + if (device == null || device.getSubscribeCycleForAlarm() < 0) {
  661 + return false;
  662 + }
  663 + logger.info("[添加事件订阅] 设备{}", device.getDeviceId());
  664 + // 添加目录订阅
  665 + AlarmSubscribeTask alarmSubscribeTask = new AlarmSubscribeTask(device, sipCommander, dynamicTask);
  666 + // 刷新订阅
  667 + int subscribeCycleForAlarm = Math.max(device.getSubscribeCycleForAlarm(), 30);
  668 + // 设置最小值为30
  669 + dynamicTask.startCron(device.getDeviceId() + "alarm", alarmSubscribeTask, (subscribeCycleForAlarm - 1) * 1000);
  670 + return true;
  671 + }
  672 +
  673 +
  674 + @Override
  675 + public boolean removeAlarmDirectorySubscription(Device device) {
  676 + if (device == null || device.getSubscribeCycleForAlarm() < 0) {
  677 + return false;
  678 + }
  679 + logger.info("[移除事件订阅]: {}", device.getDeviceId());
  680 + String taskKey = device.getDeviceId() + "alarm";
  681 + if (device.isOnLine()) {
  682 + Runnable runnable = dynamicTask.get(taskKey);
  683 + if (runnable instanceof ISubscribeTask) {
  684 + ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
  685 + subscribeTask.stop(null);
  686 + }
  687 + }
  688 + dynamicTask.stop(taskKey);
  689 + return true;
  690 + }
  691 +
640 692
641 } 693 }