Commit 5e73874880cdfd5b6b99147a0cdd8a6eabcfbf16

Authored by 648540858
1 parent 710600db

添加队列处理redis消息和sip消息,支持使用推流状态作为通道在线状态

Showing 19 changed files with 645 additions and 436 deletions
src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
@@ -31,6 +31,8 @@ public class UserSetting { @@ -31,6 +31,8 @@ public class UserSetting {
31 31
32 private Boolean logInDatebase = Boolean.TRUE; 32 private Boolean logInDatebase = Boolean.TRUE;
33 33
  34 + private Boolean usePushingAsStatus = Boolean.TRUE;
  35 +
34 private String serverId = "000000"; 36 private String serverId = "000000";
35 37
36 private String thirdPartyGBIdReg = "[\\s\\S]*"; 38 private String thirdPartyGBIdReg = "[\\s\\S]*";
@@ -136,4 +138,12 @@ public class UserSetting { @@ -136,4 +138,12 @@ public class UserSetting {
136 public void setPlatformPlayTimeout(int platformPlayTimeout) { 138 public void setPlatformPlayTimeout(int platformPlayTimeout) {
137 this.platformPlayTimeout = platformPlayTimeout; 139 this.platformPlayTimeout = platformPlayTimeout;
138 } 140 }
  141 +
  142 + public Boolean isUsePushingAsStatus() {
  143 + return usePushingAsStatus;
  144 + }
  145 +
  146 + public void setUsePushingAsStatus(Boolean usePushingAsStatus) {
  147 + this.usePushingAsStatus = usePushingAsStatus;
  148 + }
139 } 149 }
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
@@ -3,7 +3,7 @@ package com.genersoft.iot.vmp.conf.redis; @@ -3,7 +3,7 @@ package com.genersoft.iot.vmp.conf.redis;
3 3
4 import com.alibaba.fastjson.parser.ParserConfig; 4 import com.alibaba.fastjson.parser.ParserConfig;
5 import com.genersoft.iot.vmp.common.VideoManagerConstants; 5 import com.genersoft.iot.vmp.common.VideoManagerConstants;
6 -import com.genersoft.iot.vmp.service.impl.*; 6 +import com.genersoft.iot.vmp.service.redisMsg.*;
7 import org.springframework.beans.factory.annotation.Autowired; 7 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.cache.annotation.CachingConfigurerSupport; 8 import org.springframework.cache.annotation.CachingConfigurerSupport;
9 import org.springframework.context.annotation.Bean; 9 import org.springframework.context.annotation.Bean;
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipMsgInfo.java 0 → 100644
  1 +package com.genersoft.iot.vmp.gb28181.bean;
  2 +
  3 +import org.dom4j.Element;
  4 +
  5 +import javax.sip.RequestEvent;
  6 +
  7 +public class SipMsgInfo {
  8 + private RequestEvent evt;
  9 + private Device device;
  10 + private ParentPlatform platform;
  11 + private Element rootElement;
  12 +
  13 + public SipMsgInfo(RequestEvent evt, Device device, Element rootElement) {
  14 + this.evt = evt;
  15 + this.device = device;
  16 + this.rootElement = rootElement;
  17 + }
  18 +
  19 + public SipMsgInfo(RequestEvent evt, ParentPlatform platform, Element rootElement) {
  20 + this.evt = evt;
  21 + this.platform = platform;
  22 + this.rootElement = rootElement;
  23 + }
  24 +
  25 + public RequestEvent getEvt() {
  26 + return evt;
  27 + }
  28 +
  29 + public void setEvt(RequestEvent evt) {
  30 + this.evt = evt;
  31 + }
  32 +
  33 + public Device getDevice() {
  34 + return device;
  35 + }
  36 +
  37 + public void setDevice(Device device) {
  38 + this.device = device;
  39 + }
  40 +
  41 + public ParentPlatform getPlatform() {
  42 + return platform;
  43 + }
  44 +
  45 + public void setPlatform(ParentPlatform platform) {
  46 + this.platform = platform;
  47 + }
  48 +
  49 + public Element getRootElement() {
  50 + return rootElement;
  51 + }
  52 +
  53 + public void setRootElement(Element rootElement) {
  54 + this.rootElement = rootElement;
  55 + }
  56 +}
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -14,18 +14,15 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; @@ -14,18 +14,15 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
14 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 14 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
15 import com.genersoft.iot.vmp.service.IMediaServerService; 15 import com.genersoft.iot.vmp.service.IMediaServerService;
16 import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; 16 import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
17 -import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; 17 +import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
18 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 18 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
19 import com.genersoft.iot.vmp.storager.IVideoManagerStorage; 19 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
20 -import com.genersoft.iot.vmp.utils.SerializeUtils;  
21 import org.slf4j.Logger; 20 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory; 21 import org.slf4j.LoggerFactory;
23 import org.springframework.beans.factory.InitializingBean; 22 import org.springframework.beans.factory.InitializingBean;
24 import org.springframework.beans.factory.annotation.Autowired; 23 import org.springframework.beans.factory.annotation.Autowired;
25 import org.springframework.stereotype.Component; 24 import org.springframework.stereotype.Component;
26 25
27 -import javax.sip.Dialog;  
28 -import javax.sip.DialogState;  
29 import javax.sip.RequestEvent; 26 import javax.sip.RequestEvent;
30 import javax.sip.address.SipURI; 27 import javax.sip.address.SipURI;
31 import javax.sip.header.CallIdHeader; 28 import javax.sip.header.CallIdHeader;
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -22,8 +22,8 @@ import com.genersoft.iot.vmp.service.IStreamProxyService; @@ -22,8 +22,8 @@ import com.genersoft.iot.vmp.service.IStreamProxyService;
22 import com.genersoft.iot.vmp.service.IStreamPushService; 22 import com.genersoft.iot.vmp.service.IStreamPushService;
23 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; 23 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
24 import com.genersoft.iot.vmp.service.bean.SSRCInfo; 24 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
25 -import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;  
26 -import com.genersoft.iot.vmp.service.impl.RedisPushStreamResponseListener; 25 +import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
  26 +import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
27 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 27 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
28 import com.genersoft.iot.vmp.storager.IVideoManagerStorage; 28 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
29 import com.genersoft.iot.vmp.utils.DateUtil; 29 import com.genersoft.iot.vmp.utils.DateUtil;
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
@@ -21,6 +21,8 @@ import org.slf4j.Logger; @@ -21,6 +21,8 @@ import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory; 21 import org.slf4j.LoggerFactory;
22 import org.springframework.beans.factory.InitializingBean; 22 import org.springframework.beans.factory.InitializingBean;
23 import org.springframework.beans.factory.annotation.Autowired; 23 import org.springframework.beans.factory.annotation.Autowired;
  24 +import org.springframework.beans.factory.annotation.Qualifier;
  25 +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
24 import org.springframework.stereotype.Component; 26 import org.springframework.stereotype.Component;
25 import org.springframework.util.ObjectUtils; 27 import org.springframework.util.ObjectUtils;
26 import org.springframework.util.StringUtils; 28 import org.springframework.util.StringUtils;
@@ -31,6 +33,7 @@ import javax.sip.SipException; @@ -31,6 +33,7 @@ import javax.sip.SipException;
31 import javax.sip.message.Response; 33 import javax.sip.message.Response;
32 34
33 import java.text.ParseException; 35 import java.text.ParseException;
  36 +import java.util.concurrent.ConcurrentLinkedQueue;
34 37
35 import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.*; 38 import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.*;
36 39
@@ -67,6 +70,15 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme @@ -67,6 +70,15 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
67 @Autowired 70 @Autowired
68 private IDeviceChannelService deviceChannelService; 71 private IDeviceChannelService deviceChannelService;
69 72
  73 + private boolean taskQueueHandlerRun = false;
  74 +
  75 + private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
  76 +
  77 + @Qualifier("taskExecutor")
  78 + @Autowired
  79 + private ThreadPoolTaskExecutor taskExecutor;
  80 +
  81 +
70 @Override 82 @Override
71 public void afterPropertiesSet() throws Exception { 83 public void afterPropertiesSet() throws Exception {
72 notifyMessageHandler.addHandler(cmdType, this); 84 notifyMessageHandler.addHandler(cmdType, this);
@@ -75,114 +87,127 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme @@ -75,114 +87,127 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
75 @Override 87 @Override
76 public void handForDevice(RequestEvent evt, Device device, Element rootElement) { 88 public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
77 logger.info("[收到报警通知]设备:{}", device.getDeviceId()); 89 logger.info("[收到报警通知]设备:{}", device.getDeviceId());
78 - // 回复200 OK  
79 - try {  
80 - responseAck(getServerTransaction(evt), Response.OK);  
81 - } catch (SipException | InvalidArgumentException | ParseException e) {  
82 - logger.error("[收到报警通知], 回复200OK失败", e);  
83 - }  
84 -  
85 - Element deviceIdElement = rootElement.element("DeviceID");  
86 - String channelId = deviceIdElement.getText().toString();  
87 90
88 - DeviceAlarm deviceAlarm = new DeviceAlarm();  
89 - deviceAlarm.setCreateTime(DateUtil.getNow());  
90 - deviceAlarm.setDeviceId(device.getDeviceId());  
91 - deviceAlarm.setChannelId(channelId);  
92 - deviceAlarm.setAlarmPriority(getText(rootElement, "AlarmPriority"));  
93 - deviceAlarm.setAlarmMethod(getText(rootElement, "AlarmMethod"));  
94 - String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");  
95 - if (alarmTime == null) {  
96 - return;  
97 - }  
98 - deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));  
99 - String alarmDescription = getText(rootElement, "AlarmDescription");  
100 - if (alarmDescription == null) {  
101 - deviceAlarm.setAlarmDescription("");  
102 - } else {  
103 - deviceAlarm.setAlarmDescription(alarmDescription);  
104 - }  
105 - String longitude = getText(rootElement, "Longitude");  
106 - if (longitude != null && NumericUtil.isDouble(longitude)) {  
107 - deviceAlarm.setLongitude(Double.parseDouble(longitude));  
108 - } else {  
109 - deviceAlarm.setLongitude(0.00);  
110 - }  
111 - String latitude = getText(rootElement, "Latitude");  
112 - if (latitude != null && NumericUtil.isDouble(latitude)) {  
113 - deviceAlarm.setLatitude(Double.parseDouble(latitude));  
114 - } else {  
115 - deviceAlarm.setLatitude(0.00);  
116 - }  
117 -  
118 - if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) {  
119 - if ( deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) {  
120 - MobilePosition mobilePosition = new MobilePosition();  
121 - mobilePosition.setCreateTime(DateUtil.getNow());  
122 - mobilePosition.setDeviceId(deviceAlarm.getDeviceId());  
123 - mobilePosition.setTime(deviceAlarm.getAlarmTime());  
124 - mobilePosition.setLongitude(deviceAlarm.getLongitude());  
125 - mobilePosition.setLatitude(deviceAlarm.getLatitude());  
126 - mobilePosition.setReportSource("GPS Alarm");  
127 -  
128 - // 更新device channel 的经纬度  
129 - DeviceChannel deviceChannel = new DeviceChannel();  
130 - deviceChannel.setDeviceId(device.getDeviceId());  
131 - deviceChannel.setChannelId(channelId);  
132 - deviceChannel.setLongitude(mobilePosition.getLongitude());  
133 - deviceChannel.setLatitude(mobilePosition.getLatitude());  
134 - deviceChannel.setGpsTime(mobilePosition.getTime());  
135 -  
136 - deviceChannel = deviceChannelService.updateGps(deviceChannel, device);  
137 -  
138 - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());  
139 - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());  
140 - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());  
141 - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());  
142 -  
143 - if (userSetting.getSavePositionHistory()) {  
144 - storager.insertMobilePosition(mobilePosition); 91 + taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
  92 + if (!taskQueueHandlerRun) {
  93 + taskQueueHandlerRun = true;
  94 + taskExecutor.execute(() -> {
  95 + while (!taskQueue.isEmpty()) {
  96 + SipMsgInfo sipMsgInfo = taskQueue.poll();
  97 + // 回复200 OK
  98 + try {
  99 + responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK);
  100 + } catch (SipException | InvalidArgumentException | ParseException e) {
  101 + logger.error("[收到报警通知], 回复200OK失败", e);
  102 + }
  103 +
  104 + Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID");
  105 + String channelId = deviceIdElement.getText().toString();
  106 +
  107 + DeviceAlarm deviceAlarm = new DeviceAlarm();
  108 + deviceAlarm.setCreateTime(DateUtil.getNow());
  109 + deviceAlarm.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
  110 + deviceAlarm.setChannelId(channelId);
  111 + deviceAlarm.setAlarmPriority(getText(sipMsgInfo.getRootElement(), "AlarmPriority"));
  112 + deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod"));
  113 + String alarmTime = XmlUtil.getText(sipMsgInfo.getRootElement(), "AlarmTime");
  114 + if (alarmTime == null) {
  115 + return;
  116 + }
  117 + deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
  118 + String alarmDescription = getText(sipMsgInfo.getRootElement(), "AlarmDescription");
  119 + if (alarmDescription == null) {
  120 + deviceAlarm.setAlarmDescription("");
  121 + } else {
  122 + deviceAlarm.setAlarmDescription(alarmDescription);
  123 + }
  124 + String longitude = getText(sipMsgInfo.getRootElement(), "Longitude");
  125 + if (longitude != null && NumericUtil.isDouble(longitude)) {
  126 + deviceAlarm.setLongitude(Double.parseDouble(longitude));
  127 + } else {
  128 + deviceAlarm.setLongitude(0.00);
  129 + }
  130 + String latitude = getText(sipMsgInfo.getRootElement(), "Latitude");
  131 + if (latitude != null && NumericUtil.isDouble(latitude)) {
  132 + deviceAlarm.setLatitude(Double.parseDouble(latitude));
  133 + } else {
  134 + deviceAlarm.setLatitude(0.00);
  135 + }
  136 +
  137 + if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) {
  138 + if ( deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) {
  139 + MobilePosition mobilePosition = new MobilePosition();
  140 + mobilePosition.setCreateTime(DateUtil.getNow());
  141 + mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
  142 + mobilePosition.setTime(deviceAlarm.getAlarmTime());
  143 + mobilePosition.setLongitude(deviceAlarm.getLongitude());
  144 + mobilePosition.setLatitude(deviceAlarm.getLatitude());
  145 + mobilePosition.setReportSource("GPS Alarm");
  146 +
  147 + // 更新device channel 的经纬度
  148 + DeviceChannel deviceChannel = new DeviceChannel();
  149 + deviceChannel.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
  150 + deviceChannel.setChannelId(channelId);
  151 + deviceChannel.setLongitude(mobilePosition.getLongitude());
  152 + deviceChannel.setLatitude(mobilePosition.getLatitude());
  153 + deviceChannel.setGpsTime(mobilePosition.getTime());
  154 +
  155 + deviceChannel = deviceChannelService.updateGps(deviceChannel, sipMsgInfo.getDevice());
  156 +
  157 + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
  158 + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
  159 + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
  160 + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
  161 +
  162 + if (userSetting.getSavePositionHistory()) {
  163 + storager.insertMobilePosition(mobilePosition);
  164 + }
  165 + storager.updateChannelPosition(deviceChannel);
  166 +
  167 + // 发送redis消息。 通知位置信息的变化
  168 + JSONObject jsonObject = new JSONObject();
  169 + jsonObject.put("time", mobilePosition.getTime());
  170 + jsonObject.put("serial", deviceChannel.getDeviceId());
  171 + jsonObject.put("code", deviceChannel.getChannelId());
  172 + jsonObject.put("longitude", mobilePosition.getLongitude());
  173 + jsonObject.put("latitude", mobilePosition.getLatitude());
  174 + jsonObject.put("altitude", mobilePosition.getAltitude());
  175 + jsonObject.put("direction", mobilePosition.getDirection());
  176 + jsonObject.put("speed", mobilePosition.getSpeed());
  177 + redisCatchStorage.sendMobilePositionMsg(jsonObject);
  178 + }
  179 + }
  180 + if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) {
  181 + if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) {
  182 + deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType"));
  183 + }
  184 + }
  185 +
  186 + if ("7".equals(deviceAlarm.getAlarmMethod()) ) {
  187 + // 发送给平台的报警信息。 发送redis通知
  188 + AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage();
  189 + alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod()));
  190 + alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription());
  191 + alarmChannelMessage.setGbId(channelId);
  192 + redisCatchStorage.sendAlarmMsg(alarmChannelMessage);
  193 + return;
  194 + }
  195 +
  196 + logger.debug("存储报警信息、报警分类");
  197 + // 存储报警信息、报警分类
  198 + if (sipConfig.isAlarm()) {
  199 + deviceAlarmService.add(deviceAlarm);
  200 + }
  201 + logger.info("[收到报警通知]内容:{}", JSONObject.toJSON(deviceAlarm));
  202 + if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) {
  203 + publisher.deviceAlarmEventPublish(deviceAlarm);
  204 + }
