Commit db2ccfedfa17eb3cb5ca73ac3b6bc4b5a05d4148

Authored by 648540858
1 parent cf1696e0

优化notify性能,增加notify超出处理能力时直接回复错误码,不做处理。

doc/_content/ability/_media/img_16.png

41.5 KB | W: | H:

112 KB | W: | H:

  • 2-up
  • Swipe
  • Onion skin
src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
@@ -62,6 +62,8 @@ public class UserSetting { @@ -62,6 +62,8 @@ public class UserSetting {
62 62
63 private List<String> allowedOrigins = new ArrayList<>(); 63 private List<String> allowedOrigins = new ArrayList<>();
64 64
  65 + private int maxNotifyCountQueue = 10000;
  66 +
65 public Boolean getSavePositionHistory() { 67 public Boolean getSavePositionHistory() {
66 return savePositionHistory; 68 return savePositionHistory;
67 } 69 }
@@ -257,4 +259,12 @@ public class UserSetting { @@ -257,4 +259,12 @@ public class UserSetting {
257 public void setRecordPath(String recordPath) { 259 public void setRecordPath(String recordPath) {
258 this.recordPath = recordPath; 260 this.recordPath = recordPath;
259 } 261 }
  262 +
  263 + public int getMaxNotifyCountQueue() {
  264 + return maxNotifyCountQueue;
  265 + }
  266 +
  267 + public void setMaxNotifyCountQueue(int maxNotifyCountQueue) {
  268 + this.maxNotifyCountQueue = maxNotifyCountQueue;
  269 + }
260 } 270 }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java 0 → 100644
  1 +package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
  2 +
  3 +import com.genersoft.iot.vmp.conf.DynamicTask;
  4 +import com.genersoft.iot.vmp.conf.UserSetting;
  5 +import com.genersoft.iot.vmp.gb28181.bean.Device;
  6 +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
  7 +import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  8 +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
  9 +import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
  10 +import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
  11 +import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
  12 +import com.genersoft.iot.vmp.service.IDeviceChannelService;
  13 +import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  14 +import org.dom4j.DocumentException;
  15 +import org.dom4j.Element;
  16 +import org.slf4j.Logger;
  17 +import org.slf4j.LoggerFactory;
  18 +import org.springframework.beans.factory.annotation.Autowired;
  19 +import org.springframework.stereotype.Component;
  20 +
  21 +import javax.sip.RequestEvent;
  22 +import javax.sip.header.FromHeader;
  23 +import java.util.*;
  24 +import java.util.concurrent.ConcurrentHashMap;
  25 +import java.util.concurrent.CopyOnWriteArrayList;
  26 +
  27 +/**
  28 + * SIP命令类型: NOTIFY请求中的目录请求处理
  29 + */
  30 +@Component
  31 +public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent {
  32 +
  33 +
  34 + private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class);
  35 +
  36 + private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>();
  37 + private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>();
  38 + private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
  39 +
  40 + private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>();
  41 + private final List<DeviceChannel> deleteChannelList = new CopyOnWriteArrayList<>();
  42 +
  43 +
  44 + @Autowired
  45 + private UserSetting userSetting;
  46 +
  47 + @Autowired
  48 + private EventPublisher eventPublisher;
  49 +
  50 + @Autowired
  51 + private IRedisCatchStorage redisCatchStorage;
  52 +
  53 + @Autowired
  54 + private IDeviceChannelService deviceChannelService;
  55 +
  56 + @Autowired
  57 + private DynamicTask dynamicTask;
  58 +
  59 + private final static String talkKey = "notify-request-for-catalog-task";
  60 +
  61 + public void process(RequestEvent evt) {
  62 + try {
  63 + long start = System.currentTimeMillis();
  64 + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
  65 + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
  66 +
  67 + Device device = redisCatchStorage.getDevice(deviceId);
  68 + if (device == null || device.getOnline() == 0) {
  69 + logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" ));
  70 + return;
  71 + }
  72 + Element rootElement = getRootElement(evt, device.getCharset());
  73 + if (rootElement == null) {
  74 + logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest());
  75 + return;
  76 + }
  77 + Element deviceListElement = rootElement.element("DeviceList");
  78 + if (deviceListElement == null) {
  79 + return;
  80 + }
  81 + Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
  82 + if (deviceListIterator != null) {
  83 +
  84 + // 遍历DeviceList
  85 + while (deviceListIterator.hasNext()) {
  86 + Element itemDevice = deviceListIterator.next();
  87 + Element channelDeviceElement = itemDevice.element("DeviceID");
  88 + if (channelDeviceElement == null) {
  89 + continue;
  90 + }
  91 + Element eventElement = itemDevice.element("Event");
  92 + String event;
  93 + if (eventElement == null) {
  94 + logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" ));
  95 + event = CatalogEvent.ADD;
  96 + }else {
  97 + event = eventElement.getText().toUpperCase();
  98 + }
  99 + DeviceChannel channel = XmlUtil.channelContentHander(itemDevice, device, event);
  100 +
  101 + channel.setDeviceId(device.getDeviceId());
  102 + logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
  103 + switch (event) {
  104 + case CatalogEvent.ON:
  105 + // 上线
  106 + logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
  107 + updateChannelOnlineList.add(channel);
  108 + if (updateChannelOnlineList.size() > 300) {
  109 + executeSaveForOnline();
  110 + }
  111 + break;
  112 + case CatalogEvent.OFF :
  113 + // 离线
  114 + logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
  115 + if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
  116 + updateChannelOfflineList.add(channel);
  117 + if (updateChannelOfflineList.size() > 300) {
  118 + executeSaveForOffline();
  119 + }
  120 + }else {
  121 + logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
  122 + }
  123 + break;
  124 + case CatalogEvent.VLOST:
  125 + // 视频丢失
  126 + logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
  127 + if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
  128 + updateChannelOfflineList.add(channel);
  129 + if (updateChannelOfflineList.size() > 300) {
  130 + executeSaveForOffline();
  131 + }
  132 + }else {
  133 + logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
  134 + }
  135 + break;
  136 + case CatalogEvent.DEFECT:
  137 + // 故障
  138 + logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
  139 + if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
  140 + updateChannelOfflineList.add(channel);
  141 + if (updateChannelOfflineList.size() > 300) {
  142 + executeSaveForOffline();
  143 + }
  144 + }else {
  145 + logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
  146 + }
  147 + break;
  148 + case CatalogEvent.ADD:
  149 + // 增加
  150 + logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
  151 + // 判断此通道是否存在
  152 + DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
  153 + if (deviceChannel != null) {
  154 + channel.setId(deviceChannel.getId());
  155 + updateChannelMap.put(channel.getChannelId(), channel);
  156 + if (updateChannelMap.keySet().size() > 300) {
  157 + executeSaveForUpdate();
  158 + }
  159 + }else {
  160 + addChannelMap.put(channel.getChannelId(), channel);
  161 + if (addChannelMap.keySet().size() > 300) {
  162 + executeSaveForAdd();
  163 + }
  164 + }
  165 +
  166 + break;
  167 + case CatalogEvent.DEL:
  168 + // 删除
  169 + logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
  170 + deleteChannelList.add(channel);
  171 + if (deleteChannelList.size() > 300) {
  172 + executeSaveForDelete();
  173 + }
  174 + break;
  175 + case CatalogEvent.UPDATE:
  176 + // 更新
  177 + logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
  178 + // 判断此通道是否存在
  179 + DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
  180 + if (deviceChannelForUpdate != null) {
  181 + channel.setId(deviceChannelForUpdate.getId());
  182 + updateChannelMap.put(channel.getChannelId(), channel);
  183 + if (updateChannelMap.keySet().size() > 300) {
  184 + executeSaveForUpdate();
  185 + }
  186 + }else {
  187 + addChannelMap.put(channel.getChannelId(), channel);
  188 + if (addChannelMap.keySet().size() > 300) {
  189 + executeSaveForAdd();
  190 + }
  191 + }
  192 + break;
  193 + default:
  194 + logger.warn("[ NotifyCatalog ] event not found : {}", event );
  195 +
  196 + }
  197 + // 转发变化信息
  198 + eventPublisher.catalogEventPublish(null, channel, event);
  199 +
  200 + if (updateChannelMap.keySet().size() > 0
  201 + || addChannelMap.keySet().size() > 0
  202 + || updateChannelOnlineList.size() > 0
  203 + || updateChannelOfflineList.size() > 0
  204 + || deleteChannelList.size() > 0) {
  205 +
  206 + if (!dynamicTask.contains(talkKey)) {
  207 + dynamicTask.startDelay(talkKey, this::executeSave, 1000);
  208 + }
  209 + }
  210 + }
  211 + }
  212 + } catch (DocumentException e) {
  213 + logger.error("未处理的异常 ", e);
  214 + }
  215 + }
  216 +
  217 + private void executeSave(){
  218 + System.out.println("定时存储数据");
  219 + executeSaveForUpdate();
  220 + executeSaveForDelete();
  221 + executeSaveForOnline();
  222 + executeSaveForOffline();
  223 + dynamicTask.stop(talkKey);
  224 + }
  225 +
  226 + private void executeSaveForUpdate(){
  227 + if (updateChannelMap.values().size() > 0) {
  228 + ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
  229 + updateChannelMap.clear();
  230 + deviceChannelService.batchUpdateChannel(deviceChannels);
  231 + }
  232 +
  233 + }
  234 +
  235 + private void executeSaveForAdd(){
  236 + if (addChannelMap.values().size() > 0) {
  237 + ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values());
  238 + addChannelMap.clear();
  239 + deviceChannelService.batchAddChannel(deviceChannels);
  240 + }
  241 + }
  242 +
  243 + private void executeSaveForDelete(){
  244 + if (deleteChannelList.size() > 0) {
  245 + deviceChannelService.deleteChannels(deleteChannelList);
  246 + deleteChannelList.clear();
  247 + }
  248 + }
  249 +
  250 + private void executeSaveForOnline(){
  251 + if (updateChannelOnlineList.size() > 0) {
  252 + deviceChannelService.channelsOnline(updateChannelOnlineList);
  253 + updateChannelOnlineList.clear();
  254 + }
  255 + }
  256 +
  257 + private void executeSaveForOffline(){
  258 + if (updateChannelOfflineList.size() > 0) {
  259 + deviceChannelService.channelsOffline(updateChannelOfflineList);
  260 + updateChannelOfflineList.clear();
  261 + }
  262 + }
  263 +
  264 +}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -76,12 +76,17 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @@ -76,12 +76,17 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
76 @Autowired 76 @Autowired
77 private IDeviceChannelService deviceChannelService; 77 private IDeviceChannelService deviceChannelService;
78 78
  79 + @Autowired
  80 + private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
  81 +
79 private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); 82 private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
80 83
81 @Qualifier("taskExecutor") 84 @Qualifier("taskExecutor")
82 @Autowired 85 @Autowired
83 private ThreadPoolTaskExecutor taskExecutor; 86 private ThreadPoolTaskExecutor taskExecutor;
84 87
  88 + private int maxQueueCount = 30000;
  89 +
85 @Override 90 @Override
86 public void afterPropertiesSet() throws Exception { 91 public void afterPropertiesSet() throws Exception {
87 // 添加消息处理的订阅 92 // 添加消息处理的订阅
@@ -91,7 +96,15 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @@ -91,7 +96,15 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
91 @Override 96 @Override
92 public void process(RequestEvent evt) { 97 public void process(RequestEvent evt) {
93 try { 98 try {
94 - responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); 99 +
  100 + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
  101 + responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null);
  102 + logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
  103 + return;
  104 + }else {
  105 + responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null);
  106 + }
  107 +
95 }catch (SipException | InvalidArgumentException | ParseException e) { 108 }catch (SipException | InvalidArgumentException | ParseException e) {
96 logger.error("未处理的异常 ", e); 109 logger.error("未处理的异常 ", e);
97 } 110 }
@@ -103,6 +116,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @@ -103,6 +116,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
103 while (!taskQueue.isEmpty()) { 116 while (!taskQueue.isEmpty()) {
104 try { 117 try {
105 HandlerCatchData take = taskQueue.poll(); 118 HandlerCatchData take = taskQueue.poll();
  119 + if (take == null) {
  120 + continue;
  121 + }
106 Element rootElement = getRootElement(take.getEvt()); 122 Element rootElement = getRootElement(take.getEvt());
107 if (rootElement == null) { 123 if (rootElement == null) {
108 logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest()); 124 logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
@@ -112,7 +128,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @@ -112,7 +128,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
112 128
113 if (CmdType.CATALOG.equals(cmd)) { 129 if (CmdType.CATALOG.equals(cmd)) {
114 logger.info("接收到Catalog通知"); 130 logger.info("接收到Catalog通知");
115 - processNotifyCatalogList(take.getEvt()); 131 +// processNotifyCatalogList(take.getEvt());
  132 + notifyRequestForCatalogProcessor.process(take.getEvt());
116 } else if (CmdType.ALARM.equals(cmd)) { 133 } else if (CmdType.ALARM.equals(cmd)) {
117 logger.info("接收到Alarm通知"); 134 logger.info("接收到Alarm通知");
118 processNotifyAlarm(take.getEvt()); 135 processNotifyAlarm(take.getEvt());
@@ -132,7 +149,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @@ -132,7 +149,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
132 149
133 /** 150 /**
134 * 处理MobilePosition移动位置Notify 151 * 处理MobilePosition移动位置Notify
135 - * 152 + *
136 * @param evt 153 * @param evt
137 */ 154 */
138 private void processNotifyMobilePosition(RequestEvent evt) { 155 private void processNotifyMobilePosition(RequestEvent evt) {
@@ -236,7 +253,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @@ -236,7 +253,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
236 253
237 /*** 254 /***
238 * 处理alarm设备报警Notify 255 * 处理alarm设备报警Notify
239 - * 256 + *
240 * @param evt 257 * @param evt
241 */ 258 */
242 private void processNotifyAlarm(RequestEvent evt) { 259 private void processNotifyAlarm(RequestEvent evt) {
@@ -346,7 +363,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @@ -346,7 +363,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
346 363
347 /*** 364 /***
348 * 处理catalog设备目录列表Notify 365 * 处理catalog设备目录列表Notify
349 - * 366 + *
350 * @param evt 367 * @param evt
351 */ 368 */
352 private void processNotifyCatalogList(RequestEvent evt) { 369 private void processNotifyCatalogList(RequestEvent evt) {
src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java
@@ -56,4 +56,35 @@ public interface IDeviceChannelService { @@ -56,4 +56,35 @@ public interface IDeviceChannelService {
56 * 查询通道所属的设备 56 * 查询通道所属的设备
57 */ 57 */
58 List<Device> getDeviceByChannelId(String channelId); 58 List<Device> getDeviceByChannelId(String channelId);
  59 +
  60 + /**
  61 + * 批量删除通道
  62 + * @param deleteChannelList 待删除的通道列表
  63 + */
  64 + int deleteChannels(List<DeviceChannel> deleteChannelList);
  65 +
  66 + /**
  67 + * 批量上线
  68 + */
  69 + int channelsOnline(List<DeviceChannel> channels);
  70 +
  71 + /**
  72 + * 批量下线
  73 + */
  74 + int channelsOffline(List<DeviceChannel> channels);
  75 +
  76 + /**
  77 + * 获取一个通道
  78 + */
  79 + DeviceChannel getOne(String deviceId, String channelId);
  80 +
  81 + /**
  82 + * 直接批量更新通道
  83 + */
  84 + void batchUpdateChannel(List<DeviceChannel> channels);
  85 +
  86 + /**
  87 + * 直接批量添加
  88 + */
  89 + void batchAddChannel(List<DeviceChannel> deviceChannels);
59 } 90 }
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java
@@ -209,6 +209,47 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { @@ -209,6 +209,47 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
209 209
210 @Override 210 @Override
211 public List<Device> getDeviceByChannelId(String channelId) { 211 public List<Device> getDeviceByChannelId(String channelId) {
  212 +
212 return channelMapper.getDeviceByChannelId(channelId); 213 return channelMapper.getDeviceByChannelId(channelId);
213 } 214 }
  215 +
  216 + @Override
  217 + public int deleteChannels(List<DeviceChannel> deleteChannelList) {
  218 + return channelMapper.batchDel(deleteChannelList);
  219 + }
  220 +
  221 + @Override
  222 + public int channelsOnline(List<DeviceChannel> channels) {
  223 + return channelMapper.batchOnline(channels);
  224 + }
  225 +
  226 + @Override
  227 + public int channelsOffline(List<DeviceChannel> channels) {
  228 + return channelMapper.batchOffline(channels);
  229 + }
  230 +
  231 + @Override
  232 + public DeviceChannel getOne(String deviceId, String channelId){
  233 + return channelMapper.queryChannel(deviceId, channelId);
  234 + }
  235 +
  236 + @Override
  237 + public void batchUpdateChannel(List<DeviceChannel> channels) {
  238 + channelMapper.batchUpdate(channels);
  239 + for (DeviceChannel channel : channels) {
  240 + if (channel.getParentId() != null) {
  241 + channelMapper.updateChannelSubCount(channel.getDeviceId(), channel.getParentId());
  242 + }
  243 + }
  244 + }
  245 +
  246 + @Override
  247 + public void batchAddChannel(List<DeviceChannel> channels) {
  248 + channelMapper.batchAdd(channels);
  249 + for (DeviceChannel channel : channels) {
  250 + if (channel.getParentId() != null) {
  251 + channelMapper.updateChannelSubCount(channel.getDeviceId(), channel.getParentId());
  252 + }
  253 + }
  254 + }
214 } 255 }
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -644,4 +644,6 @@ public class DeviceServiceImpl implements IDeviceService { @@ -644,4 +644,6 @@ public class DeviceServiceImpl implements IDeviceService {
644 public List<Device> getAll() { 644 public List<Device> getAll() {
645 return deviceMapper.getAll(); 645 return deviceMapper.getAll();
646 } 646 }
  647 +
  648 +
647 } 649 }
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -197,6 +197,60 @@ public interface DeviceChannelMapper { @@ -197,6 +197,60 @@ public interface DeviceChannelMapper {
197 @Update(value = {"UPDATE device_channel SET status=0 WHERE deviceId=#{deviceId}"}) 197 @Update(value = {"UPDATE device_channel SET status=0 WHERE deviceId=#{deviceId}"})
198 void offlineByDeviceId(String deviceId); 198 void offlineByDeviceId(String deviceId);
199 199
  200 +// @Insert("<script> " +
  201 +// "insert into device_channel " +
  202 +// "(channelId, deviceId, name, manufacture, model, owner, civilCode, block, subCount, " +
  203 +// " address, parental, parentId, safetyWay, registerWay, certNum, certifiable, errCode, secrecy, " +
  204 +// " ipAddress, port, password, PTZType, status, streamId, longitude, latitude, longitudeGcj02, latitudeGcj02, " +
  205 +// " longitudeWgs84, latitudeWgs84, hasAudio, createTime, updateTime, businessGroupId, gpsTime) " +
  206 +// "values " +
  207 +// "<foreach collection='addChannels' index='index' item='item' separator=','> " +
  208 +// "(#{item.channelId}, #{item.deviceId}, #{item.name}, #{item.manufacture}, #{item.model}, " +
  209 +// "#{item.owner}, #{item.civilCode}, #{item.block},#{item.subCount}," +
  210 +// "#{item.address}, #{item.parental}, #{item.parentId}, #{item.safetyWay}, #{item.registerWay}, " +
  211 +// "#{item.certNum}, #{item.certifiable}, #{item.errCode}, #{item.secrecy}, " +
  212 +// "#{item.ipAddress}, #{item.port}, #{item.password}, #{item.PTZType}, #{item.status}, " +
  213 +// "#{item.streamId}, #{item.longitude}, #{item.latitude},#{item.longitudeGcj02}, " +
  214 +// "#{item.latitudeGcj02},#{item.longitudeWgs84}, #{item.latitudeWgs84}, #{item.hasAudio}, now(), now(), " +
  215 +// "#{item.businessGroupId}, #{item.gpsTime}) " +
  216 +// "</foreach> " +
  217 +// "ON DUPLICATE KEY UPDATE " +
  218 +// "updateTime=VALUES(updateTime), " +
  219 +// "name=VALUES(name), " +
  220 +// "manufacture=VALUES(manufacture), " +
  221 +// "model=VALUES(model), " +
  222 +// "owner=VALUES(owner), " +
  223 +// "civilCode=VALUES(civilCode), " +
  224 +// "block=VALUES(block), " +
  225 +// "subCount=VALUES(subCount), " +
  226 +// "address=VALUES(address), " +
  227 +// "parental=VALUES(parental), " +
  228 +// "parentId=VALUES(parentId), " +
  229 +// "safetyWay=VALUES(safetyWay), " +
  230 +// "registerWay=VALUES(registerWay), " +
  231 +// "certNum=VALUES(certNum), " +
  232 +// "certifiable=VALUES(certifiable), " +
  233 +// "errCode=VALUES(errCode), " +
  234 +// "secrecy=VALUES(secrecy), " +
  235 +// "ipAddress=VALUES(ipAddress), " +
  236 +// "port=VALUES(port), " +
  237 +// "password=VALUES(password), " +
  238 +// "PTZType=VALUES(PTZType), " +
  239 +// "status=VALUES(status), " +
  240 +// "streamId=VALUES(streamId), " +
  241 +// "longitude=VALUES(longitude), " +
  242 +// "latitude=VALUES(latitude), " +
  243 +// "longitudeGcj02=VALUES(longitudeGcj02), " +
  244 +// "latitudeGcj02=VALUES(latitudeGcj02), " +
  245 +// "longitudeWgs84=VALUES(longitudeWgs84), " +
  246 +// "latitudeWgs84=VALUES(latitudeWgs84), " +
  247 +// "hasAudio=VALUES(hasAudio), " +
  248 +// "businessGroupId=VALUES(businessGroupId), " +
  249 +// "gpsTime=VALUES(gpsTime)" +
  250 +// "</script>")
  251 +// int batchAdd(List<DeviceChannel> addChannels);
  252 +
  253 +
200 @Insert("<script> " + 254 @Insert("<script> " +
201 "insert into device_channel " + 255 "insert into device_channel " +
202 "(channelId, deviceId, name, manufacture, model, owner, civilCode, block, subCount, " + 256 "(channelId, deviceId, name, manufacture, model, owner, civilCode, block, subCount, " +
@@ -214,39 +268,6 @@ public interface DeviceChannelMapper { @@ -214,39 +268,6 @@ public interface DeviceChannelMapper {
214 "#{item.latitudeGcj02},#{item.longitudeWgs84}, #{item.latitudeWgs84}, #{item.hasAudio}, now(), now(), " + 268 "#{item.latitudeGcj02},#{item.longitudeWgs84}, #{item.latitudeWgs84}, #{item.hasAudio}, now(), now(), " +
215 "#{item.businessGroupId}, #{item.gpsTime}) " + 269 "#{item.businessGroupId}, #{item.gpsTime}) " +
216 "</foreach> " + 270 "</foreach> " +
217 - "ON DUPLICATE KEY UPDATE " +  
218 - "updateTime=VALUES(updateTime), " +  
219 - "name=VALUES(name), " +  
220 - "manufacture=VALUES(manufacture), " +  
221 - "model=VALUES(model), " +  
222 - "owner=VALUES(owner), " +  
223 - "civilCode=VALUES(civilCode), " +  
224 - "block=VALUES(block), " +  
225 - "subCount=VALUES(subCount), " +  
226 - "address=VALUES(address), " +  
227 - "parental=VALUES(parental), " +  
228 - "parentId=VALUES(parentId), " +  
229 - "safetyWay=VALUES(safetyWay), " +  
230 - "registerWay=VALUES(registerWay), " +  
231 - "certNum=VALUES(certNum), " +  
232 - "certifiable=VALUES(certifiable), " +  
233 - "errCode=VALUES(errCode), " +  
234 - "secrecy=VALUES(secrecy), " +  
235 - "ipAddress=VALUES(ipAddress), " +  
236 - "port=VALUES(port), " +  
237 - "password=VALUES(password), " +  
238 - "PTZType=VALUES(PTZType), " +  
239 - "status=VALUES(status), " +  
240 - "streamId=VALUES(streamId), " +  
241 - "longitude=VALUES(longitude), " +  
242 - "latitude=VALUES(latitude), " +  
243 - "longitudeGcj02=VALUES(longitudeGcj02), " +  
244 - "latitudeGcj02=VALUES(latitudeGcj02), " +  
245 - "longitudeWgs84=VALUES(longitudeWgs84), " +  
246 - "latitudeWgs84=VALUES(latitudeWgs84), " +  
247 - "hasAudio=VALUES(hasAudio), " +  
248 - "businessGroupId=VALUES(businessGroupId), " +  
249 - "gpsTime=VALUES(gpsTime)" +  
250 "</script>") 271 "</script>")
251 int batchAdd(List<DeviceChannel> addChannels); 272 int batchAdd(List<DeviceChannel> addChannels);
252 273
@@ -264,7 +285,7 @@ public interface DeviceChannelMapper { @@ -264,7 +285,7 @@ public interface DeviceChannelMapper {
264 "<if test='item.owner != null'>, owner=#{item.owner}</if>" + 285 "<if test='item.owner != null'>, owner=#{item.owner}</if>" +
265 "<if test='item.civilCode != null'>, civilCode=#{item.civilCode}</if>" + 286 "<if test='item.civilCode != null'>, civilCode=#{item.civilCode}</if>" +
266 "<if test='item.block != null'>, block=#{item.block}</if>" + 287 "<if test='item.block != null'>, block=#{item.block}</if>" +
267 - "<if test='item.subCount != null'>, block=#{item.subCount}</if>" + 288 + "<if test='item.subCount != null'>, subCount=#{item.subCount}</if>" +
268 "<if test='item.address != null'>, address=#{item.address}</if>" + 289 "<if test='item.address != null'>, address=#{item.address}</if>" +
269 "<if test='item.parental != null'>, parental=#{item.parental}</if>" + 290 "<if test='item.parental != null'>, parental=#{item.parental}</if>" +
270 "<if test='item.parentId != null'>, parentId=#{item.parentId}</if>" + 291 "<if test='item.parentId != null'>, parentId=#{item.parentId}</if>" +
@@ -289,7 +310,8 @@ public interface DeviceChannelMapper { @@ -289,7 +310,8 @@ public interface DeviceChannelMapper {
289 "<if test='item.latitudeWgs84 != null'>, latitudeWgs84=#{item.latitudeWgs84}</if>" + 310 "<if test='item.latitudeWgs84 != null'>, latitudeWgs84=#{item.latitudeWgs84}</if>" +
290 "<if test='item.businessGroupId != null'>, businessGroupId=#{item.businessGroupId}</if>" + 311 "<if test='item.businessGroupId != null'>, businessGroupId=#{item.businessGroupId}</if>" +
291 "<if test='item.gpsTime != null'>, gpsTime=#{item.gpsTime}</if>" + 312 "<if test='item.gpsTime != null'>, gpsTime=#{item.gpsTime}</if>" +
292 - "WHERE deviceId=#{item.deviceId} AND channelId=#{item.channelId}"+ 313 + "<if test='item.id > 0'>WHERE id=#{item.id}</if>" +
  314 + "<if test='item.id == 0'>WHERE deviceId=#{item.deviceId} AND channelId=#{item.channelId}</if>" +
293 "</foreach>" + 315 "</foreach>" +
294 "</script>"}) 316 "</script>"})
295 int batchUpdate(List<DeviceChannel> updateChannels); 317 int batchUpdate(List<DeviceChannel> updateChannels);
@@ -403,4 +425,26 @@ public interface DeviceChannelMapper { @@ -403,4 +425,26 @@ public interface DeviceChannelMapper {
403 425
404 @Select("select de.* from device de left join device_channel dc on de.deviceId = dc.deviceId where dc.channelId=#{channelId}") 426 @Select("select de.* from device de left join device_channel dc on de.deviceId = dc.deviceId where dc.channelId=#{channelId}")
405 List<Device> getDeviceByChannelId(String channelId); 427 List<Device> getDeviceByChannelId(String channelId);
  428 +
  429 +
  430 + @Delete({"<script>" +
  431 + "<foreach collection='deleteChannelList' item='item' separator=';'>" +
  432 + "DELETE FROM device_channel WHERE deviceId=#{item.deviceId} AND channelId=#{item.channelId}" +
  433 + "</foreach>" +
  434 + "</script>"})
  435 + int batchDel(List<DeviceChannel> deleteChannelList);
  436 +
  437 + @Update({"<script>" +
  438 + "<foreach collection='channels' item='item' separator=';'>" +
  439 + "UPDATE device_channel SET status=1 WHERE deviceId=#{item.deviceId} AND channelId=#{item.channelId}" +
  440 + "</foreach>" +
  441 + "</script>"})
  442 + int batchOnline(List<DeviceChannel> channels);
  443 +
  444 + @Update({"<script>" +
  445 + "<foreach collection='channels' item='item' separator=';'>" +
  446 + "UPDATE device_channel SET status=0 WHERE deviceId=#{item.deviceId} AND channelId=#{item.channelId}" +
  447 + "</foreach>" +
  448 + "</script>"})
  449 + int batchOffline(List<DeviceChannel> channels);
406 } 450 }
src/main/resources/all-application.yml
@@ -178,6 +178,8 @@ user-settings: @@ -178,6 +178,8 @@ user-settings:
178 send-to-platforms-when-id-lost: true 178 send-to-platforms-when-id-lost: true
179 # 保持通道状态,不接受notify通道状态变化, 兼容海康平台发送错误消息 179 # 保持通道状态,不接受notify通道状态变化, 兼容海康平台发送错误消息
180 refuse-channel-status-channel-form-notify: false 180 refuse-channel-status-channel-form-notify: false
  181 + # 设置notify缓存队列最大长度,超过此长度的数据将返回486 BUSY_HERE,消息丢弃, 默认10000
  182 + max-notify-count-queue: 10000
181 # 跨域配置,配置你访问前端页面的地址即可, 可以配置多个 183 # 跨域配置,配置你访问前端页面的地址即可, 可以配置多个
182 allowed-origins: 184 allowed-origins:
183 - http://localhost:8008 185 - http://localhost:8008