Commit 2b3b7dbc7973def2342eecd8caf7514f0a367c1b

Authored by panlinlin
1 parent 760f1f4d

使用设备Id+通道Id作为session的识别标识,解决点播异常时无法释放session的问题

src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java
... ... @@ -16,6 +16,7 @@ public class VideoStreamSessionManager {
16 16  
17 17 private ConcurrentHashMap<String, ClientTransaction> sessionMap = new ConcurrentHashMap<>();
18 18 private ConcurrentHashMap<String, String> ssrcMap = new ConcurrentHashMap<>();
  19 + private ConcurrentHashMap<String, String> streamIdMap = new ConcurrentHashMap<>();
19 20  
20 21 public String createPlaySsrc(){
21 22 return SsrcUtil.getPlaySsrc();
... ... @@ -25,18 +26,23 @@ public class VideoStreamSessionManager {
25 26 return SsrcUtil.getPlayBackSsrc();
26 27 }
27 28  
28   - public void put(String streamId,String ssrc,ClientTransaction transaction){
29   - sessionMap.put(streamId, transaction);
30   - ssrcMap.put(streamId, ssrc);
  29 + public void put(String deviceId, String channelId ,String ssrc, String streamId, ClientTransaction transaction){
  30 + sessionMap.put(deviceId + "_" + channelId, transaction);
  31 + ssrcMap.put(deviceId + "_" + channelId, ssrc);
  32 + streamIdMap.put(deviceId + "_" + channelId, streamId);
31 33 }
32 34  
33   - public ClientTransaction get(String streamId){
34   - return sessionMap.get(streamId);
  35 + public ClientTransaction getTransaction(String deviceId, String channelId){
  36 + return sessionMap.get(deviceId + "_" + channelId);
  37 + }
  38 +
  39 + public String getStreamId(String deviceId, String channelId){
  40 + return streamIdMap.get(deviceId + "_" + channelId);
35 41 }
36 42  
37   - public void remove(String streamId) {
38   - sessionMap.remove(streamId);
39   - SsrcUtil.releaseSsrc(ssrcMap.get(streamId));
40   - ssrcMap.remove(streamId);
  43 + public void remove(String deviceId, String channelId) {
  44 + sessionMap.remove(deviceId + "_" + channelId);
  45 + SsrcUtil.releaseSsrc(ssrcMap.get(deviceId + "_" + channelId));
  46 + ssrcMap.remove(deviceId + "_" + channelId);
41 47 }
42 48 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
... ... @@ -87,7 +87,6 @@ public interface ISIPCommander {
87 87  
88 88 /**
89 89 * 请求预览视频流
90   - *
91 90 * @param device 视频设备
92 91 * @param channelId 预览通道
93 92 */
... ... @@ -108,8 +107,8 @@ public interface ISIPCommander {
108 107 *
109 108 * @param ssrc ssrc
110 109 */
111   - void streamByeCmd(String ssrc, SipSubscribe.Event okEvent);
112   - void streamByeCmd(String ssrc);
  110 + void streamByeCmd(String deviceId, String channelId, SipSubscribe.Event okEvent);
  111 + void streamByeCmd(String deviceId, String channelId);
113 112  
114 113 /**
115 114 * 语音广播
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
... ... @@ -332,17 +332,17 @@ public class SIPCommander implements ISIPCommander {
332 332  
333 333 /**
334 334 * 请求预览视频流
335   - * @param device 视频设备
336   - * @param channelId 预览通道
337   - * @param event hook订阅
338   - * @param errorEvent sip错误订阅
339   - */
  335 + * @param device 视频设备
  336 + * @param channelId 预览通道
  337 + * @param event hook订阅
  338 + * @param errorEvent sip错误订阅
  339 + */
340 340 @Override
341 341 public void playStreamCmd(Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) {
  342 + String streamId = null;
342 343 try {
343 344 if (device == null) return;
344 345 String ssrc = streamSession.createPlaySsrc();
345   - String streamId = null;
346 346 if (rtpEnable) {
347 347 streamId = String.format("gb_play_%s_%s", device.getDeviceId(), channelId);
348 348 }else {
... ... @@ -444,9 +444,12 @@ public class SIPCommander implements ISIPCommander {
444 444  
445 445 Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), null, "FromInvt" + tm, null, ssrc, callIdHeader);
446 446  
447   - ClientTransaction transaction = transmitRequest(device, request, errorEvent);
448   - streamSession.put(streamId,ssrc, transaction);
449   -
  447 + ClientTransaction transaction = transmitRequest(device, request, (e -> {
  448 + streamSession.remove(device.getDeviceId(), channelId);
  449 + errorEvent.response(e);
  450 + }));
  451 + streamSession.put(device.getDeviceId(), channelId ,ssrc,streamId, transaction);
  452 +
450 453 } catch ( SipException | ParseException | InvalidArgumentException e) {
451 454 e.printStackTrace();
452 455 }
... ... @@ -552,7 +555,7 @@ public class SIPCommander implements ISIPCommander {
552 555 Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader);
553 556  
554 557 ClientTransaction transaction = transmitRequest(device, request, errorEvent);
555   - streamSession.put(streamId, ssrc, transaction);
  558 + streamSession.put(device.getDeviceId(), channelId, ssrc, streamId, transaction);
556 559  
557 560 } catch ( SipException | ParseException | InvalidArgumentException e) {
558 561 e.printStackTrace();
... ... @@ -566,17 +569,17 @@ public class SIPCommander implements ISIPCommander {
566 569 *
567 570 */
568 571 @Override
569   - public void streamByeCmd(String ssrc) {
570   - streamByeCmd(ssrc, null);
  572 + public void streamByeCmd(String deviceId, String channelId) {
  573 + streamByeCmd(deviceId, channelId, null);
571 574 }
572 575 @Override
573   - public void streamByeCmd(String streamId, SipSubscribe.Event okEvent) {
  576 + public void streamByeCmd(String deviceId, String channelId, SipSubscribe.Event okEvent) {
574 577  
575 578 try {
576   - ClientTransaction transaction = streamSession.get(streamId);
  579 + ClientTransaction transaction = streamSession.getTransaction(deviceId, channelId);
577 580 // 服务重启后
578 581 if (transaction == null) {
579   - StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
  582 + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
580 583 if (streamInfo != null) {
581 584  
582 585 }
... ... @@ -613,14 +616,9 @@ public class SIPCommander implements ISIPCommander {
613 616 }
614 617  
615 618 dialog.sendRequest(clientTransaction);
616   -
617   - streamSession.remove(streamId);
618   - zlmrtpServerFactory.closeRTPServer(streamId);
619   - } catch (TransactionDoesNotExistException e) {
620   - e.printStackTrace();
621   - } catch (SipException e) {
622   - e.printStackTrace();
623   - } catch (ParseException e) {
  619 + zlmrtpServerFactory.closeRTPServer(streamSession.getStreamId(deviceId, channelId));
  620 + streamSession.remove(deviceId, channelId);
  621 + } catch (SipException | ParseException e) {
624 622 e.printStackTrace();
625 623 }
626 624 }
... ... @@ -641,7 +639,6 @@ public class SIPCommander implements ISIPCommander {
641 639 * 语音广播
642 640 *
643 641 * @param device 视频设备
644   - * @param channelId 预览通道
645 642 */
646 643 @Override
647 644 public boolean audioBroadcastCmd(Device device) {
... ... @@ -1140,7 +1137,7 @@ public class SIPCommander implements ISIPCommander {
1140 1137 * @param device 视频设备
1141 1138 * @param startPriority 报警起始级别(可选)
1142 1139 * @param endPriority 报警终止级别(可选)
1143   - * @param alarmMethods 报警方式条件(可选)
  1140 + * @param alarmMethod 报警方式条件(可选)
1144 1141 * @param alarmType 报警类型
1145 1142 * @param startTime 报警发生起始时间(可选)
1146 1143 * @param endTime 报警发生终止时间(可选)
... ... @@ -1428,5 +1425,6 @@ public class SIPCommander implements ISIPCommander {
1428 1425 String streamId = String.format("gb_play_%s_%s", device.getDeviceId(), channelId);
1429 1426 zlmrtpServerFactory.closeRTPServer(streamId);
1430 1427 }
  1428 + streamSession.remove(device.getDeviceId(), channelId);
1431 1429 }
1432 1430 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java
... ... @@ -58,7 +58,7 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
58 58 redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
59 59 if (zlmrtpServerFactory.totalReaderCount(sendRtpItem.getApp(), streamId) == 0) {
60 60 System.out.println(streamId + "无其它观看者,通知设备停止推流");
61   - cmder.streamByeCmd(streamId);
  61 + cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId);
62 62 }
63 63 }
64 64 } catch (SipException e) {
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java
... ... @@ -922,7 +922,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
922 922 StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, "*");
923 923 if (streamInfo != null) {
924 924 redisCatchStorage.stopPlayback(streamInfo);
925   - cmder.streamByeCmd(streamInfo.getStreamId());
  925 + cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId());
926 926 }
927 927 }
928 928 } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
... ... @@ -306,12 +306,12 @@ public class ZLMHttpHookListener {
306 306 if (redisCatchStorage.isChannelSendingRTP(streamInfo.getChannelId())) {
307 307 ret.put("close", false);
308 308 } else {
309   - cmder.streamByeCmd(streamId);
  309 + cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId());
310 310 redisCatchStorage.stopPlay(streamInfo);
311 311 storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
312 312 }
313 313 }else{
314   - cmder.streamByeCmd(streamId);
  314 + cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId());
315 315 streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
316 316 redisCatchStorage.stopPlayback(streamInfo);
317 317 }
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
... ... @@ -63,7 +63,16 @@ public class PlayServiceImpl implements IPlayService {
63 63 playResult.setResult(result);
64 64 // 录像查询以channelId作为deviceId查询
65 65 resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result);
66   -
  66 + // 超时处理
  67 + result.onTimeout(()->{
  68 + logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  69 + // 释放rtpserver
  70 + cmder.closeRTPServer(playResult.getDevice(), channelId);
  71 + RequestMessage msg = new RequestMessage();
  72 + msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + playResult.getUuid());
  73 + msg.setData("Timeout");
  74 + resultHolder.invokeResult(msg);
  75 + });
67 76 if (streamInfo == null) {
68 77 // 发送点播消息
69 78 cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
... ... @@ -76,6 +85,7 @@ public class PlayServiceImpl implements IPlayService {
76 85 RequestMessage msg = new RequestMessage();
77 86 msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
78 87 Response response = event.getResponse();
  88 + cmder.closeRTPServer(playResult.getDevice(), channelId);
79 89 msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
80 90 resultHolder.invokeResult(msg);
81 91 if (errorEvent != null) {
... ... @@ -107,6 +117,7 @@ public class PlayServiceImpl implements IPlayService {
107 117 logger.info("收到订阅消息: " + response.toJSONString());
108 118 onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
109 119 }, event -> {
  120 + cmder.closeRTPServer(playResult.getDevice(), channelId);
110 121 RequestMessage msg = new RequestMessage();
111 122 msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
112 123 Response response = event.getResponse();
... ...
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
... ... @@ -36,7 +36,7 @@ public interface IRedisCatchStorage {
36 36  
37 37 StreamInfo queryPlaybackByStreamId(String steamId);
38 38  
39   - StreamInfo queryPlayByDevice(String deviceId, String code);
  39 + StreamInfo queryPlayByDevice(String deviceId, String channelId);
40 40  
41 41 /**
42 42 * 更新流媒体信息
... ...
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
... ... @@ -75,11 +75,11 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
75 75 }
76 76  
77 77 @Override
78   - public StreamInfo queryPlayByDevice(String deviceId, String code) {
  78 + public StreamInfo queryPlayByDevice(String deviceId, String channelId) {
79 79 // List<Object> playLeys = redis.keys(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX,
80 80 List<Object> playLeys = redis.scan(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX,
81 81 deviceId,
82   - code));
  82 + channelId));
83 83 if (playLeys == null || playLeys.size() == 0) return null;
84 84 return (StreamInfo)redis.get(playLeys.get(0).toString());
85 85 }
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
... ... @@ -75,27 +75,19 @@ public class PlayController {
75 75  
76 76 PlayResult playResult = playService.play(deviceId, channelId, null, null);
77 77  
78   - // 超时处理
79   - playResult.getResult().onTimeout(()->{
80   - logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
81   - // 释放rtpserver
82   - cmder.closeRTPServer(playResult.getDevice(), channelId);
83   - RequestMessage msg = new RequestMessage();
84   - msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + playResult.getUuid());
85   - msg.setData("Timeout");
86   - resultHolder.invokeResult(msg);
87   - });
  78 +
88 79 return playResult.getResult();
89 80 }
90 81  
91 82 @ApiOperation("停止点播")
92 83 @ApiImplicitParams({
93   - @ApiImplicitParam(name = "streamId", value = "视频流ID", dataTypeClass = String.class),
  84 + @ApiImplicitParam(name = "deviceId", value = "设备ID", dataTypeClass = String.class),
  85 + @ApiImplicitParam(name = "channelId", value = "通道ID", dataTypeClass = String.class),
94 86 })
95   - @GetMapping("/stop/{streamId}")
96   - public DeferredResult<ResponseEntity<String>> playStop(@PathVariable String streamId) {
  87 + @GetMapping("/stop/{deviceId}/{channelId}")
  88 + public DeferredResult<ResponseEntity<String>> playStop(@PathVariable String deviceId, @PathVariable String channelId) {
97 89  
98   - logger.debug(String.format("设备预览/回放停止API调用,streamId:%s", streamId));
  90 + logger.debug(String.format("设备预览/回放停止API调用,streamId:%s/$s", deviceId, channelId ));
99 91  
100 92 UUID uuid = UUID.randomUUID();
101 93 DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>();
... ... @@ -103,8 +95,8 @@ public class PlayController {
103 95 // 录像查询以channelId作为deviceId查询
104 96 resultHolder.put(DeferredResultHolder.CALLBACK_CMD_STOP + uuid, result);
105 97  
106   - cmder.streamByeCmd(streamId, event -> {
107   - StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
  98 + cmder.streamByeCmd(deviceId, channelId, event -> {
  99 + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
108 100 if (streamInfo == null) {
109 101 RequestMessage msg = new RequestMessage();
110 102 msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
... ... @@ -121,9 +113,10 @@ public class PlayController {
121 113 }
122 114 });
123 115  
124   - if (streamId != null) {
  116 + if (deviceId != null || channelId != null) {
125 117 JSONObject json = new JSONObject();
126   - json.put("streamId", streamId);
  118 + json.put("deviceId", deviceId);
  119 + json.put("channelId", channelId);
127 120 RequestMessage msg = new RequestMessage();
128 121 msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
129 122 msg.setData(json.toString());
... ... @@ -138,7 +131,7 @@ public class PlayController {
138 131  
139 132 // 超时处理
140 133 result.onTimeout(()->{
141   - logger.warn(String.format("设备预览/回放停止超时,streamId:%s ", streamId));
  134 + logger.warn(String.format("设备预览/回放停止超时,deviceId/channelId:%s/$s ", deviceId, channelId));
142 135 RequestMessage msg = new RequestMessage();
143 136 msg.setId(DeferredResultHolder.CALLBACK_CMD_STOP + uuid);
144 137 msg.setData("Timeout");
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java
... ... @@ -84,7 +84,7 @@ public class PlaybackController {
84 84 StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId);
85 85 if (streamInfo != null) {
86 86 // 停止之前的回放
87   - cmder.streamByeCmd(streamInfo.getStreamId());
  87 + cmder.streamByeCmd(deviceId, channelId);
88 88 }
89 89 resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result);
90 90 cmder.playbackStreamCmd(device, channelId, startTime, endTime, (JSONObject response) -> {
... ... @@ -103,20 +103,22 @@ public class PlaybackController {
103 103  
104 104 @ApiOperation("停止视频回放")
105 105 @ApiImplicitParams({
106   - @ApiImplicitParam(name = "ssrc", value = "视频流标识", dataTypeClass = String.class),
  106 + @ApiImplicitParam(name = "deviceId", value = "设备ID", dataTypeClass = String.class),
  107 + @ApiImplicitParam(name = "channelId", value = "通道ID", dataTypeClass = String.class),
107 108 })
108   - @GetMapping("/stop/{ssrc}")
109   - public ResponseEntity<String> playStop(@PathVariable String ssrc) {
  109 + @GetMapping("/stop/{deviceId}/{channelId}")
  110 + public ResponseEntity<String> playStop(@PathVariable String deviceId, @PathVariable String channelId) {
110 111  
111   - cmder.streamByeCmd(ssrc);
  112 + cmder.streamByeCmd(deviceId, channelId);
112 113  
113 114 if (logger.isDebugEnabled()) {
114   - logger.debug(String.format("设备录像回放停止 API调用,ssrc:%s", ssrc));
  115 + logger.debug(String.format("设备录像回放停止 API调用,deviceId/channelId:%s/%s", deviceId, channelId));
115 116 }
116 117  
117   - if (ssrc != null) {
  118 + if (deviceId != null && channelId != null) {
118 119 JSONObject json = new JSONObject();
119   - json.put("ssrc", ssrc);
  120 + json.put("deviceId", deviceId);
  121 + json.put("channelId", channelId);
120 122 return new ResponseEntity<String>(json.toString(), HttpStatus.OK);
121 123 } else {
122 124 logger.warn("设备录像回放停止API调用失败!");
... ...
src/main/java/com/genersoft/iot/vmp/web/ApiStreamController.java
... ... @@ -163,7 +163,7 @@ public class ApiStreamController {
163 163 result.put("error","未找到流信息");
164 164 return result;
165 165 }
166   - cmder.streamByeCmd(streamInfo.getStreamId());
  166 + cmder.streamByeCmd(serial, code);
167 167 redisCatchStorage.stopPlay(streamInfo);
168 168 storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
169 169 return null;
... ...
src/main/resources/application.yml
1 1 spring:
2 2 profiles:
3   - active: dev
4 3 \ No newline at end of file
  4 + active: local
5 5 \ No newline at end of file
... ...
web_src/src/components/channelList.vue
... ... @@ -216,12 +216,12 @@ export default {
216 216 var that = this;
217 217 this.$axios({
218 218 method: 'get',
219   - url: '/api/play/stop/' + itemData.streamId
  219 + url: '/api/play/stop/' + this.deviceId + "/" + itemData.channelId
220 220 }).then(function (res) {
221 221 console.log(JSON.stringify(res));
222 222 that.initData();
223 223 }).catch(function (error) {
224   - if (error.response.status == 402) { // 已经停止过
  224 + if (error.response.status === 402) { // 已经停止过
225 225 that.initData();
226 226 }else {
227 227 console.log(error)
... ... @@ -253,7 +253,7 @@ export default {
253 253  
254 254 this.$axios({
255 255 method: 'get',
256   - url:`/api/device/query/sub_channels/${this.deviceId}/${this.parentChannelId}/channels`,
  256 + url:`/api/device/query/sub_channels/${this.deviceId}/${this.parentChannelId}/channels`,
257 257 params: {
258 258 page: that.currentPage,
259 259 count: that.count,
... ...
web_src/src/components/dialog/devicePlayer.vue
... ... @@ -415,7 +415,7 @@ export default {
415 415 this.videoUrl = '';
416 416 this.$axios({
417 417 method: 'get',
418   - url: '/api/playback/stop/' + this.streamId
  418 + url: '/api/playback/stop/' + this.deviceId + "/" + this.channelId
419 419 }).then(function (res) {
420 420 if (callback) callback()
421 421 });
... ...