145 } 205 }
146 - storager.updateChannelPosition(deviceChannel);  
147 -  
148 - // 发送redis消息。 通知位置信息的变化  
149 - JSONObject jsonObject = new JSONObject();  
150 - jsonObject.put("time", mobilePosition.getTime());  
151 - jsonObject.put("serial", deviceChannel.getDeviceId());  
152 - jsonObject.put("code", deviceChannel.getChannelId());  
153 - jsonObject.put("longitude", mobilePosition.getLongitude());  
154 - jsonObject.put("latitude", mobilePosition.getLatitude());  
155 - jsonObject.put("altitude", mobilePosition.getAltitude());  
156 - jsonObject.put("direction", mobilePosition.getDirection());  
157 - jsonObject.put("speed", mobilePosition.getSpeed());  
158 - redisCatchStorage.sendMobilePositionMsg(jsonObject);  
159 - }  
160 - }  
161 - if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) {  
162 - if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) {  
163 - deviceAlarm.setAlarmType(getText(rootElement.element("Info"), "AlarmType"));  
164 - } 206 + taskQueueHandlerRun = false;
  207 + });
165 } 208 }
166 209
167 - if ("7".equals(deviceAlarm.getAlarmMethod()) ) {  
168 - // 发送给平台的报警信息。 发送redis通知  
169 - AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage();  
170 - alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod()));  
171 - alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription());  
172 - alarmChannelMessage.setGbId(channelId);  
173 - redisCatchStorage.sendAlarmMsg(alarmChannelMessage);  
174 - return;  
175 - }  
176 210
177 - logger.debug("存储报警信息、报警分类");  
178 - // 存储报警信息、报警分类  
179 - if (sipConfig.isAlarm()) {  
180 - deviceAlarmService.add(deviceAlarm);  
181 - }  
182 - logger.info("[收到报警通知]内容:{}", JSONObject.toJSON(deviceAlarm));  
183 - if (redisCatchStorage.deviceIsOnline(device.getDeviceId())) {  
184 - publisher.deviceAlarmEventPublish(deviceAlarm);  
185 - }  
186 } 211 }
187 212
188 @Override 213 @Override
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
@@ -17,6 +17,8 @@ import org.slf4j.Logger; @@ -17,6 +17,8 @@ import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory; 17 import org.slf4j.LoggerFactory;
18 import org.springframework.beans.factory.InitializingBean; 18 import org.springframework.beans.factory.InitializingBean;
19 import org.springframework.beans.factory.annotation.Autowired; 19 import org.springframework.beans.factory.annotation.Autowired;
  20 +import org.springframework.beans.factory.annotation.Qualifier;
  21 +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
