Commit 14151c07e3d04509d1c8c9ebecde5d9997046efd
1 parent
f89491ad
优化队列处理逻辑
Showing
15 changed files
with
467 additions
and
554 deletions
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
| ... | ... | @@ -29,8 +29,6 @@ public class SipLayer implements CommandLineRunner { |
| 29 | 29 | @Autowired |
| 30 | 30 | private ISIPProcessorObserver sipProcessorObserver; |
| 31 | 31 | |
| 32 | - | |
| 33 | - | |
| 34 | 32 | private final Map<String, SipProviderImpl> tcpSipProviderMap = new ConcurrentHashMap<>(); |
| 35 | 33 | private final Map<String, SipProviderImpl> udpSipProviderMap = new ConcurrentHashMap<>(); |
| 36 | 34 | ... | ... |
src/main/java/com/genersoft/iot/vmp/gb28181/auth/DigestServerAuthenticationHelper.java
| ... | ... | @@ -25,10 +25,9 @@ |
| 25 | 25 | */ |
| 26 | 26 | package com.genersoft.iot.vmp.gb28181.auth; |
| 27 | 27 | |
| 28 | -import java.security.MessageDigest; | |
| 29 | -import java.security.NoSuchAlgorithmException; | |
| 30 | -import java.time.Instant; | |
| 31 | -import java.util.Random; | |
| 28 | +import gov.nist.core.InternalErrorHandler; | |
| 29 | +import org.slf4j.Logger; | |
| 30 | +import org.slf4j.LoggerFactory; | |
| 32 | 31 | |
| 33 | 32 | import javax.sip.address.URI; |
| 34 | 33 | import javax.sip.header.AuthorizationHeader; |
| ... | ... | @@ -36,10 +35,10 @@ import javax.sip.header.HeaderFactory; |
| 36 | 35 | import javax.sip.header.WWWAuthenticateHeader; |
| 37 | 36 | import javax.sip.message.Request; |
| 38 | 37 | import javax.sip.message.Response; |
| 39 | - | |
| 40 | -import gov.nist.core.InternalErrorHandler; | |
| 41 | -import org.slf4j.Logger; | |
| 42 | -import org.slf4j.LoggerFactory; | |
| 38 | +import java.security.MessageDigest; | |
| 39 | +import java.security.NoSuchAlgorithmException; | |
| 40 | +import java.time.Instant; | |
| 41 | +import java.util.Random; | |
| 43 | 42 | |
| 44 | 43 | /** |
| 45 | 44 | * Implements the HTTP digest authentication method server side functionality. |
| ... | ... | @@ -201,12 +200,13 @@ public class DigestServerAuthenticationHelper { |
| 201 | 200 | // String ncStr = new DecimalFormat("00000000").format(Integer.parseInt(nc + "", 16)); |
| 202 | 201 | |
| 203 | 202 | String A1 = username + ":" + realm + ":" + pass; |
| 203 | + | |
| 204 | 204 | String A2 = request.getMethod().toUpperCase() + ":" + uri.toString(); |
| 205 | + | |
| 205 | 206 | byte mdbytes[] = messageDigest.digest(A1.getBytes()); |
| 206 | 207 | String HA1 = toHexString(mdbytes); |
| 207 | 208 | logger.debug("A1: " + A1); |
| 208 | 209 | logger.debug("A2: " + A2); |
| 209 | - | |
| 210 | 210 | mdbytes = messageDigest.digest(A2.getBytes()); |
| 211 | 211 | String HA2 = toHexString(mdbytes); |
| 212 | 212 | logger.debug("HA1: " + HA1); |
| ... | ... | @@ -238,58 +238,4 @@ public class DigestServerAuthenticationHelper { |
| 238 | 238 | |
| 239 | 239 | } |
| 240 | 240 | |
| 241 | -// public static void main(String[] args) throws NoSuchAlgorithmException { | |
| 242 | -// String realm = "3402000000"; | |
| 243 | -// String username = "44010000001180008012"; | |
| 244 | - | |
| 245 | - | |
| 246 | -// String nonce = "07cab60999fbf643264ace27d3b7de8b"; | |
| 247 | -// String uri = "sip:34020000002000000001@3402000000"; | |
| 248 | -// // qop 保护质量 包含auth(默认的)和auth-int(增加了报文完整性检测)两种策略 | |
| 249 | -// String qop = "auth"; | |
| 250 | - | |
| 251 | -// // 客户端随机数,这是一个不透明的字符串值,由客户端提供,并且客户端和服务器都会使用,以避免用明文文本。 | |
| 252 | -// // 这使得双方都可以查验对方的身份,并对消息的完整性提供一些保护 | |
| 253 | -// //String cNonce = authHeader.getCNonce(); | |
| 254 | - | |
| 255 | -// // nonce计数器,是一个16进制的数值,表示同一nonce下客户端发送出请求的数量 | |
| 256 | -// int nc = 1; | |
| 257 | -// String ncStr = new DecimalFormat("00000000").format(nc); | |
| 258 | -// // String ncStr = new DecimalFormat("00000000").format(Integer.parseInt(nc + "", 16)); | |
| 259 | -// MessageDigest messageDigest = MessageDigest.getInstance(DEFAULT_ALGORITHM); | |
| 260 | -// String A1 = username + ":" + realm + ":" + "12345678"; | |
| 261 | -// String A2 = "REGISTER" + ":" + uri; | |
| 262 | -// byte mdbytes[] = messageDigest.digest(A1.getBytes()); | |
| 263 | -// String HA1 = toHexString(mdbytes); | |
| 264 | -// System.out.println("A1: " + A1); | |
| 265 | -// System.out.println("A2: " + A2); | |
| 266 | - | |
| 267 | -// mdbytes = messageDigest.digest(A2.getBytes()); | |
| 268 | -// String HA2 = toHexString(mdbytes); | |
| 269 | -// System.out.println("HA1: " + HA1); | |
| 270 | -// System.out.println("HA2: " + HA2); | |
| 271 | -// String cnonce = "0a4f113b"; | |
| 272 | -// System.out.println("nonce: " + nonce); | |
| 273 | -// System.out.println("nc: " + ncStr); | |
| 274 | -// System.out.println("cnonce: " + cnonce); | |
| 275 | -// System.out.println("qop: " + qop); | |
| 276 | -// String KD = HA1 + ":" + nonce; | |
| 277 | - | |
| 278 | -// if (qop != null && qop.equals("auth") ) { | |
| 279 | -// if (nc != -1) { | |
| 280 | -// KD += ":" + ncStr; | |
| 281 | -// } | |
| 282 | -// if (cnonce != null) { | |
| 283 | -// KD += ":" + cnonce; | |
| 284 | -// } | |
| 285 | -// KD += ":" + qop; | |
| 286 | -// } | |
| 287 | -// KD += ":" + HA2; | |
| 288 | -// System.out.println("KD: " + KD); | |
| 289 | -// mdbytes = messageDigest.digest(KD.getBytes()); | |
| 290 | -// String mdString = toHexString(mdbytes); | |
| 291 | -// System.out.println("mdString: " + mdString); | |
| 292 | -// String response = "4f0507d4b87cdecff04bdaf4c96348f0"; | |
| 293 | -// System.out.println("response: " + response); | |
| 294 | -// } | |
| 295 | 241 | } | ... | ... |
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
| 1 | 1 | package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; |
| 2 | 2 | |
| 3 | 3 | import com.alibaba.fastjson2.JSON; |
| 4 | -import com.alibaba.fastjson2.JSONObject; | |
| 5 | 4 | import com.genersoft.iot.vmp.gb28181.SipLayer; |
| 6 | 5 | import com.genersoft.iot.vmp.gb28181.bean.*; |
| 7 | 6 | import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; |
| ... | ... | @@ -9,13 +8,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; |
| 9 | 8 | import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; |
| 10 | 9 | import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider; |
| 11 | 10 | import com.genersoft.iot.vmp.gb28181.utils.SipUtils; |
| 12 | -import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; | |
| 13 | -import com.genersoft.iot.vmp.utils.DateUtil; | |
| 14 | 11 | import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; |
| 15 | 12 | import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; |
| 16 | 13 | import com.genersoft.iot.vmp.service.IMediaServerService; |
| 17 | 14 | import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; |
| 18 | 15 | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; |
| 16 | +import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; | |
| 17 | +import com.genersoft.iot.vmp.utils.DateUtil; | |
| 19 | 18 | import gov.nist.javax.sip.message.MessageFactoryImpl; |
| 20 | 19 | import gov.nist.javax.sip.message.SIPRequest; |
| 21 | 20 | import org.slf4j.Logger; |
| ... | ... | @@ -26,8 +25,10 @@ import org.springframework.lang.Nullable; |
| 26 | 25 | import org.springframework.stereotype.Component; |
| 27 | 26 | import org.springframework.util.ObjectUtils; |
| 28 | 27 | |
| 29 | -import javax.sip.*; | |
| 30 | -import javax.sip.header.*; | |
| 28 | +import javax.sip.InvalidArgumentException; | |
| 29 | +import javax.sip.SipException; | |
| 30 | +import javax.sip.header.CallIdHeader; | |
| 31 | +import javax.sip.header.WWWAuthenticateHeader; | |
| 31 | 32 | import javax.sip.message.Request; |
| 32 | 33 | import java.text.ParseException; |
| 33 | 34 | import java.util.ArrayList; | ... | ... |
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
| ... | ... | @@ -11,7 +11,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; |
| 11 | 11 | import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; |
| 12 | 12 | import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; |
| 13 | 13 | import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; |
| 14 | -import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; | |
| 15 | 14 | import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; |
| 16 | 15 | import com.genersoft.iot.vmp.gb28181.utils.SipUtils; |
| 17 | 16 | import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; |
| ... | ... | @@ -31,7 +30,6 @@ import org.springframework.beans.factory.annotation.Qualifier; |
| 31 | 30 | import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; |
| 32 | 31 | import org.springframework.stereotype.Component; |
| 33 | 32 | import org.springframework.util.ObjectUtils; |
| 34 | -import org.springframework.util.StringUtils; | |
| 35 | 33 | |
| 36 | 34 | import javax.sip.InvalidArgumentException; |
| 37 | 35 | import javax.sip.RequestEvent; |
| ... | ... | @@ -77,8 +75,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements |
| 77 | 75 | @Autowired |
| 78 | 76 | private IDeviceChannelService deviceChannelService; |
| 79 | 77 | |
| 80 | - private boolean taskQueueHandlerRun = false; | |
| 81 | - | |
| 82 | 78 | private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); |
| 83 | 79 | |
| 84 | 80 | @Qualifier("taskExecutor") |
| ... | ... | @@ -98,9 +94,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements |
| 98 | 94 | }catch (SipException | InvalidArgumentException | ParseException e) { |
| 99 | 95 | e.printStackTrace(); |
| 100 | 96 | } |
| 97 | + boolean runed = !taskQueue.isEmpty(); | |
| 101 | 98 | taskQueue.offer(new HandlerCatchData(evt, null, null)); |
| 102 | - if (!taskQueueHandlerRun) { | |
| 103 | - taskQueueHandlerRun = true; | |
| 99 | + if (!runed) { | |
| 104 | 100 | taskExecutor.execute(()-> { |
| 105 | 101 | while (!taskQueue.isEmpty()) { |
| 106 | 102 | try { |
| ... | ... | @@ -128,7 +124,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements |
| 128 | 124 | logger.error("处理NOTIFY消息时错误", e); |
| 129 | 125 | } |
| 130 | 126 | } |
| 131 | - taskQueueHandlerRun = false; | |
| 132 | 127 | }); |
| 133 | 128 | } |
| 134 | 129 | } | ... | ... |
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
| ... | ... | @@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher; |
| 9 | 9 | import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; |
| 10 | 10 | import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; |
| 11 | 11 | import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; |
| 12 | -import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; | |
| 13 | 12 | import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; |
| 14 | 13 | import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; |
| 15 | 14 | import com.genersoft.iot.vmp.service.IDeviceAlarmService; |
| ... | ... | @@ -27,17 +26,15 @@ import org.springframework.beans.factory.annotation.Qualifier; |
| 27 | 26 | import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; |
| 28 | 27 | import org.springframework.stereotype.Component; |
| 29 | 28 | import org.springframework.util.ObjectUtils; |
| 30 | -import org.springframework.util.StringUtils; | |
| 31 | 29 | |
| 32 | 30 | import javax.sip.InvalidArgumentException; |
| 33 | 31 | import javax.sip.RequestEvent; |
| 34 | 32 | import javax.sip.SipException; |
| 35 | 33 | import javax.sip.message.Response; |
| 36 | - | |
| 37 | 34 | import java.text.ParseException; |
| 38 | 35 | import java.util.concurrent.ConcurrentLinkedQueue; |
| 39 | 36 | |
| 40 | -import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.*; | |
| 37 | +import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; | |
| 41 | 38 | |
| 42 | 39 | /** |
| 43 | 40 | * 报警事件的处理,参考:9.4 |
| ... | ... | @@ -72,8 +69,6 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme |
| 72 | 69 | @Autowired |
| 73 | 70 | private IDeviceChannelService deviceChannelService; |
| 74 | 71 | |
| 75 | - private boolean taskQueueHandlerRun = false; | |
| 76 | - | |
| 77 | 72 | private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>(); |
| 78 | 73 | |
| 79 | 74 | @Qualifier("taskExecutor") |
| ... | ... | @@ -89,128 +84,128 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme |
| 89 | 84 | @Override |
| 90 | 85 | public void handForDevice(RequestEvent evt, Device device, Element rootElement) { |
| 91 | 86 | logger.info("[收到报警通知]设备:{}", device.getDeviceId()); |
| 92 | - | |
| 87 | + boolean isEmpty = taskQueue.isEmpty(); | |
| 93 | 88 | taskQueue.offer(new SipMsgInfo(evt, device, rootElement)); |
| 94 | - if (!taskQueueHandlerRun) { | |
| 95 | - taskQueueHandlerRun = true; | |
| 89 | + // 回复200 OK | |
| 90 | + try { | |
| 91 | + responseAck((SIPRequest) evt.getRequest(), Response.OK); | |
| 92 | + } catch (SipException | InvalidArgumentException | ParseException e) { | |
| 93 | + logger.error("[命令发送失败] 报警通知回复: {}", e.getMessage()); | |
| 94 | + } | |
| 95 | + if (isEmpty) { | |
| 96 | 96 | taskExecutor.execute(() -> { |
| 97 | 97 | logger.info("[处理报警通知]待处理数量:{}", taskQueue.size() ); |
| 98 | 98 | while (!taskQueue.isEmpty()) { |
| 99 | - SipMsgInfo sipMsgInfo = taskQueue.poll(); | |
| 100 | - // 回复200 OK | |
| 101 | 99 | try { |
| 102 | - responseAck((SIPRequest) sipMsgInfo.getEvt().getRequest(), Response.OK); | |
| 103 | - } catch (SipException | InvalidArgumentException | ParseException e) { | |
| 104 | - logger.error("[处理报警通知], 回复200OK失败", e); | |
| 105 | - } | |
| 106 | - | |
| 107 | - Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID"); | |
| 108 | - String channelId = deviceIdElement.getText().toString(); | |
| 109 | - | |
| 110 | - DeviceAlarm deviceAlarm = new DeviceAlarm(); | |
| 111 | - deviceAlarm.setCreateTime(DateUtil.getNow()); | |
| 112 | - deviceAlarm.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); | |
| 113 | - deviceAlarm.setChannelId(channelId); | |
| 114 | - deviceAlarm.setAlarmPriority(getText(sipMsgInfo.getRootElement(), "AlarmPriority")); | |
| 115 | - deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod")); | |
| 116 | - String alarmTime = XmlUtil.getText(sipMsgInfo.getRootElement(), "AlarmTime"); | |
| 117 | - if (alarmTime == null) { | |
| 118 | - continue; | |
| 119 | - } | |
| 120 | - deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); | |
| 121 | - String alarmDescription = getText(sipMsgInfo.getRootElement(), "AlarmDescription"); | |
| 122 | - if (alarmDescription == null) { | |
| 123 | - deviceAlarm.setAlarmDescription(""); | |
| 124 | - } else { | |
| 125 | - deviceAlarm.setAlarmDescription(alarmDescription); | |
| 126 | - } | |
| 127 | - String longitude = getText(sipMsgInfo.getRootElement(), "Longitude"); | |
| 128 | - if (longitude != null && NumericUtil.isDouble(longitude)) { | |
| 129 | - deviceAlarm.setLongitude(Double.parseDouble(longitude)); | |
| 130 | - } else { | |
| 131 | - deviceAlarm.setLongitude(0.00); | |
| 132 | - } | |
| 133 | - String latitude = getText(sipMsgInfo.getRootElement(), "Latitude"); | |
| 134 | - if (latitude != null && NumericUtil.isDouble(latitude)) { | |
| 135 | - deviceAlarm.setLatitude(Double.parseDouble(latitude)); | |
| 136 | - } else { | |
| 137 | - deviceAlarm.setLatitude(0.00); | |
| 138 | - } | |
| 100 | + SipMsgInfo sipMsgInfo = taskQueue.poll(); | |
| 101 | + | |
| 102 | + Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID"); | |
| 103 | + String channelId = deviceIdElement.getText().toString(); | |
| 104 | + | |
| 105 | + DeviceAlarm deviceAlarm = new DeviceAlarm(); | |
| 106 | + deviceAlarm.setCreateTime(DateUtil.getNow()); | |
| 107 | + deviceAlarm.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); | |
| 108 | + deviceAlarm.setChannelId(channelId); | |
| 109 | + deviceAlarm.setAlarmPriority(getText(sipMsgInfo.getRootElement(), "AlarmPriority")); | |
| 110 | + deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod")); | |
| 111 | + String alarmTime = XmlUtil.getText(sipMsgInfo.getRootElement(), "AlarmTime"); | |
| 112 | + if (alarmTime == null) { | |
| 113 | + continue; | |
| 114 | + } | |
| 115 | + deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); | |
| 116 | + String alarmDescription = getText(sipMsgInfo.getRootElement(), "AlarmDescription"); | |
| 117 | + if (alarmDescription == null) { | |
| 118 | + deviceAlarm.setAlarmDescription(""); | |
| 119 | + } else { | |
| 120 | + deviceAlarm.setAlarmDescription(alarmDescription); | |
| 121 | + } | |
| 122 | + String longitude = getText(sipMsgInfo.getRootElement(), "Longitude"); | |
| 123 | + if (longitude != null && NumericUtil.isDouble(longitude)) { | |
| 124 | + deviceAlarm.setLongitude(Double.parseDouble(longitude)); | |
| 125 | + } else { | |
| 126 | + deviceAlarm.setLongitude(0.00); | |
| 127 | + } | |
| 128 | + String latitude = getText(sipMsgInfo.getRootElement(), "Latitude"); | |
| 129 | + if (latitude != null && NumericUtil.isDouble(latitude)) { | |
| 130 | + deviceAlarm.setLatitude(Double.parseDouble(latitude)); | |
| 131 | + } else { | |
| 132 | + deviceAlarm.setLatitude(0.00); | |
| 133 | + } | |
| 139 | 134 | |
| 140 | - if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) { | |
| 141 | - if ( deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) { | |
| 142 | - MobilePosition mobilePosition = new MobilePosition(); | |
| 143 | - mobilePosition.setCreateTime(DateUtil.getNow()); | |
| 144 | - mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); | |
| 145 | - mobilePosition.setTime(deviceAlarm.getAlarmTime()); | |
| 146 | - mobilePosition.setLongitude(deviceAlarm.getLongitude()); | |
| 147 | - mobilePosition.setLatitude(deviceAlarm.getLatitude()); | |
| 148 | - mobilePosition.setReportSource("GPS Alarm"); | |
| 149 | - | |
| 150 | - // 更新device channel 的经纬度 | |
| 151 | - DeviceChannel deviceChannel = new DeviceChannel(); | |
| 152 | - deviceChannel.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); | |
| 153 | - deviceChannel.setChannelId(channelId); | |
| 154 | - deviceChannel.setLongitude(mobilePosition.getLongitude()); | |
| 155 | - deviceChannel.setLatitude(mobilePosition.getLatitude()); | |
| 156 | - deviceChannel.setGpsTime(mobilePosition.getTime()); | |
| 157 | - | |
| 158 | - deviceChannel = deviceChannelService.updateGps(deviceChannel, sipMsgInfo.getDevice()); | |
| 159 | - | |
| 160 | - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); | |
| 161 | - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); | |
| 162 | - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); | |
| 163 | - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); | |
| 164 | - | |
| 165 | - if (userSetting.getSavePositionHistory()) { | |
| 166 | - storager.insertMobilePosition(mobilePosition); | |
| 135 | + if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) { | |
| 136 | + if ( deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) { | |
| 137 | + MobilePosition mobilePosition = new MobilePosition(); | |
| 138 | + mobilePosition.setCreateTime(DateUtil.getNow()); | |
| 139 | + mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); | |
| 140 | + mobilePosition.setTime(deviceAlarm.getAlarmTime()); | |
| 141 | + mobilePosition.setLongitude(deviceAlarm.getLongitude()); | |
| 142 | + mobilePosition.setLatitude(deviceAlarm.getLatitude()); | |
| 143 | + mobilePosition.setReportSource("GPS Alarm"); | |
| 144 | + | |
| 145 | + // 更新device channel 的经纬度 | |
| 146 | + DeviceChannel deviceChannel = new DeviceChannel(); | |
| 147 | + deviceChannel.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); | |
| 148 | + deviceChannel.setChannelId(channelId); | |
| 149 | + deviceChannel.setLongitude(mobilePosition.getLongitude()); | |
| 150 | + deviceChannel.setLatitude(mobilePosition.getLatitude()); | |
| 151 | + deviceChannel.setGpsTime(mobilePosition.getTime()); | |
| 152 | + | |
| 153 | + deviceChannel = deviceChannelService.updateGps(deviceChannel, sipMsgInfo.getDevice()); | |
| 154 | + | |
| 155 | + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); | |
| 156 | + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); | |
| 157 | + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); | |
| 158 | + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); | |
| 159 | + | |
| 160 | + if (userSetting.getSavePositionHistory()) { | |
| 161 | + storager.insertMobilePosition(mobilePosition); | |
| 162 | + } | |
| 163 | + storager.updateChannelPosition(deviceChannel); | |
| 164 | + | |
| 165 | + // 发送redis消息。 通知位置信息的变化 | |
| 166 | + JSONObject jsonObject = new JSONObject(); | |
| 167 | + jsonObject.put("time", mobilePosition.getTime()); | |
| 168 | + jsonObject.put("serial", deviceChannel.getDeviceId()); | |
| 169 | + jsonObject.put("code", deviceChannel.getChannelId()); | |
| 170 | + jsonObject.put("longitude", mobilePosition.getLongitude()); | |
| 171 | + jsonObject.put("latitude", mobilePosition.getLatitude()); | |
| 172 | + jsonObject.put("altitude", mobilePosition.getAltitude()); | |
| 173 | + jsonObject.put("direction", mobilePosition.getDirection()); | |
| 174 | + jsonObject.put("speed", mobilePosition.getSpeed()); | |
| 175 | + redisCatchStorage.sendMobilePositionMsg(jsonObject); | |
| 167 | 176 | } |
| 168 | - storager.updateChannelPosition(deviceChannel); | |
| 169 | - | |
| 170 | - // 发送redis消息。 通知位置信息的变化 | |
| 171 | - JSONObject jsonObject = new JSONObject(); | |
| 172 | - jsonObject.put("time", mobilePosition.getTime()); | |
| 173 | - jsonObject.put("serial", deviceChannel.getDeviceId()); | |
| 174 | - jsonObject.put("code", deviceChannel.getChannelId()); | |
| 175 | - jsonObject.put("longitude", mobilePosition.getLongitude()); | |
| 176 | - jsonObject.put("latitude", mobilePosition.getLatitude()); | |
| 177 | - jsonObject.put("altitude", mobilePosition.getAltitude()); | |
| 178 | - jsonObject.put("direction", mobilePosition.getDirection()); | |
| 179 | - jsonObject.put("speed", mobilePosition.getSpeed()); | |
| 180 | - redisCatchStorage.sendMobilePositionMsg(jsonObject); | |
| 181 | 177 | } |
| 182 | - } | |
| 183 | - if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) { | |
| 184 | - if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) { | |
| 185 | - deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType")); | |
| 178 | + if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) { | |
| 179 | + if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) { | |
| 180 | + deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType")); | |
| 181 | + } | |
| 182 | + } | |
| 183 | + logger.info("[收到报警通知]内容:{}", JSON.toJSONString(deviceAlarm)); | |
| 184 | + if ("7".equals(deviceAlarm.getAlarmMethod()) ) { | |
| 185 | + // 发送给平台的报警信息。 发送redis通知 | |
| 186 | + AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage(); | |
| 187 | + alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod())); | |
| 188 | + alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription()); | |
| 189 | + alarmChannelMessage.setGbId(channelId); | |
| 190 | + redisCatchStorage.sendAlarmMsg(alarmChannelMessage); | |
| 191 | + continue; | |
| 186 | 192 | } |
| 187 | - } | |
| 188 | - logger.info("[收到报警通知]内容:{}", JSON.toJSONString(deviceAlarm)); | |
| 189 | - if ("7".equals(deviceAlarm.getAlarmMethod()) ) { | |
| 190 | - // 发送给平台的报警信息。 发送redis通知 | |
| 191 | - AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage(); | |
| 192 | - alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod())); | |
| 193 | - alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription()); | |
| 194 | - alarmChannelMessage.setGbId(channelId); | |
| 195 | - redisCatchStorage.sendAlarmMsg(alarmChannelMessage); | |
| 196 | - continue; | |
| 197 | - } | |
| 198 | 193 | |
| 199 | - logger.debug("存储报警信息、报警分类"); | |
| 200 | - // 存储报警信息、报警分类 | |
| 201 | - if (sipConfig.isAlarm()) { | |
| 202 | - deviceAlarmService.add(deviceAlarm); | |
| 203 | - } | |
| 194 | + logger.debug("存储报警信息、报警分类"); | |
| 195 | + // 存储报警信息、报警分类 | |
| 196 | + if (sipConfig.isAlarm()) { | |
| 197 | + deviceAlarmService.add(deviceAlarm); | |
| 198 | + } | |
| 204 | 199 | |
| 205 | - if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) { | |
| 206 | - publisher.deviceAlarmEventPublish(deviceAlarm); | |
| 200 | + if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) { | |
| 201 | + publisher.deviceAlarmEventPublish(deviceAlarm); | |
| 202 | + } | |
| 203 | + }catch (Exception e) { | |
| 204 | + logger.warn("[收到报警通知] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest()); | |
| 207 | 205 | } |
| 208 | 206 | } |
| 209 | - taskQueueHandlerRun = false; | |
| 210 | 207 | }); |
| 211 | 208 | } |
| 212 | - | |
| 213 | - | |
| 214 | 209 | } |
| 215 | 210 | |
| 216 | 211 | @Override | ... | ... |
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java
| ... | ... | @@ -22,7 +22,6 @@ import org.springframework.beans.factory.annotation.Qualifier; |
| 22 | 22 | import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; |
| 23 | 23 | import org.springframework.stereotype.Component; |
| 24 | 24 | import org.springframework.util.ObjectUtils; |
| 25 | -import org.springframework.util.StringUtils; | |
| 26 | 25 | |
| 27 | 26 | import javax.sip.InvalidArgumentException; |
| 28 | 27 | import javax.sip.RequestEvent; |
| ... | ... | @@ -57,8 +56,6 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen |
| 57 | 56 | @Autowired |
| 58 | 57 | private IDeviceChannelService deviceChannelService; |
| 59 | 58 | |
| 60 | - private boolean taskQueueHandlerRun = false; | |
| 61 | - | |
| 62 | 59 | private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>(); |
| 63 | 60 | |
| 64 | 61 | @Qualifier("taskExecutor") |
| ... | ... | @@ -73,21 +70,22 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen |
| 73 | 70 | @Override |
| 74 | 71 | public void handForDevice(RequestEvent evt, Device device, Element rootElement) { |
| 75 | 72 | |
| 73 | + boolean isEmpty = taskQueue.isEmpty(); | |
| 76 | 74 | taskQueue.offer(new SipMsgInfo(evt, device, rootElement)); |
| 77 | - if (!taskQueueHandlerRun) { | |
| 78 | - taskQueueHandlerRun = true; | |
| 75 | + // 回复200 OK | |
| 76 | + try { | |
| 77 | + responseAck((SIPRequest) evt.getRequest(), Response.OK); | |
| 78 | + } catch (SipException | InvalidArgumentException | ParseException e) { | |
| 79 | + logger.error("[命令发送失败] 移动位置通知回复: {}", e.getMessage()); | |
| 80 | + } | |
| 81 | + if (isEmpty) { | |
| 79 | 82 | taskExecutor.execute(() -> { |
| 80 | 83 | while (!taskQueue.isEmpty()) { |
| 81 | 84 | SipMsgInfo sipMsgInfo = taskQueue.poll(); |
| 82 | 85 | try { |
| 83 | 86 | Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset()); |
| 84 | 87 | if (rootElementAfterCharset == null) { |
| 85 | - try { | |
| 86 | - logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest()); | |
| 87 | - responseAck((SIPRequest) sipMsgInfo.getEvt().getRequest(), Response.BAD_REQUEST); | |
| 88 | - } catch (SipException | InvalidArgumentException | ParseException e) { | |
| 89 | - logger.error("[命令发送失败] 移动设备位置数据通知 内容为空: {}", e.getMessage()); | |
| 90 | - } | |
| 88 | + logger.warn("[移动位置通知] {}处理失败,未识别到信息体", device.getDeviceId()); | |
| 91 | 89 | continue; |
| 92 | 90 | } |
| 93 | 91 | MobilePosition mobilePosition = new MobilePosition(); |
| ... | ... | @@ -137,12 +135,6 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen |
| 137 | 135 | storager.insertMobilePosition(mobilePosition); |
| 138 | 136 | } |
| 139 | 137 | storager.updateChannelPosition(deviceChannel); |
| 140 | - //回复 200 OK | |
| 141 | - try { | |
| 142 | - responseAck((SIPRequest) sipMsgInfo.getEvt().getRequest(), Response.OK); | |
| 143 | - } catch (SipException | InvalidArgumentException | ParseException e) { | |
| 144 | - logger.error("[命令发送失败] 移动设备位置数据回复200: {}", e.getMessage()); | |
| 145 | - } | |
| 146 | 138 | |
| 147 | 139 | // 发送redis消息。 通知位置信息的变化 |
| 148 | 140 | JSONObject jsonObject = new JSONObject(); |
| ... | ... | @@ -158,14 +150,12 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen |
| 158 | 150 | |
| 159 | 151 | } catch (DocumentException e) { |
| 160 | 152 | e.printStackTrace(); |
| 153 | + } catch (Exception e) { | |
| 154 | + logger.warn("[移动位置通知] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest()); | |
| 161 | 155 | } |
| 162 | - | |
| 163 | 156 | } |
| 164 | - taskQueueHandlerRun = false; | |
| 165 | 157 | }); |
| 166 | 158 | } |
| 167 | - | |
| 168 | - | |
| 169 | 159 | } |
| 170 | 160 | |
| 171 | 161 | @Override | ... | ... |
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
| 1 | 1 | package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; |
| 2 | 2 | |
| 3 | -import com.genersoft.iot.vmp.conf.SipConfig; | |
| 4 | -import com.genersoft.iot.vmp.conf.UserSetting; | |
| 5 | 3 | import com.genersoft.iot.vmp.gb28181.bean.*; |
| 6 | -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; | |
| 7 | 4 | import com.genersoft.iot.vmp.gb28181.session.CatalogDataCatch; |
| 8 | -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; | |
| 9 | 5 | import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; |
| 10 | 6 | import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; |
| 11 | 7 | import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; |
| 12 | -import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; | |
| 13 | -import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; | |
| 14 | 8 | import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; |
| 15 | -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; | |
| 16 | 9 | import com.genersoft.iot.vmp.storager.IVideoManagerStorage; |
| 17 | 10 | import gov.nist.javax.sip.message.SIPRequest; |
| 18 | 11 | import org.dom4j.DocumentException; |
| ... | ... | @@ -24,7 +17,6 @@ import org.springframework.beans.factory.annotation.Autowired; |
| 24 | 17 | import org.springframework.beans.factory.annotation.Qualifier; |
| 25 | 18 | import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; |
| 26 | 19 | import org.springframework.stereotype.Component; |
| 27 | -import org.springframework.util.StringUtils; | |
| 28 | 20 | |
| 29 | 21 | import javax.sip.InvalidArgumentException; |
| 30 | 22 | import javax.sip.RequestEvent; |
| ... | ... | @@ -45,12 +37,10 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp |
| 45 | 37 | private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class); |
| 46 | 38 | private final String cmdType = "Catalog"; |
| 47 | 39 | |
| 48 | - private boolean taskQueueHandlerRun = false; | |
| 49 | - | |
| 50 | 40 | @Autowired |
| 51 | 41 | private ResponseMessageHandler responseMessageHandler; |
| 52 | 42 | |
| 53 | - private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); | |
| 43 | + private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); | |
| 54 | 44 | |
| 55 | 45 | @Autowired |
| 56 | 46 | private IVideoManagerStorage storager; |
| ... | ... | @@ -69,6 +59,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp |
| 69 | 59 | |
| 70 | 60 | @Override |
| 71 | 61 | public void handForDevice(RequestEvent evt, Device device, Element element) { |
| 62 | + boolean isEmpty = taskQueue.isEmpty(); | |
| 72 | 63 | taskQueue.offer(new HandlerCatchData(evt, device, element)); |
| 73 | 64 | // 回复200 OK |
| 74 | 65 | try { |
| ... | ... | @@ -76,67 +67,71 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp |
| 76 | 67 | } catch (SipException | InvalidArgumentException | ParseException e) { |
| 77 | 68 | logger.error("[命令发送失败] 目录查询回复: {}", e.getMessage()); |
| 78 | 69 | } |
| 79 | - if (!taskQueueHandlerRun) { | |
| 80 | - taskQueueHandlerRun = true; | |
| 70 | + // 如果不为空则说明已经开启消息处理 | |
| 71 | + if (isEmpty) { | |
| 81 | 72 | taskExecutor.execute(() -> { |
| 82 | 73 | while (!taskQueue.isEmpty()) { |
| 83 | - HandlerCatchData take = taskQueue.poll(); | |
| 84 | - Element rootElement = null; | |
| 74 | + // 全局异常捕获,保证下一条可以得到处理 | |
| 85 | 75 | try { |
| 86 | - rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset()); | |
| 87 | - } catch (DocumentException e) { | |
| 88 | - logger.error("[xml解析] 失败: ", e); | |
| 89 | - continue; | |
| 90 | - } | |
| 91 | - if (rootElement == null) { | |
| 92 | - logger.warn("[ 收到通道 ] content cannot be null, {}", evt.getRequest()); | |
| 93 | - continue; | |
| 94 | - } | |
| 95 | - Element deviceListElement = rootElement.element("DeviceList"); | |
| 96 | - Element sumNumElement = rootElement.element("SumNum"); | |
| 97 | - Element snElement = rootElement.element("SN"); | |
| 98 | - int sumNum = Integer.parseInt(sumNumElement.getText()); | |
| 99 | - | |
| 100 | - if (sumNum == 0) { | |
| 101 | - logger.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId()); | |
| 102 | - // 数据已经完整接收 | |
| 103 | - storager.cleanChannelsForDevice(take.getDevice().getDeviceId()); | |
| 104 | - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); | |
| 105 | - } else { | |
| 106 | - Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); | |
| 107 | - if (deviceListIterator != null) { | |
| 108 | - List<DeviceChannel> channelList = new ArrayList<>(); | |
| 109 | - // 遍历DeviceList | |
| 110 | - while (deviceListIterator.hasNext()) { | |
| 111 | - Element itemDevice = deviceListIterator.next(); | |
| 112 | - Element channelDeviceElement = itemDevice.element("DeviceID"); | |
| 113 | - if (channelDeviceElement == null) { | |
| 114 | - continue; | |
| 76 | + HandlerCatchData take = taskQueue.poll(); | |
| 77 | + Element rootElement = null; | |
| 78 | + try { | |
| 79 | + rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset()); | |
| 80 | + } catch (DocumentException e) { | |
| 81 | + logger.error("[xml解析] 失败: ", e); | |
| 82 | + continue; | |
| 83 | + } | |
| 84 | + if (rootElement == null) { | |
| 85 | + logger.warn("[ 收到通道 ] content cannot be null, {}", evt.getRequest()); | |
| 86 | + continue; | |
| 87 | + } | |
| 88 | + Element deviceListElement = rootElement.element("DeviceList"); | |
| 89 | + Element sumNumElement = rootElement.element("SumNum"); | |
| 90 | + Element snElement = rootElement.element("SN"); | |
| 91 | + int sumNum = Integer.parseInt(sumNumElement.getText()); | |
| 92 | + | |
| 93 | + if (sumNum == 0) { | |
| 94 | + logger.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId()); | |
| 95 | + // 数据已经完整接收 | |
| 96 | + storager.cleanChannelsForDevice(take.getDevice().getDeviceId()); | |
| 97 | + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); | |
| 98 | + } else { | |
| 99 | + Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); | |
| 100 | + if (deviceListIterator != null) { | |
| 101 | + List<DeviceChannel> channelList = new ArrayList<>(); | |
| 102 | + // 遍历DeviceList | |
| 103 | + while (deviceListIterator.hasNext()) { | |
| 104 | + Element itemDevice = deviceListIterator.next(); | |
| 105 | + Element channelDeviceElement = itemDevice.element("DeviceID"); | |
| 106 | + if (channelDeviceElement == null) { | |
| 107 | + continue; | |
| 108 | + } | |
| 109 | + DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice, device, null); | |
| 110 | + deviceChannel.setDeviceId(take.getDevice().getDeviceId()); | |
| 111 | + | |
| 112 | + channelList.add(deviceChannel); | |
| 115 | 113 | } |
| 116 | - DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice, device, null); | |
| 117 | - deviceChannel.setDeviceId(take.getDevice().getDeviceId()); | |
| 118 | - | |
| 119 | - channelList.add(deviceChannel); | |
| 120 | - } | |
| 121 | - int sn = Integer.parseInt(snElement.getText()); | |
| 122 | - catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList); | |
| 123 | - logger.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 : catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum); | |
| 124 | - if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) { | |
| 125 | - // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理, | |
| 126 | - // 目前支持设备通道上线通知时和设备上线时向上级通知 | |
| 127 | - boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId())); | |
| 128 | - if (!resetChannelsResult) { | |
| 129 | - String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "条"; | |
| 130 | - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg); | |
| 131 | - } else { | |
| 132 | - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); | |
| 114 | + int sn = Integer.parseInt(snElement.getText()); | |
| 115 | + catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList); | |
| 116 | + logger.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 : catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum); | |
| 117 | + if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) { | |
| 118 | + // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理, | |
| 119 | + // 目前支持设备通道上线通知时和设备上线时向上级通知 | |
| 120 | + boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId())); | |
| 121 | + if (!resetChannelsResult) { | |
| 122 | + String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "条"; | |
| 123 | + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg); | |
| 124 | + } else { | |
| 125 | + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); | |
| 126 | + } | |
| 133 | 127 | } |
| 134 | 128 | } |
| 135 | - } | |
| 136 | 129 | |
| 130 | + } | |
| 131 | + }catch (Exception e) { | |
| 132 | + logger.warn("[收到通道] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest()); | |
| 137 | 133 | } |
| 138 | 134 | } |
| 139 | - taskQueueHandlerRun = false; | |
| 140 | 135 | }); |
| 141 | 136 | } |
| 142 | 137 | ... | ... |
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java
| ... | ... | @@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP |
| 9 | 9 | import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; |
| 10 | 10 | import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; |
| 11 | 11 | import com.genersoft.iot.vmp.utils.DateUtil; |
| 12 | -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; | |
| 13 | 12 | import gov.nist.javax.sip.message.SIPRequest; |
| 14 | 13 | import org.dom4j.DocumentException; |
| 15 | 14 | import org.dom4j.Element; |
| ... | ... | @@ -21,17 +20,17 @@ import org.springframework.beans.factory.annotation.Qualifier; |
| 21 | 20 | import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; |
| 22 | 21 | import org.springframework.stereotype.Component; |
| 23 | 22 | import org.springframework.util.ObjectUtils; |
| 24 | -import org.springframework.util.StringUtils; | |
| 25 | 23 | |
| 26 | 24 | import javax.sip.InvalidArgumentException; |
| 27 | 25 | import javax.sip.RequestEvent; |
| 28 | 26 | import javax.sip.SipException; |
| 29 | 27 | import javax.sip.message.Response; |
| 30 | 28 | import java.text.ParseException; |
| 31 | -import java.util.*; | |
| 32 | -import java.util.concurrent.BlockingQueue; | |
| 29 | +import java.util.ArrayList; | |
| 30 | +import java.util.Collections; | |
| 31 | +import java.util.Iterator; | |
| 32 | +import java.util.List; | |
| 33 | 33 | import java.util.concurrent.ConcurrentLinkedQueue; |
| 34 | -import java.util.concurrent.LinkedBlockingQueue; | |
| 35 | 34 | |
| 36 | 35 | import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; |
| 37 | 36 | |
| ... | ... | @@ -46,7 +45,6 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent |
| 46 | 45 | |
| 47 | 46 | private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); |
| 48 | 47 | |
| 49 | - private boolean taskQueueHandlerRun = false; | |
| 50 | 48 | @Autowired |
| 51 | 49 | private ResponseMessageHandler responseMessageHandler; |
| 52 | 50 | |
| ... | ... | @@ -70,6 +68,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent |
| 70 | 68 | |
| 71 | 69 | @Override |
| 72 | 70 | public void handForDevice(RequestEvent evt, Device device, Element rootElement) { |
| 71 | + boolean isEmpty = taskQueue.isEmpty(); | |
| 73 | 72 | try { |
| 74 | 73 | // 回复200 OK |
| 75 | 74 | responseAck((SIPRequest) evt.getRequest(), Response.OK); |
| ... | ... | @@ -77,8 +76,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent |
| 77 | 76 | logger.error("[命令发送失败] 国标级联 国标录像: {}", e.getMessage()); |
| 78 | 77 | } |
| 79 | 78 | taskQueue.offer(new HandlerCatchData(evt, device, rootElement)); |
| 80 | - if (!taskQueueHandlerRun) { | |
| 81 | - taskQueueHandlerRun = true; | |
| 79 | + if (isEmpty) { | |
| 82 | 80 | taskExecutor.execute(()->{ |
| 83 | 81 | while (!taskQueue.isEmpty()) { |
| 84 | 82 | try { |
| ... | ... | @@ -151,9 +149,10 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent |
| 151 | 149 | } |
| 152 | 150 | } catch (DocumentException e) { |
| 153 | 151 | logger.error("xml解析异常: ", e); |
| 152 | + } catch (Exception e) { | |
| 153 | + logger.warn("[国标录像] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest()); | |
| 154 | 154 | } |
| 155 | 155 | } |
| 156 | - taskQueueHandlerRun = false; | |
| 157 | 156 | }); |
| 158 | 157 | } |
| 159 | 158 | } | ... | ... |
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java
| ... | ... | @@ -38,8 +38,6 @@ public class RedisAlarmMsgListener implements MessageListener { |
| 38 | 38 | @Autowired |
| 39 | 39 | private IVideoManagerStorage storage; |
| 40 | 40 | |
| 41 | - private boolean taskQueueHandlerRun = false; | |
| 42 | - | |
| 43 | 41 | private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); |
| 44 | 42 | |
| 45 | 43 | @Qualifier("taskExecutor") |
| ... | ... | @@ -49,69 +47,68 @@ public class RedisAlarmMsgListener implements MessageListener { |
| 49 | 47 | @Override |
| 50 | 48 | public void onMessage(@NotNull Message message, byte[] bytes) { |
| 51 | 49 | logger.info("收到来自REDIS的ALARM通知: {}", new String(message.getBody())); |
| 52 | - | |
| 50 | + boolean isEmpty = taskQueue.isEmpty(); | |
| 53 | 51 | taskQueue.offer(message); |
| 54 | - if (!taskQueueHandlerRun) { | |
| 55 | - taskQueueHandlerRun = true; | |
| 52 | + if (isEmpty) { | |
| 56 | 53 | logger.info("[线程池信息]活动线程数:{}, 最大线程数: {}", taskExecutor.getActiveCount(), taskExecutor.getMaxPoolSize()); |
| 57 | 54 | taskExecutor.execute(() -> { |
| 58 | 55 | while (!taskQueue.isEmpty()) { |
| 59 | 56 | Message msg = taskQueue.poll(); |
| 60 | - | |
| 61 | - AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class); | |
| 62 | - if (alarmChannelMessage == null) { | |
| 63 | - logger.warn("[REDIS的ALARM通知]消息解析失败"); | |
| 64 | - continue; | |
| 65 | - } | |
| 66 | - String gbId = alarmChannelMessage.getGbId(); | |
| 67 | - | |
| 68 | - DeviceAlarm deviceAlarm = new DeviceAlarm(); | |
| 69 | - deviceAlarm.setCreateTime(DateUtil.getNow()); | |
| 70 | - deviceAlarm.setChannelId(gbId); | |
| 71 | - deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription()); | |
| 72 | - deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn()); | |
| 73 | - deviceAlarm.setAlarmPriority("1"); | |
| 74 | - deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601()); | |
| 75 | - deviceAlarm.setAlarmType("1"); | |
| 76 | - deviceAlarm.setLongitude(0); | |
| 77 | - deviceAlarm.setLatitude(0); | |
| 78 | - | |
| 79 | - if (ObjectUtils.isEmpty(gbId)) { | |
| 80 | - // 发送给所有的上级 | |
| 81 | - List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true); | |
| 82 | - if (parentPlatforms.size() > 0) { | |
| 83 | - for (ParentPlatform parentPlatform : parentPlatforms) { | |
| 84 | - try { | |
| 85 | - commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm); | |
| 86 | - } catch (SipException | InvalidArgumentException | ParseException e) { | |
| 87 | - logger.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage()); | |
| 88 | - } | |
| 89 | - } | |
| 57 | + try { | |
| 58 | + AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class); | |
| 59 | + if (alarmChannelMessage == null) { | |
| 60 | + logger.warn("[REDIS的ALARM通知]消息解析失败"); | |
| 61 | + continue; | |
| 90 | 62 | } |
| 91 | - }else { | |
| 92 | - Device device = storage.queryVideoDevice(gbId); | |
| 93 | - ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId); | |
| 94 | - if (device != null && platform == null) { | |
| 95 | - try { | |
| 96 | - commander.sendAlarmMessage(device, deviceAlarm); | |
| 97 | - } catch (InvalidArgumentException | SipException | ParseException e) { | |
| 98 | - logger.error("[命令发送失败] 发送报警: {}", e.getMessage()); | |
| 99 | - } | |
| 100 | - }else if (device == null && platform != null){ | |
| 101 | - try { | |
| 102 | - commanderForPlatform.sendAlarmMessage(platform, deviceAlarm); | |
| 103 | - } catch (InvalidArgumentException | SipException | ParseException e) { | |
| 104 | - logger.error("[命令发送失败] 发送报警: {}", e.getMessage()); | |
| 63 | + String gbId = alarmChannelMessage.getGbId(); | |
| 64 | + | |
| 65 | + DeviceAlarm deviceAlarm = new DeviceAlarm(); | |
| 66 | + deviceAlarm.setCreateTime(DateUtil.getNow()); | |
| 67 | + deviceAlarm.setChannelId(gbId); | |
| 68 | + deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription()); | |
| 69 | + deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn()); | |
| 70 | + deviceAlarm.setAlarmPriority("1"); | |
| 71 | + deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601()); | |
| 72 | + deviceAlarm.setAlarmType("1"); | |
| 73 | + deviceAlarm.setLongitude(0); | |
| 74 | + deviceAlarm.setLatitude(0); | |
| 75 | + | |
| 76 | + if (ObjectUtils.isEmpty(gbId)) { | |
| 77 | + // 发送给所有的上级 | |
| 78 | + List<ParentPlatform> parentPlatforms = storage.queryEnableParentPlatformList(true); | |
| 79 | + if (parentPlatforms.size() > 0) { | |
| 80 | + for (ParentPlatform parentPlatform : parentPlatforms) { | |
| 81 | + try { | |
| 82 | + commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm); | |
| 83 | + } catch (SipException | InvalidArgumentException | ParseException e) { | |
| 84 | + logger.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage()); | |
| 85 | + } | |
| 86 | + } | |
| 105 | 87 | } |
| 106 | 88 | }else { |
| 107 | - logger.warn("无法确定" + gbId + "是平台还是设备"); | |
| 89 | + Device device = storage.queryVideoDevice(gbId); | |
| 90 | + ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId); | |
| 91 | + if (device != null && platform == null) { | |
| 92 | + try { | |
| 93 | + commander.sendAlarmMessage(device, deviceAlarm); | |
| 94 | + } catch (InvalidArgumentException | SipException | ParseException e) { | |
| 95 | + logger.error("[命令发送失败] 发送报警: {}", e.getMessage()); | |
| 96 | + } | |
| 97 | + }else if (device == null && platform != null){ | |
| 98 | + try { | |
| 99 | + commanderForPlatform.sendAlarmMessage(platform, deviceAlarm); | |
| 100 | + } catch (InvalidArgumentException | SipException | ParseException e) { | |
| 101 | + logger.error("[命令发送失败] 发送报警: {}", e.getMessage()); | |
| 102 | + } | |
| 103 | + }else { | |
| 104 | + logger.warn("无法确定" + gbId + "是平台还是设备"); | |
| 105 | + } | |
| 108 | 106 | } |
| 107 | + }catch (Exception e) { | |
| 108 | + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage()); | |
| 109 | 109 | } |
| 110 | 110 | } |
| 111 | - taskQueueHandlerRun = false; | |
| 112 | 111 | }); |
| 113 | 112 | } |
| 114 | - | |
| 115 | - | |
| 116 | 113 | } |
| 117 | 114 | } | ... | ... |
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
| ... | ... | @@ -88,8 +88,6 @@ public class RedisGbPlayMsgListener implements MessageListener { |
| 88 | 88 | @Autowired |
| 89 | 89 | private ZlmHttpHookSubscribe subscribe; |
| 90 | 90 | |
| 91 | - private boolean taskQueueHandlerRun = false; | |
| 92 | - | |
| 93 | 91 | private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); |
| 94 | 92 | |
| 95 | 93 | @Qualifier("taskExecutor") |
| ... | ... | @@ -111,107 +109,103 @@ public class RedisGbPlayMsgListener implements MessageListener { |
| 111 | 109 | |
| 112 | 110 | @Override |
| 113 | 111 | public void onMessage(Message message, byte[] bytes) { |
| 114 | - | |
| 112 | + boolean isEmpty = taskQueue.isEmpty(); | |
| 115 | 113 | taskQueue.offer(message); |
| 116 | - if (!taskQueueHandlerRun) { | |
| 117 | - taskQueueHandlerRun = true; | |
| 114 | + if (isEmpty) { | |
| 118 | 115 | taskExecutor.execute(() -> { |
| 119 | 116 | while (!taskQueue.isEmpty()) { |
| 120 | 117 | Message msg = taskQueue.poll(); |
| 121 | - JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); | |
| 122 | - WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class); | |
| 123 | - if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { | |
| 124 | - continue; | |
| 125 | - } | |
| 126 | - if (WvpRedisMsg.isRequest(wvpRedisMsg)) { | |
| 127 | - logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody())); | |
| 128 | - | |
| 129 | - switch (wvpRedisMsg.getCmd()){ | |
| 130 | - case WvpRedisMsgCmd.GET_SEND_ITEM: | |
| 131 | - RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class); | |
| 132 | - requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | |
| 133 | - break; | |
| 134 | - case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | |
| 135 | - RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);; | |
| 136 | - requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | |
| 137 | - break; | |
| 138 | - default: | |
| 139 | - break; | |
| 118 | + try { | |
| 119 | + JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); | |
| 120 | + WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class); | |
| 121 | + if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { | |
| 122 | + continue; | |
| 140 | 123 | } |
| 141 | - | |
| 142 | - }else { | |
| 143 | - logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody())); | |
| 144 | - switch (wvpRedisMsg.getCmd()){ | |
| 145 | - case WvpRedisMsgCmd.GET_SEND_ITEM: | |
| 146 | - | |
| 147 | - WVPResult content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); | |
| 148 | - | |
| 149 | - String key = wvpRedisMsg.getSerial(); | |
| 150 | - switch (content.getCode()) { | |
| 151 | - case 0: | |
| 152 | - ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class); | |
| 153 | - PlayMsgCallback playMsgCallback = callbacks.get(key); | |
| 154 | - if (playMsgCallback != null) { | |
| 155 | - callbacksForError.remove(key); | |
| 156 | - try { | |
| 157 | - playMsgCallback.handler(responseSendItemMsg); | |
| 158 | - } catch (ParseException e) { | |
| 159 | - logger.error("[REDIS消息处理异常] ", e); | |
| 124 | + if (WvpRedisMsg.isRequest(wvpRedisMsg)) { | |
| 125 | + logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody())); | |
| 126 | + | |
| 127 | + switch (wvpRedisMsg.getCmd()){ | |
| 128 | + case WvpRedisMsgCmd.GET_SEND_ITEM: | |
| 129 | + RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class); | |
| 130 | + requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | |
| 131 | + break; | |
| 132 | + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | |
| 133 | + RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);; | |
| 134 | + requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | |
| 135 | + break; | |
| 136 | + default: | |
| 137 | + break; | |
| 138 | + } | |
| 139 | + | |
| 140 | + }else { | |
| 141 | + logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody())); | |
| 142 | + switch (wvpRedisMsg.getCmd()){ | |
| 143 | + case WvpRedisMsgCmd.GET_SEND_ITEM: | |
| 144 | + | |
| 145 | + WVPResult content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); | |
| 146 | + | |
| 147 | + String key = wvpRedisMsg.getSerial(); | |
| 148 | + switch (content.getCode()) { | |
| 149 | + case 0: | |
| 150 | + ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class); | |
| 151 | + PlayMsgCallback playMsgCallback = callbacks.get(key); | |
| 152 | + if (playMsgCallback != null) { | |
| 153 | + callbacksForError.remove(key); | |
| 154 | + try { | |
| 155 | + playMsgCallback.handler(responseSendItemMsg); | |
| 156 | + } catch (ParseException e) { | |
| 157 | + logger.error("[REDIS消息处理异常] ", e); | |
| 158 | + } | |
| 159 | + } | |
| 160 | + break; | |
| 161 | + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | |
| 162 | + case ERROR_CODE_OFFLINE: | |
| 163 | + case ERROR_CODE_TIMEOUT: | |
| 164 | + PlayMsgErrorCallback errorCallback = callbacksForError.get(key); | |
| 165 | + if (errorCallback != null) { | |
| 166 | + callbacks.remove(key); | |
| 167 | + errorCallback.handler(content); | |
| 168 | + } | |
| 169 | + break; | |
| 170 | + default: | |
| 171 | + break; | |
| 172 | + } | |
| 173 | + break; | |
| 174 | + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | |
| 175 | + WVPResult wvpResult = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); | |
| 176 | + String serial = wvpRedisMsg.getSerial(); | |
| 177 | + switch (wvpResult.getCode()) { | |
| 178 | + case 0: | |
| 179 | + JSONObject jsonObject = (JSONObject)wvpResult.getData(); | |
| 180 | + PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); | |
| 181 | + if (playMsgCallback != null) { | |
| 182 | + callbacksForError.remove(serial); | |
| 183 | + playMsgCallback.handler(jsonObject); | |
| 184 | + } | |
| 185 | + break; | |
| 186 | + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | |
| 187 | + case ERROR_CODE_OFFLINE: | |
| 188 | + case ERROR_CODE_TIMEOUT: | |
| 189 | + PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); | |
| 190 | + if (errorCallback != null) { | |
| 191 | + callbacks.remove(serial); | |
| 192 | + errorCallback.handler(wvpResult); | |
| 160 | 193 | } |
| 161 | - } | |
| 162 | - break; | |
| 163 | - case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | |
| 164 | - case ERROR_CODE_OFFLINE: | |
| 165 | - case ERROR_CODE_TIMEOUT: | |
| 166 | - PlayMsgErrorCallback errorCallback = callbacksForError.get(key); | |
| 167 | - if (errorCallback != null) { | |
| 168 | - callbacks.remove(key); | |
| 169 | - errorCallback.handler(content); | |
| 170 | - } | |
| 171 | - break; | |
| 172 | - default: | |
| 173 | - break; | |
| 174 | - } | |
| 175 | - break; | |
| 176 | - case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | |
| 177 | - WVPResult wvpResult = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); | |
| 178 | - String serial = wvpRedisMsg.getSerial(); | |
| 179 | - switch (wvpResult.getCode()) { | |
| 180 | - case 0: | |
| 181 | - JSONObject jsonObject = (JSONObject)wvpResult.getData(); | |
| 182 | - PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); | |
| 183 | - if (playMsgCallback != null) { | |
| 184 | - callbacksForError.remove(serial); | |
| 185 | - playMsgCallback.handler(jsonObject); | |
| 186 | - } | |
| 187 | - break; | |
| 188 | - case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | |
| 189 | - case ERROR_CODE_OFFLINE: | |
| 190 | - case ERROR_CODE_TIMEOUT: | |
| 191 | - PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); | |
| 192 | - if (errorCallback != null) { | |
| 193 | - callbacks.remove(serial); | |
| 194 | - errorCallback.handler(wvpResult); | |
| 195 | - } | |
| 196 | - break; | |
| 197 | - default: | |
| 198 | - break; | |
| 199 | - } | |
| 200 | - break; | |
| 201 | - default: | |
| 202 | - break; | |
| 194 | + break; | |
| 195 | + default: | |
| 196 | + break; | |
| 197 | + } | |
| 198 | + break; | |
| 199 | + default: | |
| 200 | + break; | |
| 201 | + } | |
| 203 | 202 | } |
| 203 | + }catch (Exception e) { | |
| 204 | + logger.warn("[RedisGbPlayMsg] 发现未处理的异常, {}",e.getMessage()); | |
| 204 | 205 | } |
| 205 | 206 | } |
| 206 | - taskQueueHandlerRun = false; | |
| 207 | 207 | }); |
| 208 | 208 | } |
| 209 | - | |
| 210 | - | |
| 211 | - | |
| 212 | - | |
| 213 | - | |
| 214 | - | |
| 215 | 209 | } |
| 216 | 210 | |
| 217 | 211 | /** | ... | ... |
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java
| ... | ... | @@ -27,8 +27,6 @@ public class RedisGpsMsgListener implements MessageListener { |
| 27 | 27 | |
| 28 | 28 | private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class); |
| 29 | 29 | |
| 30 | - private boolean taskQueueHandlerRun = false; | |
| 31 | - | |
| 32 | 30 | @Autowired |
| 33 | 31 | private IRedisCatchStorage redisCatchStorage; |
| 34 | 32 | |
| ... | ... | @@ -44,17 +42,20 @@ public class RedisGpsMsgListener implements MessageListener { |
| 44 | 42 | |
| 45 | 43 | @Override |
| 46 | 44 | public void onMessage(@NotNull Message message, byte[] bytes) { |
| 45 | + boolean isEmpty = taskQueue.isEmpty(); | |
| 47 | 46 | taskQueue.offer(message); |
| 48 | - if (!taskQueueHandlerRun) { | |
| 49 | - taskQueueHandlerRun = true; | |
| 47 | + if (isEmpty) { | |
| 50 | 48 | taskExecutor.execute(() -> { |
| 51 | 49 | while (!taskQueue.isEmpty()) { |
| 52 | 50 | Message msg = taskQueue.poll(); |
| 53 | - GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class); | |
| 54 | - // 只是放入redis缓存起来 | |
| 55 | - redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); | |
| 51 | + try { | |
| 52 | + GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class); | |
| 53 | + // 只是放入redis缓存起来 | |
| 54 | + redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); | |
| 55 | + }catch (Exception e) { | |
| 56 | + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage()); | |
| 57 | + } | |
| 56 | 58 | } |
| 57 | - taskQueueHandlerRun = false; | |
| 58 | 59 | }); |
| 59 | 60 | } |
| 60 | 61 | } | ... | ... |
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
| 1 | 1 | package com.genersoft.iot.vmp.service.redisMsg; |
| 2 | 2 | |
| 3 | 3 | import com.alibaba.fastjson2.JSON; |
| 4 | -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; | |
| 5 | 4 | import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; |
| 6 | 5 | import org.slf4j.Logger; |
| 7 | 6 | import org.slf4j.LoggerFactory; |
| ... | ... | @@ -26,8 +25,6 @@ public class RedisPushStreamResponseListener implements MessageListener { |
| 26 | 25 | |
| 27 | 26 | private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class); |
| 28 | 27 | |
| 29 | - private boolean taskQueueHandlerRun = false; | |
| 30 | - | |
| 31 | 28 | private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); |
| 32 | 29 | |
| 33 | 30 | @Qualifier("taskExecutor") |
| ... | ... | @@ -43,24 +40,27 @@ public class RedisPushStreamResponseListener implements MessageListener { |
| 43 | 40 | |
| 44 | 41 | @Override |
| 45 | 42 | public void onMessage(Message message, byte[] bytes) { |
| 46 | - logger.warn("[REDIS消息-请求推流结果]: {}", new String(message.getBody())); | |
| 43 | + logger.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody())); | |
| 44 | + boolean isEmpty = taskQueue.isEmpty(); | |
| 47 | 45 | taskQueue.offer(message); |
| 48 | - if (!taskQueueHandlerRun) { | |
| 49 | - taskQueueHandlerRun = true; | |
| 46 | + if (isEmpty) { | |
| 50 | 47 | taskExecutor.execute(() -> { |
| 51 | 48 | while (!taskQueue.isEmpty()) { |
| 52 | 49 | Message msg = taskQueue.poll(); |
| 53 | - MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); | |
| 54 | - if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ | |
| 55 | - logger.info("[REDIS消息-请求推流结果]:参数不全"); | |
| 56 | - continue; | |
| 57 | - } | |
| 58 | - // 查看正在等待的invite消息 | |
| 59 | - if (responseEvents.get(response.getApp() + response.getStream()) != null) { | |
| 60 | - responseEvents.get(response.getApp() + response.getStream()).run(response); | |
| 50 | + try { | |
| 51 | + MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); | |
| 52 | + if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ | |
| 53 | + logger.info("[REDIS消息-请求推流结果]:参数不全"); | |
| 54 | + continue; | |
| 55 | + } | |
| 56 | + // 查看正在等待的invite消息 | |
| 57 | + if (responseEvents.get(response.getApp() + response.getStream()) != null) { | |
| 58 | + responseEvents.get(response.getApp() + response.getStream()).run(response); | |
| 59 | + } | |
| 60 | + }catch (Exception e) { | |
| 61 | + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage()); | |
| 61 | 62 | } |
| 62 | 63 | } |
| 63 | - taskQueueHandlerRun = false; | |
| 64 | 64 | }); |
| 65 | 65 | } |
| 66 | 66 | } | ... | ... |
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java
| ... | ... | @@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; |
| 6 | 6 | import com.genersoft.iot.vmp.service.IGbStreamService; |
| 7 | 7 | import com.genersoft.iot.vmp.service.IMediaServerService; |
| 8 | 8 | import com.genersoft.iot.vmp.service.IStreamPushService; |
| 9 | -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; | |
| 10 | 9 | import com.genersoft.iot.vmp.utils.DateUtil; |
| 11 | 10 | import org.slf4j.Logger; |
| 12 | 11 | import org.slf4j.LoggerFactory; |
| ... | ... | @@ -18,7 +17,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; |
| 18 | 17 | import org.springframework.stereotype.Component; |
| 19 | 18 | |
| 20 | 19 | import javax.annotation.Resource; |
| 21 | -import java.util.*; | |
| 20 | +import java.util.ArrayList; | |
| 21 | +import java.util.List; | |
| 22 | 22 | import java.util.concurrent.ConcurrentLinkedQueue; |
| 23 | 23 | |
| 24 | 24 | /** |
| ... | ... | @@ -38,7 +38,6 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener { |
| 38 | 38 | @Resource |
| 39 | 39 | private IGbStreamService gbStreamService; |
| 40 | 40 | |
| 41 | - private boolean taskQueueHandlerRun = false; | |
| 42 | 41 | |
| 43 | 42 | private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); |
| 44 | 43 | |
| ... | ... | @@ -49,54 +48,56 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener { |
| 49 | 48 | @Override |
| 50 | 49 | public void onMessage(Message message, byte[] bytes) { |
| 51 | 50 | logger.info("[REDIS消息-推流设备列表更新]: {}", new String(message.getBody())); |
| 52 | - | |
| 51 | + boolean isEmpty = taskQueue.isEmpty(); | |
| 53 | 52 | taskQueue.offer(message); |
| 54 | - if (!taskQueueHandlerRun) { | |
| 55 | - taskQueueHandlerRun = true; | |
| 53 | + if (isEmpty) { | |
| 56 | 54 | taskExecutor.execute(() -> { |
| 57 | 55 | while (!taskQueue.isEmpty()) { |
| 58 | 56 | Message msg = taskQueue.poll(); |
| 59 | - List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class); | |
| 60 | - //查询全部的app+stream 用于判断是添加还是修改 | |
| 61 | - List<String> allAppAndStream = streamPushService.getAllAppAndStream(); | |
| 57 | + try { | |
| 58 | + List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class); | |
| 59 | + //查询全部的app+stream 用于判断是添加还是修改 | |
| 60 | + List<String> allAppAndStream = streamPushService.getAllAppAndStream(); | |
| 62 | 61 | |
| 63 | - /** | |
| 64 | - * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 | |
| 65 | - */ | |
| 66 | - List<StreamPushItem> streamPushItemForSave = new ArrayList<>(); | |
| 67 | - List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>(); | |
| 68 | - for (StreamPushItem streamPushItem : streamPushItems) { | |
| 69 | - String app = streamPushItem.getApp(); | |
| 70 | - String stream = streamPushItem.getStream(); | |
| 71 | - boolean contains = allAppAndStream.contains(app + stream); | |
| 72 | - //不存在就添加 | |
| 73 | - if (!contains) { | |
| 74 | - streamPushItem.setStreamType("push"); | |
| 75 | - streamPushItem.setCreateTime(DateUtil.getNow()); | |
| 76 | - streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); | |
| 77 | - streamPushItem.setOriginType(2); | |
| 78 | - streamPushItem.setOriginTypeStr("rtsp_push"); | |
| 79 | - streamPushItem.setTotalReaderCount("0"); | |
| 80 | - streamPushItemForSave.add(streamPushItem); | |
| 81 | - } else { | |
| 82 | - //存在就只修改 name和gbId | |
| 83 | - streamPushItemForUpdate.add(streamPushItem); | |
| 62 | + /** | |
| 63 | + * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 | |
| 64 | + */ | |
| 65 | + List<StreamPushItem> streamPushItemForSave = new ArrayList<>(); | |
| 66 | + List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>(); | |
| 67 | + for (StreamPushItem streamPushItem : streamPushItems) { | |
| 68 | + String app = streamPushItem.getApp(); | |
| 69 | + String stream = streamPushItem.getStream(); | |
| 70 | + boolean contains = allAppAndStream.contains(app + stream); | |
| 71 | + //不存在就添加 | |
| 72 | + if (!contains) { | |
| 73 | + streamPushItem.setStreamType("push"); | |
| 74 | + streamPushItem.setCreateTime(DateUtil.getNow()); | |
| 75 | + streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); | |
| 76 | + streamPushItem.setOriginType(2); | |
| 77 | + streamPushItem.setOriginTypeStr("rtsp_push"); | |
| 78 | + streamPushItem.setTotalReaderCount("0"); | |
| 79 | + streamPushItemForSave.add(streamPushItem); | |
| 80 | + } else { | |
| 81 | + //存在就只修改 name和gbId | |
| 82 | + streamPushItemForUpdate.add(streamPushItem); | |
| 83 | + } | |
| 84 | 84 | } |
| 85 | - } | |
| 86 | - if (streamPushItemForSave.size() > 0) { | |
| 85 | + if (streamPushItemForSave.size() > 0) { | |
| 87 | 86 | |
| 88 | - logger.info("添加{}条",streamPushItemForSave.size()); | |
| 89 | - logger.info(JSONObject.toJSONString(streamPushItemForSave)); | |
| 90 | - streamPushService.batchAdd(streamPushItemForSave); | |
| 87 | + logger.info("添加{}条",streamPushItemForSave.size()); | |
| 88 | + logger.info(JSONObject.toJSONString(streamPushItemForSave)); | |
| 89 | + streamPushService.batchAdd(streamPushItemForSave); | |
| 91 | 90 | |
| 92 | - } | |
| 93 | - if(streamPushItemForUpdate.size()>0){ | |
| 94 | - logger.info("修改{}条",streamPushItemForUpdate.size()); | |
| 95 | - logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); | |
| 96 | - gbStreamService.updateGbIdOrName(streamPushItemForUpdate); | |
| 91 | + } | |
| 92 | + if(streamPushItemForUpdate.size()>0){ | |
| 93 | + logger.info("修改{}条",streamPushItemForUpdate.size()); | |
| 94 | + logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); | |
| 95 | + gbStreamService.updateGbIdOrName(streamPushItemForUpdate); | |
| 96 | + } | |
| 97 | + }catch (Exception e) { | |
| 98 | + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage()); | |
| 97 | 99 | } |
| 98 | 100 | } |
| 99 | - taskQueueHandlerRun = false; | |
| 100 | 101 | }); |
| 101 | 102 | } |
| 102 | 103 | } | ... | ... |
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java
| ... | ... | @@ -29,8 +29,6 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic |
| 29 | 29 | |
| 30 | 30 | private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusMsgListener.class); |
| 31 | 31 | |
| 32 | - private boolean taskQueueHandlerRun = false; | |
| 33 | - | |
| 34 | 32 | @Autowired |
| 35 | 33 | private IRedisCatchStorage redisCatchStorage; |
| 36 | 34 | |
| ... | ... | @@ -50,37 +48,40 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic |
| 50 | 48 | |
| 51 | 49 | @Override |
| 52 | 50 | public void onMessage(Message message, byte[] bytes) { |
| 51 | + boolean isEmpty = taskQueue.isEmpty(); | |
| 53 | 52 | logger.warn("[REDIS消息-推流设备状态变化]: {}", new String(message.getBody())); |
| 54 | 53 | taskQueue.offer(message); |
| 55 | 54 | |
| 56 | - if (!taskQueueHandlerRun) { | |
| 57 | - taskQueueHandlerRun = true; | |
| 55 | + if (isEmpty) { | |
| 58 | 56 | taskExecutor.execute(() -> { |
| 59 | 57 | while (!taskQueue.isEmpty()) { |
| 60 | 58 | Message msg = taskQueue.poll(); |
| 61 | - PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class); | |
| 62 | - if (statusChangeFromPushStream == null) { | |
| 63 | - logger.warn("[REDIS消息]推流设备状态变化消息解析失败"); | |
| 64 | - continue; | |
| 65 | - } | |
| 66 | - // 取消定时任务 | |
| 67 | - dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED); | |
| 68 | - if (statusChangeFromPushStream.isSetAllOffline()) { | |
| 69 | - // 所有设备离线 | |
| 70 | - streamPushService.allStreamOffline(); | |
| 71 | - } | |
| 72 | - if (statusChangeFromPushStream.getOfflineStreams() != null | |
| 73 | - && statusChangeFromPushStream.getOfflineStreams().size() > 0) { | |
| 74 | - // 更新部分设备离线 | |
| 75 | - streamPushService.offline(statusChangeFromPushStream.getOfflineStreams()); | |
| 76 | - } | |
| 77 | - if (statusChangeFromPushStream.getOnlineStreams() != null && | |
| 78 | - statusChangeFromPushStream.getOnlineStreams().size() > 0) { | |
| 79 | - // 更新部分设备上线 | |
| 80 | - streamPushService.online(statusChangeFromPushStream.getOnlineStreams()); | |
| 59 | + try { | |
| 60 | + PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class); | |
| 61 | + if (statusChangeFromPushStream == null) { | |
| 62 | + logger.warn("[REDIS消息]推流设备状态变化消息解析失败"); | |
| 63 | + continue; | |
| 64 | + } | |
| 65 | + // 取消定时任务 | |
| 66 | + dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED); | |
| 67 | + if (statusChangeFromPushStream.isSetAllOffline()) { | |
| 68 | + // 所有设备离线 | |
| 69 | + streamPushService.allStreamOffline(); | |
| 70 | + } | |
| 71 | + if (statusChangeFromPushStream.getOfflineStreams() != null | |
| 72 | + && statusChangeFromPushStream.getOfflineStreams().size() > 0) { | |
| 73 | + // 更新部分设备离线 | |
| 74 | + streamPushService.offline(statusChangeFromPushStream.getOfflineStreams()); | |
| 75 | + } | |
| 76 | + if (statusChangeFromPushStream.getOnlineStreams() != null && | |
| 77 | + statusChangeFromPushStream.getOnlineStreams().size() > 0) { | |
| 78 | + // 更新部分设备上线 | |
| 79 | + streamPushService.online(statusChangeFromPushStream.getOnlineStreams()); | |
| 80 | + } | |
| 81 | + }catch (Exception e) { | |
| 82 | + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage()); | |
| 81 | 83 | } |
| 82 | 84 | } |
| 83 | - taskQueueHandlerRun = false; | |
| 84 | 85 | }); |
| 85 | 86 | } |
| 86 | 87 | } | ... | ... |
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
| ... | ... | @@ -33,8 +33,6 @@ public class RedisStreamMsgListener implements MessageListener { |
| 33 | 33 | @Autowired |
| 34 | 34 | private ZLMMediaListManager zlmMediaListManager; |
| 35 | 35 | |
| 36 | - private boolean taskQueueHandlerRun = false; | |
| 37 | - | |
| 38 | 36 | private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); |
| 39 | 37 | |
| 40 | 38 | @Qualifier("taskExecutor") |
| ... | ... | @@ -43,48 +41,50 @@ public class RedisStreamMsgListener implements MessageListener { |
| 43 | 41 | |
| 44 | 42 | @Override |
| 45 | 43 | public void onMessage(Message message, byte[] bytes) { |
| 46 | - | |
| 44 | + boolean isEmpty = taskQueue.isEmpty(); | |
| 47 | 45 | taskQueue.offer(message); |
| 48 | - if (!taskQueueHandlerRun) { | |
| 49 | - taskQueueHandlerRun = true; | |
| 46 | + if (isEmpty) { | |
| 50 | 47 | taskExecutor.execute(() -> { |
| 51 | 48 | while (!taskQueue.isEmpty()) { |
| 52 | 49 | Message msg = taskQueue.poll(); |
| 53 | - JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); | |
| 54 | - if (steamMsgJson == null) { | |
| 55 | - logger.warn("[收到redis 流变化]消息解析失败"); | |
| 56 | - continue; | |
| 57 | - } | |
| 58 | - String serverId = steamMsgJson.getString("serverId"); | |
| 50 | + try { | |
| 51 | + JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); | |
| 52 | + if (steamMsgJson == null) { | |
| 53 | + logger.warn("[收到redis 流变化]消息解析失败"); | |
| 54 | + continue; | |
| 55 | + } | |
| 56 | + String serverId = steamMsgJson.getString("serverId"); | |
| 59 | 57 | |
| 60 | - if (userSetting.getServerId().equals(serverId)) { | |
| 61 | - // 自己发送的消息忽略即可 | |
| 62 | - continue; | |
| 63 | - } | |
| 64 | - logger.info("[收到redis 流变化]: {}", new String(message.getBody())); | |
| 65 | - String app = steamMsgJson.getString("app"); | |
| 66 | - String stream = steamMsgJson.getString("stream"); | |
| 67 | - boolean register = steamMsgJson.getBoolean("register"); | |
| 68 | - String mediaServerId = steamMsgJson.getString("mediaServerId"); | |
| 69 | - OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam(); | |
| 70 | - onStreamChangedHookParam.setSeverId(serverId); | |
| 71 | - onStreamChangedHookParam.setApp(app); | |
| 72 | - onStreamChangedHookParam.setStream(stream); | |
| 73 | - onStreamChangedHookParam.setRegist(register); | |
| 74 | - onStreamChangedHookParam.setMediaServerId(mediaServerId); | |
| 75 | - onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000); | |
| 76 | - onStreamChangedHookParam.setAliveSecond(0L); | |
| 77 | - onStreamChangedHookParam.setTotalReaderCount("0"); | |
| 78 | - onStreamChangedHookParam.setOriginType(0); | |
| 79 | - onStreamChangedHookParam.setOriginTypeStr("0"); | |
| 80 | - onStreamChangedHookParam.setOriginTypeStr("unknown"); | |
| 81 | - if (register) { | |
| 82 | - zlmMediaListManager.addPush(onStreamChangedHookParam); | |
| 83 | - }else { | |
| 84 | - zlmMediaListManager.removeMedia(app, stream); | |
| 58 | + if (userSetting.getServerId().equals(serverId)) { | |
| 59 | + // 自己发送的消息忽略即可 | |
| 60 | + continue; | |
| 61 | + } | |
| 62 | + logger.info("[收到redis 流变化]: {}", new String(message.getBody())); | |
| 63 | + String app = steamMsgJson.getString("app"); | |
| 64 | + String stream = steamMsgJson.getString("stream"); | |
| 65 | + boolean register = steamMsgJson.getBoolean("register"); | |
| 66 | + String mediaServerId = steamMsgJson.getString("mediaServerId"); | |
| 67 | + OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam(); | |
| 68 | + onStreamChangedHookParam.setSeverId(serverId); | |
| 69 | + onStreamChangedHookParam.setApp(app); | |
| 70 | + onStreamChangedHookParam.setStream(stream); | |
| 71 | + onStreamChangedHookParam.setRegist(register); | |
| 72 | + onStreamChangedHookParam.setMediaServerId(mediaServerId); | |
| 73 | + onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000); | |
| 74 | + onStreamChangedHookParam.setAliveSecond(0L); | |
| 75 | + onStreamChangedHookParam.setTotalReaderCount("0"); | |
| 76 | + onStreamChangedHookParam.setOriginType(0); | |
| 77 | + onStreamChangedHookParam.setOriginTypeStr("0"); | |
| 78 | + onStreamChangedHookParam.setOriginTypeStr("unknown"); | |
| 79 | + if (register) { | |
| 80 | + zlmMediaListManager.addPush(onStreamChangedHookParam); | |
| 81 | + }else { | |
| 82 | + zlmMediaListManager.removeMedia(app, stream); | |
| 83 | + } | |
| 84 | + }catch (Exception e) { | |
| 85 | + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage()); | |
| 85 | 86 | } |
| 86 | 87 | } |
| 87 | - taskQueueHandlerRun = false; | |
| 88 | 88 | }); |
| 89 | 89 | } |
| 90 | 90 | } | ... | ... |