Commit 8870f5f5a182f4af527dc2b89ad75063019df14f

Authored by 648540858
1 parent eb2a4139

优化使用来源ip作为流ip

src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java
... ... @@ -2,10 +2,10 @@ package com.genersoft.iot.vmp.common;
2 2  
3 3 import io.swagger.v3.oas.annotations.media.Schema;
4 4  
5   -import java.net.URL;
  5 +import java.io.Serializable;
6 6  
7 7 @Schema(description = "流信息")
8   -public class StreamInfo {
  8 +public class StreamInfo implements Serializable {
9 9  
10 10 @Schema(description = "应用名")
11 11 private String app;
... ... @@ -79,6 +79,94 @@ public class StreamInfo {
79 79 @Schema(description = "是否暂停(录像回放使用)")
80 80 private boolean pause;
81 81  
  82 + public void setFlv(StreamURL flv) {
  83 + this.flv = flv;
  84 + }
  85 +
  86 + public void setHttps_flv(StreamURL https_flv) {
  87 + this.https_flv = https_flv;
  88 + }
  89 +
  90 + public void setWs_flv(StreamURL ws_flv) {
  91 + this.ws_flv = ws_flv;
  92 + }
  93 +
  94 + public void setWss_flv(StreamURL wss_flv) {
  95 + this.wss_flv = wss_flv;
  96 + }
  97 +
  98 + public void setFmp4(StreamURL fmp4) {
  99 + this.fmp4 = fmp4;
  100 + }
  101 +
  102 + public void setHttps_fmp4(StreamURL https_fmp4) {
  103 + this.https_fmp4 = https_fmp4;
  104 + }
  105 +
  106 + public void setWs_fmp4(StreamURL ws_fmp4) {
  107 + this.ws_fmp4 = ws_fmp4;
  108 + }
  109 +
  110 + public void setWss_fmp4(StreamURL wss_fmp4) {
  111 + this.wss_fmp4 = wss_fmp4;
  112 + }
  113 +
  114 + public void setHls(StreamURL hls) {
  115 + this.hls = hls;
  116 + }
  117 +
  118 + public void setHttps_hls(StreamURL https_hls) {
  119 + this.https_hls = https_hls;
  120 + }
  121 +
  122 + public void setWs_hls(StreamURL ws_hls) {
  123 + this.ws_hls = ws_hls;
  124 + }
  125 +
  126 + public void setWss_hls(StreamURL wss_hls) {
  127 + this.wss_hls = wss_hls;
  128 + }
  129 +
  130 + public void setTs(StreamURL ts) {
  131 + this.ts = ts;
  132 + }
  133 +
  134 + public void setHttps_ts(StreamURL https_ts) {
  135 + this.https_ts = https_ts;
  136 + }
  137 +
  138 + public void setWs_ts(StreamURL ws_ts) {
  139 + this.ws_ts = ws_ts;
  140 + }
  141 +
  142 + public void setWss_ts(StreamURL wss_ts) {
  143 + this.wss_ts = wss_ts;
  144 + }
  145 +
  146 + public void setRtmp(StreamURL rtmp) {
  147 + this.rtmp = rtmp;
  148 + }
  149 +
  150 + public void setRtmps(StreamURL rtmps) {
  151 + this.rtmps = rtmps;
  152 + }
  153 +
  154 + public void setRtsp(StreamURL rtsp) {
  155 + this.rtsp = rtsp;
  156 + }
  157 +
  158 + public void setRtsps(StreamURL rtsps) {
  159 + this.rtsps = rtsps;
  160 + }
  161 +
  162 + public void setRtc(StreamURL rtc) {
  163 + this.rtc = rtc;
  164 + }
  165 +
  166 + public void setRtcs(StreamURL rtcs) {
  167 + this.rtcs = rtcs;
  168 + }
  169 +
82 170 public void setRtmp(String host, int port, int sslPort, String app, String stream, String callIdParam) {
83 171 String file = String.format("%s/%s/%s", app, stream, callIdParam);
84 172 this.rtmp = new StreamURL("rtmp", host, port, file);
... ...
src/main/java/com/genersoft/iot/vmp/common/StreamURL.java
... ... @@ -2,9 +2,11 @@ package com.genersoft.iot.vmp.common;
2 2  
3 3 import io.swagger.v3.oas.annotations.media.Schema;
4 4  
  5 +import java.io.Serializable;
  6 +
5 7  
6 8 @Schema(description = "流地址信息")
7   -public class StreamURL {
  9 +public class StreamURL implements Serializable {
8 10  
9 11 @Schema(description = "协议")
10 12 private String protocol;
... ...
src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java
... ... @@ -20,8 +20,6 @@ public class SipConfig {
20 20  
21 21 Integer ptzSpeed = 50;
22 22  
23   - Integer keepaliveTimeOut = 255;
24   -
25 23 Integer registerTimeInterval = 120;
26 24  
27 25 private boolean alarm;
... ... @@ -50,9 +48,6 @@ public class SipConfig {
50 48 this.ptzSpeed = ptzSpeed;
51 49 }
52 50  
53   - public void setKeepaliveTimeOut(Integer keepaliveTimeOut) {
54   - this.keepaliveTimeOut = keepaliveTimeOut;
55   - }
56 51  
57 52 public void setRegisterTimeInterval(Integer registerTimeInterval) {
58 53 this.registerTimeInterval = registerTimeInterval;
... ... @@ -86,10 +81,6 @@ public class SipConfig {
86 81 return ptzSpeed;
87 82 }
88 83  
89   - public Integer getKeepaliveTimeOut() {
90   - return keepaliveTimeOut;
91   - }
92   -
93 84 public Integer getRegisterTimeInterval() {
94 85 return registerTimeInterval;
95 86 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
... ... @@ -11,6 +11,7 @@ import org.springframework.beans.factory.annotation.Autowired;
11 11 import org.springframework.boot.CommandLineRunner;
12 12 import org.springframework.core.annotation.Order;
13 13 import org.springframework.stereotype.Component;
  14 +import org.springframework.util.ObjectUtils;
14 15  
15 16 import javax.sip.*;
16 17 import java.util.*;
... ... @@ -107,6 +108,9 @@ public class SipLayer implements CommandLineRunner {
107 108 }
108 109  
109 110 public SipProviderImpl getUdpSipProvider(String ip) {
  111 + if (ObjectUtils.isEmpty(ip)) {
  112 + return null;
  113 + }
110 114 return udpSipProviderMap.get(ip);
111 115 }
112 116  
... ... @@ -125,6 +129,9 @@ public class SipLayer implements CommandLineRunner {
125 129 }
126 130  
127 131 public SipProviderImpl getTcpSipProvider(String ip) {
  132 + if (ObjectUtils.isEmpty(ip)) {
  133 + return null;
  134 + }
128 135 return tcpSipProviderMap.get(ip);
129 136 }
130 137 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java
... ... @@ -5,22 +5,19 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
5 5 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
6 6 import com.genersoft.iot.vmp.utils.GitUtil;
7 7 import gov.nist.javax.sip.SipProviderImpl;
8   -import gov.nist.javax.sip.message.SIPRequest;
9 8 import org.slf4j.Logger;
10 9 import org.slf4j.LoggerFactory;
11 10 import org.springframework.beans.factory.annotation.Autowired;
12   -import org.springframework.beans.factory.annotation.Qualifier;
13 11 import org.springframework.stereotype.Component;
  12 +import org.springframework.util.ObjectUtils;
14 13  
15 14 import javax.sip.SipException;
16   -import javax.sip.SipFactory;
17 15 import javax.sip.header.CallIdHeader;
18 16 import javax.sip.header.UserAgentHeader;
19 17 import javax.sip.header.ViaHeader;
20 18 import javax.sip.message.Message;
21 19 import javax.sip.message.Request;
22 20 import javax.sip.message.Response;
23   -import java.net.InetAddress;
24 21 import java.text.ParseException;
25 22  
26 23 /**
... ... @@ -109,6 +106,10 @@ public class SIPSender {
109 106 }
110 107  
111 108 public CallIdHeader getNewCallIdHeader(String ip, String transport){
  109 + if (ObjectUtils.isEmpty(ip) || ObjectUtils.isEmpty(transport)) {
  110 + return transport.equalsIgnoreCase("TCP") ? sipLayer.getTcpSipProvider().getNewCallId()
  111 + : sipLayer.getUdpSipProvider().getNewCallId();
  112 + }
112 113 return transport.equalsIgnoreCase("TCP") ? sipLayer.getTcpSipProvider(ip).getNewCallId()
113 114 : sipLayer.getUdpSipProvider(ip).getNewCallId();
114 115 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java
1 1 package com.genersoft.iot.vmp.gb28181.transmit.callback;
2 2  
3   -import java.util.HashMap;
  3 +import com.alibaba.fastjson2.JSON;
  4 +import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;
  5 +import org.springframework.stereotype.Component;
  6 +import org.springframework.util.ObjectUtils;
  7 +import org.springframework.web.context.request.async.DeferredResult;
  8 +
  9 +import java.util.Collection;
4 10 import java.util.Map;
5 11 import java.util.Set;
6 12 import java.util.concurrent.ConcurrentHashMap;
7 13  
8   -import org.springframework.http.HttpStatus;
9   -import org.springframework.http.ResponseEntity;
10   -import org.springframework.stereotype.Component;
11   -import org.springframework.web.context.request.async.DeferredResult;
12   -
13 14 /**
14 15 * @description: 异步请求处理
15 16 * @author: swwheihei
... ... @@ -51,31 +52,48 @@ public class DeferredResultHolder {
51 52  
52 53 public static final String CALLBACK_CMD_BROADCAST = "CALLBACK_BROADCAST";
53 54  
54   - private Map<String, Map<String, DeferredResult>> map = new ConcurrentHashMap<>();
  55 + private Map<String, Map<String, DeferredResultEx>> map = new ConcurrentHashMap<>();
55 56  
56 57  
57   - public void put(String key, String id, DeferredResult result) {
58   - Map<String, DeferredResult> deferredResultMap = map.get(key);
  58 + public void put(String key, String id, DeferredResultEx result) {
  59 + Map<String, DeferredResultEx> deferredResultMap = map.get(key);
59 60 if (deferredResultMap == null) {
60 61 deferredResultMap = new ConcurrentHashMap<>();
61 62 map.put(key, deferredResultMap);
62 63 }
63 64 deferredResultMap.put(id, result);
64 65 }
65   -
66   - public DeferredResult get(String key, String id) {
67   - Map<String, DeferredResult> deferredResultMap = map.get(key);
  66 +
  67 + public void put(String key, String id, DeferredResult result) {
  68 + Map<String, DeferredResultEx> deferredResultMap = map.get(key);
68 69 if (deferredResultMap == null) {
  70 + deferredResultMap = new ConcurrentHashMap<>();
  71 + map.put(key, deferredResultMap);
  72 + }
  73 + deferredResultMap.put(id, new DeferredResultEx(result));
  74 + }
  75 +
  76 + public DeferredResultEx get(String key, String id) {
  77 + Map<String, DeferredResultEx> deferredResultMap = map.get(key);
  78 + if (deferredResultMap == null || ObjectUtils.isEmpty(id)) {
69 79 return null;
70 80 }
71 81 return deferredResultMap.get(id);
72 82 }
73 83  
  84 + public Collection<DeferredResultEx> getAllByKey(String key) {
  85 + Map<String, DeferredResultEx> deferredResultMap = map.get(key);
  86 + if (deferredResultMap == null) {
  87 + return null;
  88 + }
  89 + return deferredResultMap.values();
  90 + }
  91 +
74 92 public boolean exist(String key, String id){
75 93 if (key == null) {
76 94 return false;
77 95 }
78   - Map<String, DeferredResult> deferredResultMap = map.get(key);
  96 + Map<String, DeferredResultEx> deferredResultMap = map.get(key);
79 97 if (id == null) {
80 98 return deferredResultMap != null;
81 99 }else {
... ... @@ -88,15 +106,15 @@ public class DeferredResultHolder {
88 106 * @param msg
89 107 */
90 108 public void invokeResult(RequestMessage msg) {
91   - Map<String, DeferredResult> deferredResultMap = map.get(msg.getKey());
  109 + Map<String, DeferredResultEx> deferredResultMap = map.get(msg.getKey());
92 110 if (deferredResultMap == null) {
93 111 return;
94 112 }
95   - DeferredResult result = deferredResultMap.get(msg.getId());
  113 + DeferredResultEx result = deferredResultMap.get(msg.getId());
96 114 if (result == null) {
97 115 return;
98 116 }
99   - result.setResult(msg.getData());
  117 + result.getDeferredResult().setResult(msg.getData());
100 118 deferredResultMap.remove(msg.getId());
101 119 if (deferredResultMap.size() == 0) {
102 120 map.remove(msg.getKey());
... ... @@ -108,18 +126,27 @@ public class DeferredResultHolder {
108 126 * @param msg
109 127 */
110 128 public void invokeAllResult(RequestMessage msg) {
111   - Map<String, DeferredResult> deferredResultMap = map.get(msg.getKey());
  129 + Map<String, DeferredResultEx> deferredResultMap = map.get(msg.getKey());
112 130 if (deferredResultMap == null) {
113 131 return;
114 132 }
115 133 Set<String> ids = deferredResultMap.keySet();
116 134 for (String id : ids) {
117   - DeferredResult result = deferredResultMap.get(id);
  135 + DeferredResultEx result = deferredResultMap.get(id);
118 136 if (result == null) {
119 137 return;
120 138 }
121   - result.setResult(msg.getData());
  139 + if (result.getFilter() != null) {
  140 + Object handler = result.getFilter().handler(msg.getData());
  141 + System.out.println(JSON.toJSONString(handler));
  142 + result.getDeferredResult().setResult(handler);
  143 + }else {
  144 + result.getDeferredResult().setResult(msg.getData());
  145 + }
  146 +
122 147 }
123 148 map.remove(msg.getKey());
124 149 }
  150 +
  151 +
125 152 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
... ... @@ -8,14 +8,13 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
8 8 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
9 9 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
10 10 import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
11   -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
12 11 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
13 12 import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
14 13 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
15 14 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
16   -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
17 15 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
18 16 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
  17 +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
19 18 import com.genersoft.iot.vmp.media.zlm.dto.*;
20 19 import com.genersoft.iot.vmp.service.IMediaServerService;
21 20 import com.genersoft.iot.vmp.service.IPlayService;
... ... @@ -39,9 +38,10 @@ import org.springframework.beans.factory.annotation.Autowired;
39 38 import org.springframework.stereotype.Component;
40 39  
41 40 import javax.sdp.*;
42   -import javax.sip.*;
  41 +import javax.sip.InvalidArgumentException;
  42 +import javax.sip.RequestEvent;
  43 +import javax.sip.SipException;
43 44 import javax.sip.header.CallIdHeader;
44   -import javax.sip.message.Request;
45 45 import javax.sip.message.Response;
46 46 import java.text.ParseException;
47 47 import java.time.Instant;
... ... @@ -479,7 +479,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
479 479 playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> {
480 480 logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId);
481 481 redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
482   - }, null);
  482 + });
483 483 } else {
484 484 sendRtpItem.setStreamId(playTransaction.getStream());
485 485 // 写入redis, 超时时回复
... ...
src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
... ... @@ -13,7 +13,6 @@ import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
13 13 import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
14 14 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
15 15 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
16   -import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
17 16 import org.springframework.web.context.request.async.DeferredResult;
18 17  
19 18 import javax.sip.InvalidArgumentException;
... ... @@ -25,12 +24,12 @@ import java.text.ParseException;
25 24 */
26 25 public interface IPlayService {
27 26  
28   - void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid);
  27 + void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId);
29 28  
30 29 void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
31 30 ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
32   - InviteTimeOutCallback timeoutCallback, String uuid);
33   - PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback);
  31 + InviteTimeOutCallback timeoutCallback);
  32 + void play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback);
34 33  
35 34 MediaServerItem getNewMediaServerItem(Device device);
36 35  
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
1 1 package com.genersoft.iot.vmp.service.impl;
2 2  
3   -import java.math.BigDecimal;
4   -import java.math.RoundingMode;
5   -import java.text.ParseException;
6   -import java.util.*;
7   -
8   -import javax.sip.InvalidArgumentException;
9   -import javax.sip.ResponseEvent;
10   -import javax.sip.SipException;
11   -
12   -import com.genersoft.iot.vmp.conf.exception.ControllerException;
13   -import com.genersoft.iot.vmp.conf.exception.ServiceException;
14   -import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
15   -import com.genersoft.iot.vmp.gb28181.bean.*;
16   -import com.genersoft.iot.vmp.service.IDeviceService;
17   -import com.genersoft.iot.vmp.utils.redis.RedisUtil;
18   -import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
19   -import org.slf4j.Logger;
20   -import org.slf4j.LoggerFactory;
21   -import org.springframework.beans.factory.annotation.Autowired;
22   -import org.springframework.beans.factory.annotation.Qualifier;
23   -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
24   -import org.springframework.stereotype.Service;
25   -import org.springframework.util.ObjectUtils;
26   -import org.springframework.web.context.request.async.DeferredResult;
27   -
28 3 import com.alibaba.fastjson2.JSON;
29 4 import com.alibaba.fastjson2.JSONArray;
30 5 import com.alibaba.fastjson2.JSONObject;
31 6 import com.genersoft.iot.vmp.common.StreamInfo;
32 7 import com.genersoft.iot.vmp.conf.DynamicTask;
33 8 import com.genersoft.iot.vmp.conf.UserSetting;
  9 +import com.genersoft.iot.vmp.conf.exception.ControllerException;
  10 +import com.genersoft.iot.vmp.conf.exception.ServiceException;
  11 +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
  12 +import com.genersoft.iot.vmp.gb28181.bean.*;
34 13 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
35 14 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
36 15 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
37 16 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
38 17 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
39 18 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
40   -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
41   -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
42   -import com.genersoft.iot.vmp.utils.DateUtil;
43 19 import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
44   -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
45 20 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  21 +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
  22 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
  23 +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
46 24 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  25 +import com.genersoft.iot.vmp.service.IDeviceService;
47 26 import com.genersoft.iot.vmp.service.IMediaServerService;
48 27 import com.genersoft.iot.vmp.service.IMediaService;
49 28 import com.genersoft.iot.vmp.service.IPlayService;
... ... @@ -53,8 +32,27 @@ import com.genersoft.iot.vmp.service.bean.PlayBackResult;
53 32 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
54 33 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
55 34 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  35 +import com.genersoft.iot.vmp.utils.DateUtil;
  36 +import com.genersoft.iot.vmp.utils.redis.RedisUtil;
  37 +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
56 38 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
57   -import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
  39 +import org.slf4j.Logger;
  40 +import org.slf4j.LoggerFactory;
  41 +import org.springframework.beans.factory.annotation.Autowired;
  42 +import org.springframework.beans.factory.annotation.Qualifier;
  43 +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  44 +import org.springframework.stereotype.Service;
  45 +import org.springframework.util.ObjectUtils;
  46 +import org.springframework.web.context.request.async.DeferredResult;
  47 +
  48 +import javax.sip.InvalidArgumentException;
  49 +import javax.sip.ResponseEvent;
  50 +import javax.sip.SipException;
  51 +import java.math.BigDecimal;
  52 +import java.math.RoundingMode;
  53 +import java.text.ParseException;
  54 +import java.util.List;
  55 +import java.util.UUID;
58 56  
59 57 @SuppressWarnings(value = {"rawtypes", "unchecked"})
60 58 @Service
... ... @@ -111,46 +109,19 @@ public class PlayServiceImpl implements IPlayService {
111 109 private ThreadPoolTaskExecutor taskExecutor;
112 110  
113 111 @Override
114   - public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
115   - ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
116   - Runnable timeoutCallback) {
  112 + public void play(MediaServerItem mediaServerItem, String deviceId, String channelId,
  113 + ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
  114 + Runnable timeoutCallback) {
117 115 if (mediaServerItem == null) {
118 116 throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
119 117 }
120   - PlayResult playResult = new PlayResult();
  118 +
121 119 RequestMessage msg = new RequestMessage();
122 120 String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
123 121 msg.setKey(key);
124   - String uuid = UUID.randomUUID().toString();
125   - msg.setId(uuid);
126   - playResult.setUuid(uuid);
127   - DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
128   - playResult.setResult(result);
129   - // 录像查询以channelId作为deviceId查询
130   - resultHolder.put(key, uuid, result);
131 122  
132 123 Device device = redisCatchStorage.getDevice(deviceId);
133 124 StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
134   - playResult.setDevice(device);
135   -
136   - result.onCompletion(() -> {
137   - // 点播结束时调用截图接口
138   - taskExecutor.execute(() -> {
139   - // TODO 应该在上流时调用更好,结束也可能是错误结束
140   - String path = "snap";
141   - String fileName = deviceId + "_" + channelId + ".jpg";
142   - WVPResult wvpResult = (WVPResult) result.getResult();
143   - if (Objects.requireNonNull(wvpResult).getCode() == 0) {
144   - StreamInfo streamInfoForSuccess = (StreamInfo) wvpResult.getData();
145   - MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
146   - String streamUrl = streamInfoForSuccess.getFmp4().getUrl();
147   -
148   - // 请求截图
149   - logger.info("[请求截图]: " + fileName);
150   - zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
151   - }
152   - });
153   - });
154 125  
155 126 if (streamInfo != null) {
156 127 String streamId = streamInfo.getStream();
... ... @@ -160,7 +131,7 @@ public class PlayServiceImpl implements IPlayService {
160 131 wvpResult.setMsg("点播失败, redis缓存streamId等于null");
161 132 msg.setData(wvpResult);
162 133 resultHolder.invokeAllResult(msg);
163   - return playResult;
  134 + return;
164 135 }
165 136 String mediaServerId = streamInfo.getMediaServerId();
166 137 MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
... ... @@ -178,14 +149,13 @@ public class PlayServiceImpl implements IPlayService {
178 149 msg.setData(wvpResult);
179 150  
180 151 resultHolder.invokeAllResult(msg);
181   - return playResult;
  152 + return;
182 153 } else {
183 154 WVPResult wvpResult = new WVPResult();
184 155 wvpResult.setCode(ErrorCode.SUCCESS.getCode());
185 156 wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
186 157 wvpResult.setData(streamInfo);
187 158 msg.setData(wvpResult);
188   -
189 159 resultHolder.invokeAllResult(msg);
190 160 if (hookEvent != null) {
191 161 hookEvent.response(mediaServerItem, JSON.parseObject(JSON.toJSONString(streamInfo)));
... ... @@ -211,7 +181,6 @@ public class PlayServiceImpl implements IPlayService {
211 181 streamId = String.format("%s_%s", device.getDeviceId(), channelId);
212 182 }
213 183 SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
214   - logger.info(JSONObject.toJSONString(ssrcInfo));
215 184 play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response) -> {
216 185 if (hookEvent != null) {
217 186 hookEvent.response(mediaServerItem, response);
... ... @@ -238,16 +207,15 @@ public class PlayServiceImpl implements IPlayService {
238 207 msg.setData(wvpResult);
239 208 // 回复之前所有的点播请求
240 209 resultHolder.invokeAllResult(msg);
241   - }, uuid);
  210 + });
242 211 }
243   - return playResult;
244 212 }
245 213  
246 214  
247 215 @Override
248 216 public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
249 217 ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
250   - InviteTimeOutCallback timeoutCallback, String uuid) {
  218 + InviteTimeOutCallback timeoutCallback) {
251 219  
252 220 String streamId = null;
253 221 if (mediaServerItem.isRtpEnable()) {
... ... @@ -281,6 +249,16 @@ public class PlayServiceImpl implements IPlayService {
281 249 //端口获取失败的ssrcInfo 没有必要发送点播指令
282 250 if (ssrcInfo.getPort() <= 0) {
283 251 logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
  252 + dynamicTask.stop(timeOutTaskKey);
  253 + // 释放ssrc
  254 + mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
  255 +
  256 + streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  257 +
  258 + RequestMessage msg = new RequestMessage();
  259 + msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + device.getDeviceId() + channelId);
  260 + msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "点播端口分配异常"));
  261 + resultHolder.invokeAllResult(msg);
284 262 return;
285 263 }
286 264 try {
... ... @@ -289,9 +267,15 @@ public class PlayServiceImpl implements IPlayService {
289 267 System.out.println("停止超时任务: " + timeOutTaskKey);
290 268 dynamicTask.stop(timeOutTaskKey);
291 269 // hook响应
292   - onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid);
  270 + onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId);
293 271 hookEvent.response(mediaServerItemInuse, response);
294 272 logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
  273 + String streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", stream);
  274 + String path = "snap";
  275 + String fileName = device.getDeviceId() + "_" + channelId + ".jpg";
  276 + // 请求截图
  277 + logger.info("[请求截图]: " + fileName);
  278 + zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
295 279  
296 280 }, (event) -> {
297 281 ResponseEvent responseEvent = (ResponseEvent) event.event;
... ... @@ -331,7 +315,7 @@ public class PlayServiceImpl implements IPlayService {
331 315 logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
332 316 dynamicTask.stop(timeOutTaskKey);
333 317 // hook响应
334   - onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
  318 + onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId);
335 319 hookEvent.response(mediaServerItemInUse, response);
336 320 });
337 321 }
... ... @@ -367,13 +351,41 @@ public class PlayServiceImpl implements IPlayService {
367 351 }
368 352  
369 353 @Override
370   - public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
  354 + public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) {
  355 + StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
371 356 RequestMessage msg = new RequestMessage();
372   - if (uuid != null) {
373   - msg.setId(uuid);
  357 + msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
  358 + if (streamInfo != null) {
  359 + DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  360 + if (deviceChannel != null) {
  361 + deviceChannel.setStreamId(streamInfo.getStream());
  362 + storager.startPlay(deviceId, channelId, streamInfo.getStream());
  363 + }
  364 + redisCatchStorage.startPlay(streamInfo);
  365 +
  366 + WVPResult wvpResult = new WVPResult();
  367 + wvpResult.setCode(ErrorCode.SUCCESS.getCode());
  368 + wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
  369 + wvpResult.setData(streamInfo);
  370 +
  371 + msg.setData(wvpResult);
  372 + resultHolder.invokeAllResult(msg);
  373 +
  374 + } else {
  375 + logger.warn("设备预览API调用失败!");
  376 + msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败!"));
  377 + resultHolder.invokeAllResult(msg);
374 378 }
  379 + }
  380 +
  381 + private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
  382 + RequestMessage msg = new RequestMessage();
375 383 msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
  384 + if (!ObjectUtils.isEmpty(uuid)) {
  385 + msg.setId(uuid);
  386 + }
376 387 StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
  388 +
377 389 if (streamInfo != null) {
378 390 DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
379 391 if (deviceChannel != null) {
... ... @@ -390,8 +402,8 @@ public class PlayServiceImpl implements IPlayService {
390 402  
391 403 resultHolder.invokeAllResult(msg);
392 404 } else {
393   - logger.warn("设备预览API调用失败!");
394   - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败!"));
  405 + logger.warn("录像回放调用失败!");
  406 + msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "录像回放调用失败!"));
395 407 resultHolder.invokeAllResult(msg);
396 408 }
397 409 }
... ... @@ -545,7 +557,7 @@ public class PlayServiceImpl implements IPlayService {
545 557 logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
546 558 dynamicTask.stop(playBackTimeOutTaskKey);
547 559 // hook响应
548   - onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
  560 + onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
549 561 hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));
550 562 });
551 563 }
... ... @@ -568,6 +580,8 @@ public class PlayServiceImpl implements IPlayService {
568 580 return result;
569 581 }
570 582  
  583 +
  584 +
