Commit 5a75381a00a555443925bbbd8e333b14473b3ed1

Authored by 648540858
1 parent 7aa8444e

基于新的云端录像结构实现国标录像

src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java
... ... @@ -75,6 +75,33 @@ public class VideoStreamSessionManager {
75 75 return (SsrcTransaction)redisTemplate.opsForValue().get(scanResult.get(0));
76 76 }
77 77  
  78 + public SsrcTransaction getSsrcTransactionByCallId(String callId){
  79 +
  80 + if (ObjectUtils.isEmpty(callId)) {
  81 + return null;
  82 + }
  83 + String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + "_*_*_" + callId+ "_*";
  84 + List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
  85 + if (!scanResult.isEmpty()) {
  86 + return (SsrcTransaction)redisTemplate.opsForValue().get(scanResult.get(0));
  87 + }else {
  88 + key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + "_*_*_play_*";
  89 + scanResult = RedisUtil.scan(redisTemplate, key);
  90 + if (scanResult.isEmpty()) {
  91 + return null;
  92 + }
  93 + for (Object keyObj : scanResult) {
  94 + SsrcTransaction ssrcTransaction = (SsrcTransaction)redisTemplate.opsForValue().get(keyObj);
  95 + if (ssrcTransaction.getSipTransactionInfo() != null &&
  96 + ssrcTransaction.getSipTransactionInfo().getCallId().equals(callId)) {
  97 + return ssrcTransaction;
  98 + }
  99 + }
  100 + return null;
  101 + }
  102 +
  103 + }
  104 +