20 import org.springframework.stereotype.Component; 22 import org.springframework.stereotype.Component;
21 import org.springframework.util.ObjectUtils; 23 import org.springframework.util.ObjectUtils;
22 import org.springframework.util.StringUtils; 24 import org.springframework.util.StringUtils;
@@ -26,6 +28,7 @@ import javax.sip.RequestEvent; @@ -26,6 +28,7 @@ import javax.sip.RequestEvent;
26 import javax.sip.SipException; 28 import javax.sip.SipException;
27 import javax.sip.message.Response; 29 import javax.sip.message.Response;
28 import java.text.ParseException; 30 import java.text.ParseException;
  31 +import java.util.concurrent.ConcurrentLinkedQueue;
29 32
30 import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; 33 import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
31 34
@@ -53,6 +56,14 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen @@ -53,6 +56,14 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
53 @Autowired 56 @Autowired
54 private IDeviceChannelService deviceChannelService; 57 private IDeviceChannelService deviceChannelService;
55 58
  59 + private boolean taskQueueHandlerRun = false;
  60 +
  61 + private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
  62 +
  63 + @Qualifier("taskExecutor")
  64 + @Autowired
  65 + private ThreadPoolTaskExecutor taskExecutor;
  66 +
56 @Override 67 @Override
57 public void afterPropertiesSet() throws Exception { 68 public void afterPropertiesSet() throws Exception {
58 notifyMessageHandler.addHandler(cmdType, this); 69 notifyMessageHandler.addHandler(cmdType, this);
@@ -61,78 +72,91 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen @@ -61,78 +72,91 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
61 @Override 72 @Override
62 public void handForDevice(RequestEvent evt, Device device, Element rootElement) { 73 public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
63 74
64 - try {  
65 - rootElement = getRootElement(evt, device.getCharset());  
66 - if (rootElement == null) {  
67 - logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", evt.getRequest());  
68 - responseAck(getServerTransaction(evt), Response.BAD_REQUEST);  
69 - return;  
70 - }  
71 - MobilePosition mobilePosition = new MobilePosition();  
72 - mobilePosition.setCreateTime(DateUtil.getNow());  
73 - if (!ObjectUtils.isEmpty(device.getName())) {  
74 - mobilePosition.setDeviceName(device.getName());  
75 - }  
76 - mobilePosition.setDeviceId(device.getDeviceId());  
77 - mobilePosition.setChannelId(getText(rootElement, "DeviceID"));  
78 - mobilePosition.setTime(getText(rootElement, "Time"));  
79 - mobilePosition.setLongitude(Double.parseDouble(getText(rootElement, "Longitude")));  
80 - mobilePosition.setLatitude(Double.parseDouble(getText(rootElement, "Latitude")));  
81 - if (NumericUtil.isDouble(getText(rootElement, "Speed"))) {  
82 - mobilePosition.setSpeed(Double.parseDouble(getText(rootElement, "Speed")));  
83 - } else {  
84 - mobilePosition.setSpeed(0.0);  
85 - }  
86 - if (NumericUtil.isDouble(getText(rootElement, "Direction"))) {  
87 - mobilePosition.setDirection(Double.parseDouble(getText(rootElement, "Direction")));  
88 - } else {  
89 - mobilePosition.setDirection(0.0);  
90 - }  
91 - if (NumericUtil.isDouble(getText(rootElement, "Altitude"))) {  
92 - mobilePosition.setAltitude(Double.parseDouble(getText(rootElement, "Altitude")));  
93 - } else {  
94 - mobilePosition.setAltitude(0.0);  
95 - }  
96 - mobilePosition.setReportSource("Mobile Position");  
97 -  
98 -  
99 - // 更新device channel 的经纬度  
100 - DeviceChannel deviceChannel = new DeviceChannel();  
101 - deviceChannel.setDeviceId(device.getDeviceId());  
102 - deviceChannel.setChannelId(mobilePosition.getChannelId());  
103 - deviceChannel.setLongitude(mobilePosition.getLongitude());  
104 - deviceChannel.setLatitude(mobilePosition.getLatitude());  
105 - deviceChannel.setGpsTime(mobilePosition.getTime());  
106 -  
107 - deviceChannel = deviceChannelService.updateGps(deviceChannel, device);  
108 -  
109 - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());  
110 - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());  
111 - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());  
112 - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());  
113 -  
114 - if (userSetting.getSavePositionHistory()) {  
115 - storager.insertMobilePosition(mobilePosition);  
116 - }  
117 - storager.updateChannelPosition(deviceChannel);  
118 - //回复 200 OK  
119 - responseAck(getServerTransaction(evt), Response.OK);  
120 -  
121 - // 发送redis消息。 通知位置信息的变化  
122 - JSONObject jsonObject = new JSONObject();  
123 - jsonObject.put("time", mobilePosition.getTime());  
124 - jsonObject.put("serial", deviceChannel.getDeviceId());  
125 - jsonObject.put("code", deviceChannel.getChannelId());  
126 - jsonObject.put("longitude", mobilePosition.getLongitude());  
127 - jsonObject.put("latitude", mobilePosition.getLatitude());  
128 - jsonObject.put("altitude", mobilePosition.getAltitude());  
129 - jsonObject.put("direction", mobilePosition.getDirection());  
130 - jsonObject.put("speed", mobilePosition.getSpeed());  
131 - redisCatchStorage.sendMobilePositionMsg(jsonObject);  
132 -  
133 - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {  
134 - e.printStackTrace(); 75 + taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
  76 + if (!taskQueueHandlerRun) {
  77 + taskQueueHandlerRun = true;
  78 + taskExecutor.execute(() -> {
  79 + while (!taskQueue.isEmpty()) {
  80 + SipMsgInfo sipMsgInfo = taskQueue.poll();
  81 + try {
  82 + Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset());
  83 + if (rootElementAfterCharset == null) {
  84 + logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest());
  85 + responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.BAD_REQUEST);
  86 + return;
  87 + }
  88 + MobilePosition mobilePosition = new MobilePosition();
  89 + mobilePosition.setCreateTime(DateUtil.getNow());
  90 + if (!ObjectUtils.isEmpty(sipMsgInfo.getDevice().getName())) {
  91 + mobilePosition.setDeviceName(sipMsgInfo.getDevice().getName());
  92 + }
  93 + mobilePosition.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
  94 + mobilePosition.setChannelId(getText(rootElementAfterCharset, "DeviceID"));
  95 + mobilePosition.setTime(getText(rootElementAfterCharset, "Time"));
  96 + mobilePosition.setLongitude(Double.parseDouble(getText(rootElementAfterCharset, "Longitude")));
  97 + mobilePosition.setLatitude(Double.parseDouble(getText(rootElementAfterCharset, "Latitude")));
  98 + if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Speed"))) {
  99 + mobilePosition.setSpeed(Double.parseDouble(getText(rootElementAfterCharset, "Speed")));
  100 + } else {
  101 + mobilePosition.setSpeed(0.0);
  102 + }
  103 + if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Direction"))) {
  104 + mobilePosition.setDirection(Double.parseDouble(getText(rootElementAfterCharset, "Direction")));
  105 + } else {
  106 + mobilePosition.setDirection(0.0);
  107 + }
  108 + if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Altitude"))) {
  109 + mobilePosition.setAltitude(Double.parseDouble(getText(rootElementAfterCharset, "Altitude")));
  110 + } else {
  111 + mobilePosition.setAltitude(0.0);
  112 + }
  113 + mobilePosition.setReportSource("Mobile Position");
  114 +
  115 +
  116 + // 更新device channel 的经纬度
  117 + DeviceChannel deviceChannel = new DeviceChannel();
  118 + deviceChannel.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
  119 + deviceChannel.setChannelId(mobilePosition.getChannelId());
  120 + deviceChannel.setLongitude(mobilePosition.getLongitude());
  121 + deviceChannel.setLatitude(mobilePosition.getLatitude());
  122 + deviceChannel.setGpsTime(mobilePosition.getTime());
  123 +
  124 + deviceChannel = deviceChannelService.updateGps(deviceChannel, sipMsgInfo.getDevice());
  125 +
  126 + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84());
  127 + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84());
  128 + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
  129 + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
  130 +
  131 + if (userSetting.getSavePositionHistory()) {
  132 + storager.insertMobilePosition(mobilePosition);
  133 + }
  134 + storager.updateChannelPosition(deviceChannel);
  135 + //回复 200 OK
  136 + responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK);
  137 +
  138 + // 发送redis消息。 通知位置信息的变化
  139 + JSONObject jsonObject = new JSONObject();
  140 + jsonObject.put("time", mobilePosition.getTime());
  141 + jsonObject.put("serial", deviceChannel.getDeviceId());
  142 + jsonObject.put("code", deviceChannel.getChannelId());
  143 + jsonObject.put("longitude", mobilePosition.getLongitude());
  144 + jsonObject.put("latitude", mobilePosition.getLatitude());
  145 + jsonObject.put("altitude", mobilePosition.getAltitude());
  146 + jsonObject.put("direction", mobilePosition.getDirection());
  147 + jsonObject.put("speed", mobilePosition.getSpeed());
  148 + redisCatchStorage.sendMobilePositionMsg(jsonObject);
  149 +
  150 + } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
  151 + e.printStackTrace();
  152 + }
  153 +
  154 + }
  155 + taskQueueHandlerRun = false;
  156 + });
