Commit 4677042b12417c2915ca929073224449f4365d36

Authored by 648540858
2 parents 00fb00a4 045f2334

Merge remote-tracking branch 'origin/master'

Showing 19 changed files with 183 additions and 84 deletions
... ... @@ -382,6 +382,44 @@
382 382 <skipTests>true</skipTests>
383 383 </configuration>
384 384 </plugin>
  385 +
  386 + <plugin>
  387 + <groupId>org.apache.maven.plugins</groupId>
  388 + <artifactId>maven-jar-plugin</artifactId>
  389 + <version>3.3.0</version>
  390 + <configuration>
  391 + <excludes>
  392 + <exclude>**/all-application.yml</exclude>
  393 + <exclude>**/application.yml</exclude>
  394 + <exclude>**/application-*.yml</exclude>
  395 + <exclude>**/local.jks</exclude>
  396 + </excludes>
  397 + </configuration>
  398 + </plugin>
  399 + <plugin>
  400 + <artifactId>maven-resources-plugin</artifactId>
  401 + <executions>
  402 + <execution> <!-- 复制配置文件 -->
  403 + <id>copy-resources</id>
  404 + <phase>package</phase>
  405 + <goals>
  406 + <goal>copy-resources</goal>
  407 + </goals>
  408 + <configuration>
  409 + <resources>
  410 + <resource>
  411 + <directory>src/main/resources</directory>
  412 + <includes>
  413 + <include>application.yml</include>
  414 + <include>application-*.yml</include>
  415 + </includes>
  416 + </resource>
  417 + </resources>
  418 + <outputDirectory>${project.build.directory}</outputDirectory>
  419 + </configuration>
  420 + </execution>
  421 + </executions>
  422 + </plugin>
385 423 </plugins>
386 424 <resources>
387 425 <resource>
... ...
src/main/java/com/genersoft/iot/vmp/conf/CivilCodeFileConf.java
1 1 package com.genersoft.iot.vmp.conf;
2 2  
3 3 import com.genersoft.iot.vmp.common.CivilCodePo;
4   -import org.ehcache.impl.internal.concurrent.ConcurrentHashMap;
  4 +import com.genersoft.iot.vmp.utils.CivilCodeUtil;
5 5 import org.slf4j.Logger;
6 6 import org.slf4j.LoggerFactory;
7 7 import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -17,7 +17,8 @@ import java.io.File;
17 17 import java.io.InputStream;
18 18 import java.io.InputStreamReader;
19 19 import java.nio.file.Files;
20   -import java.util.Map;
  20 +import java.util.ArrayList;
  21 +import java.util.List;