78 105 public List<SsrcTransaction> getSsrcTransactionForAll(String deviceId, String channelId, String callId, String stream){
79 106 if (ObjectUtils.isEmpty(deviceId)) {
80 107 deviceId ="*";
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
... ... @@ -31,6 +31,7 @@ import javax.sip.header.CallIdHeader;
31 31 import javax.sip.message.Response;
32 32 import java.text.ParseException;
33 33 import java.util.HashMap;
  34 +import java.util.List;
34 35 import java.util.Map;
35 36  
36 37 /**
... ... @@ -149,7 +150,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
149 150 }else {
150 151  
151 152 // 可能是设备发送的停止
152   - SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(null, null, callIdHeader.getCallId(), null);
  153 + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId());
153 154 if (ssrcTransaction == null) {
154 155 return;
155 156 }
... ...
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java
... ... @@ -76,7 +76,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
76 76  
77 77 RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress());
78 78 if (!device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) {
79   - logger.info("[心跳] 设备{}地址变化, 远程地址为: {}:{}", device.getDeviceId(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort());
  79 + logger.info("[收到心跳] 设备{}地址变化, 远程地址为: {}:{}", device.getDeviceId(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort());
80 80 device.setPort(remoteAddressInfo.getPort());
81 81 device.setHostAddress(remoteAddressInfo.getIp().concat(":").concat(String.valueOf(remoteAddressInfo.getPort())));
82 82 device.setIp(remoteAddressInfo.getIp());
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java
... ... @@ -9,12 +9,16 @@ import org.jetbrains.annotations.NotNull;
9 9 import org.slf4j.Logger;
10 10 import org.slf4j.LoggerFactory;
11 11 import org.springframework.stereotype.Component;
  12 +import org.springframework.util.ObjectUtils;
12 13  
13 14 import java.io.IOException;
14 15 import java.net.ConnectException;
  16 +import java.net.SocketTimeoutException;
15 17 import java.util.HashMap;
  18 +import java.util.List;
16 19 import java.util.Map;
17 20 import java.util.Objects;
  21 +import java.util.concurrent.TimeUnit;
18 22  
19 23 @Component
20 24 public class AssistRESTfulUtils {
... ... @@ -22,21 +26,43 @@ public class AssistRESTfulUtils {
22 26 private final static Logger logger = LoggerFactory.getLogger(AssistRESTfulUtils.class);
23 27  
24 28  
  29 + private OkHttpClient client;
  30 +
  31 +
  32 +
  33 +
25 34 public interface RequestCallback{
26 35 void run(JSONObject response);
27 36 }
28 37  
29 38 private OkHttpClient getClient(){
30   - OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
31   - if (logger.isDebugEnabled()) {
32   - HttpLoggingInterceptor logging = new HttpLoggingInterceptor(message -> {
33   - logger.debug("http请求参数:" + message);
34   - });
35   - logging.setLevel(HttpLoggingInterceptor.Level.BASIC);
36   - // OkHttp進行添加攔截器loggingInterceptor
37   - httpClientBuilder.addInterceptor(logging);
  39 + return getClient(null);
  40 + }
  41 +
  42 + private OkHttpClient getClient(Integer readTimeOut){
  43 + if (client == null) {
  44 + if (readTimeOut == null) {
  45 + readTimeOut = 10;
  46 + }
  47 + OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
  48 + // 设置连接超时时间
  49 + httpClientBuilder.connectTimeout(8, TimeUnit.SECONDS);
  50 + // 设置读取超时时间
  51 + httpClientBuilder.readTimeout(readTimeOut,TimeUnit.SECONDS);
  52 + // 设置连接池
  53 + httpClientBuilder.connectionPool(new ConnectionPool(16, 5, TimeUnit.MINUTES));
  54 + if (logger.isDebugEnabled()) {
  55 + HttpLoggingInterceptor logging = new HttpLoggingInterceptor(message -> {
  56 + logger.debug("http请求参数:" + message);
  57 + });
  58 + logging.setLevel(HttpLoggingInterceptor.Level.BASIC);
  59 + // OkHttp進行添加攔截器loggingInterceptor
  60 + httpClientBuilder.addInterceptor(logging);
  61 + }
  62 + client = httpClientBuilder.build();
38 63 }
39   - return httpClientBuilder.build();
  64 + return client;
  65 +
40 66 }
41 67  
42 68  
... ... @@ -124,13 +150,91 @@ public class AssistRESTfulUtils {
124 150 return responseJSON;
125 151 }
126 152  
  153 + public JSONObject sendPost(MediaServerItem mediaServerItem, String api, JSONObject param, ZLMRESTfulUtils.RequestCallback callback, Integer readTimeOut) {
  154 + OkHttpClient client = getClient(readTimeOut);
127 155  
128   - public JSONObject fileDuration(MediaServerItem mediaServerItem, String app, String stream, RequestCallback callback){
129   - Map<String, Object> param = new HashMap<>();
130   - param.put("app",app);
131   - param.put("stream",stream);
132   - param.put("recordIng",true);
133   - return sendGet(mediaServerItem, "api/record/file/duration",param, callback);
  156 + if (mediaServerItem == null) {
  157 + return null;
  158 + }
  159 + String url = String.format("http://%s:%s/%s", mediaServerItem.getIp(), mediaServerItem.getRecordAssistPort(), api);
  160 + JSONObject responseJSON = new JSONObject();
  161 + //-2自定义流媒体 调用错误码
  162 + responseJSON.put("code",-2);
  163 + responseJSON.put("msg","ASSIST调用失败");
  164 +
  165 + RequestBody requestBodyJson = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), param.toString());
  166 +
  167 + Request request = new Request.Builder()
  168 + .post(requestBodyJson)
  169 + .url(url)
  170 + .addHeader("Content-Type", "application/json")
  171 + .build();
  172 + if (callback == null) {
  173 + try {
  174 + Response response = client.newCall(request).execute();
  175 + if (response.isSuccessful()) {
  176 + ResponseBody responseBody = response.body();
  177 + if (responseBody != null) {
  178 + String responseStr = responseBody.string();
  179 + responseJSON = JSON.parseObject(responseStr);
  180 + }
  181 + }else {
  182 + response.close();
  183 + Objects.requireNonNull(response.body()).close();
  184 + }
  185 + }catch (IOException e) {
  186 + logger.error(String.format("[ %s ]ASSIST请求失败: %s", url, e.getMessage()));
  187 +
  188 + if(e instanceof SocketTimeoutException){
  189 + //读取超时超时异常
  190 + logger.error(String.format("读取ASSIST数据失败: %s, %s", url, e.getMessage()));
  191 + }
  192 + if(e instanceof ConnectException){
  193 + //判断连接异常,我这里是报Failed to connect to 10.7.5.144
  194 + logger.error(String.format("连接ASSIST失败: %s, %s", url, e.getMessage()));
  195 + }
  196 +
  197 + }catch (Exception e){
  198 + logger.error(String.format("访问ASSIST失败: %s, %s", url, e.getMessage()));
  199 + }
  200 + }else {
  201 + client.newCall(request).enqueue(new Callback(){
  202 +
  203 + @Override
  204 + public void onResponse(@NotNull Call call, @NotNull Response response){
  205 + if (response.isSuccessful()) {
  206 + try {
  207 + String responseStr = Objects.requireNonNull(response.body()).string();
  208 + callback.run(JSON.parseObject(responseStr));
  209 + } catch (IOException e) {
  210 + logger.error(String.format("[ %s ]请求失败: %s", url, e.getMessage()));
  211 + }
  212 +
  213 + }else {
  214 + response.close();
  215 + Objects.requireNonNull(response.body()).close();
  216 + }
  217 + }
  218 +
  219 + @Override
  220 + public void onFailure(@NotNull Call call, @NotNull IOException e) {
  221 + logger.error(String.format("连接ZLM失败: %s, %s", call.request().toString(), e.getMessage()));
  222 +
  223 + if(e instanceof SocketTimeoutException){
  224 + //读取超时超时异常
  225 + logger.error(String.format("读取ZLM数据失败: %s, %s", call.request().toString(), e.getMessage()));
  226 + }
  227 + if(e instanceof ConnectException){
  228 + //判断连接异常,我这里是报Failed to connect to 10.7.5.144
  229 + logger.error(String.format("连接ZLM失败: %s, %s", call.request().toString(), e.getMessage()));
  230 + }
  231 + }
  232 + });
  233 + }
  234 +
  235 +
  236 +
  237 + return responseJSON;
134 238 }
135 239  
136 240 public JSONObject getInfo(MediaServerItem mediaServerItem, RequestCallback callback){
... ... @@ -138,33 +242,33 @@ public class AssistRESTfulUtils {
138 242 return sendGet(mediaServerItem, "api/record/info",param, callback);
139 243 }
140 244  
141   - public JSONObject addStreamCallInfo(MediaServerItem mediaServerItem, String app, String stream, String callId, RequestCallback callback){
142   - Map<String, Object> param = new HashMap<>();
143   - param.put("app",app);
144   - param.put("stream",stream);
145   - param.put("callId",callId);
146   - return sendGet(mediaServerItem, "api/record/addStreamCallInfo",param, callback);
147   - }
  245 + public JSONObject addTask(MediaServerItem mediaServerItem, String app, String stream, String startTime,
  246 + String endTime, String callId, List<String> filePathList, String remoteHost) {
  247 +
  248 + JSONObject videoTaskInfoJSON = new JSONObject();
  249 + videoTaskInfoJSON.put("app", app);
  250 + videoTaskInfoJSON.put("stream", stream);
  251 + videoTaskInfoJSON.put("startTime", startTime);
  252 + videoTaskInfoJSON.put("endTime", endTime);
  253 + videoTaskInfoJSON.put("callId", callId);
  254 + videoTaskInfoJSON.put("filePathList", filePathList);
  255 + if (!ObjectUtils.isEmpty(remoteHost)) {
  256 + videoTaskInfoJSON.put("remoteHost", remoteHost);
  257 + }
148 258  
149   - public JSONObject getDateList(MediaServerItem mediaServerItem, String app, String stream, int year, int month) {
150   - Map<String, Object> param = new HashMap<>();
151   - param.put("app", app);
152   - param.put("stream", stream);
153   - param.put("year", year);
154   - param.put("month", month);
155   - return sendGet(mediaServerItem, "api/record/date/list", param, null);
  259 + return sendPost(mediaServerItem, "api/record/file/download/task/add", videoTaskInfoJSON, null, 30);
156 260 }
157 261  
158   - public JSONObject getFileList(MediaServerItem mediaServerItem, int page, int count, String app, String stream,
159   - String startTime, String endTime) {
  262 + public JSONObject queryTaskList(MediaServerItem mediaServerItem, String taskId, Boolean isEnd) {
160 263 Map<String, Object> param = new HashMap<>();
161   - param.put("app", app);
162   - param.put("stream", stream);
163   - param.put("page", page);
164   - param.put("count", count);
165   - param.put("startTime", startTime);
166   - param.put("endTime", endTime);
167   - return sendGet(mediaServerItem, "api/record/file/listWithDate", param, null);
  264 + if (!ObjectUtils.isEmpty(taskId)) {
  265 + param.put("taskId", taskId);
  266 + }
  267 + if (!ObjectUtils.isEmpty(isEnd)) {
  268 + param.put("isEnd", isEnd);
  269 + }
  270 +
  271 + return sendGet(mediaServerItem, "api/record/file/download/task/list", param, null);
168 272 }
169 273  
170 274 }
... ...
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
... ... @@ -234,12 +234,6 @@ public class ZLMHttpHookListener {
234 234 streamAuthorityInfo.setSign(sign);
235 235 // 鉴权通过
236 236 redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
237   - // 通知assist新的callId
238   - if (mediaInfo != null && mediaInfo.getRecordAssistPort() > 0) {
239   - taskExecutor.execute(() -> {
240   - assistRESTfulUtils.addStreamCallInfo(mediaInfo, param.getApp(), param.getStream(), callId, null);
241   - });
242   - }
243 237 }
244 238 } else {
245 239 zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId());
... ... @@ -267,15 +261,28 @@ public class ZLMHttpHookListener {
267 261 }
268 262 // 替换流地址
269 263 if ("rtp".equals(param.getApp()) && !mediaInfo.isRtpEnable()) {
270   - String ssrc = String.format("%010d", Long.parseLong(param.getStream(), 16));;
271   - InviteInfo inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc);
272   - if (inviteInfo != null) {
273   - result.setStream_replace(inviteInfo.getStream());
274   - logger.info("[ZLM HOOK]推流鉴权 stream: {} 替换为 {}", param.getStream(), inviteInfo.getStream());
  264 + if (!mediaInfo.isRtpEnable()) {
  265 + String ssrc = String.format("%010d", Long.parseLong(param.getStream(), 16));;
  266 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc);
  267 + if (inviteInfo != null) {
  268 + result.setStream_replace(inviteInfo.getStream());
  269 + logger.info("[ZLM HOOK]推流鉴权 stream: {} 替换为 {}", param.getStream(), inviteInfo.getStream());
  270 + }
275 271 }
  272 +
276 273 }
  274 +
277 275 List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, param.getStream());
278 276 if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) {
  277 +
  278 + // 为录制国标模拟一个鉴权信息
  279 + StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
  280 + streamAuthorityInfo.setApp(param.getApp());
  281 + streamAuthorityInfo.setStream(ssrcTransactionForAll.get(0).getStream());
  282 + streamAuthorityInfo.setCallId(ssrcTransactionForAll.get(0).getSipTransactionInfo().getCallId());
  283 +
  284 + redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), ssrcTransactionForAll.get(0).getStream(), streamAuthorityInfo);
  285 +
279 286 String deviceId = ssrcTransactionForAll.get(0).getDeviceId();
280 287 String channelId = ssrcTransactionForAll.get(0).getChannelId();
281 288 DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
... ... @@ -349,13 +356,11 @@ public class ZLMHttpHookListener {
349 356  
350 357 List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks();
351 358 // TODO 重构此处逻辑
352   - boolean isPush = false;
353 359 if (param.isRegist()) {
354   - // 处理流注册的鉴权信息
  360 + // 处理流注册的鉴权信息, 流注销这里不再删除鉴权信息,下次来了新的鉴权信息会对就的进行覆盖
355 361 if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
356 362 || param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
357 363 || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
358   - isPush = true;
359 364 StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream());
360 365 if (streamAuthorityInfo == null) {
361 366 streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
... ... @@ -365,8 +370,6 @@ public class ZLMHttpHookListener {
365 370 }
366 371 redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
367 372 }
368   - } else {
369   - redisCatchStorage.removeStreamAuthorityInfo(param.getApp(), param.getStream());
370 373 }
371 374  
372 375 if ("rtsp".equals(param.getSchema())) {
... ...
src/main/java/com/genersoft/iot/vmp/service/ICloudRecordService.java
1 1 package com.genersoft.iot.vmp.service;
2 2  
  3 +import com.alibaba.fastjson2.JSONArray;
  4 +import com.alibaba.fastjson2.JSONObject;
3 5 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
4 6 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam;
5 7 import com.genersoft.iot.vmp.service.bean.CloudRecordItem;
... ... @@ -28,4 +30,14 @@ public interface ICloudRecordService {
28 30 */
29 31 List<String> getDateList(String app, String stream, int year, int month, List<MediaServerItem> mediaServerItems);
30 32  
  33 + /**
  34 + * 添加合并任务
  35 + */
  36 + String addTask(String app, String stream, String mediaServerId, String startTime, String endTime, String callId, String remoteHost);
  37 +
  38 +
  39 + /**
  40 + * 查询合并任务列表
  41 + */
  42 + JSONArray queryTask(String taskId, String mediaServerId, Boolean isEnd);
31 43 }
... ...
src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
... ... @@ -87,21 +87,10 @@ public interface IMediaServerService {
87 87  
88 88 void updateMediaServerKeepalive(String mediaServerId, ServerKeepaliveData data);
89 89  
90   - boolean checkRtpServer(MediaServerItem mediaServerItem, String rtp, String stream);
91   -
92 90 /**
93 91 * 获取负载信息
94 92 * @return
95 93 */
96 94 MediaServerLoad getLoad(MediaServerItem mediaServerItem);
97 95  
98   - /**
99   - * 按时间查找录像文件
100   - */
101   - List<RecordFile> getRecords(String app, String stream, String startTime, String endTime, List<MediaServerItem> mediaServerItems);
102   -
103   - /**
104   - * 查找存在录像文件的时间
105   - */
106   - List<String> getRecordDates(String app, String stream, int year, int month, List<MediaServerItem> mediaServerItems);
107 96 }
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/CloudRecordServiceImpl.java
1 1 package com.genersoft.iot.vmp.service.impl;
2 2  
  3 +import com.alibaba.fastjson2.JSONArray;
  4 +import com.alibaba.fastjson2.JSONObject;
3 5 import com.genersoft.iot.vmp.conf.exception.ControllerException;
4 6 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
  7 +import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
  8 +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  9 +import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
5 10 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
6 11 import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
7 12 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam;
8 13 import com.genersoft.iot.vmp.service.ICloudRecordService;
  14 +import com.genersoft.iot.vmp.service.IMediaServerService;
9 15 import com.genersoft.iot.vmp.service.bean.CloudRecordItem;
10 16 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
11 17 import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
... ... @@ -32,8 +38,17 @@ public class CloudRecordServiceImpl implements ICloudRecordService {
32 38 private CloudRecordServiceMapper cloudRecordServiceMapper;
33 39  
34 40 @Autowired
  41 + private IMediaServerService mediaServerService;
  42 +
  43 + @Autowired
35 44 private IRedisCatchStorage redisCatchStorage;
36 45  
  46 + @Autowired
  47 + private AssistRESTfulUtils assistRESTfulUtils;
  48 +
  49 + @Autowired
  50 + private VideoStreamSessionManager streamSession;
  51 +
37 52 @Override
38 53 public PageInfo<CloudRecordItem> getList(int page, int count, String app, String stream, String startTime, String endTime, List<MediaServerItem> mediaServerItems) {
39 54 // 开始时间和结束时间在数据库中都是以秒为单位的
... ... @@ -54,7 +69,8 @@ public class CloudRecordServiceImpl implements ICloudRecordService {
54 69  
55 70 }
56 71 PageHelper.startPage(page, count);
57   - List<CloudRecordItem> all = cloudRecordServiceMapper.getList(app, stream, startTimeStamp, endTimeStamp, mediaServerItems);
  72 + List<CloudRecordItem> all = cloudRecordServiceMapper.getList(app, stream, startTimeStamp, endTimeStamp,
  73 + null, mediaServerItems);
58 74 return new PageInfo<>(all);
59 75 }
60 76  
... ... @@ -69,7 +85,8 @@ public class CloudRecordServiceImpl implements ICloudRecordService {
69 85 }
70 86 long startTimeStamp = startDate.atStartOfDay().toInstant(ZoneOffset.ofHours(8)).getEpochSecond();
71 87 long endTimeStamp = endDate.atStartOfDay().toInstant(ZoneOffset.ofHours(8)).getEpochSecond();
72   - List<CloudRecordItem> cloudRecordItemList = cloudRecordServiceMapper.getList(app, stream, startTimeStamp, endTimeStamp, mediaServerItems);
  88 + List<CloudRecordItem> cloudRecordItemList = cloudRecordServiceMapper.getList(app, stream, startTimeStamp,
  89 + endTimeStamp, null, mediaServerItems);
73 90 if (cloudRecordItemList.isEmpty()) {
74 91 return new ArrayList<>();
75 92 }
... ... @@ -83,12 +100,71 @@ public class CloudRecordServiceImpl implements ICloudRecordService {
83 100  
84 101 @Override
85 102 public void addRecord(OnRecordMp4HookParam param) {
86   - CloudRecordItem cloudRecordItem = CloudRecordItem.getInstance(param);
  103 + CloudRecordItem cloudRecordItem = CloudRecordItem.getInstance(param);
87 104 StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream());
88 105 if (streamAuthorityInfo != null) {
89 106 cloudRecordItem.setCallId(streamAuthorityInfo.getCallId());
90 107 }
91   - logger.info("[添加录像记录] {}/{} 文件大小:{}", param.getApp(), param.getStream(), param.getFile_size());
  108 + logger.info("[添加录像记录] {}/{} 文件大小:{}, 时长: {}秒", param.getApp(), param.getStream(), param.getFile_size(),param.getTime_len());
92 109 cloudRecordServiceMapper.add(cloudRecordItem);
93 110 }
  111 +
  112 + @Override
  113 + public String addTask(String app, String stream, String mediaServerId, String startTime, String endTime, String callId, String remoteHost) {
  114 + // 参数校验
  115 + assert app != null;
  116 + assert stream != null;
  117 + MediaServerItem mediaServerItem = null;
  118 + if (mediaServerId == null) {
  119 + mediaServerItem = mediaServerService.getDefaultMediaServer();
  120 + }else {
  121 + mediaServerItem = mediaServerService.getOne(mediaServerId);
  122 + }
  123 + if (mediaServerItem == null) {
  124 + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的流媒体");
  125 + }else {
  126 + if (remoteHost == null) {
  127 + remoteHost = "http://" + mediaServerItem.getStreamIp() + ":" + mediaServerItem.getRecordAssistPort();
  128 + }
  129 + }
  130 + Long startTimeStamp = null;
  131 + Long endTimeStamp = null;
  132 + if (startTime != null) {
  133 + startTimeStamp = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
  134 + }
  135 + if (endTime != null) {
  136 + endTimeStamp = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
  137 + }
  138 +
  139 + List<MediaServerItem> mediaServers = new ArrayList<>();
  140 + mediaServers.add(mediaServerItem);
  141 + // 检索相关的录像文件
  142 + List<String> filePathList = cloudRecordServiceMapper.queryRecordFilePathList(app, stream, startTimeStamp, endTimeStamp, callId, mediaServers);
  143 + if (filePathList == null || filePathList.isEmpty()) {
  144 + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未检索到视频文件");
  145 + }
  146 + JSONObject result = assistRESTfulUtils.addTask(mediaServerItem, app, stream, startTime, endTime, callId, filePathList, remoteHost);
  147 + if (result.getInteger("code") != 0) {
  148 + throw new ControllerException(result.getInteger("code"), result.getString("msg"));
  149 + }
  150 + return result.getString("data");
  151 + }
  152 +
  153 + @Override
  154 + public JSONArray queryTask(String taskId, String mediaServerId, Boolean isEnd) {
  155 + MediaServerItem mediaServerItem = null;
  156 + if (mediaServerId == null) {
  157 + mediaServerItem = mediaServerService.getDefaultMediaServer();
  158 + }else {
  159 + mediaServerItem = mediaServerService.getOne(mediaServerId);
  160 + }
  161 + if (mediaServerItem == null) {
  162 + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的流媒体");
  163 + }
  164 + JSONObject result = assistRESTfulUtils.queryTaskList(mediaServerItem, taskId, isEnd);
  165 + if (result.getInteger("code") != 0) {
  166 + throw new ControllerException(result.getInteger("code"), result.getString("msg"));
  167 + }
  168 + return result.getJSONArray("data");
  169 + }
94 170 }
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
... ... @@ -742,15 +742,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
742 742 }
743 743  
744 744 @Override
745   - public boolean checkRtpServer(MediaServerItem mediaServerItem, String app, String stream) {
746   - JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, stream);
747   - if(rtpInfo.getInteger("code") == 0){
748   - return rtpInfo.getBoolean("exist");
749   - }
750   - return false;
751   - }
752   -
753   - @Override
754 745 public MediaServerLoad getLoad(MediaServerItem mediaServerItem) {
755 746 MediaServerLoad result = new MediaServerLoad();
756 747 result.setId(mediaServerItem.getId());
... ... @@ -761,90 +752,4 @@ public class MediaServerServiceImpl implements IMediaServerService {
761 752 result.setGbSend(redisCatchStorage.getGbSendCount(mediaServerItem.getId()));
762 753 return result;
763 754 }
764   -
765   - @Override
766   - public List<RecordFile> getRecords(String app, String stream, String startTime, String endTime, List<MediaServerItem> mediaServerItems) {
767   - Assert.notNull(app, "app不存在");
768   - Assert.notNull(stream, "stream不存在");
769   - Assert.notNull(startTime, "startTime不存在");
770   - Assert.notNull(endTime, "endTime不存在");
771   - Assert.notEmpty(mediaServerItems, "流媒体列表为空");
772   -
773   - CompletableFuture[] completableFutures = new CompletableFuture[mediaServerItems.size()];
774   - for (int i = 0; i < mediaServerItems.size(); i++) {
775   - completableFutures[i] = getRecordFilesForOne(app, stream, startTime, endTime, mediaServerItems.get(i));
776   - }
777   - List<RecordFile> result = new ArrayList<>();
778   - for (int i = 0; i < completableFutures.length; i++) {
779   - try {
780   - List<RecordFile> list = (List<RecordFile>) completableFutures[i].get();
781   - if (!list.isEmpty()) {
782   - for (int g = 0; g < list.size(); g++) {
783   - list.get(g).setMediaServerId(mediaServerItems.get(i).getId());
784   - }
785   - result.addAll(list);
786   - }
787   - } catch (InterruptedException e) {
788   - throw new RuntimeException(e);
789   - } catch (ExecutionException e) {
790   - throw new RuntimeException(e);
791   - }
792   - }
793   - Comparator<RecordFile> comparator = Comparator.comparing(RecordFile::getFileName);
794   - result.sort(comparator);
795   - return result;
796   - }
797   -
798   - @Override
799   - public List<String> getRecordDates(String app, String stream, int year, int month, List<MediaServerItem> mediaServerItems) {
800   - Assert.notNull(app, "app不存在");
801   - Assert.notNull(stream, "stream不存在");
802   - Assert.notEmpty(mediaServerItems, "流媒体列表为空");
803   - CompletableFuture[] completableFutures = new CompletableFuture[mediaServerItems.size()];
804   -
805   - for (int i = 0; i < mediaServerItems.size(); i++) {
806   - completableFutures[i] = getRecordDatesForOne(app, stream, year, month, mediaServerItems.get(i));
807   - }
808   - List<String> result = new ArrayList<>();
809   - CompletableFuture.allOf(completableFutures).join();
810   - for (CompletableFuture completableFuture : completableFutures) {
811   - try {
812   - List<String> list = (List<String>) completableFuture.get();
813   - result.addAll(list);
814   - } catch (InterruptedException e) {
815   - throw new RuntimeException(e);
816   - } catch (ExecutionException e) {
817   - throw new RuntimeException(e);
818   - }
819   - }
820   - Collections.sort(result);
821   - return result;
822   - }
823   -
824   - @Async
825   - public CompletableFuture<List<String>> getRecordDatesForOne(String app, String stream, int year, int month, MediaServerItem mediaServerItem) {
826   - JSONObject fileListJson = assistRESTfulUtils.getDateList(mediaServerItem, app, stream, year, month);
827   - if (fileListJson != null && !fileListJson.isEmpty()) {
828   - if (fileListJson.getString("code") != null && fileListJson.getInteger("code") == 0) {
829   - JSONArray data = fileListJson.getJSONArray("data");
830   - return CompletableFuture.completedFuture(data.toJavaList(String.class));
831   - }
832   - }
833   - return CompletableFuture.completedFuture(new ArrayList<>());
834   - }
835   -
836   - @Async
837   - public CompletableFuture<List<RecordFile>> getRecordFilesForOne(String app, String stream, String startTime, String endTime, MediaServerItem mediaServerItem) {
838   - JSONObject fileListJson = assistRESTfulUtils.getFileList(mediaServerItem, 1, 100000000, app, stream, startTime, endTime);
839   - if (fileListJson != null && !fileListJson.isEmpty()) {
840   - if (fileListJson.getString("code") != null && fileListJson.getInteger("code") == 0) {
841   - JSONObject data = fileListJson.getJSONObject("data");
842   - JSONArray list = data.getJSONArray("list");
843   - if (list != null) {
844   - return CompletableFuture.completedFuture(list.toJavaList(RecordFile.class));
845   - }
846   - }
847   - }
848   - return CompletableFuture.completedFuture(new ArrayList<>());
849   - }
850 755 }
... ...
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
... ... @@ -27,11 +27,13 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
27 27 import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
28 28 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
29 29 import com.genersoft.iot.vmp.service.*;
  30 +import com.genersoft.iot.vmp.service.bean.CloudRecordItem;