135 } 157 }
  158 +
  159 +
136 } 160 }
137 161
138 @Override 162 @Override
src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
1 package com.genersoft.iot.vmp.service.impl; 1 package com.genersoft.iot.vmp.service.impl;
2 2
3 import com.genersoft.iot.vmp.conf.DynamicTask; 3 import com.genersoft.iot.vmp.conf.DynamicTask;
  4 +import com.genersoft.iot.vmp.conf.UserSetting;
4 import com.genersoft.iot.vmp.gb28181.bean.*; 5 import com.genersoft.iot.vmp.gb28181.bean.*;
5 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; 6 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
6 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; 7 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
@@ -59,6 +60,9 @@ public class PlatformServiceImpl implements IPlatformService { @@ -59,6 +60,9 @@ public class PlatformServiceImpl implements IPlatformService {
59 @Autowired 60 @Autowired
60 private GbStreamMapper gbStreamMapper; 61 private GbStreamMapper gbStreamMapper;
61 62
  63 + @Autowired
  64 + private UserSetting userSetting;
  65 +
62 66
63 67
64 @Override 68 @Override
@@ -241,7 +245,7 @@ public class PlatformServiceImpl implements IPlatformService { @@ -241,7 +245,7 @@ public class PlatformServiceImpl implements IPlatformService {
241 if (subscribe != null) { 245 if (subscribe != null) {
242 246
243 // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持 247 // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
244 - List<DeviceChannel> gbStreams = gbStreamMapper.queryGbStreamListInPlatform(platform.getServerGBId()); 248 + List<DeviceChannel> gbStreams = gbStreamMapper.queryGbStreamListInPlatform(platform.getServerGBId(), userSetting.isUsePushingAsStatus());
245 if (gbStreams.size() == 0) { 249 if (gbStreams.size() == 0) {
246 return; 250 return;
247 } 251 }
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -131,23 +131,6 @@ public class PlayServiceImpl implements IPlayService { @@ -131,23 +131,6 @@ public class PlayServiceImpl implements IPlayService {
131 StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); 131 StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
132 playResult.setDevice(device); 132 playResult.setDevice(device);
133 133
134 - result.onCompletion(()->{  
135 - // 点播结束时调用截图接口  
136 - taskExecutor.execute(()->{  
137 - // TODO 应该在上流时调用更好,结束也可能是错误结束  
138 - String path = "snap";  
139 - String fileName = deviceId + "_" + channelId + ".jpg";  
140 - WVPResult wvpResult = (WVPResult)result.getResult();  
141 - if (Objects.requireNonNull(wvpResult).getCode() == 0) {  
142 - StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();  
143 - MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());  
144 - String streamUrl = streamInfoForSuccess.getFmp4();  
145 - // 请求截图  
146 - logger.info("[请求截图]: " + fileName);  
147 - zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);  
148 - }  
149 - });  
150 - });  
151 if (streamInfo != null) { 134 if (streamInfo != null) {
152 String streamId = streamInfo.getStream(); 135 String streamId = streamInfo.getStream();
153 if (streamId == null) { 136 if (streamId == null) {
@@ -209,6 +192,21 @@ public class PlayServiceImpl implements IPlayService { @@ -209,6 +192,21 @@ public class PlayServiceImpl implements IPlayService {
209 SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); 192 SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
210 logger.info(JSONObject.toJSONString(ssrcInfo)); 193 logger.info(JSONObject.toJSONString(ssrcInfo));
211 play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{ 194 play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
  195 + // 点播结束时调用截图接口
  196 + taskExecutor.execute(()->{
  197 + // TODO 应该在上流时调用更好,结束也可能是错误结束
  198 + String path = "snap";
  199 + String fileName = deviceId + "_" + channelId + ".jpg";
  200 + WVPResult wvpResult = (WVPResult)result.getResult();
  201 + if (Objects.requireNonNull(wvpResult).getCode() == 0) {
  202 + StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
  203 + MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
  204 + String streamUrl = streamInfoForSuccess.getFmp4();
  205 + // 请求截图
  206 + logger.info("[请求截图]: " + fileName);
  207 + zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
  208 + }
  209 + });
212 if (hookEvent != null) { 210 if (hookEvent != null) {
213 hookEvent.response(mediaServerItem, response); 211 hookEvent.response(mediaServerItem, response);
214 } 212 }
src/main/java/com/genersoft/iot/vmp/service/impl/RedisAlarmMsgListener.java renamed to src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java
1 -package com.genersoft.iot.vmp.service.impl; 1 +package com.genersoft.iot.vmp.service.redisMsg;
2 2
3 import com.alibaba.fastjson.JSON; 3 import com.alibaba.fastjson.JSON;
4 import com.genersoft.iot.vmp.gb28181.bean.*; 4 import com.genersoft.iot.vmp.gb28181.bean.*;
5 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; 5 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
6 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; 6 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
7 -import com.genersoft.iot.vmp.service.IPlatformChannelService;  
8 -import com.genersoft.iot.vmp.storager.IRedisCatchStorage;  
9 import com.genersoft.iot.vmp.storager.IVideoManagerStorage; 7 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
10 import com.genersoft.iot.vmp.utils.DateUtil; 8 import com.genersoft.iot.vmp.utils.DateUtil;
11 import org.slf4j.Logger; 9 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory; 10 import org.slf4j.LoggerFactory;
13 import org.springframework.beans.factory.annotation.Autowired; 11 import org.springframework.beans.factory.annotation.Autowired;
  12 +import org.springframework.beans.factory.annotation.Qualifier;
14 import org.springframework.data.redis.connection.Message; 13 import org.springframework.data.redis.connection.Message;
15 import org.springframework.data.redis.connection.MessageListener; 14 import org.springframework.data.redis.connection.MessageListener;
  15 +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
16 import org.springframework.stereotype.Component; 16 import org.springframework.stereotype.Component;
17 import org.springframework.util.ObjectUtils; 17 import org.springframework.util.ObjectUtils;
18 18
19 import java.util.List; 19 import java.util.List;
  20 +import java.util.concurrent.ConcurrentLinkedQueue;
20 21
21 22
22 @Component 23 @Component
@@ -33,45 +34,67 @@ public class RedisAlarmMsgListener implements MessageListener { @@ -33,45 +34,67 @@ public class RedisAlarmMsgListener implements MessageListener {
33 @Autowired 34 @Autowired
34 private IVideoManagerStorage storage; 35 private IVideoManagerStorage storage;
35 36
  37 + private boolean taskQueueHandlerRun = false;
  38 +
  39 + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
  40 +
  41 + @Qualifier("taskExecutor")
  42 + @Autowired
  43 + private ThreadPoolTaskExecutor taskExecutor;
  44 +
36 @Override 45 @Override
37 public void onMessage(Message message, byte[] bytes) { 46 public void onMessage(Message message, byte[] bytes) {
38 logger.info("收到来自REDIS的ALARM通知: {}", new String(message.getBody())); 47 logger.info("收到来自REDIS的ALARM通知: {}", new String(message.getBody()));
39 - AlarmChannelMessage alarmChannelMessage = JSON.parseObject(message.getBody(), AlarmChannelMessage.class);  
40 - if (alarmChannelMessage == null) {  
41 - logger.warn("[REDIS的ALARM通知]消息解析失败");  
42 - return;  
43 - }  
44 - String gbId = alarmChannelMessage.getGbId();  
45 -  
46 - DeviceAlarm deviceAlarm = new DeviceAlarm();  
47 - deviceAlarm.setCreateTime(DateUtil.getNow());  
48 - deviceAlarm.setChannelId(gbId);  
49 - deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription());  
50 - deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn());  
51 - deviceAlarm.setAlarmPriority("1");  
52 - deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601());  
53 - deviceAlarm.setAlarmType("1");  
54 - deviceAlarm.setLongitude(0);  
55 - deviceAlarm.setLatitude(0);  
56 -  
57 - if (ObjectUtils.isEmpty(gbId)) {  
58 - // 发送给所有的上级  
59 - List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true);  
60 - if (parentPlatforms.size() > 0) {  
61 - for (ParentPlatform parentPlatform : parentPlatforms) {  
62 - commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm); 48 +
  49 + taskQueue.offer(message);
  50 + if (!taskQueueHandlerRun) {
  51 + taskQueueHandlerRun = true;
  52 + taskExecutor.execute(() -> {
  53 + while (!taskQueue.isEmpty()) {
  54 + Message msg = taskQueue.poll();
  55 +
  56 + AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class);
  57 + if (alarmChannelMessage == null) {
  58 + logger.warn("[REDIS的ALARM通知]消息解析失败");
  59 + return;
  60 + }
  61 + String gbId = alarmChannelMessage.getGbId();
  62 +
  63 + DeviceAlarm deviceAlarm = new DeviceAlarm();
  64 + deviceAlarm.setCreateTime(DateUtil.getNow());
  65 + deviceAlarm.setChannelId(gbId);
  66 + deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription());
  67 + deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn());
  68 + deviceAlarm.setAlarmPriority("1");
  69 + deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601());
  70 + deviceAlarm.setAlarmType("1");
  71 + deviceAlarm.setLongitude(0);
  72 + deviceAlarm.setLatitude(0);
  73 +
  74 + if (ObjectUtils.isEmpty(gbId)) {
  75 + // 发送给所有的上级
  76 + List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true);
  77 + if (parentPlatforms.size() > 0) {
  78 + for (ParentPlatform parentPlatform : parentPlatforms) {
  79 + commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
  80 + }
  81 + }
  82 + }else {
  83 + Device device = storage.queryVideoDevice(gbId);
  84 + ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId);
  85 + if (device != null && platform == null) {
  86 + commander.sendAlarmMessage(device, deviceAlarm);
  87 + }else if (device == null && platform != null){
  88 + commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);
  89 + }else {
  90 + logger.warn("无法确定" + gbId + "是平台还是设备");
  91 + }
  92 + }
63 } 93 }
64 - }  
65 - }else {  
66 - Device device = storage.queryVideoDevice(gbId);  
67 - ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId);  
68 - if (device != null && platform == null) {  
69 - commander.sendAlarmMessage(device, deviceAlarm);  
70 - }else if (device == null && platform != null){  
71 - commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);  
72 - }else {  
73 - logger.warn("无法确定" + gbId + "是平台还是设备");  
74 - } 94 + taskQueueHandlerRun = false;
  95 + });
75 } 96 }
  97 +
  98 +
76 } 99 }
77 } 100 }
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java renamed to src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
1 -package com.genersoft.iot.vmp.service.impl; 1 +package com.genersoft.iot.vmp.service.redisMsg;
2 2
3 import com.alibaba.fastjson.JSON; 3 import com.alibaba.fastjson.JSON;
4 import com.alibaba.fastjson.JSONObject; 4 import com.alibaba.fastjson.JSONObject;
@@ -19,8 +19,10 @@ import com.genersoft.iot.vmp.vmanager.bean.WVPResult; @@ -19,8 +19,10 @@ import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
19 import org.slf4j.Logger; 19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory; 20 import org.slf4j.LoggerFactory;
21 import org.springframework.beans.factory.annotation.Autowired; 21 import org.springframework.beans.factory.annotation.Autowired;
  22 +import org.springframework.beans.factory.annotation.Qualifier;