571 585 @Override
572 586 public DeferredResult<WVPResult<StreamInfo>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
573 587 Device device = storager.queryVideoDevice(deviceId);
... ...
src/main/java/com/genersoft/iot/vmp/utils/redis/FastJsonRedisSerializer.java
1 1 package com.genersoft.iot.vmp.utils.redis;
2 2  
3   -import java.nio.charset.Charset;
4   -
  3 +import com.alibaba.fastjson2.JSON;
5 4 import com.alibaba.fastjson2.JSONReader;
6 5 import com.alibaba.fastjson2.JSONWriter;
7 6 import org.springframework.data.redis.serializer.RedisSerializer;
8 7 import org.springframework.data.redis.serializer.SerializationException;
9 8  
10   -import com.alibaba.fastjson2.JSON;
11   -import com.alibaba.fastjson2.JSONWriter.Feature;
  9 +import java.nio.charset.Charset;
12 10  
13 11 /**
14 12 * @description:使用fastjson实现redis的序列化
... ... @@ -31,7 +29,7 @@ public class FastJsonRedisSerializer&lt;T&gt; implements RedisSerializer&lt;T&gt; {
31 29 if (t == null) {
32 30 return new byte[0];
33 31 }
34   - return JSON.toJSONString(t, JSONWriter.Feature.WriteClassName).getBytes(DEFAULT_CHARSET);
  32 + return JSON.toJSONString(t, JSONWriter.Feature.WriteClassName, JSONWriter.Feature.WritePairAsJavaBean).getBytes(DEFAULT_CHARSET);
35 33 }
36 34  
37 35 @Override
... ... @@ -42,4 +40,6 @@ public class FastJsonRedisSerializer&lt;T&gt; implements RedisSerializer&lt;T&gt; {
42 40 String str = new String(bytes, DEFAULT_CHARSET);
43 41 return JSON.parseObject(str, clazz, JSONReader.Feature.SupportAutoType);
44 42 }
  43 +
  44 +
45 45 }
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultEx.java 0 → 100644
  1 +package com.genersoft.iot.vmp.vmanager.bean;
  2 +
  3 +import org.springframework.web.context.request.async.DeferredResult;
  4 +
  5 +public class DeferredResultEx<T> {
  6 +
  7 + private DeferredResult<T> deferredResult;
  8 +
  9 + private DeferredResultFilter filter;
  10 +
  11 + public DeferredResultEx(DeferredResult<T> result) {
  12 + this.deferredResult = result;
  13 + }
  14 +
  15 +
  16 + public DeferredResult<T> getDeferredResult() {
  17 + return deferredResult;
  18 + }
  19 +
  20 + public void setDeferredResult(DeferredResult<T> deferredResult) {
  21 + this.deferredResult = deferredResult;
  22 + }
  23 +
  24 + public DeferredResultFilter getFilter() {
  25 + return filter;
  26 + }
  27 +
  28 + public void setFilter(DeferredResultFilter filter) {
  29 + this.filter = filter;
  30 + }
  31 +}
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultFilter.java 0 → 100644
  1 +package com.genersoft.iot.vmp.vmanager.bean;
  2 +
  3 +public interface DeferredResultFilter {
  4 +
  5 + Object handler(Object o);
  6 +}
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
1 1 package com.genersoft.iot.vmp.vmanager.gb28181.play;
2 2  
3 3 import com.alibaba.fastjson2.JSONArray;
  4 +import com.alibaba.fastjson2.JSONObject;
4 5 import com.genersoft.iot.vmp.common.StreamInfo;
5 6 import com.genersoft.iot.vmp.conf.UserSetting;
6 7 import com.genersoft.iot.vmp.conf.exception.ControllerException;
7 8 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
  9 +import com.genersoft.iot.vmp.gb28181.bean.Device;
8 10 import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
9 11 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
10   -import com.genersoft.iot.vmp.gb28181.bean.Device;
11 12 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
12 13 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
  14 +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
13 15 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
14 16 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
15 17 import com.genersoft.iot.vmp.service.IMediaServerService;
  18 +import com.genersoft.iot.vmp.service.IMediaService;
  19 +import com.genersoft.iot.vmp.service.IPlayService;
16 20 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  21 +import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  22 +import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;
17 23 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
18 24 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
19   -import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
20   -import com.genersoft.iot.vmp.service.IMediaService;
21   -import com.genersoft.iot.vmp.service.IPlayService;
22   -
23 25 import io.swagger.v3.oas.annotations.Operation;
24 26 import io.swagger.v3.oas.annotations.Parameter;
25 27 import io.swagger.v3.oas.annotations.tags.Tag;
26 28 import org.slf4j.Logger;
27 29 import org.slf4j.LoggerFactory;
28 30 import org.springframework.beans.factory.annotation.Autowired;
29   -import org.springframework.web.bind.annotation.CrossOrigin;
30   -import org.springframework.web.bind.annotation.GetMapping;
31   -import org.springframework.web.bind.annotation.PathVariable;
32   -import org.springframework.web.bind.annotation.PostMapping;
33   -import org.springframework.web.bind.annotation.RequestMapping;
34   -import org.springframework.web.bind.annotation.RestController;
35   -
36   -import com.alibaba.fastjson2.JSONObject;
37   -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
38   -import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  31 +import org.springframework.web.bind.annotation.*;
39 32 import org.springframework.web.context.request.async.DeferredResult;
40 33  
41   -import javax.servlet.http.HttpServlet;
42 34 import javax.servlet.http.HttpServletRequest;
43 35 import javax.sip.InvalidArgumentException;
44 36 import javax.sip.SipException;
... ... @@ -91,16 +83,52 @@ public class PlayController {
91 83 public DeferredResult<WVPResult<StreamInfo>> play(HttpServletRequest request, @PathVariable String deviceId,
92 84 @PathVariable String channelId) {
93 85  
  86 + String localAddr = request.getLocalAddr();
  87 + String localName = request.getLocalName();
  88 + String remoteHost = request.getRemoteHost();
  89 + String remoteAddr = request.getRemoteAddr();
  90 + String remoteUser = request.getRemoteUser();
  91 + String requestURI = request.getRequestURI();
  92 + System.out.println(3333333);
  93 + System.out.println(localAddr);
  94 + System.out.println(localName);
  95 + System.out.println(remoteHost);
  96 + System.out.println(remoteAddr);
  97 + System.out.println(remoteUser);
  98 + System.out.println(requestURI);
  99 + System.out.println(4444444);
94 100 // 获取可用的zlm
95 101 Device device = storager.queryVideoDevice(deviceId);
96 102 MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
97   - PlayResult playResult = playService.play(newMediaServerItem, deviceId, channelId, null, null, null);
98   - playResult.getResult().onCompletion(()->{
99   - WVPResult<StreamInfo> result = (WVPResult<StreamInfo>)playResult.getResult().getResult();
100   - result.getData().channgeStreamIp(request.getLocalAddr());
101   - playResult.getResult().setResult(result);
  103 +
  104 + RequestMessage msg = new RequestMessage();
  105 + String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
  106 + boolean exist = resultHolder.exist(key, null);
  107 + msg.setKey(key);
  108 + String uuid = UUID.randomUUID().toString();
  109 + msg.setId(uuid);
  110 + DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
  111 + DeferredResultEx<WVPResult<StreamInfo>> deferredResultEx = new DeferredResultEx<>(result);
  112 +
  113 + deferredResultEx.setFilter(result1 -> {
  114 + System.out.println(1111);
  115 + System.out.println(request.getLocalName());
  116 + WVPResult<StreamInfo> wvpResult = (WVPResult<StreamInfo>)result1;
  117 + if (wvpResult.getCode() == ErrorCode.SUCCESS.getCode()) {
  118 + StreamInfo data = wvpResult.getData();
  119 + data.channgeStreamIp(request.getLocalName());
  120 + ((WVPResult<StreamInfo>)result1).setData(data);
  121 + }
  122 + return result1;
102 123 });
103   - return playResult.getResult();
  124 +
  125 + // 录像查询以channelId作为deviceId查询
  126 + resultHolder.put(key, uuid, deferredResultEx);
  127 +
  128 + if (!exist) {
  129 + playService.play(newMediaServerItem, deviceId, channelId, null, null, null);
  130 + }
  131 + return result;
104 132 }
105 133  
106 134  
... ...
src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java
... ... @@ -112,7 +112,7 @@ public class ApiStreamController {
112 112 return resultDeferredResult;
113 113 }
114 114 MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
115   - PlayResult play = playService.play(newMediaServerItem, serial, code, (mediaServerItem, response)->{
  115 + playService.play(newMediaServerItem, serial, code, (mediaServerItem, response)->{
116 116 StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(serial, code);
117 117 JSONObject result = new JSONObject();
118 118 result.put("StreamID", streamInfo.getStream());
... ...
src/main/resources/all-application.yml
... ... @@ -105,8 +105,6 @@ sip:
105 105 id: 44010200492000000001
106 106 # [可选] 默认设备认证密码,后续扩展使用设备单独密码, 移除密码将不进行校验
107 107 password: admin123
108   - # [可选] 心跳超时时间, 建议设置为心跳周期的三倍
109   - keepalive-timeout: 255
110 108 # [可选] 国标级联注册失败,再次发起注册的时间间隔。 默认60秒
111 109 register-time-interval: 60
112 110 # [可选] 云台控制速度
... ...