30 31 import com.genersoft.iot.vmp.service.bean.ErrorCallback;
31 32 import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
32 33 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
33 34 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
34 35 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  36 +import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
35 37 import com.genersoft.iot.vmp.utils.DateUtil;
36 38 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
37 39 import gov.nist.javax.sip.message.SIPResponse;
... ... @@ -107,6 +109,9 @@ public class PlayServiceImpl implements IPlayService {
107 109 @Autowired
108 110 private ZlmHttpHookSubscribe subscribe;
109 111  
  112 + @Autowired
  113 + private CloudRecordServiceMapper cloudRecordServiceMapper;
  114 +
110 115  
111 116 @Override
112 117 public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback) {
... ... @@ -749,31 +754,43 @@ public class PlayServiceImpl implements IPlayService {
749 754 logger.warn("查询录像信息时发现节点已离线");
750 755 return null;
751 756 }
752   - if (mediaServerItem.getRecordAssistPort() > 0) {
753   - JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream(), null);
754   - if (jsonObject == null) {
755   - throw new ControllerException(ErrorCode.ERROR100.getCode(), "连接Assist服务失败");
756   - }
757   - if (jsonObject.getInteger("code") == 0) {
758   - long duration = jsonObject.getLong("data");
759   -
760   - if (duration == 0) {
761   - inviteInfo.getStreamInfo().setProgress(0);
762   - } else {
763   - String startTime = inviteInfo.getStreamInfo().getStartTime();
764   - String endTime = inviteInfo.getStreamInfo().getEndTime();
765   - long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
766   - long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
767   -
768   - BigDecimal currentCount = new BigDecimal(duration / 1000);
769   - BigDecimal totalCount = new BigDecimal(end - start);
770   - BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
771   - double process = divide.doubleValue();
772   - inviteInfo.getStreamInfo().setProgress(process);
773   - }
774   - inviteStreamService.updateInviteInfo(inviteInfo);
775   - }
  757 + if (mediaServerItem.getRecordAssistPort() == 0) {
  758 + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未配置Assist服务,无法完成录像下载");
  759 + }
  760 + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream);
  761 +
  762 + if (ssrcTransaction == null) {
  763 + logger.warn("[获取下载进度],未找到下载事务信息");
  764 + return null;
776 765 }
  766 +
  767 + // 为了支持多个数据库,这里不能使用求和函数来直接获取总数了
  768 + List<CloudRecordItem> cloudRecordItemList = cloudRecordServiceMapper.getList("rtp", inviteInfo.getStream(), null, null, ssrcTransaction.getCallId(), null);
  769 +
  770 + if (cloudRecordItemList.isEmpty()) {
  771 + logger.warn("[获取下载进度],未找到下载视频信息");
  772 + return null;
  773 + }
  774 + long duration = 0;
  775 + for (CloudRecordItem cloudRecordItem : cloudRecordItemList) {
  776 + duration += cloudRecordItem.getTimeLen();
  777 + }
  778 + if (duration == 0) {
  779 + inviteInfo.getStreamInfo().setProgress(0);
  780 + } else {
  781 + String startTime = inviteInfo.getStreamInfo().getStartTime();
  782 + String endTime = inviteInfo.getStreamInfo().getEndTime();
  783 + long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
  784 + long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
  785 +
  786 + BigDecimal currentCount = new BigDecimal(duration);
  787 + BigDecimal totalCount = new BigDecimal(end - start);
  788 + BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP);
  789 + double process = divide.doubleValue();
  790 + inviteInfo.getStreamInfo().setProgress(process);
  791 + }
  792 + inviteStreamService.updateInviteInfo(inviteInfo);
  793 +
