Commit 6c13317faf11cb72d659bdedad555ac896bb8922

Authored by 648540858
2 parents 14d55498 f4f3e60a

Merge branch 'wvp-28181-2.0' into main-dev

sql/2.6.9更新.sql
@@ -5,4 +5,4 @@ alter table wvp_platform @@ -5,4 +5,4 @@ alter table wvp_platform
5 add auto_push_channel bool default false 5 add auto_push_channel bool default false
6 6
7 alter table wvp_stream_proxy 7 alter table wvp_stream_proxy
8 - add stream_key varying(255) 8 + add stream_key character varying(255)
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java
@@ -89,17 +89,17 @@ public class CatalogSubscribeTask implements ISubscribeTask { @@ -89,17 +89,17 @@ public class CatalogSubscribeTask implements ISubscribeTask {
89 ResponseEvent event = (ResponseEvent) eventResult.event; 89 ResponseEvent event = (ResponseEvent) eventResult.event;
90 if (event.getResponse().getRawContent() != null) { 90 if (event.getResponse().getRawContent() != null) {
91 // 成功 91 // 成功
92 - logger.info("[取消目录订阅订阅]成功: {}", device.getDeviceId()); 92 + logger.info("[取消目录订阅]成功: {}", device.getDeviceId());
93 }else { 93 }else {
94 // 成功 94 // 成功
95 - logger.info("[取消目录订阅订阅]成功: {}", device.getDeviceId()); 95 + logger.info("[取消目录订阅]成功: {}", device.getDeviceId());
96 } 96 }
97 },eventResult -> { 97 },eventResult -> {
98 // 失败 98 // 失败
99 - logger.warn("[取消目录订阅订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); 99 + logger.warn("[取消目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
100 }); 100 });
101 } catch (InvalidArgumentException | SipException | ParseException e) { 101 } catch (InvalidArgumentException | SipException | ParseException e) {
102 - logger.error("[命令发送失败] 取消目录订阅订阅: {}", e.getMessage()); 102 + logger.error("[命令发送失败] 取消目录订阅: {}", e.getMessage());
103 } 103 }
104 } 104 }
105 } 105 }
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
@@ -132,7 +132,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @@ -132,7 +132,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
132 132
133 if (CmdType.CATALOG.equals(cmd)) { 133 if (CmdType.CATALOG.equals(cmd)) {
134 logger.info("接收到Catalog通知"); 134 logger.info("接收到Catalog通知");
135 - processNotifyCatalogList(take.getEvt());  
136 notifyRequestForCatalogProcessor.process(take.getEvt()); 135 notifyRequestForCatalogProcessor.process(take.getEvt());
137 } else if (CmdType.ALARM.equals(cmd)) { 136 } else if (CmdType.ALARM.equals(cmd)) {
138 logger.info("接收到Alarm通知"); 137 logger.info("接收到Alarm通知");
@@ -371,114 +370,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @@ -371,114 +370,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
371 } 370 }
372 } 371 }
373 372
374 - /***  
375 - * 处理catalog设备目录列表Notify  
376 - *  
377 - * @param evt  
378 - */  
379 - private void processNotifyCatalogList(RequestEvent evt) {  
380 - try {  
381 - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);  
382 - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);  
383 -  
384 - Device device = redisCatchStorage.getDevice(deviceId);  
385 - if (device == null || !device.isOnLine()) {  
386 - logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" ));  
387 - return;  
388 - }  
389 - Element rootElement = getRootElement(evt, device.getCharset());  
390 - if (rootElement == null) {  
391 - logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest());  
392 - return;  
393 - }  
394 - Element deviceListElement = rootElement.element("DeviceList");  
395 - if (deviceListElement == null) {  
396 - return;  
397 - }  
398 - Iterator<Element> deviceListIterator = deviceListElement.elementIterator();  
399 - if (deviceListIterator != null) {  
400 -  
401 - // 遍历DeviceList  
402 - while (deviceListIterator.hasNext()) {  
403 - Element itemDevice = deviceListIterator.next();  
404 - Element channelDeviceElement = itemDevice.element("DeviceID");  
405 - if (channelDeviceElement == null) {  
406 - continue;  
407 - }  
408 - Element eventElement = itemDevice.element("Event");  
409 - String event;  
410 - if (eventElement == null) {  
411 - logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" ));  
412 - event = CatalogEvent.ADD;  
413 - }else {  
414 - event = eventElement.getText().toUpperCase();  
415 - }  
416 - DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event, civilCodeFileConf);  
417 - if (channel == null) {  
418 - logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));  
419 - continue;  
420 - }  
421 - if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {  
422 - channel.setParentId(null);  
423 - }  
424 - channel.setDeviceId(device.getDeviceId());  
425 - logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());  
426 - switch (event) {  
427 - case CatalogEvent.ON:  
428 - // 上线  
429 - logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());  
430 - storager.deviceChannelOnline(deviceId, channel.getChannelId());  
431 - break;  
432 - case CatalogEvent.OFF :  
433 - // 离线  
434 - logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());  
435 - if (userSetting.getRefuseChannelStatusChannelFormNotify()) {  
436 - storager.deviceChannelOffline(deviceId, channel.getChannelId());  
437 - }else {  
438 - logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());  
439 - }  
440 - break;  
441 - case CatalogEvent.VLOST:  
442 - // 视频丢失  
443 - logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());  
444 - if (userSetting.getRefuseChannelStatusChannelFormNotify()) {  
445 - storager.deviceChannelOffline(deviceId, channel.getChannelId());  
446 - }else {  
447 - logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());  
448 - }  
449 - break;  
450 - case CatalogEvent.DEFECT:  
451 - // 故障  
452 - break;  
453 - case CatalogEvent.ADD:  
454 - // 增加  
455 - logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());  
456 - deviceChannelService.updateChannel(deviceId, channel);  
457 - break;  
458 - case CatalogEvent.DEL:  
459 - // 删除  
460 - logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());  
461 - storager.delChannel(deviceId, channel.getChannelId());  
462 - break;  
463 - case CatalogEvent.UPDATE:  
464 - // 更新  
465 - logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());  
466 - deviceChannelService.updateChannel(deviceId, channel);  
467 - break;  
468 - default:  
469 - logger.warn("[ NotifyCatalog ] event not found : {}", event );  
470 -  
471 - }  
472 - // 转发变化信息  
473 - eventPublisher.catalogEventPublish(null, channel, event);  
474 -  
475 - }  
476 - }  
477 - } catch (DocumentException e) {  
478 - logger.error("未处理的异常 ", e);  
479 - }  
480 - }  
481 -  
482 public void setCmder(SIPCommander cmder) { 373 public void setCmder(SIPCommander cmder) {
483 } 374 }
484 375
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
@@ -218,6 +218,21 @@ public class ZLMRESTfulUtils { @@ -218,6 +218,21 @@ public class ZLMRESTfulUtils {
218 } 218 }
219 } 219 }
220 220
  221 + public JSONObject isMediaOnline(MediaServerItem mediaServerItem, String app, String stream, String schema){
  222 + Map<String, Object> param = new HashMap<>();
  223 + if (app != null) {
  224 + param.put("app",app);
  225 + }
  226 + if (stream != null) {
  227 + param.put("stream",stream);
  228 + }
  229 + if (schema != null) {
  230 + param.put("schema",schema);
  231 + }
  232 + param.put("vhost","__defaultVhost__");
  233 + return sendPost(mediaServerItem, "isMediaOnline", param, null);
  234 + }
  235 +