22 import org.springframework.data.redis.connection.Message; 23 import org.springframework.data.redis.connection.Message;
23 import org.springframework.data.redis.connection.MessageListener; 24 import org.springframework.data.redis.connection.MessageListener;
  25 +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
24 import org.springframework.stereotype.Component; 26 import org.springframework.stereotype.Component;
25 27
26 import java.text.ParseException; 28 import java.text.ParseException;
@@ -28,6 +30,7 @@ import java.util.HashMap; @@ -28,6 +30,7 @@ import java.util.HashMap;
28 import java.util.Map; 30 import java.util.Map;
29 import java.util.UUID; 31 import java.util.UUID;
30 import java.util.concurrent.ConcurrentHashMap; 32 import java.util.concurrent.ConcurrentHashMap;
  33 +import java.util.concurrent.ConcurrentLinkedQueue;
31 34
32 35
33 /** 36 /**
@@ -85,6 +88,14 @@ public class RedisGbPlayMsgListener implements MessageListener { @@ -85,6 +88,14 @@ public class RedisGbPlayMsgListener implements MessageListener {
85 @Autowired 88 @Autowired
86 private ZlmHttpHookSubscribe subscribe; 89 private ZlmHttpHookSubscribe subscribe;
87 90
  91 + private boolean taskQueueHandlerRun = false;
  92 +
  93 + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
  94 +
  95 + @Qualifier("taskExecutor")
  96 + @Autowired
  97 + private ThreadPoolTaskExecutor taskExecutor;
  98 +
88 99
89 public interface PlayMsgCallback{ 100 public interface PlayMsgCallback{
90 void handler(ResponseSendItemMsg responseSendItemMsg) throws ParseException; 101 void handler(ResponseSendItemMsg responseSendItemMsg) throws ParseException;
@@ -100,94 +111,107 @@ public class RedisGbPlayMsgListener implements MessageListener { @@ -100,94 +111,107 @@ public class RedisGbPlayMsgListener implements MessageListener {
100 111
101 @Override 112 @Override
102 public void onMessage(Message message, byte[] bytes) { 113 public void onMessage(Message message, byte[] bytes) {
103 - JSONObject msgJSON = JSON.parseObject(message.getBody(), JSONObject.class);  
104 - WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class);  
105 - if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {  
106 - return;  
107 - }  
108 - if (WvpRedisMsg.isRequest(wvpRedisMsg)) {  
109 - logger.info("[收到REDIS通知] 请求: {}", new String(message.getBody()));  
110 -  
111 - switch (wvpRedisMsg.getCmd()){  
112 - case WvpRedisMsgCmd.GET_SEND_ITEM:  
113 - RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class);  
114 - requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());  
115 - break;  
116 - case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:  
117 - RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);;  
118 - requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());  
119 - break;  
120 - default:  
121 - break;  
122 - }  
123 114
124 - }else {  
125 - logger.info("[收到REDIS通知] 回复: {}", new String(message.getBody()));  
126 - switch (wvpRedisMsg.getCmd()){  
127 - case WvpRedisMsgCmd.GET_SEND_ITEM:  
128 -  
129 - WVPResult content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class);  
130 -  
131 - String key = wvpRedisMsg.getSerial();  
132 - switch (content.getCode()) {  
133 - case 0:  
134 - ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class);  
135 - PlayMsgCallback playMsgCallback = callbacks.get(key);  
136 - if (playMsgCallback != null) {  
137 - callbacksForError.remove(key);  
138 - try {  
139 - playMsgCallback.handler(responseSendItemMsg);  
140 - } catch (ParseException e) {  
141 - throw new RuntimeException(e);  
142 - }  
143 - }  
144 - break;  
145 - case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:  
146 - case ERROR_CODE_OFFLINE:  
147 - case ERROR_CODE_TIMEOUT:  
148 - PlayMsgErrorCallback errorCallback = callbacksForError.get(key);  
149 - if (errorCallback != null) {  
150 - callbacks.remove(key);  
151 - errorCallback.handler(content);  
152 - }  
153 - break;  
154 - default:  
155 - break; 115 + taskQueue.offer(message);
  116 + if (!taskQueueHandlerRun) {
  117 + taskQueueHandlerRun = true;
  118 + taskExecutor.execute(() -> {
  119 + while (!taskQueue.isEmpty()) {
  120 + Message msg = taskQueue.poll();
  121 + JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class);
  122 + WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class);
  123 + if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
  124 + return;
156 } 125 }
157 - break;  
158 - case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:  
159 - WVPResult wvpResult = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class);  
160 - String serial = wvpRedisMsg.getSerial();  
161 - switch (wvpResult.getCode()) {  
162 - case 0:  
163 - JSONObject jsonObject = (JSONObject)wvpResult.getData();  
164 - PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);  
165 - if (playMsgCallback != null) {  
166 - callbacksForError.remove(serial);  
167 - playMsgCallback.handler(jsonObject);  
168 - }  
169 - break;  
170 - case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:  
171 - case ERROR_CODE_OFFLINE:  
172 - case ERROR_CODE_TIMEOUT:  
173 - PlayMsgErrorCallback errorCallback = callbacksForError.get(serial);  
174 - if (errorCallback != null) {  
175 - callbacks.remove(serial);  
176 - errorCallback.handler(wvpResult);  
177 - }  
178 - break;  
179 - default:  
180 - break; 126 + if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
  127 + logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody()));
  128 +
  129 + switch (wvpRedisMsg.getCmd()){
  130 + case WvpRedisMsgCmd.GET_SEND_ITEM:
  131 + RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class);
  132 + requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
  133 + break;
  134 + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
  135 + RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);;
  136 + requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
  137 + break;
  138 + default:
  139 + break;
  140 + }
  141 +
  142 + }else {
  143 + logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody()));
  144 + switch (wvpRedisMsg.getCmd()){
  145 + case WvpRedisMsgCmd.GET_SEND_ITEM:
  146 +
  147 + WVPResult content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class);
  148 +
  149 + String key = wvpRedisMsg.getSerial();
  150 + switch (content.getCode()) {
  151 + case 0:
  152 + ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class);
  153 + PlayMsgCallback playMsgCallback = callbacks.get(key);
  154 + if (playMsgCallback != null) {
  155 + callbacksForError.remove(key);
  156 + try {
  157 + playMsgCallback.handler(responseSendItemMsg);
  158 + } catch (ParseException e) {
  159 + throw new RuntimeException(e);
  160 + }
  161 + }
  162 + break;
  163 + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
  164 + case ERROR_CODE_OFFLINE:
  165 + case ERROR_CODE_TIMEOUT:
  166 + PlayMsgErrorCallback errorCallback = callbacksForError.get(key);
  167 + if (errorCallback != null) {
  168 + callbacks.remove(key);
  169 + errorCallback.handler(content);
  170 + }
  171 + break;
  172 + default:
  173 + break;
  174 + }
  175 + break;
  176 + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
  177 + WVPResult wvpResult = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class);
  178 + String serial = wvpRedisMsg.getSerial();
  179 + switch (wvpResult.getCode()) {
  180 + case 0:
  181 + JSONObject jsonObject = (JSONObject)wvpResult.getData();
  182 + PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
  183 + if (playMsgCallback != null) {
  184 + callbacksForError.remove(serial);
  185 + playMsgCallback.handler(jsonObject);
  186 + }
  187 + break;
  188 + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
  189 + case ERROR_CODE_OFFLINE:
  190 + case ERROR_CODE_TIMEOUT:
  191 + PlayMsgErrorCallback errorCallback = callbacksForError.get(serial);
  192 + if (errorCallback != null) {
  193 + callbacks.remove(serial);
  194 + errorCallback.handler(wvpResult);
  195 + }
  196 + break;
  197 + default:
  198 + break;
  199 + }
  200 + break;
  201 + default:
  202 + break;
  203 + }
181 } 204 }
182 - break;  
183 - default:  
184 - break;  
185 - } 205 + }
  206 + taskQueueHandlerRun = false;
  207 + });
186 } 208 }
187 209
188 210
189 211
190 212
  213 +
  214 +
191 } 215 }
192 216
193 /** 217 /**
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java renamed to src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java
1 -package com.genersoft.iot.vmp.service.impl; 1 +package com.genersoft.iot.vmp.service.redisMsg;
2 2
3 import com.alibaba.fastjson.JSON; 3 import com.alibaba.fastjson.JSON;
4 -import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;  
5 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; 4 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
6 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 5 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
7 import com.genersoft.iot.vmp.storager.IVideoManagerStorage; 6 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java renamed to src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
1 -package com.genersoft.iot.vmp.service.impl; 1 +package com.genersoft.iot.vmp.service.redisMsg;
2 2
3 import com.alibaba.fastjson.JSON; 3 import com.alibaba.fastjson.JSON;
4 -import com.alibaba.fastjson.JSONObject;  
5 -import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;  
6 -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;  
7 -import com.genersoft.iot.vmp.service.IGbStreamService;  
8 -import com.genersoft.iot.vmp.service.IMediaServerService;  
9 -import com.genersoft.iot.vmp.service.IStreamPushService; 4 +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
10 import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; 5 import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
11 -import com.genersoft.iot.vmp.utils.DateUtil;  
12 import org.slf4j.Logger; 6 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory; 7 import org.slf4j.LoggerFactory;
  8 +import org.springframework.beans.factory.annotation.Autowired;
  9 +import org.springframework.beans.factory.annotation.Qualifier;
14 import org.springframework.data.redis.connection.Message; 10 import org.springframework.data.redis.connection.Message;
15 import org.springframework.data.redis.connection.MessageListener; 11 import org.springframework.data.redis.connection.MessageListener;
  12 +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
16 import org.springframework.stereotype.Component; 13 import org.springframework.stereotype.Component;
17 import org.springframework.util.ObjectUtils; 14 import org.springframework.util.ObjectUtils;
18 15
19 -import javax.annotation.Resource;  
20 -import java.util.ArrayList;  
21 -import java.util.List;  
22 import java.util.Map; 16 import java.util.Map;
23 import java.util.concurrent.ConcurrentHashMap; 17 import java.util.concurrent.ConcurrentHashMap;
  18 +import java.util.concurrent.ConcurrentLinkedQueue;
24 19
25 /** 20 /**
26 * 接收redis返回的推流结果 21 * 接收redis返回的推流结果
@@ -31,6 +26,15 @@ public class RedisPushStreamResponseListener implements MessageListener { @@ -31,6 +26,15 @@ public class RedisPushStreamResponseListener implements MessageListener {
31 26
32 private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class); 27 private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
33 28
  29 + private boolean taskQueueHandlerRun = false;
  30 +
  31 + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
  32 +
  33 + @Qualifier("taskExecutor")
  34 + @Autowired
  35 + private ThreadPoolTaskExecutor taskExecutor;
  36 +
  37 +
34 private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>(); 38 private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
35 39
36 public interface PushStreamResponseEvent{ 40 public interface PushStreamResponseEvent{
@@ -39,16 +43,25 @@ public class RedisPushStreamResponseListener implements MessageListener { @@ -39,16 +43,25 @@ public class RedisPushStreamResponseListener implements MessageListener {
39 43
40 @Override 44 @Override
41 public void onMessage(Message message, byte[] bytes) { 45 public void onMessage(Message message, byte[] bytes) {
42 - //  
43 logger.warn("[REDIS消息-请求推流结果]: {}", new String(message.getBody())); 46 logger.warn("[REDIS消息-请求推流结果]: {}", new String(message.getBody()));
44 - MessageForPushChannelResponse response = JSON.parseObject(new String(message.getBody()), MessageForPushChannelResponse.class);  
45 - if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){  
46 - logger.info("[REDIS消息-请求推流结果]:参数不全");  
47 - return;  
48 - }  
49 - // 查看正在等待的invite消息  
50 - if (responseEvents.get(response.getApp() + response.getStream()) != null) {  
51 - responseEvents.get(response.getApp() + response.getStream()).run(response); 47 + taskQueue.offer(message);
  48 + if (!taskQueueHandlerRun) {
  49 + taskQueueHandlerRun = true;
  50 + taskExecutor.execute(() -> {
  51 + while (!taskQueue.isEmpty()) {
  52 + Message msg = taskQueue.poll();
  53 + MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
  54 + if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
  55 + logger.info("[REDIS消息-请求推流结果]:参数不全");
  56 + return;
  57 + }
  58 + // 查看正在等待的invite消息
  59 + if (responseEvents.get(response.getApp() + response.getStream()) != null) {
  60 + responseEvents.get(response.getApp() + response.getStream()).run(response);
  61 + }
  62 + }
  63 + taskQueueHandlerRun = false;
  64 + });
52 } 65 }
53 } 66 }
54 67
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java renamed to src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
1 -package com.genersoft.iot.vmp.service.impl; 1 +package com.genersoft.iot.vmp.service.redisMsg;
2 2
3 import com.alibaba.fastjson.JSON; 3 import com.alibaba.fastjson.JSON;
4 import com.alibaba.fastjson.JSONObject; 4 import com.alibaba.fastjson.JSONObject;
@@ -6,15 +6,20 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; @@ -6,15 +6,20 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
6 import com.genersoft.iot.vmp.service.IGbStreamService; 6 import com.genersoft.iot.vmp.service.IGbStreamService;
7 import com.genersoft.iot.vmp.service.IMediaServerService; 7 import com.genersoft.iot.vmp.service.IMediaServerService;
8 import com.genersoft.iot.vmp.service.IStreamPushService; 8 import com.genersoft.iot.vmp.service.IStreamPushService;
  9 +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
9 import com.genersoft.iot.vmp.utils.DateUtil; 10 import com.genersoft.iot.vmp.utils.DateUtil;
10 import org.slf4j.Logger; 11 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory; 12 import org.slf4j.LoggerFactory;
  13 +import org.springframework.beans.factory.annotation.Autowired;
  14 +import org.springframework.beans.factory.annotation.Qualifier;
12 import org.springframework.data.redis.connection.Message; 15 import org.springframework.data.redis.connection.Message;
13 import org.springframework.data.redis.connection.MessageListener; 16 import org.springframework.data.redis.connection.MessageListener;
  17 +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
14 import org.springframework.stereotype.Component; 18 import org.springframework.stereotype.Component;
15 19
16 import javax.annotation.Resource; 20 import javax.annotation.Resource;
17 import java.util.*; 21 import java.util.*;
  22 +import java.util.concurrent.ConcurrentLinkedQueue;
18 23
19 /** 24 /**
20 * @Auther: JiangFeng 25 * @Auther: JiangFeng
@@ -33,49 +38,66 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener { @@ -33,49 +38,66 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
33 @Resource 38 @Resource
34 private IGbStreamService gbStreamService; 39 private IGbStreamService gbStreamService;
35 40
  41 + private boolean taskQueueHandlerRun = false;
  42 +
  43 + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
  44 +
  45 + @Qualifier("taskExecutor")
  46 + @Autowired
  47 + private ThreadPoolTaskExecutor taskExecutor;
  48 +
36 @Override 49 @Override
37 public void onMessage(Message message, byte[] bytes) { 50 public void onMessage(Message message, byte[] bytes) {
38 - //  
39 - logger.warn("[REDIS消息-推流设备列表更新]: {}", new String(message.getBody()));  
40 - List<StreamPushItem> streamPushItems = JSON.parseArray(new String(message.getBody()), StreamPushItem.class);  
41 - //查询全部的app+stream 用于判断是添加还是修改  
42 - List<String> allAppAndStream = streamPushService.getAllAppAndStream(); 51 + logger.info("[REDIS消息-推流设备列表更新]: {}", new String(message.getBody()));
43 52
44 - /**  
45 - * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表  
46 - */  
47 - List<StreamPushItem> streamPushItemForSave = new ArrayList<>();  
48 - List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();  
49 - for (StreamPushItem streamPushItem : streamPushItems) {  
50 - String app = streamPushItem.getApp();  
51 - String stream = streamPushItem.getStream();  
52 - boolean contains = allAppAndStream.contains(app + stream);  
53 - //不存在就添加  
54 - if (!contains) {  
55 - streamPushItem.setStreamType("push");  
56 - streamPushItem.setCreateTime(DateUtil.getNow());  
57 - streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());  
58 - streamPushItem.setOriginType(2);  
59 - streamPushItem.setOriginTypeStr("rtsp_push");  
60 - streamPushItem.setTotalReaderCount("0");  
61 - streamPushItemForSave.add(streamPushItem);  
62 - } else {  
63 - //存在就只修改 name和gbId  
64 - streamPushItemForUpdate.add(streamPushItem);  
65 - }  
66 - }  
67 - if (streamPushItemForSave.size() > 0) { 53 + taskQueue.offer(message);
  54 + if (!taskQueueHandlerRun) {
  55 + taskQueueHandlerRun = true;
  56 + taskExecutor.execute(() -> {
  57 + while (!taskQueue.isEmpty()) {
  58 + Message msg = taskQueue.poll();
  59 + List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
  60 + //查询全部的app+stream 用于判断是添加还是修改
  61 + List<String> allAppAndStream = streamPushService.getAllAppAndStream();
68 62
69 - logger.info("添加{}条",streamPushItemForSave.size());  
70 - logger.info(JSONObject.toJSONString(streamPushItemForSave));  
71 - streamPushService.batchAdd(streamPushItemForSave); 63 + /**
  64 + * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表
  65 + */
  66 + List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
  67 + List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
  68 + for (StreamPushItem streamPushItem : streamPushItems) {
  69 + String app = streamPushItem.getApp();
  70 + String stream = streamPushItem.getStream();
  71 + boolean contains = allAppAndStream.contains(app + stream);
  72 + //不存在就添加
  73 + if (!contains) {
  74 + streamPushItem.setStreamType("push");
  75 + streamPushItem.setCreateTime(DateUtil.getNow());
  76 + streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
  77 + streamPushItem.setOriginType(2);
  78 + streamPushItem.setOriginTypeStr("rtsp_push");
  79 + streamPushItem.setTotalReaderCount("0");
  80 + streamPushItemForSave.add(streamPushItem);
  81 + } else {
  82 + //存在就只修改 name和gbId
  83 + streamPushItemForUpdate.add(streamPushItem);
  84 + }
  85 + }
  86 + if (streamPushItemForSave.size() > 0) {
72 87
73 - }  
74 - if(streamPushItemForUpdate.size()>0){  
75 - logger.info("修改{}条",streamPushItemForUpdate.size());  
76 - logger.info(JSONObject.toJSONString(streamPushItemForUpdate));  
77 - gbStreamService.updateGbIdOrName(streamPushItemForUpdate);  
78 - } 88 + logger.info("添加{}条",streamPushItemForSave.size());
  89 + logger.info(JSONObject.toJSONString(streamPushItemForSave));
  90 + streamPushService.batchAdd(streamPushItemForSave);
79 91
  92 + }
  93 + if(streamPushItemForUpdate.size()>0){
  94 + logger.info("修改{}条",streamPushItemForUpdate.size());
  95 + logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
  96 + gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
  97 + }
  98 + }
  99 + taskQueueHandlerRun = false;
  100 + });
  101 + }
