Commit b957ab61c7d7f54716f81f4cd9474238fd110e1d

Authored by 648540858
1 parent c77c1a95

bug修复

src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java
... ... @@ -39,9 +39,9 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
39 39 dynamicTask.stop(taskKey);
40 40 }
41 41 sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> {
42   -// if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) {
43   -// dialog = eventResult.dialog;
44   -// }
  42 + if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) {
  43 + dialog = eventResult.dialog;
  44 + }
45 45 ResponseEvent event = (ResponseEvent) eventResult.event;
46 46 if (event.getResponse().getRawContent() != null) {
47 47 // 成功
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
... ... @@ -419,12 +419,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
419 419 }
420 420 }
421 421 } else if (gbStream != null) {
422   - if (streamPushItem != null && streamPushItem.isStatus()) {
423   - // 在线状态
  422 + if (streamPushItem != null && streamPushItem.isPushIng()) {
  423 + // 推流状态
424 424 pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
425 425 mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
426 426 } else {
427   - // 不在线 拉起
  427 + // 未推流 拉起
428 428 notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
429 429 mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
430 430 }
... ... @@ -451,7 +451,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
451 451 int port, Boolean tcpActive, boolean mediaTransmissionTCP,
452 452 String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
453 453 // 推流
454   - if (streamPushItem.getServerId().equals(userSetting.getServerId())) {
  454 + if (streamPushItem.isSelf()) {
455 455 Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
456 456 if (streamReady) {
457 457 // 自平台内容
... ... @@ -500,7 +500,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
500 500 String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
501 501 if ("proxy".equals(gbStream.getStreamType())) {
502 502 // TODO 控制启用以使设备上线
503   - logger.info("[ app={}, stream={} ]通道离线,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
  503 + logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
504 504 responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
505 505 } else if ("push".equals(gbStream.getStreamType())) {
506 506 if (!platform.isStartOfflinePush()) {
... ... @@ -508,7 +508,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
508 508 return;
509 509 }
510 510 // 发送redis消息以使设备上线
511   - logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream());
  511 + logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream());
512 512  
513 513 MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
514 514 gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
... ... @@ -518,7 +518,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
518 518 dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
519 519 logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream());
520 520 try {
521   - mediaListManager.removedChannelOnlineEventLister(gbStream.getGbId());
  521 + mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
522 522 responseAck(evt, Response.REQUEST_TIMEOUT); // 超时
523 523 } catch (SipException e) {
524 524 e.printStackTrace();
... ... @@ -533,7 +533,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
533 533 Boolean finalTcpActive = tcpActive;
534 534  
535 535 // 添加在本机上线的通知
536   - mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream, serverId) -> {
  536 + mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> {
537 537 dynamicTask.stop(callIdHeader.getCallId());
538 538 if (serverId.equals(userSetting.getServerId())) {
539 539 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
... ... @@ -621,7 +621,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
621 621 // 离线
622 622 // 查询是否在本机上线了
623 623 StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
624   - if (currentStreamPushItem.isStatus()) {
  624 + if (currentStreamPushItem.isPushIng()) {
625 625 // 在线状态
626 626 pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
627 627 mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
... ... @@ -350,17 +350,17 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
350 350 switch (event) {
351 351 case CatalogEvent.ON:
352 352 // 上线
353   - logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId());
  353 + logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
354 354 storager.deviceChannelOnline(deviceId, channel.getChannelId());
355 355 break;
356 356 case CatalogEvent.OFF :
357 357 // 离线
358   - logger.info("收到来自设备【{}】的通道【{}】离线通知", device.getDeviceId(), channel.getChannelId());
  358 + logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
359 359 storager.deviceChannelOffline(deviceId, channel.getChannelId());
360 360 break;
361 361 case CatalogEvent.VLOST:
362 362 // 视频丢失
363   - logger.info("收到来自设备【{}】的通道【{}】视频丢失通知", device.getDeviceId(), channel.getChannelId());
  363 + logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
364 364 storager.deviceChannelOffline(deviceId, channel.getChannelId());
365 365 break;
366 366 case CatalogEvent.DEFECT:
... ... @@ -368,17 +368,17 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
368 368 break;
369 369 case CatalogEvent.ADD:
370 370 // 增加
371   - logger.info("收到来自设备【{}】的增加通道【{}】通知", device.getDeviceId(), channel.getChannelId());
  371 + logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
372 372 deviceChannelService.updateChannel(deviceId, channel);
373 373 break;
374 374 case CatalogEvent.DEL:
375 375 // 删除
376   - logger.info("收到来自设备【{}】的删除通道【{}】通知", device.getDeviceId(), channel.getChannelId());
  376 + logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
377 377 storager.delChannel(deviceId, channel.getChannelId());
378 378 break;
379 379 case CatalogEvent.UPDATE:
380 380 // 更新
381   - logger.info("收到来自设备【{}】的更新通道【{}】通知", device.getDeviceId(), channel.getChannelId());
  381 + logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
382 382 deviceChannelService.updateChannel(deviceId, channel);
383 383 break;
384 384 default:
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
... ... @@ -69,7 +69,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
69 69  
70 70 @Override
71 71 public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
72   - logger.info("收到来自设备[{}]的报警通知", device.getDeviceId());
  72 + logger.info("[收到报警通知]设备:{}", device.getDeviceId());
73 73 // 回复200 OK
74 74 try {
75 75 responseAck(evt, Response.OK);
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
... ... @@ -111,7 +111,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
111 111 int sumNum = Integer.parseInt(sumNumElement.getText());
112 112  
113 113 if (sumNum == 0) {
114   - logger.info("收到来自设备【{}】的通道: 0个", take.getDevice().getDeviceId());
  114 + logger.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId());
115 115 // 数据已经完整接收
116 116 storager.cleanChannelsForDevice(take.getDevice().getDeviceId());
117 117 catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
... ... @@ -133,7 +133,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
133 133 }
134 134 int sn = Integer.parseInt(snElement.getText());
135 135 catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList);
136   - logger.info("收到来自设备【{}】的通道: {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 :catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum);
  136 + logger.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 :catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum);
137 137 if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) {
138 138 // 数据已经完整接收
139 139 boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId()));
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
... ... @@ -240,6 +240,8 @@ public class ZLMHttpHookListener {
240 240 if (mediaInfo != null) {
241 241 assistRESTfulUtils.addStreamCallInfo(mediaInfo, param.getApp(), param.getStream(), callId, null);
242 242 }
  243 + }else {
  244 + zlmMediaListManager.sendStreamEvent(param.getApp(),param.getStream(), param.getMediaServerId());
243 245 }
244 246  
245 247 ret.put("code", 0);
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
... ... @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
4 4 import com.genersoft.iot.vmp.conf.UserSetting;
5 5 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
6 6 import com.genersoft.iot.vmp.media.zlm.dto.*;
  7 +import com.genersoft.iot.vmp.service.IMediaServerService;
7 8 import com.genersoft.iot.vmp.service.IStreamProxyService;
8 9 import com.genersoft.iot.vmp.service.IStreamPushService;
9 10 import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
... ... @@ -63,7 +64,13 @@ public class ZLMMediaListManager {
63 64 @Autowired
64 65 private UserSetting userSetting;
65 66  
66   - private Map<String, ChannelOnlineEvent> channelOnlineEvents = new ConcurrentHashMap<>();
  67 + @Autowired
  68 + private ZLMRTPServerFactory zlmrtpServerFactory;
  69 +
  70 + @Autowired
  71 + private IMediaServerService mediaServerService;
  72 +
  73 + private Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>();
67 74  
68 75 public StreamPushItem addPush(MediaItem mediaItem) {
69 76 // 查找此直播流是否存在redis预设gbId
... ... @@ -79,9 +86,26 @@ public class ZLMMediaListManager {
79 86 }else {
80 87 streamPushMapper.update(transform);
81 88 }
  89 + if (transform != null) {
  90 + if (getChannelOnlineEventLister(transform.getApp(), transform.getStream()) != null) {
  91 + getChannelOnlineEventLister(transform.getApp(), transform.getStream()).run(transform.getApp(), transform.getStream(), transform.getServerId());
  92 + removedChannelOnlineEventLister(transform.getApp(), transform.getStream());
  93 + }
  94 + }
82 95 return transform;
83 96 }
84 97  
  98 + public void sendStreamEvent(String app, String stream, String mediaServerId) {
  99 + MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  100 + // 查看推流状态
  101 + if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) {
  102 + if (getChannelOnlineEventLister(app, stream) != null) {
  103 + getChannelOnlineEventLister(app, stream).run(app, stream, mediaServerId);
  104 + removedChannelOnlineEventLister(app, stream);
  105 + }
  106 + }
  107 + }
  108 +
85 109 public int removeMedia(String app, String streamId) {
86 110 // 查找是否关联了国标, 关联了不删除, 置为离线
87 111 GbStream gbStream = gbStreamMapper.selectOne(app, streamId);
... ... @@ -89,48 +113,21 @@ public class ZLMMediaListManager {
89 113 if (gbStream == null) {
90 114 result = storager.removeMedia(app, streamId);
91 115 }else {
92   - // TODO 暂不设置为离线
93 116 result =storager.mediaOffline(app, streamId);
94 117 }
95 118 return result;
96 119 }
97 120  
98   - public void addChannelOnlineEventLister(String key, ChannelOnlineEvent callback) {
99   - this.channelOnlineEvents.put(key,callback);
  121 + public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) {
  122 + this.channelOnPublishEvents.put(app + "_" + stream, callback);
100 123 }
101 124  
102   - public void removedChannelOnlineEventLister(String key) {
103   - this.channelOnlineEvents.remove(key);
  125 + public void removedChannelOnlineEventLister(String app, String stream) {
  126 + this.channelOnPublishEvents.remove(app + "_" + stream);
104 127 }
105 128  
  129 + public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) {
  130 + return this.channelOnPublishEvents.get(app + "_" + stream);
  131 + }
106 132  
107   -
108   -// public void clearAllSessions() {
109   -// logger.info("清空所有国标相关的session");
110   -// JSONObject allSessionJSON = zlmresTfulUtils.getAllSession();
111   -// ZLMServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
112   -// HashSet<String> allLocalPorts = new HashSet();
113   -// if (allSessionJSON.getInteger("code") == 0) {
114   -// JSONArray data = allSessionJSON.getJSONArray("data");
115   -// if (data.size() > 0) {
116   -// for (int i = 0; i < data.size(); i++) {
117   -// JSONObject sessionJOSN = data.getJSONObject(i);
118   -// Integer local_port = sessionJOSN.getInteger("local_port");
119   -// if (!local_port.equals(Integer.valueOf(mediaInfo.getHttpPort())) &&
120   -// !local_port.equals(Integer.valueOf(mediaInfo.getHttpSSLport())) &&
121   -// !local_port.equals(Integer.valueOf(mediaInfo.getRtmpPort())) &&
122   -// !local_port.equals(Integer.valueOf(mediaInfo.getRtspPort())) &&
123   -// !local_port.equals(Integer.valueOf(mediaInfo.getRtspSSlport())) &&
124   -// !local_port.equals(Integer.valueOf(mediaInfo.getHookOnFlowReport()))){
125   -// allLocalPorts.add(sessionJOSN.getInteger("local_port") + "");
126   -// }
127   -// }
128   -// }
129   -// }
130   -// if (allLocalPorts.size() > 0) {
131   -// List<String> result = new ArrayList<>(allLocalPorts);
132   -// String localPortSStr = String.join(",", result);
133   -// zlmresTfulUtils.kickSessions(localPortSStr);
134   -// }
135   -// }
136 133 }
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
... ... @@ -273,8 +273,10 @@ public class ZLMRTPServerFactory {
273 273 * 查询待转推的流是否就绪
274 274 */
275 275 public Boolean isStreamReady(MediaServerItem mediaServerItem, String app, String streamId) {
276   - JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtmp", streamId);
277   - return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"));
  276 + JSONObject mediaInfo = zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId);
  277 + return (mediaInfo.getInteger("code") == 0
  278 + && mediaInfo.getJSONArray("data") != null
  279 + && mediaInfo.getJSONArray("data").size() > 0);
278 280 }
279 281  
280 282 /**
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java
... ... @@ -72,9 +72,10 @@ public class RedisStreamMsgListener implements MessageListener {
72 72 mediaItem.setOriginType(0);
73 73 mediaItem.setOriginTypeStr("0");
74 74 mediaItem.setOriginTypeStr("unknown");
75   -
76   - zlmMediaListManager.addPush(mediaItem);
77   -
78   -
  75 + if (register) {
  76 + zlmMediaListManager.addPush(mediaItem);
  77 + }else {
  78 + zlmMediaListManager.removeMedia(app, stream);
  79 + }
79 80 }
80 81 }
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java
... ... @@ -264,9 +264,8 @@ public class StreamPushController {
264 264 })
265 265 @GetMapping(value = "/getPlayUrl")
266 266 @ResponseBody
267   - public WVPResult<StreamInfo> getPlayUrl(HttpServletRequest request, @RequestParam String app,
268   - @RequestParam String stream,
269   - @RequestParam(required = false) String mediaServerId){
  267 + public WVPResult<StreamInfo> getPlayUrl(@RequestParam String app,@RequestParam String stream,
  268 + @RequestParam(required = false) String mediaServerId){
270 269 boolean authority = false;
271 270 // 是否登陆用户, 登陆用户返回完整信息
272 271 LoginUser userInfo = SecurityUtils.getUserInfo();
... ... @@ -275,7 +274,7 @@ public class StreamPushController {
275 274 }
276 275 WVPResult<StreamInfo> result = new WVPResult<>();
277 276 StreamPushItem push = streamPushService.getPush(app, stream);
278   - if (!userSetting.getServerId().equals(push.getServerId()) ) {
  277 + if (push != null && !push.isSelf()) {
279 278 result.setCode(-1);
280 279 result.setMsg("来自其他平台的推流信息");
281 280 return result;
... ... @@ -283,7 +282,7 @@ public class StreamPushController {
283 282 StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority);
284 283 if (streamInfo != null){
285 284 result.setCode(0);
286   - result.setMsg("scccess");
  285 + result.setMsg("success");
287 286 result.setData(streamInfo);
288 287 }else {
289 288 result.setCode(-1);
... ...
web_src/src/components/PushVideoList.vue
... ... @@ -76,7 +76,7 @@
76 76 <el-table-column label="操作" min-width="360" fixed="right">
77 77 <template slot-scope="scope">
78 78 <el-button size="medium" icon="el-icon-video-play"
79   - v-if="(scope.row.status == false && scope.row.gbId == null) || scope.row.status"
  79 + v-if="scope.row.pushIng === true"
80 80 @click="playPush(scope.row)" type="text">播放
81 81 </el-button>
82 82 <el-divider direction="vertical"></el-divider>
... ...
web_src/src/components/dialog/catalogEdit.vue
... ... @@ -70,12 +70,11 @@ export default {
70 70 console.log(catalogType)
71 71 // 216 为虚拟组织 215 为业务分组;目录第一级必须为业务分组, 业务分组下为虚拟组织,虚拟组织下可以有其他虚拟组织
72 72 if (this.level === 1 && catalogType !== "215") {
73   - return callback(new Error('业务分组模式下第一层目录的编号10到13位必须为215'));
  73 + return callback(new Error('业务分组模式下第一层目录的编号11到13位必须为215'));
74 74 }
75 75 if (this.level > 1 && catalogType !== "216") {
76   - return callback(new Error('业务分组模式下第一层以下目录的编号10到13位必须为216'));
  76 + return callback(new Error('业务分组模式下第一层以下目录的编号11到13位必须为216'));
77 77 }
78   -
79 78 }
80 79 callback();
81 80 }
... ...