21 22  
22 23 /**
23 24 * 启动时读取行政区划表
... ... @@ -28,8 +29,6 @@ public class CivilCodeFileConf implements CommandLineRunner {
28 29  
29 30 private final static Logger logger = LoggerFactory.getLogger(CivilCodeFileConf.class);
30 31  
31   - private final Map<String, CivilCodePo> civilCodeMap= new ConcurrentHashMap<>();
32   -
33 32 @Autowired
34 33 @Lazy
35 34 private UserSetting userSetting;
... ... @@ -62,6 +61,7 @@ public class CivilCodeFileConf implements CommandLineRunner {
62 61 BufferedReader inputStreamReader = new BufferedReader(new InputStreamReader(inputStream));
63 62 int index = -1;
64 63 String line;
  64 + List<CivilCodePo> civilCodePoList = new ArrayList<>();
65 65 while ((line = inputStreamReader.readLine()) != null) {
66 66 index ++;
67 67 if (index == 0) {
... ... @@ -69,36 +69,15 @@ public class CivilCodeFileConf implements CommandLineRunner {
69 69 }
70 70 String[] infoArray = line.split(",");
71 71 CivilCodePo civilCodePo = CivilCodePo.getInstance(infoArray);
72   - civilCodeMap.put(civilCodePo.getCode(), civilCodePo);
  72 + civilCodePoList.add(civilCodePo);
73 73 }
  74 + CivilCodeUtil.INSTANCE.add(civilCodePoList);
74 75 inputStreamReader.close();
75 76 inputStream.close();
76   - if (civilCodeMap.size() == 0) {
  77 + if (civilCodePoList.isEmpty()) {
77 78 logger.warn("[行政区划] 文件内容为空,可能造成目录刷新结果不完整");
78 79 }else {
79   - logger.info("[行政区划] 加载成功,共加载数据{}条", civilCodeMap.size());
80   - }
81   - }
82   -
83   - public CivilCodePo getParentCode(String code) {
84   - if (code.length() > 8) {
85   - return null;
86   - }
87   - if (code.length() == 8) {
88   - String parentCode = code.substring(0, 6);
89   - return civilCodeMap.get(parentCode);
90   - }else {
91   - CivilCodePo civilCodePo = civilCodeMap.get(code);
92   - if (civilCodePo == null){
93   - return null;
94   - }
95   - String parentCode = civilCodePo.getParentCode();
96   - if (parentCode == null) {
97   - return null;
98   - }
99   - return civilCodeMap.get(parentCode);
  80 + logger.info("[行政区划] 加载成功,共加载数据{}条", civilCodePoList.size());
100 81 }
101   -
102 82 }
103   -
104 83 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
... ... @@ -498,6 +498,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
498 498 String endTimeStr = DateUtil.urlFormatter.format(end);
499 499 String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr;
500 500 SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0,false, false, device.getStreamModeForParam());
  501 + sendRtpItem.setStream(stream);
501 502 // 写入redis, 超时时回复
502 503 redisCatchStorage.updateSendRTPSever(sendRtpItem);
503 504 playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
... ... @@ -1006,7 +1007,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
1006 1007 Media media = mediaDescription.getMedia();
1007 1008  
1008 1009 Vector mediaFormats = media.getMediaFormats(false);
1009   - if (mediaFormats.contains("8")) {
  1010 +// if (mediaFormats.contains("8")) {
1010 1011 port = media.getMediaPort();
1011 1012 String protocol = media.getProtocol();
1012 1013 // 区分TCP发流还是udp, 当前默认udp
... ... @@ -1022,7 +1023,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
1022 1023 }
1023 1024 }
1024 1025 break;
1025   - }
  1026 +// }
1026 1027 }
1027 1028 if (port == -1) {
1028 1029 logger.info("不支持的媒体格式,返回415");
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java
... ... @@ -108,7 +108,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
108 108 }else {
109 109 event = eventElement.getText().toUpperCase();
110 110 }
111   - DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event, civilCodeFileConf);
  111 + DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
112 112 if (channel == null) {
113 113 logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
114 114 continue;
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java
1 1 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
2 2  
3   -import com.genersoft.iot.vmp.common.VideoManagerConstants;
4 3 import com.genersoft.iot.vmp.conf.DynamicTask;
5 4 import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch;
6 5 import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatchStatus;
7 6 import com.genersoft.iot.vmp.gb28181.bean.Device;
8 7 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
9 8 import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
10   -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
11 9 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
12 10 import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
13 11 import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
14   -import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
15 12 import com.genersoft.iot.vmp.service.IPlayService;
16 13 import gov.nist.javax.sip.message.SIPRequest;
17 14 import org.dom4j.Element;
... ... @@ -77,15 +74,6 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i
77 74 AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId);
78 75 audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite);
79 76 audioBroadcastManager.update(audioBroadcastCatch);
80   - // 等待invite消息, 超时则结束
81   - String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId();
82   - if (!SipUtils.isFrontEnd(device.getDeviceId())) {
83   - key += audioBroadcastCatch.getChannelId();
84   - }
85   - dynamicTask.startDelay(key, ()->{
86   - logger.info("[语音广播]等待invite消息超时:{}/{}", device.getDeviceId(), channelId);
87   - playService.stopAudioBroadcast(device.getDeviceId(), channelId);
88   - }, 2000);
89 77 }else {
90 78 playService.stopAudioBroadcast(device.getDeviceId(), channelId);
91 79 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
1 1 package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
2 2  
3   -import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
4 3 import com.genersoft.iot.vmp.conf.SipConfig;
5 4 import com.genersoft.iot.vmp.gb28181.bean.*;
6 5 import com.genersoft.iot.vmp.gb28181.session.CatalogDataCatch;
... ... @@ -58,9 +57,6 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
58 57 private ThreadPoolTaskExecutor taskExecutor;
59 58  
60 59 @Autowired
61   - private CivilCodeFileConf civilCodeFileConf;
62   -
63   - @Autowired
64 60 private SipConfig sipConfig;
65 61 private AtomicBoolean processing = new AtomicBoolean(false);
66 62  
... ... @@ -118,7 +114,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
118 114 if (channelDeviceElement == null) {
119 115 continue;
120 116 }
121   - DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, null, civilCodeFileConf);
  117 + DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, null);
122 118 if (channel == null) {
123 119 logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
124 120 continue;
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java
... ... @@ -3,10 +3,10 @@ package com.genersoft.iot.vmp.gb28181.utils;
3 3 import com.alibaba.fastjson2.JSONArray;
4 4 import com.alibaba.fastjson2.JSONObject;
5 5 import com.genersoft.iot.vmp.common.CivilCodePo;
6   -import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
7 6 import com.genersoft.iot.vmp.gb28181.bean.Device;
8 7 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
9 8 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
  9 +import com.genersoft.iot.vmp.utils.CivilCodeUtil;
10 10 import com.genersoft.iot.vmp.utils.DateUtil;
11 11 import org.apache.commons.lang3.StringUtils;
12 12 import org.apache.commons.lang3.math.NumberUtils;
... ... @@ -240,7 +240,7 @@ public class XmlUtil {
240 240 CivilCode, BusinessGroup,VirtualOrganization,Other
241 241 }
242 242  
243   - public static DeviceChannel channelContentHandler(Element itemDevice, Device device, String event, CivilCodeFileConf civilCodeFileConf){
  243 + public static DeviceChannel channelContentHandler(Element itemDevice, Device device, String event){
244 244 DeviceChannel deviceChannel = new DeviceChannel();
245 245 deviceChannel.setDeviceId(device.getDeviceId());
246 246 Element channdelIdElement = itemDevice.element("DeviceID");
... ... @@ -267,7 +267,7 @@ public class XmlUtil {
267 267 }
268 268 if(channelId.length() <= 8) {
269 269 deviceChannel.setHasAudio(false);
270   - CivilCodePo parentCode = civilCodeFileConf.getParentCode(channelId);
  270 + CivilCodePo parentCode = CivilCodeUtil.INSTANCE.getParentCode(channelId);
271 271 if (parentCode != null) {
272 272 deviceChannel.setParentId(parentCode.getCode());
273 273 deviceChannel.setCivilCode(parentCode.getCode());
... ...
src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java
... ... @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
7 7 import com.github.pagehelper.PageInfo;
8 8  
9 9 import java.util.List;
  10 +import java.util.Map;
10 11  
11 12 /**
12 13 * 级联国标平台关联流业务接口
... ... @@ -71,4 +72,7 @@ public interface IGbStreamService {
71 72 void delAllPlatformInfo(String platformId, String catalogId);
72 73  
73 74 List<GbStream> getGbChannelWithGbid(String gbId);
  75 +
  76 + Map<String, GbStream> getAllGBId();
  77 +
74 78 }
... ...
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
... ... @@ -115,4 +115,7 @@ public interface IStreamPushService {
115 115 */
116 116 ResourceBaseInfo getOverview();
117 117  
  118 + Map<String, StreamPushItem> getAllAppAndStreamMap();
  119 +
  120 +