221 public JSONObject getMediaList(MediaServerItem mediaServerItem, String app, String stream, String schema, RequestCallback callback){ 236 public JSONObject getMediaList(MediaServerItem mediaServerItem, String app, String stream, String schema, RequestCallback callback){
222 Map<String, Object> param = new HashMap<>(); 237 Map<String, Object> param = new HashMap<>();
223 if (app != null) { 238 if (app != null) {
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
@@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher; @@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
9 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; 9 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
10 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted; 10 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted;
11 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; 11 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  12 +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
12 import com.genersoft.iot.vmp.service.IMediaServerService; 13 import com.genersoft.iot.vmp.service.IMediaServerService;
13 import org.slf4j.Logger; 14 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory; 15 import org.slf4j.LoggerFactory;
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -545,16 +545,18 @@ public class DeviceServiceImpl implements IDeviceService { @@ -545,16 +545,18 @@ public class DeviceServiceImpl implements IDeviceService {
545 545
546 546
547 // 目录订阅相关的信息 547 // 目录订阅相关的信息
548 - if (device.getSubscribeCycleForCatalog() > 0) {  
549 - if (deviceInStore.getSubscribeCycleForCatalog() == 0 || deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {  
550 - deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); 548 + if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
  549 + if (device.getSubscribeCycleForCatalog() > 0) {
  550 + // 若已开启订阅,但订阅周期不同,则先取消
  551 + if (deviceInStore.getSubscribeCycleForCatalog() != 0) {
  552 + removeCatalogSubscribe(deviceInStore);
  553 + }
551 // 开启订阅 554 // 开启订阅
552 - addCatalogSubscribe(deviceInStore);  
553 - }  
554 - }else if (device.getSubscribeCycleForCatalog() == 0) {  
555 - if (deviceInStore.getSubscribeCycleForCatalog() != 0) {  
556 deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); 555 deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
  556 + addCatalogSubscribe(deviceInStore);
  557 + }else if (device.getSubscribeCycleForCatalog() == 0) {
557 // 取消订阅 558 // 取消订阅
  559 + deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
558 removeCatalogSubscribe(deviceInStore); 560 removeCatalogSubscribe(deviceInStore);
559 } 561 }
560 } 562 }
@@ -569,6 +571,8 @@ public class DeviceServiceImpl implements IDeviceService { @@ -569,6 +571,8 @@ public class DeviceServiceImpl implements IDeviceService {
569 } 571 }
570 }else if (device.getSubscribeCycleForMobilePosition() == 0) { 572 }else if (device.getSubscribeCycleForMobilePosition() == 0) {
571 if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) { 573 if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
  574 + deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
  575 + deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
572 // 取消订阅 576 // 取消订阅
573 removeMobilePositionSubscribe(deviceInStore); 577 removeMobilePositionSubscribe(deviceInStore);
574 } 578 }
src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java
@@ -257,7 +257,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { @@ -257,7 +257,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
257 ":" + inviteInfo.getDeviceId() + 257 ":" + inviteInfo.getDeviceId() +
258 ":" + inviteInfo.getChannelId() + 258 ":" + inviteInfo.getChannelId() +
259 ":" + inviteInfo.getStream() + 259 ":" + inviteInfo.getStream() +
260 - ":" + inviteInfo.getSsrcInfo().getSsrc(); 260 + ":" + ssrc;
261 if (inviteInfoInDb.getSsrcInfo() != null) { 261 if (inviteInfoInDb.getSsrcInfo() != null) {
262 inviteInfoInDb.getSsrcInfo().setSsrc(ssrc); 262 inviteInfoInDb.getSsrcInfo().setSsrc(ssrc);
263 } 263 }
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -35,15 +35,19 @@ import org.slf4j.Logger; @@ -35,15 +35,19 @@ import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory; 35 import org.slf4j.LoggerFactory;
36 import org.springframework.beans.factory.annotation.Autowired; 36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.jdbc.datasource.DataSourceTransactionManager; 37 import org.springframework.jdbc.datasource.DataSourceTransactionManager;
  38 +import org.springframework.scheduling.annotation.Scheduled;
38 import org.springframework.stereotype.Service; 39 import org.springframework.stereotype.Service;
39 import org.springframework.transaction.TransactionDefinition; 40 import org.springframework.transaction.TransactionDefinition;
40 import org.springframework.transaction.TransactionStatus; 41 import org.springframework.transaction.TransactionStatus;
  42 +import org.springframework.util.CollectionUtils;
41 import org.springframework.util.ObjectUtils; 43 import org.springframework.util.ObjectUtils;
42 44
43 import java.util.HashMap; 45 import java.util.HashMap;
44 import java.util.List; 46 import java.util.List;
45 import java.util.Map; 47 import java.util.Map;
46 import java.util.UUID; 48 import java.util.UUID;
  49 +import java.util.function.Function;
  50 +import java.util.stream.Collectors;
47 51
48 /** 52 /**
49 * 视频代理业务 53 * 视频代理业务
@@ -554,4 +558,43 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @@ -554,4 +558,43 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
554 558
555 return new ResourceBaseInfo(total, online); 559 return new ResourceBaseInfo(total, online);
556 } 560 }
  561 +
  562 +
  563 + @Scheduled(cron = "* 0/10 * * * ?")
  564 + public void asyncCheckStreamProxyStatus() {
  565 +
  566 + List<MediaServerItem> all = mediaServerService.getAllOnline();
  567 +
  568 + if (CollectionUtils.isEmpty(all)){
  569 + return;
  570 + }
  571 +
  572 + Map<String, MediaServerItem> serverItemMap = all.stream().collect(Collectors.toMap(MediaServerItem::getId, Function.identity(), (m1, m2) -> m1));
  573 +
  574 + List<StreamProxyItem> list = videoManagerStorager.getStreamProxyListForEnable(true);
  575 +
  576 + if (CollectionUtils.isEmpty(list)){
  577 + return;
  578 + }
  579 +
  580 + for (StreamProxyItem streamProxyItem : list) {
  581 +
  582 + MediaServerItem mediaServerItem = serverItemMap.get(streamProxyItem.getMediaServerId());
  583 +
  584 + // TODO 支持其他 schema
  585 + JSONObject mediaInfo = zlmresTfulUtils.isMediaOnline(mediaServerItem, streamProxyItem.getApp(), streamProxyItem.getStream(), "rtsp");
  586 +
  587 + if (mediaInfo == null){
  588 + streamProxyItem.setStatus(false);
  589 + } else {
  590 + if (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")) {
  591 + streamProxyItem.setStatus(true);
  592 + } else {
  593 + streamProxyItem.setStatus(false);
  594 + }
  595 + }
  596 +
  597 + updateStreamProxy(streamProxyItem);
  598 + }
  599 + }
557 } 600 }
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -506,6 +506,9 @@ public class StreamPushServiceImpl implements IStreamPushService { @@ -506,6 +506,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
506 stream.setUpdateTime(DateUtil.getNow()); 506 stream.setUpdateTime(DateUtil.getNow());
507 stream.setCreateTime(DateUtil.getNow()); 507 stream.setCreateTime(DateUtil.getNow());
508 stream.setServerId(userSetting.getServerId()); 508 stream.setServerId(userSetting.getServerId());
  509 + stream.setMediaServerId(mediaConfig.getId());
  510 + stream.setSelf(true);
  511 + stream.setPushIng(true);
509 512
510 // 放在事务内执行 513 // 放在事务内执行
511 boolean result = false; 514 boolean result = false;