777 794 return inviteInfo.getStreamInfo();
778 795 }
779 796 return null;
... ...
src/main/java/com/genersoft/iot/vmp/storager/dao/CloudRecordServiceMapper.java
... ... @@ -42,17 +42,37 @@ public interface CloudRecordServiceMapper {
42 42  
43 43 @Select(" <script>" +
44 44 "select * " +
45   - "from wvp_cloud_record " +
46   - "where 0 = 0" +
  45 + " from wvp_cloud_record " +
  46 + " where 0 = 0" +
47 47 " <if test= 'app != null '> and app=#{app}</if>" +
48 48 " <if test= 'stream != null '> and stream=#{stream}</if>" +
49   - " <if test= 'startTimeStamp != null '> and start_time &gt;= #{startTimeStamp}</if>" +
50   - " <if test= 'endTimeStamp != null '> and end_time &lt;= #{endTimeStamp}</if>" +
  49 + " <if test= 'startTimeStamp != null '> and end_time &gt;= #{startTimeStamp}</if>" +
  50 + " <if test= 'endTimeStamp != null '> and start_time &lt;= #{endTimeStamp}</if>" +
  51 + " <if test= 'callId != null '> and call_id = #{callId}</if>" +
51 52 " <if test= 'mediaServerItemList != null ' > and media_server_id in " +
52 53 " <foreach collection='mediaServerItemList' item='item' open='(' separator=',' close=')' > #{item.id}</foreach>" +
53 54 " </if>" +
54 55 " </script>")
55 56 List<CloudRecordItem> getList(@Param("app") String app, @Param("stream") String stream,
56 57 @Param("startTimeStamp")Long startTimeStamp, @Param("endTimeStamp")Long endTimeStamp,
57   - List<MediaServerItem> mediaServerItemList);
  58 + @Param("callId")String callId, List<MediaServerItem> mediaServerItemList);
  59 +
  60 +
  61 + @Select(" <script>" +
  62 + "select file_path" +
  63 + " from wvp_cloud_record " +
  64 + " where 0 = 0" +
  65 + " <if test= 'app != null '> and app=#{app}</if>" +
  66 + " <if test= 'stream != null '> and stream=#{stream}</if>" +
  67 + " <if test= 'startTimeStamp != null '> and end_time &gt;= #{startTimeStamp}</if>" +
  68 + " <if test= 'endTimeStamp != null '> and start_time &lt;= #{endTimeStamp}</if>" +
  69 + " <if test= 'callId != null '> and call_id = #{callId}</if>" +
  70 + " <if test= 'mediaServerItemList != null ' > and media_server_id in " +
  71 + " <foreach collection='mediaServerItemList' item='item' open='(' separator=',' close=')' > #{item.id}</foreach>" +
  72 + " </if>" +
  73 + " </script>")
  74 + List<String> queryRecordFilePathList(@Param("app") String app, @Param("stream") String stream,
  75 + @Param("startTimeStamp")Long startTimeStamp, @Param("endTimeStamp")Long endTimeStamp,
  76 + @Param("callId")String callId, List<MediaServerItem> mediaServerItemList);
  77 +