118 121 }
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
... ... @@ -575,8 +575,8 @@ public class DeviceServiceImpl implements IDeviceService {
575 575  
576 576 }else if (device.getSubscribeCycleForMobilePosition() == 0) {
577 577 // 取消订阅
578   - deviceInStore.setSubscribeCycleForCatalog(0);
579   - removeCatalogSubscribe(deviceInStore, null);
  578 + deviceInStore.setSubscribeCycleForMobilePosition(0);
  579 + removeMobilePositionSubscribe(deviceInStore, null);
580 580 }
581 581 }
582 582 if (deviceInStore.getGeoCoordSys() != null) {
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java
... ... @@ -19,11 +19,11 @@ import org.springframework.jdbc.datasource.DataSourceTransactionManager;
19 19 import org.springframework.stereotype.Service;
20 20 import org.springframework.transaction.TransactionDefinition;
21 21 import org.springframework.transaction.TransactionStatus;
22   -import org.springframework.transaction.annotation.Transactional;
23 22 import org.springframework.util.ObjectUtils;
24 23  
25 24 import java.util.ArrayList;
26 25 import java.util.List;
  26 +import java.util.Map;
27 27  
28 28 @Service
29 29 @DS("master")
... ... @@ -268,4 +268,9 @@ public class GbStreamServiceImpl implements IGbStreamService {
268 268 public List<GbStream> getGbChannelWithGbid(String gbId) {
269 269 return gbStreamMapper.selectByGBId(gbId);
270 270 }
  271 +
  272 + @Override
  273 + public Map<String, GbStream> getAllGBId() {
  274 + return gbStreamMapper.getAllGBId();
  275 + }
271 276 }
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
... ... @@ -3,10 +3,7 @@ package com.genersoft.iot.vmp.service.impl;
3 3 import com.alibaba.fastjson2.JSONArray;
4 4 import com.alibaba.fastjson2.JSONObject;
5 5 import com.baomidou.dynamic.datasource.annotation.DS;
6   -import com.genersoft.iot.vmp.common.InviteInfo;
7   -import com.genersoft.iot.vmp.common.InviteSessionStatus;
8   -import com.genersoft.iot.vmp.common.InviteSessionType;
9   -import com.genersoft.iot.vmp.common.StreamInfo;
  6 +import com.genersoft.iot.vmp.common.*;
10 7 import com.genersoft.iot.vmp.conf.DynamicTask;
11 8 import com.genersoft.iot.vmp.conf.SipConfig;
12 9 import com.genersoft.iot.vmp.conf.UserSetting;
... ... @@ -18,19 +15,13 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
18 15 import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
19 16 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
20 17 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
21   -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
22   -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
23 18 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
24   -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
  19 +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
25 20 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
  21 +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
26 22 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
27 23 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
28 24 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
29   -import com.genersoft.iot.vmp.media.zlm.dto.*;
30   -import com.genersoft.iot.vmp.media.zlm.*;
31   -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
32   -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
33   -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
34 25 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
35 26 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRecordMp4;
36 27 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
... ... @@ -40,15 +31,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam;
40 31 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
41 32 import com.genersoft.iot.vmp.service.*;
42 33 import com.genersoft.iot.vmp.service.bean.*;
43   -import com.genersoft.iot.vmp.service.bean.ErrorCallback;
44   -import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
45   -import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
46   -import com.genersoft.iot.vmp.service.bean.SSRCInfo;
47 34 import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
48   -import com.genersoft.iot.vmp.service.bean.DownloadFileInfo;
49   -import com.genersoft.iot.vmp.service.bean.ErrorCallback;
50   -import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
51   -import com.genersoft.iot.vmp.service.bean.SSRCInfo;
52 35 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
53 36 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
54 37 import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
... ... @@ -1179,6 +1162,15 @@ public class PlayServiceImpl implements IPlayService {
1179 1162 // 发送成功
1180 1163 AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, mediaServerItem, app, stream, event, AudioBroadcastCatchStatus.Ready, isFromPlatform);
1181 1164 audioBroadcastManager.update(audioBroadcastCatch);
  1165 + // 等待invite消息, 超时则结束
  1166 + String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId();
  1167 + if (!SipUtils.isFrontEnd(device.getDeviceId())) {
  1168 + key += audioBroadcastCatch.getChannelId();
  1169 + }
  1170 + dynamicTask.startDelay(key, ()->{
  1171 + logger.info("[语音广播]等待invite消息超时:{}/{}", device.getDeviceId(), channelId);
  1172 + stopAudioBroadcast(device.getDeviceId(), channelId);
  1173 + }, 2000);
1182 1174 }, eventResultForError -> {
1183 1175 // 发送失败
1184 1176 logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg);
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
... ... @@ -548,4 +548,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
548 548  
549 549 return new ResourceBaseInfo(total, online);
550 550 }
  551 +
  552 + @Override
  553 + public Map<String, StreamPushItem> getAllAppAndStreamMap() {
  554 + return streamPushMapper.getAllAppAndStreamMap();
  555 + }