80 } 102 }
81 } 103 }
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java renamed to src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java
1 -package com.genersoft.iot.vmp.service.impl; 1 +package com.genersoft.iot.vmp.service.redisMsg;
2 2
3 import com.alibaba.fastjson.JSON; 3 import com.alibaba.fastjson.JSON;
4 -import com.alibaba.fastjson.JSONObject;  
5 import com.genersoft.iot.vmp.common.VideoManagerConstants; 4 import com.genersoft.iot.vmp.common.VideoManagerConstants;
6 import com.genersoft.iot.vmp.conf.DynamicTask; 5 import com.genersoft.iot.vmp.conf.DynamicTask;
7 -import com.genersoft.iot.vmp.conf.UserSetting;  
8 -import com.genersoft.iot.vmp.gb28181.bean.GbStream;  
9 -import com.genersoft.iot.vmp.gb28181.event.EventPublisher;  
10 -import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;  
11 -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;  
12 -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;  
13 -import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;  
14 -import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;  
15 -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;  
16 import com.genersoft.iot.vmp.service.IStreamPushService; 6 import com.genersoft.iot.vmp.service.IStreamPushService;
17 -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;  
18 import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto; 7 import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto;
19 -import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;  
20 import com.genersoft.iot.vmp.storager.IRedisCatchStorage; 8 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
21 -import com.genersoft.iot.vmp.storager.IVideoManagerStorage;  
22 import org.slf4j.Logger; 9 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory; 10 import org.slf4j.LoggerFactory;
24 import org.springframework.beans.factory.annotation.Autowired; 11 import org.springframework.beans.factory.annotation.Autowired;
@@ -30,8 +17,6 @@ import org.springframework.data.redis.connection.MessageListener; @@ -30,8 +17,6 @@ import org.springframework.data.redis.connection.MessageListener;
30 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 17 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
31 import org.springframework.stereotype.Component; 18 import org.springframework.stereotype.Component;
32 19
33 -import java.util.ArrayList;  
34 -import java.util.List;  
35 import java.util.concurrent.ConcurrentLinkedQueue; 20 import java.util.concurrent.ConcurrentLinkedQueue;
36 21
37 22
@@ -65,7 +50,6 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic @@ -65,7 +50,6 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
65 50
66 @Override 51 @Override
67 public void onMessage(Message message, byte[] bytes) { 52 public void onMessage(Message message, byte[] bytes) {
68 - // TODO 增加队列  
69 logger.warn("[REDIS消息-推流设备状态变化]: {}", new String(message.getBody())); 53 logger.warn("[REDIS消息-推流设备状态变化]: {}", new String(message.getBody()));
70 taskQueue.offer(message); 54 taskQueue.offer(message);
71 55
src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java renamed to src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
1 -package com.genersoft.iot.vmp.service.impl; 1 +package com.genersoft.iot.vmp.service.redisMsg;
2 2
3 import com.alibaba.fastjson.JSON; 3 import com.alibaba.fastjson.JSON;
4 import com.alibaba.fastjson.JSONObject; 4 import com.alibaba.fastjson.JSONObject;
5 import com.genersoft.iot.vmp.conf.UserSetting; 5 import com.genersoft.iot.vmp.conf.UserSetting;
6 6
7 -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;  
8 -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;  
9 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; 7 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
10 import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; 8 import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
11 -import com.genersoft.iot.vmp.storager.IVideoManagerStorage;  
12 import org.slf4j.Logger; 9 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory; 10 import org.slf4j.LoggerFactory;
14 import org.springframework.beans.factory.annotation.Autowired; 11 import org.springframework.beans.factory.annotation.Autowired;
  12 +import org.springframework.beans.factory.annotation.Qualifier;
15 import org.springframework.data.redis.connection.Message; 13 import org.springframework.data.redis.connection.Message;
16 import org.springframework.data.redis.connection.MessageListener; 14 import org.springframework.data.redis.connection.MessageListener;
  15 +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
17 import org.springframework.stereotype.Component; 16 import org.springframework.stereotype.Component;
18 17
  18 +import java.util.concurrent.ConcurrentLinkedQueue;
  19 +
19 20
20 /** 21 /**
21 * 接收其他wvp发送流变化通知 22 * 接收其他wvp发送流变化通知
@@ -32,41 +33,59 @@ public class RedisStreamMsgListener implements MessageListener { @@ -32,41 +33,59 @@ public class RedisStreamMsgListener implements MessageListener {
32 @Autowired 33 @Autowired
33 private ZLMMediaListManager zlmMediaListManager; 34 private ZLMMediaListManager zlmMediaListManager;
34 35
  36 + private boolean taskQueueHandlerRun = false;
  37 +
  38 + private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
  39 +
  40 + @Qualifier("taskExecutor")
  41 + @Autowired
  42 + private ThreadPoolTaskExecutor taskExecutor;
  43 +
35 @Override 44 @Override
36 public void onMessage(Message message, byte[] bytes) { 45 public void onMessage(Message message, byte[] bytes) {
37 46
38 - JSONObject steamMsgJson = JSON.parseObject(message.getBody(), JSONObject.class);  
39 - if (steamMsgJson == null) {  
40 - logger.warn("[收到redis 流变化]消息解析失败");  
41 - return;  
42 - }  
43 - String serverId = steamMsgJson.getString("serverId"); 47 + taskQueue.offer(message);
  48 + if (!taskQueueHandlerRun) {
  49 + taskQueueHandlerRun = true;
  50 + taskExecutor.execute(() -> {
  51 + while (!taskQueue.isEmpty()) {
  52 + Message msg = taskQueue.poll();
  53 + JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
  54 + if (steamMsgJson == null) {
  55 + logger.warn("[收到redis 流变化]消息解析失败");
  56 + return;
  57 + }
  58 + String serverId = steamMsgJson.getString("serverId");
44 59
45 - if (userSetting.getServerId().equals(serverId)) {  
46 - // 自己发送的消息忽略即可  
47 - return;  
48 - }  
49 - logger.info("[收到redis 流变化]: {}", new String(message.getBody()));  
50 - String app = steamMsgJson.getString("app");  
51 - String stream = steamMsgJson.getString("stream");  
52 - boolean register = steamMsgJson.getBoolean("register");  
53 - String mediaServerId = steamMsgJson.getString("mediaServerId");  
54 - MediaItem mediaItem = new MediaItem();  
55 - mediaItem.setSeverId(serverId);  
56 - mediaItem.setApp(app);  
57 - mediaItem.setStream(stream);  
58 - mediaItem.setRegist(register);  
59 - mediaItem.setMediaServerId(mediaServerId);  
60 - mediaItem.setCreateStamp(System.currentTimeMillis()/1000);  
61 - mediaItem.setAliveSecond(0L);  
62 - mediaItem.setTotalReaderCount("0");  
63 - mediaItem.setOriginType(0);  
64 - mediaItem.setOriginTypeStr("0");  
65 - mediaItem.setOriginTypeStr("unknown");  
66 - if (register) {  
67 - zlmMediaListManager.addPush(mediaItem);  
68 - }else {  
69 - zlmMediaListManager.removeMedia(app, stream); 60 + if (userSetting.getServerId().equals(serverId)) {
  61 + // 自己发送的消息忽略即可
  62 + return;
  63 + }
  64 + logger.info("[收到redis 流变化]: {}", new String(message.getBody()));
  65 + String app = steamMsgJson.getString("app");
  66 + String stream = steamMsgJson.getString("stream");
  67 + boolean register = steamMsgJson.getBoolean("register");
  68 + String mediaServerId = steamMsgJson.getString("mediaServerId");
  69 + MediaItem mediaItem = new MediaItem();
  70 + mediaItem.setSeverId(serverId);
  71 + mediaItem.setApp(app);
  72 + mediaItem.setStream(stream);
  73 + mediaItem.setRegist(register);
  74 + mediaItem.setMediaServerId(mediaServerId);
  75 + mediaItem.setCreateStamp(System.currentTimeMillis()/1000);
  76 + mediaItem.setAliveSecond(0L);
  77 + mediaItem.setTotalReaderCount("0");
  78 + mediaItem.setOriginType(0);
  79 + mediaItem.setOriginTypeStr("0");
  80 + mediaItem.setOriginTypeStr("unknown");
  81 + if (register) {
  82 + zlmMediaListManager.addPush(mediaItem);
  83 + }else {
  84 + zlmMediaListManager.removeMedia(app, stream);
  85 + }
  86 + }
  87 + taskQueueHandlerRun = false;
  88 + });
70 } 89 }
71 } 90 }
72 } 91 }
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
@@ -75,18 +75,23 @@ public interface GbStreamMapper { @@ -75,18 +75,23 @@ public interface GbStreamMapper {
75 "WHERE gs.gbId = '${gbId}' AND pgs.platformId = '${platformId}'") 75 "WHERE gs.gbId = '${gbId}' AND pgs.platformId = '${platformId}'")
76 GbStream queryStreamInPlatform(String platformId, String gbId); 76 GbStream queryStreamInPlatform(String platformId, String gbId);
77 77
78 - @Select("select gt.gbId as channelId, gt.name, 'wvp-pro' as manufacture, st.status, gt.longitude, gt.latitude, pc.id as parentId," + 78 + @Select("<script> "+
  79 + "select gt.gbId as channelId, gt.name, 'wvp-pro' as manufacture, st.status, gt.longitude, gt.latitude, pc.id as parentId," +
79 " '1' as registerWay, pc.civilCode, 'live' as model, 'wvp-pro' as owner, '0' as parental,'0' as secrecy" + 80 " '1' as registerWay, pc.civilCode, 'live' as model, 'wvp-pro' as owner, '0' as parental,'0' as secrecy" +
80 " from gb_stream gt " + 81 " from gb_stream gt " +
81 " left join (" + 82 " left join (" +
82 - " select sp.status, sp.app, sp.stream from stream_push sp" + 83 + " select " +
  84 + " <if test='usPushingAsStatus != true'> sp.status as status, </if>" +
  85 + " <if test='usPushingAsStatus == true'> sp.pushIng as status, </if>" +
  86 + "sp.app, sp.stream from stream_push sp" +
83 " union all" + 87 " union all" +
84 " select spxy.status, spxy.app, spxy.stream from stream_proxy spxy" + 88 " select spxy.status, spxy.app, spxy.stream from stream_proxy spxy" +
85 " ) st on st.app = gt.app and st.stream = gt.stream" + 89 " ) st on st.app = gt.app and st.stream = gt.stream" +
86 " left join platform_gb_stream pgs on gt.gbStreamId = pgs.gbStreamId" + 90 " left join platform_gb_stream pgs on gt.gbStreamId = pgs.gbStreamId" +
87 " left join platform_catalog pc on pgs.catalogId = pc.id and pgs.platformId = pc.platformId" + 91 " left join platform_catalog pc on pgs.catalogId = pc.id and pgs.platformId = pc.platformId" +
88 - " where pgs.platformId=#{platformId}")  
89 - List<DeviceChannel> queryGbStreamListInPlatform(String platformId); 92 + " where pgs.platformId=#{platformId}" +
  93 + "</script>")
  94 + List<DeviceChannel> queryGbStreamListInPlatform(String platformId, boolean usPushingAsStatus);
90 95
91 96
92 @Select("SELECT gs.* FROM gb_stream gs LEFT JOIN platform_gb_stream pgs " + 97 @Select("SELECT gs.* FROM gb_stream gs LEFT JOIN platform_gb_stream pgs " +
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.storager.impl; @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.storager.impl;
2 2
3 import com.genersoft.iot.vmp.common.StreamInfo; 3 import com.genersoft.iot.vmp.common.StreamInfo;
4 import com.genersoft.iot.vmp.conf.SipConfig; 4 import com.genersoft.iot.vmp.conf.SipConfig;
  5 +import com.genersoft.iot.vmp.conf.UserSetting;
5 import com.genersoft.iot.vmp.gb28181.bean.*; 6 import com.genersoft.iot.vmp.gb28181.bean.*;
6 import com.genersoft.iot.vmp.gb28181.event.EventPublisher; 7 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
7 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; 8 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
@@ -84,6 +85,9 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { @@ -84,6 +85,9 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
84 private GbStreamMapper gbStreamMapper; 85 private GbStreamMapper gbStreamMapper;
85 86
86 @Autowired 87 @Autowired
  88 + private UserSetting userSetting;
  89 +
  90 + @Autowired
87 private PlatformCatalogMapper catalogMapper; 91 private PlatformCatalogMapper catalogMapper;
88 92
89 @Autowired 93 @Autowired
@@ -614,7 +618,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { @@ -614,7 +618,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
614 */ 618 */
615 @Override 619 @Override
616 public List<DeviceChannel> queryGbStreamListInPlatform(String platformId) { 620 public List<DeviceChannel> queryGbStreamListInPlatform(String platformId) {
617 - return gbStreamMapper.queryGbStreamListInPlatform(platformId); 621 + return gbStreamMapper.queryGbStreamListInPlatform(platformId, userSetting.isUsePushingAsStatus());
618 } 622 }
619 623
620 /** 624 /**
src/main/resources/all-application.yml
@@ -188,6 +188,8 @@ user-settings: @@ -188,6 +188,8 @@ user-settings:
188 record-sip: true 188 record-sip: true
189 # 是否将日志存储进数据库 189 # 是否将日志存储进数据库
190 logInDatebase: true 190 logInDatebase: true
  191 + # 使用推流状态作为推流通道状态
  192 + use-pushing-as-status: true
191 193
192 # 关闭在线文档(生产环境建议关闭) 194 # 关闭在线文档(生产环境建议关闭)
193 springdoc: 195 springdoc: