Commit 726963ba772dde3e9bfaf52c7c398c0050ca1859

Authored by 648540858
1 parent 7241e0d2

优化通道更新逻辑

Too many changes to show.

To preserve performance only 7 of 12 files are displayed.

src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java 0 → 100644
  1 +package com.genersoft.iot.vmp.gb28181.bean;
  2 +
  3 +import java.util.Date;
  4 +import java.util.List;
  5 +
  6 +public class CatalogData {
  7 + private int total;
  8 + private List<DeviceChannel> channelList;
  9 + private Date lastTime;
  10 + private Device device;
  11 +
  12 + public int getTotal() {
  13 + return total;
  14 + }
  15 +
  16 + public void setTotal(int total) {
  17 + this.total = total;
  18 + }
  19 +
  20 + public List<DeviceChannel> getChannelList() {
  21 + return channelList;
  22 + }
  23 +
  24 + public void setChannelList(List<DeviceChannel> channelList) {
  25 + this.channelList = channelList;
  26 + }
  27 +
  28 + public Date getLastTime() {
  29 + return lastTime;
  30 + }
  31 +
  32 + public void setLastTime(Date lastTime) {
  33 + this.lastTime = lastTime;
  34 + }
  35 +
  36 + public Device getDevice() {
  37 + return device;
  38 + }
  39 +
  40 + public void setDevice(Device device) {
  41 + this.device = device;
  42 + }
  43 +}
src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java 0 → 100644
  1 +package com.genersoft.iot.vmp.gb28181.session;
  2 +
  3 +import com.genersoft.iot.vmp.gb28181.bean.CatalogData;
  4 +import com.genersoft.iot.vmp.gb28181.bean.Device;
  5 +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
  6 +import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
  7 +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
  8 +import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
  9 +import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
  10 +import org.springframework.beans.factory.annotation.Autowired;
  11 +import org.springframework.scheduling.annotation.Scheduled;
  12 +import org.springframework.stereotype.Component;
  13 +
  14 +import java.util.*;
  15 +import java.util.concurrent.ConcurrentHashMap;
  16 +
  17 +@Component
  18 +public class CatalogDataCatch {
  19 +
  20 + public static Map<String, CatalogData> data = new ConcurrentHashMap<>();
  21 +
  22 + @Autowired
  23 + private DeferredResultHolder deferredResultHolder;
  24 +
  25 + @Autowired
  26 + private IVideoManagerStorager storager;
  27 +
  28 + public void put(String key, int total, Device device, List<DeviceChannel> deviceChannelList) {
  29 + CatalogData catalogData = data.get(key);
  30 + if (catalogData == null) {
  31 + catalogData = new CatalogData();
  32 + catalogData.setTotal(total);
  33 + catalogData.setDevice(device);
  34 + catalogData.setChannelList(new ArrayList<>());
  35 + data.put(key, catalogData);
  36 + }
  37 + catalogData.getChannelList().addAll(deviceChannelList);
  38 + catalogData.setLastTime(new Date(System.currentTimeMillis()));
  39 + }
  40 +
  41 + public List<DeviceChannel> get(String key) {
  42 + CatalogData catalogData = data.get(key);
  43 + if (catalogData == null) return null;
  44 + return catalogData.getChannelList();
  45 + }
  46 +
  47 + public void del(String key) {
  48 + data.remove(key);
  49 + }
  50 +
  51 + @Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
  52 + private void timerTask(){
  53 + Set<String> keys = data.keySet();
  54 + Calendar calendar = Calendar.getInstance();
  55 + calendar.setTime(new Date());
  56 + calendar.set(Calendar.SECOND, calendar.get(Calendar.SECOND) - 5);
  57 + for (String key : keys) {
  58 + CatalogData catalogData = data.get(key);
  59 + if (catalogData.getLastTime().before(calendar.getTime())) {
  60 +
  61 + storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
  62 + RequestMessage msg = new RequestMessage();
  63 + msg.setKey(key);
  64 + WVPResult<Object> result = new WVPResult<>();
  65 + result.setCode(0);
  66 + result.setMsg("更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条");
  67 + result.setData(catalogData.getDevice());
  68 + msg.setData(result);
  69 + deferredResultHolder.invokeAllResult(msg);
  70 + data.remove(key);
  71 + }
  72 + }
  73 + }
  74 +}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
@@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
7 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; 7 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
8 import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; 8 import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
9 import com.genersoft.iot.vmp.gb28181.event.EventPublisher; 9 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  10 +import com.genersoft.iot.vmp.gb28181.session.CatalogDataCatch;
10 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; 11 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
11 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; 12 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
12 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; 13 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@@ -14,6 +15,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessag @@ -14,6 +15,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessag
14 import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; 15 import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
15 import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; 16 import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
16 import com.genersoft.iot.vmp.storager.IVideoManagerStorager; 17 import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
  18 +import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
17 import org.dom4j.DocumentException; 19 import org.dom4j.DocumentException;
18 import org.dom4j.Element; 20 import org.dom4j.Element;
19 import org.slf4j.Logger; 21 import org.slf4j.Logger;
@@ -27,7 +29,9 @@ import javax.sip.RequestEvent; @@ -27,7 +29,9 @@ import javax.sip.RequestEvent;
27 import javax.sip.SipException; 29 import javax.sip.SipException;
28 import javax.sip.message.Response; 30 import javax.sip.message.Response;
29 import java.text.ParseException; 31 import java.text.ParseException;
  32 +import java.text.SimpleDateFormat;
30 import java.util.ArrayList; 33 import java.util.ArrayList;
  34 +import java.util.Date;
31 import java.util.Iterator; 35 import java.util.Iterator;
32 import java.util.List; 36 import java.util.List;
33 37
@@ -39,6 +43,8 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @@ -39,6 +43,8 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
39 private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class); 43 private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class);
40 private final String cmdType = "Catalog"; 44 private final String cmdType = "Catalog";
41 45
  46 + private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  47 +
42 @Autowired 48 @Autowired
43 private ResponseMessageHandler responseMessageHandler; 49 private ResponseMessageHandler responseMessageHandler;
44 50
@@ -49,6 +55,9 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @@ -49,6 +55,9 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
49 private DeferredResultHolder deferredResultHolder; 55 private DeferredResultHolder deferredResultHolder;
50 56
51 @Autowired 57 @Autowired
  58 + private CatalogDataCatch catalogDataCatch;
  59 +
  60 + @Autowired
52 private DeviceOffLineDetector offLineDetector; 61 private DeviceOffLineDetector offLineDetector;
53 62
54 @Autowired 63 @Autowired
@@ -69,6 +78,12 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @@ -69,6 +78,12 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
69 try { 78 try {
70 rootElement = getRootElement(evt, device.getCharset()); 79 rootElement = getRootElement(evt, device.getCharset());
71 Element deviceListElement = rootElement.element("DeviceList"); 80 Element deviceListElement = rootElement.element("DeviceList");
  81 + Element sumNumElement = rootElement.element("SumNum");
  82 + if (sumNumElement == null || deviceListElement == null) {
  83 + responseAck(evt, Response.BAD_REQUEST, "xml error");
  84 + return;
  85 + }
  86 + int sumNum = Integer.parseInt(sumNumElement.getText());
72 Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); 87 Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
73 if (deviceListIterator != null) { 88 if (deviceListIterator != null) {
74 List<DeviceChannel> channelList = new ArrayList<>(); 89 List<DeviceChannel> channelList = new ArrayList<>();
@@ -86,6 +101,10 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @@ -86,6 +101,10 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
86 String status = statusElement != null ? statusElement.getText().toString() : "ON"; 101 String status = statusElement != null ? statusElement.getText().toString() : "ON";
87 DeviceChannel deviceChannel = new DeviceChannel(); 102 DeviceChannel deviceChannel = new DeviceChannel();
88 deviceChannel.setName(channelName); 103 deviceChannel.setName(channelName);
  104 + deviceChannel.setDeviceId(device.getDeviceId());
  105 + String now = this.format.format(new Date(System.currentTimeMillis()));
  106 + deviceChannel.setCreateTime(now);
  107 + deviceChannel.setUpdateTime(now);
89 deviceChannel.setChannelId(channelDeviceId); 108 deviceChannel.setChannelId(channelDeviceId);
90 // ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR的兼容性处理 109 // ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR的兼容性处理
91 if (status.equals("ON") || status.equals("On") || status.equals("ONLINE")) { 110 if (status.equals("ON") || status.equals("On") || status.equals("ONLINE")) {
@@ -153,14 +172,28 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @@ -153,14 +172,28 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
153 deviceChannel.setPTZType(Integer.parseInt(getText(itemDevice, "PTZType"))); 172 deviceChannel.setPTZType(Integer.parseInt(getText(itemDevice, "PTZType")));
154 } 173 }
155 deviceChannel.setHasAudio(true); // 默认含有音频,播放时再检查是否有音频及是否AAC 174 deviceChannel.setHasAudio(true); // 默认含有音频,播放时再检查是否有音频及是否AAC
156 - // TODO 修改为批量插入  
157 channelList.add(deviceChannel); 175 channelList.add(deviceChannel);
158 } 176 }
159 - storager.updateChannels(device.getDeviceId(), channelList);  
160 - RequestMessage msg = new RequestMessage();  
161 - msg.setKey(key);  
162 - msg.setData(device);  
163 - deferredResultHolder.invokeAllResult(msg); 177 +
  178 + catalogDataCatch.put(key, sumNum, device, channelList);
  179 + if (catalogDataCatch.get(key).size() == sumNum) {
  180 + // 数据已经完整接收
  181 + boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key));
  182 + RequestMessage msg = new RequestMessage();
  183 + msg.setKey(key);
  184 + WVPResult<Object> result = new WVPResult<>();
  185 + result.setCode(0);
  186 + result.setData(device);
  187 + if (resetChannelsResult) {
  188 + result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条");
  189 + }else {
  190 + result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条");
  191 + }
  192 + msg.setData(result);
  193 + deferredResultHolder.invokeAllResult(msg);
  194 + catalogDataCatch.del(key);
  195 + }
  196 +
164 // 回复200 OK 197 // 回复200 OK
165 responseAck(evt, Response.OK); 198 responseAck(evt, Response.OK);
166 if (offLineDetector.isOnline(device.getDeviceId())) { 199 if (offLineDetector.isOnline(device.getDeviceId())) {
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -73,7 +73,6 @@ public class StreamPushServiceImpl implements IStreamPushService { @@ -73,7 +73,6 @@ public class StreamPushServiceImpl implements IStreamPushService {
73 result.put(key, streamPushItem); 73 result.put(key, streamPushItem);
74 } 74 }
75 } 75 }
76 -  
77 } 76 }
78 77
79 return new ArrayList<>(result.values()); 78 return new ArrayList<>(result.values());
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java
@@ -55,7 +55,7 @@ public interface IVideoManagerStorager { @@ -55,7 +55,7 @@ public interface IVideoManagerStorager {
55 * @param deviceId 设备id 55 * @param deviceId 设备id
56 * @param channels 多个通道 56 * @param channels 多个通道
57 */ 57 */
58 - public void updateChannels(String deviceId, List<DeviceChannel> channels); 58 + public int updateChannels(String deviceId, List<DeviceChannel> channels);
59 59
60 /** 60 /**
61 * 开始播放 61 * 开始播放
@@ -425,4 +425,10 @@ public interface IVideoManagerStorager { @@ -425,4 +425,10 @@ public interface IVideoManagerStorager {
425 */ 425 */
426 StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId); 426 StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId);
427 427
  428 + /**
  429 + * catlog查询结束后完全重写通道信息
  430 + * @param deviceId
  431 + * @param deviceChannelList
  432 + */
  433 + boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannelList);