551 556 }
... ...
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
... ... @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service.redisMsg;
2 2  
3 3 import com.alibaba.fastjson2.JSON;
4 4 import com.alibaba.fastjson2.JSONObject;
  5 +import com.genersoft.iot.vmp.gb28181.bean.GbStream;
5 6 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
6 7 import com.genersoft.iot.vmp.service.IGbStreamService;
7 8 import com.genersoft.iot.vmp.service.IMediaServerService;
... ... @@ -19,6 +20,7 @@ import org.springframework.stereotype.Component;
19 20 import javax.annotation.Resource;
20 21 import java.util.ArrayList;
21 22 import java.util.List;
  23 +import java.util.Map;
22 24 import java.util.concurrent.ConcurrentLinkedQueue;
23 25  
24 26 /**
... ... @@ -57,7 +59,8 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
57 59 try {
58 60 List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
59 61 //查询全部的app+stream 用于判断是添加还是修改
60   - List<String> allAppAndStream = streamPushService.getAllAppAndStream();
  62 + Map<String, StreamPushItem> allAppAndStream = streamPushService.getAllAppAndStreamMap();
  63 + Map<String, GbStream> allGBId = gbStreamService.getAllGBId();
61 64  
62 65 /**
63 66 * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表
... ... @@ -67,9 +70,15 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
67 70 for (StreamPushItem streamPushItem : streamPushItems) {
68 71 String app = streamPushItem.getApp();
69 72 String stream = streamPushItem.getStream();
70   - boolean contains = allAppAndStream.contains(app + stream);
  73 + boolean contains = allAppAndStream.containsKey(app + stream);
71 74 //不存在就添加
72 75 if (!contains) {
  76 + if (allGBId.containsKey(streamPushItem.getGbId())) {
  77 + GbStream gbStream = allGBId.get(streamPushItem.getGbId());
  78 + logger.warn("[REDIS消息-推流设备列表更新-INSERT] 国标编号重复: {}, 已分配给{}/{}",
  79 + streamPushItem.getGbId(), gbStream.getApp(), gbStream.getStream());
  80 + continue;
  81 + }
73 82 streamPushItem.setStreamType("push");
74 83 streamPushItem.setCreateTime(DateUtil.getNow());
75 84 streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
... ... @@ -77,25 +86,31 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
77 86 streamPushItem.setOriginTypeStr("rtsp_push");
78 87 streamPushItem.setTotalReaderCount("0");
79 88 streamPushItemForSave.add(streamPushItem);
  89 + allGBId.put(streamPushItem.getGbId(), streamPushItem);
80 90 } else {
  91 + if (allGBId.containsKey(streamPushItem.getGbId())) {
  92 + GbStream gbStream = allGBId.get(streamPushItem.getGbId());
  93 + logger.warn("[REDIS消息-推流设备列表更新-UPDATE] 国标编号重复: {}, 已分配给{}/{}",
  94 + streamPushItem.getGbId(), gbStream.getApp(), gbStream.getStream());
  95 + continue;
  96 + }
81 97 //存在就只修改 name和gbId
82 98 streamPushItemForUpdate.add(streamPushItem);
83 99 }
84 100 }
85   - if (streamPushItemForSave.size() > 0) {
86   -
  101 + if (!streamPushItemForSave.isEmpty()) {
87 102 logger.info("添加{}条",streamPushItemForSave.size());
88 103 logger.info(JSONObject.toJSONString(streamPushItemForSave));
89 104 streamPushService.batchAdd(streamPushItemForSave);
90 105  
91 106 }
92   - if(streamPushItemForUpdate.size()>0){
  107 + if(!streamPushItemForUpdate.isEmpty()){
93 108 logger.info("修改{}条",streamPushItemForUpdate.size());
94 109 logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
95 110 gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
96 111 }
97 112 }catch (Exception e) {
98   - logger.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
  113 + logger.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", new String(message.getBody()));
99 114 logger.error("[REDIS消息-推流设备列表更新] 异常内容: ", e);
100 115 }
101 116 }
... ...
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
... ... @@ -10,6 +10,7 @@ import org.apache.ibatis.annotations.Param;
10 10 import org.springframework.stereotype.Repository;
11 11  
12 12 import java.util.List;
  13 +import java.util.Map;
13 14  
14 15 @Mapper
15 16 @Repository
... ... @@ -170,4 +171,7 @@ public interface GbStreamMapper {
170 171 @Select("SELECT status FROM wvp_stream_push WHERE app=#{app} AND stream=#{stream}")
171 172 Boolean selectStatusForPush(@Param("app") String app, @Param("stream") String stream);
172 173  
  174 + @MapKey("gbId")
  175 + @Select("SELECT * from wvp_gb_stream")
  176 + Map<String, GbStream> getAllGBId();
173 177 }
... ...
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
... ... @@ -7,6 +7,7 @@ import org.apache.ibatis.annotations.*;
7 7 import org.springframework.stereotype.Repository;
8 8  
9 9 import java.util.List;
  10 +import java.util.Map;
10 11  
11 12 @Mapper
12 13 @Repository
... ... @@ -195,4 +196,12 @@ public interface StreamPushMapper {
195 196 "</foreach>" +
196 197 "</script>")
197 198 List<StreamPushItem> getListIn(List<StreamPushItem> streamPushItems);
  199 +
  200 + @MapKey("vhost")
  201 + @Select("SELECT CONCAT(wsp.app, wsp.stream) as vhost, wsp.app, wsp.stream, wgs.gb_id, wgs.name " +
  202 + " from wvp_stream_push wsp " +
  203 + " left join wvp_gb_stream wgs on wgs.app = wsp.app and wgs.stream = wsp.stream")
  204 + Map<String, StreamPushItem> getAllAppAndStreamMap();
  205 +
  206 +
198 207 }
... ...
src/main/java/com/genersoft/iot/vmp/utils/CivilCodeUtil.java 0 → 100644
  1 +package com.genersoft.iot.vmp.utils;
  2 +
  3 +import com.genersoft.iot.vmp.common.CivilCodePo;
  4 +import org.slf4j.Logger;
  5 +import org.slf4j.LoggerFactory;
  6 +
  7 +import java.util.List;
  8 +import java.util.Map;
  9 +import java.util.concurrent.ConcurrentHashMap;
  10 +
  11 +public enum CivilCodeUtil {
  12 +
  13 + INSTANCE;
  14 + private final static Logger log = LoggerFactory.getLogger(CivilCodeUtil.class);
  15 +
  16 + // 用与消息的缓存
  17 + private final Map<String, CivilCodePo> civilCodeMap = new ConcurrentHashMap<>();
  18 +
  19 + CivilCodeUtil() {
  20 + }
  21 +
  22 + public void add(List<CivilCodePo> civilCodePoList) {
  23 + if (!civilCodePoList.isEmpty()) {
  24 + for (CivilCodePo civilCodePo : civilCodePoList) {
  25 + civilCodeMap.put(civilCodePo.getCode(), civilCodePo);
  26 + }
  27 + }
  28 + }
  29 +
  30 + public CivilCodePo getParentCode(String code) {
  31 + if (code.length() > 8) {
  32 + return null;
  33 + }
  34 + if (code.length() == 8) {
  35 + String parentCode = code.substring(0, 6);
  36 + return civilCodeMap.get(parentCode);
  37 + }else {
  38 + CivilCodePo civilCodePo = civilCodeMap.get(code);
  39 + if (civilCodePo == null){
  40 + return null;
  41 + }
  42 + String parentCode = civilCodePo.getParentCode();
  43 + if (parentCode == null) {
  44 + return null;
  45 + }
  46 + return civilCodeMap.get(parentCode);
  47 + }
  48 +
  49 + }
  50 +}
... ...
src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java
... ... @@ -36,6 +36,11 @@ public class DateUtil {
36 36 private static final String ISO8601_ZONE_PATTERN = "yyyy-MM-dd'T'HH:mm:ssXXX";
37 37  
38 38 /**
  39 + * 兼容的时间格式 iso8601时间格式带毫秒
  40 + */
  41 + private static final String ISO8601_MILLISECOND_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS";
  42 +
  43 + /**
39 44 * wvp内部统一时间格式
40 45 */
41 46 public static final String PATTERN = "yyyy-MM-dd HH:mm:ss";
... ... @@ -55,6 +60,8 @@ public class DateUtil {
55 60 public static final DateTimeFormatter formatterCompatibleISO8601 = DateTimeFormatter.ofPattern(ISO8601_COMPATIBLE_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr));
56 61 public static final DateTimeFormatter formatterISO8601 = DateTimeFormatter.ofPattern(ISO8601_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr));
57 62 public static final DateTimeFormatter formatterZoneISO8601 = DateTimeFormatter.ofPattern(ISO8601_ZONE_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr));
  63 + public static final DateTimeFormatter formatterMillisecondISO8601 = DateTimeFormatter.ofPattern(ISO8601_MILLISECOND_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr));
  64 +
58 65 public static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr));
59 66 public static final DateTimeFormatter DateFormatter = DateTimeFormatter.ofPattern(date_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr));
60 67 public static final DateTimeFormatter urlFormatter = DateTimeFormatter.ofPattern(URL_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr));
... ... @@ -70,6 +77,8 @@ public class DateUtil {
70 77 return formatter.format(formatterCompatibleISO8601.parse(formatTime));
71 78 } else if (verification(formatTime, formatterZoneISO8601)) {
72 79 return formatter.format(formatterZoneISO8601.parse(formatTime));
  80 + } else if (verification(formatTime, formatterMillisecondISO8601)) {
  81 + return formatter.format(formatterMillisecondISO8601.parse(formatTime));
73 82 }
74 83 return formatter.format(formatterISO8601.parse(formatTime));
75 84 }
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java
... ... @@ -3,10 +3,9 @@ package com.genersoft.iot.vmp.vmanager.gb28181.gbStream;
3 3 import com.genersoft.iot.vmp.conf.exception.ControllerException;
4 4 import com.genersoft.iot.vmp.conf.security.JwtUtils;
5 5 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
6   -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
7 6 import com.genersoft.iot.vmp.service.IGbStreamService;
8 7 import com.genersoft.iot.vmp.service.IPlatformService;
9   -import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  8 +import com.genersoft.iot.vmp.service.IStreamPushService;
10 9 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
11 10 import com.genersoft.iot.vmp.vmanager.gb28181.gbStream.bean.GbStreamParam;
12 11 import com.github.pagehelper.PageInfo;
... ... @@ -20,7 +19,6 @@ import org.springframework.beans.factory.annotation.Autowired;
20 19 import org.springframework.util.ObjectUtils;
21 20 import org.springframework.web.bind.annotation.*;
22 21  
23   -import java.util.ArrayList;
24 22 import java.util.List;
25 23  
26 24 @Tag(name = "视频流关联到级联平台")
... ... @@ -35,6 +33,9 @@ public class GbStreamController {
35 33 private IGbStreamService gbStreamService;
36 34  
37 35 @Autowired
  36 + private IStreamPushService service;
  37 +
  38 + @Autowired
38 39 private IPlatformService platformService;
39 40  
40 41  
... ...