58 78 }
... ...
src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java
1 1 package com.genersoft.iot.vmp.vmanager.cloudRecord;
2 2  
  3 +import com.alibaba.fastjson2.JSONArray;
  4 +import com.alibaba.fastjson2.JSONObject;
3 5 import com.genersoft.iot.vmp.conf.DynamicTask;
4 6 import com.genersoft.iot.vmp.conf.UserSetting;
5 7 import com.genersoft.iot.vmp.conf.exception.ControllerException;
... ... @@ -145,19 +147,53 @@ public class CloudRecordController {
145 147 @Operation(summary = "添加合并任务")
146 148 @Parameter(name = "app", description = "应用名", required = true)
147 149 @Parameter(name = "stream", description = "流ID", required = true)
  150 + @Parameter(name = "mediaServerId", description = "流媒体ID", required = false)
148 151 @Parameter(name = "startTime", description = "鉴权ID", required = false)
149 152 @Parameter(name = "endTime", description = "鉴权ID", required = false)
150 153 @Parameter(name = "callId", description = "鉴权ID", required = false)
151 154 @Parameter(name = "remoteHost", description = "返回地址时的远程地址", required = false)
152 155 public String addTask(
153   - @RequestParam String app,
154   - @RequestParam String stream,
155   - @RequestParam String startTime,
156   - @RequestParam String endTime,
157   - @RequestParam String callId,
158   - @RequestParam String remoteHost
  156 + @RequestParam(required = true) String app,
  157 + @RequestParam(required = true) String stream,
  158 + @RequestParam(required = false) String mediaServerId,
  159 + @RequestParam(required = false) String startTime,
  160 + @RequestParam(required = false) String endTime,
  161 + @RequestParam(required = false) String callId,
  162 + @RequestParam(required = false) String remoteHost
  163 + ){
  164 + return cloudRecordService.addTask(app, stream, mediaServerId, startTime, endTime, callId, remoteHost);
  165 + }
  166 +
  167 + @ResponseBody
  168 + @GetMapping("/task/list")
  169 + @Operation(summary = "查询合并任务")
  170 + @Parameter(name = "taskId", description = "任务Id", required = false)
  171 + @Parameter(name = "mediaServerId", description = "流媒体ID", required = false)
  172 + @Parameter(name = "isEnd", description = "是否结束", required = false)
  173 + public JSONArray queryTaskList(
  174 + @RequestParam(required = false) String taskId,
  175 + @RequestParam(required = false) String mediaServerId,
  176 + @RequestParam(required = false) Boolean isEnd
  177 + ){
  178 + return cloudRecordService.queryTask(taskId, mediaServerId, isEnd);
  179 + }
  180 +
  181 + @ResponseBody
  182 + @GetMapping("/collect/add")
  183 + @Operation(summary = "添加收藏")
  184 + @Parameter(name = "app", description = "应用名", required = true)
  185 + @Parameter(name = "stream", description = "流ID", required = true)
  186 + @Parameter(name = "mediaServerId", description = "流媒体ID", required = false)
  187 + @Parameter(name = "startTime", description = "鉴权ID", required = false)
  188 + @Parameter(name = "endTime", description = "鉴权ID", required = false)
  189 + @Parameter(name = "callId", description = "鉴权ID", required = false)
  190 + @Parameter(name = "collectType", description = "收藏类型", required = false)
  191 + public JSONArray addCollect(
  192 + @RequestParam(required = false) String taskId,
  193 + @RequestParam(required = false) String mediaServerId,
  194 + @RequestParam(required = false) Boolean isEnd
159 195 ){
160   - return cloudRecordService.addTask(app, stream, startTime, endTime, callId, remoteHost);
  196 + return cloudRecordService.queryTask(taskId, mediaServerId, isEnd);
161 197 }
162 198  
163 199  
... ...
web_src/src/components/CloudRecordDetail.vue
... ... @@ -480,12 +480,13 @@
480 480 let that = this;
481 481 this.$axios({
482 482 method: 'get',
483   - url:`/record_proxy/${that.mediaServerId}/api/record/file/download/task/add`,
  483 + url:`/api/cloud/record/task/add`,
484 484 params: {
485   - app: that.app,
486   - stream: that.stream,
487   - startTime: moment(this.taskTimeRange[0]).format('YYYY-MM-DD HH:mm:ss'),
488   - endTime: moment(this.taskTimeRange[1]).format('YYYY-MM-DD HH:mm:ss'),
  485 + app: this.app,
  486 + stream: this.stream,
  487 + mediaServerId: this.mediaServerId,
  488 + startTime: moment(this.taskTimeRange[0]).format('YYYY-MM-DD HH:mm:ss'),
  489 + endTime: moment(this.taskTimeRange[1]).format('YYYY-MM-DD HH:mm:ss'),
489 490 }
490 491 }).then(function (res) {
491 492 if (res.data.code === 0 ) {
... ... @@ -505,8 +506,9 @@
505 506 let that = this;
506 507 this.$axios({
507 508 method: 'get',
508   - url:`/record_proxy/${that.mediaServerId}/api/record/file/download/task/list`,
  509 + url:`/api/cloud/record/task/list`,
509 510 params: {
  511 + mediaServerId: this.mediaServerId,
510 512 isEnd: isEnd,
511 513 }
512 514 }).then(function (res) {
... ...
web_src/src/components/dialog/recordDownload.vue
... ... @@ -137,10 +137,11 @@ export default {
137 137 getFileDownload: function (){
138 138 this.$axios({
139 139 method: 'get',
140   - url:`/record_proxy/${this.mediaServerId}/api/record/file/download/task/add`,
  140 + url:`/api/cloud/record/task/add`,
141 141 params: {
142 142 app: this.app,
143 143 stream: this.stream,
  144 + mediaServerId: this.mediaServerId,
144 145 startTime: null,
145 146 endTime: null,
146 147 }
... ... @@ -169,10 +170,9 @@ export default {
169 170 getProgressForFile: function (callback){
170 171 this.$axios({
171 172 method: 'get',
172   - url:`/record_proxy/${this.mediaServerId}/api/record/file/download/task/list`,
  173 + url:`/api/cloud/record/task/list`,
173 174 params: {
174   - app: this.app,
175   - stream: this.stream,
  175 + mediaServerId: this.mediaServerId,
176 176 taskId: this.taskId,
177 177 isEnd: true,
178 178 }
... ...