428 } 434 }
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
@@ -133,7 +133,7 @@ public interface DeviceChannelMapper { @@ -133,7 +133,7 @@ public interface DeviceChannelMapper {
133 "'${item.streamId}', ${item.longitude}, ${item.latitude},'${item.createTime}', '${item.updateTime}')" + 133 "'${item.streamId}', ${item.longitude}, ${item.latitude},'${item.createTime}', '${item.updateTime}')" +
134 "</foreach> " + 134 "</foreach> " +
135 "</script>") 135 "</script>")
136 - void batchAdd(List<DeviceChannel> addChannels); 136 + int batchAdd(List<DeviceChannel> addChannels);
137 137
138 @Update({"<script>" + 138 @Update({"<script>" +
139 "<foreach collection='updateChannels' item='item' separator=';'>" + 139 "<foreach collection='updateChannels' item='item' separator=';'>" +
@@ -167,7 +167,7 @@ public interface DeviceChannelMapper { @@ -167,7 +167,7 @@ public interface DeviceChannelMapper {
167 "WHERE deviceId=#{item.deviceId} AND channelId=#{item.channelId}"+ 167 "WHERE deviceId=#{item.deviceId} AND channelId=#{item.channelId}"+
168 "</foreach>" + 168 "</foreach>" +
169 "</script>"}) 169 "</script>"})
170 - void batchUpdate(List<DeviceChannel> updateChannels); 170 + int batchUpdate(List<DeviceChannel> updateChannels);
171 171
172 @Select(value = {" <script>" + 172 @Select(value = {" <script>" +
173 "SELECT * FROM ( "+ 173 "SELECT * FROM ( "+
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
@@ -156,7 +156,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @@ -156,7 +156,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
156 } 156 }
157 157
158 @Override 158 @Override
159 - public void updateChannels(String deviceId, List<DeviceChannel> channels) { 159 + public int updateChannels(String deviceId, List<DeviceChannel> channels) {
160 List<DeviceChannel> addChannels = new ArrayList<>(); 160 List<DeviceChannel> addChannels = new ArrayList<>();
161 List<DeviceChannel> updateChannels = new ArrayList<>(); 161 List<DeviceChannel> updateChannels = new ArrayList<>();
162 HashMap<String, DeviceChannel> channelsInStore = new HashMap<>(); 162 HashMap<String, DeviceChannel> channelsInStore = new HashMap<>();
@@ -210,13 +210,47 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @@ -210,13 +210,47 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
210 if (i + limitCount > updateChannels.size()) { 210 if (i + limitCount > updateChannels.size()) {
211 toIndex = updateChannels.size(); 211 toIndex = updateChannels.size();
212 } 212 }
213 - deviceChannelMapper.batchAdd(updateChannels.subList(i, toIndex)); 213 + deviceChannelMapper.batchUpdate(updateChannels.subList(i, toIndex));
214 } 214 }
215 }else { 215 }else {
216 deviceChannelMapper.batchUpdate(updateChannels); 216 deviceChannelMapper.batchUpdate(updateChannels);
217 } 217 }
218 } 218 }
219 } 219 }
  220 + return addChannels.size() + updateChannels.size();
  221 + }
  222 +
  223 + @Override
  224 + public boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannelList) {
  225 + TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
  226 + try {
  227 + int cleanChannelsResult = deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
  228 + int limitCount = 300;
  229 + boolean result = cleanChannelsResult <0;
  230 + if (!result && deviceChannelList.size() > 0) {
  231 + if (deviceChannelList.size() > limitCount) {
  232 + for (int i = 0; i < deviceChannelList.size(); i += limitCount) {
  233 + int toIndex = i + limitCount;
  234 + if (i + limitCount > deviceChannelList.size()) {
  235 + toIndex = deviceChannelList.size();
  236 + }
  237 + result = result || deviceChannelMapper.batchAdd(deviceChannelList.subList(i, toIndex)) < 0;
  238 + }
  239 + }else {
  240 + result = result || deviceChannelMapper.batchAdd(deviceChannelList) < 0;
  241 + }
  242 + }
  243 + if (result) {
  244 + //事务回滚
  245 + dataSourceTransactionManager.rollback(transactionStatus);
  246 + }
  247 + dataSourceTransactionManager.commit(transactionStatus); //手动提交
  248 + return true;
  249 + }catch (Exception e) {
  250 + dataSourceTransactionManager.rollback(transactionStatus);
  251 + return false;
  252 + }
  253 +
220 } 254 }
221 255
222 @Override 256 @Override
@@ -711,7 +745,6 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @@ -711,7 +745,6 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
711 if (streamProxyItems == null) { 745 if (streamProxyItems == null) {
712 platformGbStreamMapper.add(streamPushItem); 746 platformGbStreamMapper.add(streamPushItem);
713 } 747 }
714 -  
715 } 748 }
716 } 749 }
717 } 750 }