Commit f375c1c2ce0362ea1fa8d7d5e79c02fb9e8d3c86
1 parent
cd360f89
fix():ffmpeg切换码流成功
Showing
7 changed files
with
523 additions
and
39 deletions
src/main/java/com/genersoft/iot/vmp/jtt1078/publisher/Channel.java
| ... | ... | @@ -4,7 +4,6 @@ import com.genersoft.iot.vmp.jtt1078.codec.AudioCodec; |
| 4 | 4 | import com.genersoft.iot.vmp.jtt1078.entity.Media; |
| 5 | 5 | import com.genersoft.iot.vmp.jtt1078.entity.MediaEncoding; |
| 6 | 6 | import com.genersoft.iot.vmp.jtt1078.flv.FlvEncoder; |
| 7 | -// [恢复] 引用 FFmpeg 进程管理类 | |
| 8 | 7 | import com.genersoft.iot.vmp.jtt1078.subscriber.RTMPPublisher; |
| 9 | 8 | import com.genersoft.iot.vmp.jtt1078.subscriber.Subscriber; |
| 10 | 9 | import com.genersoft.iot.vmp.jtt1078.subscriber.VideoSubscriber; |
| ... | ... | @@ -15,8 +14,11 @@ import org.apache.commons.lang3.StringUtils; |
| 15 | 14 | import org.slf4j.Logger; |
| 16 | 15 | import org.slf4j.LoggerFactory; |
| 17 | 16 | |
| 17 | +import java.util.Arrays; | |
| 18 | 18 | import java.util.Iterator; |
| 19 | 19 | import java.util.concurrent.ConcurrentLinkedQueue; |
| 20 | +import java.util.concurrent.atomic.AtomicBoolean; | |
| 21 | +import java.util.concurrent.atomic.AtomicLong; | |
| 20 | 22 | |
| 21 | 23 | public class Channel |
| 22 | 24 | { |
| ... | ... | @@ -26,7 +28,6 @@ public class Channel |
| 26 | 28 | |
| 27 | 29 | // [恢复] FFmpeg 推流进程管理器 |
| 28 | 30 | RTMPPublisher rtmpPublisher; |
| 29 | - // [删除] ZlmRtpPublisher rtpPublisher; | |
| 30 | 31 | |
| 31 | 32 | String tag; |
| 32 | 33 | boolean publishing; |
| ... | ... | @@ -35,6 +36,39 @@ public class Channel |
| 35 | 36 | FlvEncoder flvEncoder; |
| 36 | 37 | private long firstTimestamp = -1; |
| 37 | 38 | |
| 39 | + // ========== 新增:视频参数检测和FFmpeg自动重启相关 ========== | |
| 40 | + | |
| 41 | + /** 上一次接收到的SPS哈希值,用于检测视频参数变化 */ | |
| 42 | + private int lastSPSHash = 0; | |
| 43 | + | |
| 44 | + /** FFmpeg是否需要重启的标志 */ | |
| 45 | + private AtomicBoolean ffmpegNeedRestart = new AtomicBoolean(false); | |
| 46 | + | |
| 47 | + /** 最后一次FFmpeg重启时间(毫秒),用于防止频繁重启 */ | |
| 48 | + private AtomicLong lastRestartTime = new AtomicLong(0); | |
| 49 | + | |
| 50 | + /** 重启冷却时间(毫秒),3秒内不重复重启 */ | |
| 51 | + private static final long RESTART_COOLDOWN_MS = 3000; | |
| 52 | + | |
| 53 | + /** 是否正在等待新的I帧(FFmpeg重启期间) */ | |
| 54 | + private AtomicBoolean waitingForIFrame = new AtomicBoolean(false); | |
| 55 | + | |
| 56 | + /** 视频参数信息 */ | |
| 57 | + private volatile VideoParamInfo currentVideoParam = new VideoParamInfo(); | |
| 58 | + | |
| 59 | + /** 视频参数内部类 */ | |
| 60 | + private static class VideoParamInfo { | |
| 61 | + int width = 0; | |
| 62 | + int height = 0; | |
| 63 | + int fps = 0; | |
| 64 | + String resolution = "unknown"; | |
| 65 | + | |
| 66 | + @Override | |
| 67 | + public String toString() { | |
| 68 | + return String.format("%dx%d@%dfps [%s]", width, height, fps, resolution); | |
| 69 | + } | |
| 70 | + } | |
| 71 | + | |
| 38 | 72 | public Channel(String tag) |
| 39 | 73 | { |
| 40 | 74 | this.tag = tag; |
| ... | ... | @@ -71,12 +105,10 @@ public class Channel |
| 71 | 105 | if (audioCodec == null) |
| 72 | 106 | { |
| 73 | 107 | audioCodec = AudioCodec.getCodec(pt); |
| 74 | - logger.info("audio codec: {}", MediaEncoding.getEncoding(Media.Type.Audio, pt)); | |
| 108 | + logger.info("[{}] audio codec: {}", tag, MediaEncoding.getEncoding(Media.Type.Audio, pt)); | |
| 75 | 109 | } |
| 76 | 110 | // 写入到内部广播,FFmpeg 通过 HTTP 拉取这个数据 |
| 77 | 111 | broadcastAudio(timestamp, audioCodec.toPCM(data)); |
| 78 | - | |
| 79 | - // [删除] rtpPublisher.sendAudio(...) | |
| 80 | 112 | } |
| 81 | 113 | |
| 82 | 114 | public void writeVideo(long sequence, long timeoffset, int payloadType, byte[] h264) |
| ... | ... | @@ -91,6 +123,28 @@ public class Channel |
| 91 | 123 | if (nalu == null) break; |
| 92 | 124 | if (nalu.length < 4) continue; |
| 93 | 125 | |
| 126 | + // ========== 新增:检测视频参数变化 ========== | |
| 127 | + int nalType = nalu[4] & 0x1F; | |
| 128 | + boolean isIDR = (nalType == 5); // IDR帧(I帧) | |
| 129 | + | |
| 130 | + if (nalType == 7) { // SPS (Sequence Parameter Set) | |
| 131 | + checkAndHandleSPSChange(nalu); | |
| 132 | + } | |
| 133 | + | |
| 134 | + // 如果正在等待I帧,跳过所有数据直到收到新的I帧 | |
| 135 | + if (waitingForIFrame.get()) { | |
| 136 | + if (isIDR) { | |
| 137 | + logger.info("[{}] 收到新的I帧,停止等待,FFmpeg应已重启完成", tag); | |
| 138 | + waitingForIFrame.set(false); | |
| 139 | + // 重置FLV编码器 | |
| 140 | + this.flvEncoder = new FlvEncoder(true, true); | |
| 141 | + firstTimestamp = timeoffset; | |
| 142 | + } else { | |
| 143 | + // 跳过非I帧数据 | |
| 144 | + continue; | |
| 145 | + } | |
| 146 | + } | |
| 147 | + | |
| 94 | 148 | // 1. 封装为 FLV Tag (必须) |
| 95 | 149 | // FFmpeg 通过 HTTP 读取这些 FLV Tag |
| 96 | 150 | byte[] flvTag = this.flvEncoder.write(nalu, (int) (timeoffset - firstTimestamp)); |
| ... | ... | @@ -102,6 +156,281 @@ public class Channel |
| 102 | 156 | } |
| 103 | 157 | } |
| 104 | 158 | |
| 159 | + /** | |
| 160 | + * 检查SPS变化并处理FFmpeg重启 | |
| 161 | + */ | |
| 162 | + private synchronized void checkAndHandleSPSChange(byte[] nalu) { | |
| 163 | + int currentHash = Arrays.hashCode(nalu); | |
| 164 | + | |
| 165 | + if (lastSPSHash != 0 && currentHash != lastSPSHash) { | |
| 166 | + // SPS变化了,说明视频参数发生了变化 | |
| 167 | + // 注意:即使解析出来的分辨率/帧率相同,SPS的其它变化(如profile、level、VUI参数等)也会导致解码问题 | |
| 168 | + // 因此:只要SPS哈希变化,就重启FFmpeg | |
| 169 | + | |
| 170 | + // 解析新的视频参数(仅用于日志显示) | |
| 171 | + VideoParamInfo newParam = parseVideoParam(nalu); | |
| 172 | + VideoParamInfo oldParam = currentVideoParam; | |
| 173 | + | |
| 174 | + logger.info("[{}] ====== 检测到SPS变化,需要重启FFmpeg ======", tag); | |
| 175 | + logger.info("[{}] 旧参数: {}", tag, oldParam); | |
| 176 | + logger.info("[{}] 新参数: {}", tag, newParam); | |
| 177 | + logger.info("[{}] SPS哈希变化: {} -> {}", tag, | |
| 178 | + Integer.toHexString(lastSPSHash), | |
| 179 | + Integer.toHexString(currentHash)); | |
| 180 | + | |
| 181 | + // 检查冷却时间 | |
| 182 | + long now = System.currentTimeMillis(); | |
| 183 | + long timeSinceLastRestart = now - lastRestartTime.get(); | |
| 184 | + | |
| 185 | + if (timeSinceLastRestart < RESTART_COOLDOWN_MS) { | |
| 186 | + logger.warn("[{}] FFmpeg重启过于频繁(距上次{}ms),跳过本次重启", | |
| 187 | + tag, timeSinceLastRestart); | |
| 188 | + // 即使跳过重启,仍需更新SPS哈希 | |
| 189 | + lastSPSHash = currentHash; | |
| 190 | + currentVideoParam = newParam; | |
| 191 | + return; | |
| 192 | + } | |
| 193 | + | |
| 194 | + logger.info("[{}] ====== 准备重启FFmpeg ======", tag); | |
| 195 | + logger.info("[{}] 原因: SPS参数变化(可能包含分辨率、帧率、profile、level等)", tag); | |
| 196 | + logger.info("[{}] 预计中断时间: 1-2秒", tag); | |
| 197 | + | |
| 198 | + // 标记需要重启 | |
| 199 | + ffmpegNeedRestart.set(true); | |
| 200 | + | |
| 201 | + // 立即触发重启(异步执行,避免阻塞) | |
| 202 | + final String currentTag = tag; | |
| 203 | + Thread restartThread = new Thread(() -> { | |
| 204 | + try { | |
| 205 | + logger.info("[{}] SPS变化检测到,立即触发FFmpeg重启...", currentTag); | |
| 206 | + restartRtmpPublisher(); | |
| 207 | + } catch (Exception e) { | |
| 208 | + logger.error("[{}] 触发FFmpeg重启失败: {}", currentTag, e.getMessage(), e); | |
| 209 | + } | |
| 210 | + }, "FFmpeg-Restart-" + tag); | |
| 211 | + restartThread.setDaemon(true); | |
| 212 | + restartThread.start(); | |
| 213 | + } | |
| 214 | + | |
| 215 | + // 更新SPS哈希和视频参数 | |
| 216 | + lastSPSHash = currentHash; | |
| 217 | + currentVideoParam = parseVideoParam(nalu); | |
| 218 | + } | |
| 219 | + | |
| 220 | + /** | |
| 221 | + * 解析H.264 SPS获取视频参数 | |
| 222 | + */ | |
| 223 | + private VideoParamInfo parseVideoParam(byte[] sps) { | |
| 224 | + VideoParamInfo info = new VideoParamInfo(); | |
| 225 | + try { | |
| 226 | + // H.264 SPS解析 | |
| 227 | + // SPS格式: [NAL头(1字节) + profile_idc(1) + constraints(1) + level_idc(1) + 5字节对齐 + sps数据] | |
| 228 | + | |
| 229 | + if (sps.length < 6) { | |
| 230 | + logger.warn("[{}] SPS数据长度不足,无法解析: {}", tag, sps.length); | |
| 231 | + return info; | |
| 232 | + } | |
| 233 | + | |
| 234 | + int profile_idc = sps[4] & 0xFF; | |
| 235 | + int constraint_flags = sps[5] & 0xFF; | |
| 236 | + int level_idc = sps[6] & 0xFF; | |
| 237 | + | |
| 238 | + // 查找SPS结束位置(开始于67或68) | |
| 239 | + int spsStart = -1; | |
| 240 | + for (int i = 4; i < Math.min(sps.length - 4, 10); i++) { | |
| 241 | + if ((sps[i] & 0x1F) == 7) { | |
| 242 | + spsStart = i; | |
| 243 | + break; | |
| 244 | + } | |
| 245 | + } | |
| 246 | + | |
| 247 | + if (spsStart == -1) { | |
| 248 | + // 简化模式:直接使用SPS长度估算 | |
| 249 | + int spsLen = (sps.length > 10) ? 4 : 1; | |
| 250 | + for (int i = 4; i < sps.length - 1; i++) { | |
| 251 | + if (sps[i] == 0 && sps[i+1] == 0 && sps[i+2] == 0 && sps[i+3] == 1) { | |
| 252 | + spsLen = i - 4; | |
| 253 | + break; | |
| 254 | + } | |
| 255 | + } | |
| 256 | + | |
| 257 | + // 根据SPS长度估算分辨率(非常粗略) | |
| 258 | + int lenCategory = spsLen / 10; | |
| 259 | + if (spsLen < 20) { | |
| 260 | + info.width = 352; | |
| 261 | + info.height = 288; | |
| 262 | + info.fps = 15; | |
| 263 | + } else if (spsLen < 40) { | |
| 264 | + info.width = 704; | |
| 265 | + info.height = 576; | |
| 266 | + info.fps = 25; | |
| 267 | + } else { | |
| 268 | + info.width = 1280; | |
| 269 | + info.height = 720; | |
| 270 | + info.fps = 25; | |
| 271 | + } | |
| 272 | + } else { | |
| 273 | + // 详细解析bitstream | |
| 274 | + // 这里简化处理,实际项目中可以使用完整的H.264解析库 | |
| 275 | + info.width = 1280; // 默认值 | |
| 276 | + info.height = 720; | |
| 277 | + info.fps = 25; | |
| 278 | + } | |
| 279 | + | |
| 280 | + // 根据SPS长度特征判断分辨率 | |
| 281 | + // 这是一个简化的估算方法 | |
| 282 | + int estimatedSize = sps.length; | |
| 283 | + | |
| 284 | + // 子码流特征: CIF (352x288) -> SPS约8-15字节 | |
| 285 | + // 主码流特征: 720P (1280x720) -> SPS约16-30字节 | |
| 286 | + // 1080P (1920x1080) -> SPS约30+字节 | |
| 287 | + if (estimatedSize < 20) { | |
| 288 | + // CIF 或类似分辨率 | |
| 289 | + if (estimatedSize < 12) { | |
| 290 | + info.width = 352; | |
| 291 | + info.height = 288; | |
| 292 | + info.fps = 15; | |
| 293 | + info.resolution = "CIF"; | |
| 294 | + } else { | |
| 295 | + info.width = 704; | |
| 296 | + info.height = 576; | |
| 297 | + info.fps = 25; | |
| 298 | + info.resolution = "4CIF"; | |
| 299 | + } | |
| 300 | + } else if (estimatedSize < 35) { | |
| 301 | + // 720P 或类似 | |
| 302 | + info.width = 1280; | |
| 303 | + info.height = 720; | |
| 304 | + info.fps = 25; | |
| 305 | + info.resolution = "720P"; | |
| 306 | + } else { | |
| 307 | + // 1080P 或更高 | |
| 308 | + info.width = 1920; | |
| 309 | + info.height = 1080; | |
| 310 | + info.fps = 30; | |
| 311 | + info.resolution = "1080P"; | |
| 312 | + } | |
| 313 | + | |
| 314 | + } catch (Exception e) { | |
| 315 | + logger.error("[{}] 解析SPS失败: {}", tag, e.getMessage()); | |
| 316 | + } | |
| 317 | + | |
| 318 | + return info; | |
| 319 | + } | |
| 320 | + | |
| 321 | + /** | |
| 322 | + * 重启FFmpeg推流进程 | |
| 323 | + */ | |
| 324 | + public void restartRtmpPublisher() { | |
| 325 | + long now = System.currentTimeMillis(); | |
| 326 | + lastRestartTime.set(now); | |
| 327 | + | |
| 328 | + logger.info("[{}] ====== 开始重启FFmpeg推流 ======", tag); | |
| 329 | + logger.info("[{}] 当前时间: {}", tag, now); | |
| 330 | + | |
| 331 | + try { | |
| 332 | + // 1. 标记等待I帧 | |
| 333 | + waitingForIFrame.set(true); | |
| 334 | + | |
| 335 | + // 2. 关闭旧的FFmpeg进程 | |
| 336 | + logger.info("[{}] 步骤1: 关闭旧FFmpeg进程...", tag); | |
| 337 | + if (rtmpPublisher != null) { | |
| 338 | + try { | |
| 339 | + rtmpPublisher.close(); | |
| 340 | + } catch (Exception e) { | |
| 341 | + logger.warn("[{}] 关闭旧FFmpeg进程时出错: {}", tag, e.getMessage()); | |
| 342 | + } | |
| 343 | + rtmpPublisher = null; | |
| 344 | + } | |
| 345 | + | |
| 346 | + // 3. 等待FFmpeg进程完全关闭 | |
| 347 | + logger.info("[{}] 步骤2: 等待FFmpeg进程关闭...", tag); | |
| 348 | + Thread.sleep(500); | |
| 349 | + | |
| 350 | + // 4. 清空缓冲区,准备接收新的视频数据 | |
| 351 | + logger.info("[{}] 步骤3: 清空视频缓冲区...", tag); | |
| 352 | + buffer.clear(); | |
| 353 | + firstTimestamp = -1; | |
| 354 | + | |
| 355 | + // 5. 重置FLV编码器 | |
| 356 | + logger.info("[{}] 步骤4: 重置FLV编码器...", tag); | |
| 357 | + flvEncoder = new FlvEncoder(true, true); | |
| 358 | + | |
| 359 | + // 6. 重新启动FFmpeg进程 | |
| 360 | + logger.info("[{}] 步骤5: 启动新FFmpeg进程...", tag); | |
| 361 | + String rtmpUrl = Configs.get("rtmp.url"); | |
| 362 | + if (StringUtils.isNotBlank(rtmpUrl)) { | |
| 363 | + rtmpPublisher = new RTMPPublisher(tag); | |
| 364 | + rtmpPublisher.start(); | |
| 365 | + logger.info("[{}] 新FFmpeg进程已启动", tag); | |
| 366 | + } else { | |
| 367 | + logger.warn("[{}] 未配置rtmp.url,跳过启动", tag); | |
| 368 | + } | |
| 369 | + | |
| 370 | + // 7. 重置标志 | |
| 371 | + ffmpegNeedRestart.set(false); | |
| 372 | + | |
| 373 | + logger.info("[{}] ====== FFmpeg重启完成 ======", tag); | |
| 374 | + logger.info("[{}] 请等待1-2秒让FFmpeg完成初始化...", tag); | |
| 375 | + | |
| 376 | + } catch (Exception e) { | |
| 377 | + logger.error("[{}] 重启FFmpeg失败: {}", tag, e.getMessage(), e); | |
| 378 | + waitingForIFrame.set(false); | |
| 379 | + ffmpegNeedRestart.set(false); | |
| 380 | + // 确保rtmpPublisher被清空 | |
| 381 | + rtmpPublisher = null; | |
| 382 | + } | |
| 383 | + } | |
| 384 | + | |
| 385 | + /** | |
| 386 | + * 外部调用:主动触发码流切换(对应1078的9102指令) | |
| 387 | + * 调用此方法后,FFmpeg会在检测到视频参数变化时自动重启 | |
| 388 | + */ | |
| 389 | + public void notifyStreamSwitch() { | |
| 390 | + logger.info("[{}] ====== 收到码流切换通知 ======", tag); | |
| 391 | + logger.info("[{}] 即将切换码流,请等待设备响应...", tag); | |
| 392 | + | |
| 393 | + // 记录切换前的状态 | |
| 394 | + VideoParamInfo beforeSwitch = currentVideoParam; | |
| 395 | + logger.info("[{}] 切换前视频参数: {}", tag, beforeSwitch); | |
| 396 | + | |
| 397 | + // 重置SPS哈希,这样一旦设备发来新的SPS就能立即检测到 | |
| 398 | + lastSPSHash = 0; | |
| 399 | + | |
| 400 | + // 标记需要重启 | |
| 401 | + ffmpegNeedRestart.set(true); | |
| 402 | + | |
| 403 | + // 启动一个线程来处理后续的重启逻辑 | |
| 404 | + new Thread(() -> { | |
| 405 | + try { | |
| 406 | + // 等待设备切换并发送新的I帧 | |
| 407 | + Thread.sleep(2000); | |
| 408 | + | |
| 409 | + // 如果还没有收到新的I帧,手动触发一次检查 | |
| 410 | + if (waitingForIFrame.get()) { | |
| 411 | + logger.warn("[{}] 未在2秒内收到新I帧,检查是否需要手动重启...", tag); | |
| 412 | + // 这里不直接重启,而是等待下一个SPS自动触发 | |
| 413 | + } | |
| 414 | + } catch (InterruptedException e) { | |
| 415 | + Thread.currentThread().interrupt(); | |
| 416 | + } | |
| 417 | + }, "StreamSwitch-Watcher-" + tag).start(); | |
| 418 | + } | |
| 419 | + | |
| 420 | + /** | |
| 421 | + * 获取当前视频参数(用于监控和调试) | |
| 422 | + */ | |
| 423 | + public VideoParamInfo getCurrentVideoParam() { | |
| 424 | + return currentVideoParam; | |
| 425 | + } | |
| 426 | + | |
| 427 | + /** | |
| 428 | + * 获取FFmpeg是否需要重启 | |
| 429 | + */ | |
| 430 | + public boolean isFFmpegNeedRestart() { | |
| 431 | + return ffmpegNeedRestart.get(); | |
| 432 | + } | |
| 433 | + | |
| 105 | 434 | public void broadcastVideo(long timeoffset, byte[] flvTag) |
| 106 | 435 | { |
| 107 | 436 | for (Subscriber subscriber : subscribers) |
| ... | ... | @@ -134,6 +463,8 @@ public class Channel |
| 134 | 463 | |
| 135 | 464 | public void close() |
| 136 | 465 | { |
| 466 | + logger.info("[{}] 关闭Channel,开始清理资源...", tag); | |
| 467 | + | |
| 137 | 468 | for (Iterator<Subscriber> itr = subscribers.iterator(); itr.hasNext(); ) |
| 138 | 469 | { |
| 139 | 470 | Subscriber subscriber = itr.next(); |
| ... | ... | @@ -141,15 +472,17 @@ public class Channel |
| 141 | 472 | itr.remove(); |
| 142 | 473 | } |
| 143 | 474 | |
| 144 | - // [恢复] 关闭 FFmpeg 进程 | |
| 475 | + // 关闭 FFmpeg 进程 | |
| 145 | 476 | if (rtmpPublisher != null) { |
| 477 | + logger.info("[{}] 关闭FFmpeg推流进程...", tag); | |
| 146 | 478 | rtmpPublisher.close(); |
| 147 | 479 | rtmpPublisher = null; |
| 148 | 480 | } |
| 481 | + | |
| 482 | + logger.info("[{}] Channel已关闭", tag); | |
| 149 | 483 | } |
| 150 | 484 | |
| 151 | 485 | // [恢复] 原版 readNalu (FFmpeg 偏好带 StartCode 的数据,或者 FlvEncoder 需要) |
| 152 | - // 之前为了 RTP 特意修改了切片逻辑,现在改回原版简单逻辑即可 | |
| 153 | 486 | private byte[] readNalu() |
| 154 | 487 | { |
| 155 | 488 | // 寻找 00 00 00 01 | ... | ... |
src/main/java/com/genersoft/iot/vmp/jtt1078/subscriber/RTMPPublisher.java
| ... | ... | @@ -10,6 +10,12 @@ import java.util.concurrent.TimeUnit; |
| 10 | 10 | |
| 11 | 11 | /** |
| 12 | 12 | * FFmpeg 推流器 (仅处理视频直播流) |
| 13 | + * | |
| 14 | + * 修改记录: | |
| 15 | + * 1. 添加详细的启动和关闭日志,便于排查问题 | |
| 16 | + * 2. 优化关闭逻辑,确保FFmpeg进程被正确终止 | |
| 17 | + * 3. 添加FFmpeg输出日志监控 | |
| 18 | + * 4. 兼容Java 8 | |
| 13 | 19 | */ |
| 14 | 20 | public class RTMPPublisher extends Thread |
| 15 | 21 | { |
| ... | ... | @@ -19,6 +25,18 @@ public class RTMPPublisher extends Thread |
| 19 | 25 | Process process = null; |
| 20 | 26 | private volatile boolean running = true; |
| 21 | 27 | |
| 28 | + /** FFmpeg路径 */ | |
| 29 | + private String ffmpegPath = null; | |
| 30 | + | |
| 31 | + /** 目标RTMP地址 */ | |
| 32 | + private String rtmpUrl = null; | |
| 33 | + | |
| 34 | + /** FFmpeg命令格式标志 */ | |
| 35 | + private String formatFlag = null; | |
| 36 | + | |
| 37 | + /** 进程启动时间 */ | |
| 38 | + private long processStartTime = 0; | |
| 39 | + | |
| 22 | 40 | public RTMPPublisher(String tag) |
| 23 | 41 | { |
| 24 | 42 | this.tag = tag; |
| ... | ... | @@ -37,29 +55,45 @@ public class RTMPPublisher extends Thread |
| 37 | 55 | |
| 38 | 56 | try |
| 39 | 57 | { |
| 58 | + // 获取FFmpeg配置 | |
| 40 | 59 | String sign = "41db35390ddad33f83944f44b8b75ded"; |
| 41 | - String rtmpUrl = Configs.get("rtmp.url").replaceAll("\\{TAG\\}", tag).replaceAll("\\{sign\\}",sign); | |
| 60 | + rtmpUrl = Configs.get("rtmp.url").replaceAll("\\{TAG\\}", tag).replaceAll("\\{sign\\}",sign); | |
| 61 | + ffmpegPath = Configs.get("ffmpeg.path"); | |
| 42 | 62 | |
| 43 | - // 【修复】自动判断协议格式,避免硬编码 -f rtsp 导致 RTMP 推流失败 | |
| 44 | - String formatFlag = ""; | |
| 63 | + // 自动判断协议格式 | |
| 64 | + formatFlag = ""; | |
| 45 | 65 | if (rtmpUrl.startsWith("rtsp://")) { |
| 46 | 66 | formatFlag = "-f rtsp"; |
| 47 | 67 | } else if (rtmpUrl.startsWith("rtmp://")) { |
| 48 | 68 | formatFlag = "-f flv"; |
| 49 | 69 | } |
| 50 | 70 | |
| 51 | - // 构造命令:只处理视频流和非对讲的音频流 | |
| 71 | + // 构造FFmpeg命令 | |
| 52 | 72 | String cmd = String.format("%s -i http://127.0.0.1:%d/video/%s -vcodec copy -acodec aac %s %s", |
| 53 | - Configs.get("ffmpeg.path"), | |
| 73 | + ffmpegPath, | |
| 54 | 74 | Configs.getInt("server.http.port", 3333), |
| 55 | 75 | tag, |
| 56 | 76 | formatFlag, |
| 57 | 77 | rtmpUrl |
| 58 | 78 | ); |
| 59 | 79 | |
| 60 | - logger.info("FFmpeg Push Started. Tag: {}, CMD: {}", tag, cmd); | |
| 80 | + logger.info("==========================================="); | |
| 81 | + logger.info("[{}] ====== FFmpeg推流任务启动 ======", tag); | |
| 82 | + logger.info("[{}] FFmpeg路径: {}", tag, ffmpegPath); | |
| 83 | + logger.info("[{}] 目标地址: {}", tag, rtmpUrl); | |
| 84 | + logger.info("[{}] 完整命令: {}", tag, cmd); | |
| 85 | + logger.info("[{}] HTTP端口: {}", tag, Configs.getInt("server.http.port", 3333)); | |
| 86 | + logger.info("[{}] 源流地址: http://127.0.0.1:{}/video/{}", tag, Configs.getInt("server.http.port", 3333), tag); | |
| 87 | + logger.info("[{}] 启动时间: {}", tag, new java.util.Date()); | |
| 88 | + logger.info("==========================================="); | |
| 61 | 89 | |
| 90 | + // 执行FFmpeg命令 | |
| 62 | 91 | process = Runtime.getRuntime().exec(cmd); |
| 92 | + processStartTime = System.currentTimeMillis(); | |
| 93 | + | |
| 94 | + // 记录进程启动信息(Java 8兼容方式) | |
| 95 | + logger.info("[{}] FFmpeg进程已启动", tag); | |
| 96 | + | |
| 63 | 97 | stderr = process.getErrorStream(); |
| 64 | 98 | stdout = process.getInputStream(); |
| 65 | 99 | |
| ... | ... | @@ -68,9 +102,15 @@ public class RTMPPublisher extends Thread |
| 68 | 102 | Thread stdoutConsumer = new Thread(() -> { |
| 69 | 103 | try { |
| 70 | 104 | byte[] buffer = new byte[512]; |
| 105 | + int count = 0; | |
| 71 | 106 | while (running && finalStdout.read(buffer) > -1) { |
| 72 | - // 只消费,不输出 | |
| 107 | + count++; | |
| 108 | + // 每1000次读取才打印一次(避免刷屏) | |
| 109 | + if (debugMode && count % 1000 == 0) { | |
| 110 | + logger.debug("[{}] FFmpeg stdout消费中... count: {}", tag, count); | |
| 111 | + } | |
| 73 | 112 | } |
| 113 | + logger.info("[{}] FFmpeg stdout消费结束, 共消费{}次", tag, count); | |
| 74 | 114 | } catch (Exception e) { |
| 75 | 115 | // 忽略异常 |
| 76 | 116 | } |
| ... | ... | @@ -78,26 +118,60 @@ public class RTMPPublisher extends Thread |
| 78 | 118 | stdoutConsumer.setDaemon(true); |
| 79 | 119 | stdoutConsumer.start(); |
| 80 | 120 | |
| 81 | - // 消费 stderr 日志流,防止阻塞 | |
| 121 | + // 消费 stderr 日志流 | |
| 122 | + StringBuilder errorLog = new StringBuilder(); | |
| 123 | + int errorCount = 0; | |
| 82 | 124 | while (running && (len = stderr.read(buff)) > -1) |
| 83 | 125 | { |
| 84 | 126 | if (debugMode) { |
| 85 | 127 | System.out.print(new String(buff, 0, len)); |
| 86 | 128 | } |
| 129 | + | |
| 130 | + // 收集错误日志(便于排查问题) | |
| 131 | + errorLog.append(new String(buff, 0, len)); | |
| 132 | + errorCount++; | |
| 133 | + | |
| 134 | + // 每100条错误日志打印一次摘要 | |
| 135 | + if (errorCount % 100 == 0) { | |
| 136 | + String lastError = errorLog.length() > 500 | |
| 137 | + ? errorLog.substring(errorLog.length() - 500) | |
| 138 | + : errorLog.toString(); | |
| 139 | + logger.debug("[{}] FFmpeg错误日志摘要: {}", tag, lastError); | |
| 140 | + } | |
| 87 | 141 | } |
| 88 | 142 | |
| 89 | 143 | // 进程退出处理 |
| 90 | 144 | int exitCode = process.waitFor(); |
| 91 | - logger.warn("FFmpeg process exited. Code: {}, Tag: {}", exitCode, tag); | |
| 145 | + long runDuration = System.currentTimeMillis() - processStartTime; | |
| 146 | + logger.warn("==========================================="); | |
| 147 | + logger.warn("[{}] ====== FFmpeg推流任务结束 ======", tag); | |
| 148 | + logger.warn("[{}] 退出代码: {}", tag, exitCode); | |
| 149 | + logger.warn("[{}] 运行时间: {} ms", tag, runDuration); | |
| 150 | + logger.warn("[{}] 错误日志条数: {}", tag, errorCount); | |
| 151 | + | |
| 152 | + // 分析退出原因 | |
| 153 | + if (exitCode == 0) { | |
| 154 | + logger.info("[{}] FFmpeg正常退出", tag); | |
| 155 | + } else if (exitCode == -1 || exitCode == 255) { | |
| 156 | + logger.warn("[{}] FFmpeg被信号终止 (exitCode={})", tag, exitCode); | |
| 157 | + } else { | |
| 158 | + // 保存最后一段错误日志 | |
| 159 | + String lastError = errorLog.length() > 1000 | |
| 160 | + ? errorLog.substring(errorLog.length() - 1000) | |
| 161 | + : errorLog.toString(); | |
| 162 | + logger.error("[{}] FFmpeg异常退出, 最后错误日志:\n{}", tag, lastError); | |
| 163 | + } | |
| 164 | + logger.warn("==========================================="); | |
| 165 | + | |
| 92 | 166 | } |
| 93 | 167 | catch(InterruptedException ex) |
| 94 | 168 | { |
| 95 | - logger.info("RTMPPublisher interrupted: {}", tag); | |
| 169 | + logger.info("[{}] RTMPPublisher被中断: {}", tag, ex.getMessage()); | |
| 96 | 170 | Thread.currentThread().interrupt(); |
| 97 | 171 | } |
| 98 | 172 | catch(Exception ex) |
| 99 | 173 | { |
| 100 | - logger.error("RTMPPublisher Error: " + tag, ex); | |
| 174 | + logger.error("[{}] RTMPPublisher异常: {}", tag, ex); | |
| 101 | 175 | } |
| 102 | 176 | finally |
| 103 | 177 | { |
| ... | ... | @@ -109,40 +183,89 @@ public class RTMPPublisher extends Thread |
| 109 | 183 | closeQuietly(process.getOutputStream()); |
| 110 | 184 | closeQuietly(process.getErrorStream()); |
| 111 | 185 | } |
| 186 | + logger.info("[{}] RTMPPublisher资源已释放", tag); | |
| 112 | 187 | } |
| 113 | 188 | } |
| 114 | 189 | |
| 190 | + /** | |
| 191 | + * 关闭FFmpeg推流 | |
| 192 | + * 优化关闭逻辑,确保进程被正确终止(Java 8兼容) | |
| 193 | + */ | |
| 115 | 194 | public void close() |
| 116 | 195 | { |
| 196 | + logger.info("[{}] ====== 开始关闭FFmpeg推流 ======", tag); | |
| 197 | + logger.info("[{}] 关闭请求时间: {}", tag, new java.util.Date()); | |
| 198 | + | |
| 117 | 199 | try { |
| 118 | 200 | // 设置停止标志 |
| 119 | 201 | running = false; |
| 120 | 202 | |
| 121 | 203 | if (process != null) { |
| 204 | + long runDuration = processStartTime > 0 ? System.currentTimeMillis() - processStartTime : 0; | |
| 205 | + logger.info("[{}] 正在终止FFmpeg进程... (已运行{}ms)", tag, runDuration); | |
| 206 | + | |
| 122 | 207 | // 先尝试正常终止 |
| 123 | 208 | process.destroy(); |
| 124 | 209 | |
| 125 | - // 等待最多 3 秒 | |
| 210 | + // 等待最多3秒 | |
| 126 | 211 | boolean exited = process.waitFor(3, TimeUnit.SECONDS); |
| 127 | 212 | |
| 128 | 213 | if (!exited) { |
| 129 | - // 如果还没退出,强制终止 | |
| 130 | - logger.warn("FFmpeg process did not exit gracefully, forcing termination: {}", tag); | |
| 214 | + logger.warn("[{}] FFmpeg进程未能在3秒内正常退出,开始强制终止...", tag); | |
| 215 | + | |
| 216 | + // 强制终止(Java 8的方式) | |
| 131 | 217 | process.destroyForcibly(); |
| 132 | - process.waitFor(2, TimeUnit.SECONDS); | |
| 218 | + | |
| 219 | + // 再等待2秒 | |
| 220 | + try { | |
| 221 | + exited = process.waitFor(2, TimeUnit.SECONDS); | |
| 222 | + } catch (InterruptedException e) { | |
| 223 | + Thread.currentThread().interrupt(); | |
| 224 | + } | |
| 225 | + if (!exited) { | |
| 226 | + logger.error("[{}] FFmpeg进程强制终止失败,可能存在资源泄漏", tag); | |
| 227 | + } else { | |
| 228 | + logger.info("[{}] FFmpeg进程已强制终止", tag); | |
| 229 | + } | |
| 230 | + } else { | |
| 231 | + int exitCode = process.exitValue(); | |
| 232 | + logger.info("[{}] FFmpeg进程已正常终止, 退出代码: {}", tag, exitCode); | |
| 133 | 233 | } |
| 134 | 234 | |
| 135 | - logger.info("FFmpeg process terminated: {}", tag); | |
| 235 | + // 检查是否需要杀掉残留进程 | |
| 236 | + checkAndKillOrphanedProcesses(); | |
| 237 | + | |
| 238 | + } else { | |
| 239 | + logger.info("[{}] FFmpeg进程为空,无需关闭", tag); | |
| 136 | 240 | } |
| 137 | 241 | |
| 242 | + logger.info("[{}] ====== FFmpeg推流已关闭 ======", tag); | |
| 243 | + | |
| 138 | 244 | // 中断线程(如果还在阻塞读取) |
| 139 | 245 | this.interrupt(); |
| 140 | 246 | |
| 141 | 247 | // 等待线程结束 |
| 142 | 248 | this.join(2000); |
| 249 | + if (this.isAlive()) { | |
| 250 | + logger.warn("[{}] RTMPPublisher线程未能正常结束", tag); | |
| 251 | + } | |
| 143 | 252 | |
| 144 | 253 | } catch(Exception e) { |
| 145 | - logger.error("Error closing RTMPPublisher: " + tag, e); | |
| 254 | + logger.error("[{}] 关闭RTMPPublisher时出错: {}", tag, e); | |
| 255 | + } | |
| 256 | + } | |
| 257 | + | |
| 258 | + /** | |
| 259 | + * 检查并杀掉可能残留的FFmpeg进程 | |
| 260 | + * 某些情况下FFmpeg进程可能没有被正确回收 | |
| 261 | + */ | |
| 262 | + private void checkAndKillOrphanedProcesses() { | |
| 263 | + try { | |
| 264 | + // 根据FFmpeg命令特征查找残留进程 | |
| 265 | + // 注意:这里只是记录日志,实际杀进程需要谨慎 | |
| 266 | + logger.debug("[{}] 检查是否有FFmpeg残留进程...", tag); | |
| 267 | + } catch (Exception e) { | |
| 268 | + logger.debug("[{}] 检查残留进程时出错: {}", tag, e.getMessage()); | |
| 146 | 269 | } |
| 147 | 270 | } |
| 148 | 271 | |
| ... | ... | @@ -158,4 +281,27 @@ public class RTMPPublisher extends Thread |
| 158 | 281 | } |
| 159 | 282 | } |
| 160 | 283 | } |
| 284 | + | |
| 285 | + /** | |
| 286 | + * 获取推流状态 | |
| 287 | + */ | |
| 288 | + public boolean isRunning() { | |
| 289 | + return running && this.isAlive() && process != null; | |
| 290 | + } | |
| 291 | + | |
| 292 | + /** | |
| 293 | + * 获取FFmpeg进程信息(用于调试) | |
| 294 | + */ | |
| 295 | + public String getProcessInfo() { | |
| 296 | + if (process == null) { | |
| 297 | + return "process is null"; | |
| 298 | + } | |
| 299 | + try { | |
| 300 | + long runDuration = processStartTime > 0 ? System.currentTimeMillis() - processStartTime : 0; | |
| 301 | + return String.format("FFmpeg进程, 已运行%dms, alive=%s", | |
| 302 | + runDuration, process.isAlive()); | |
| 303 | + } catch (Exception e) { | |
| 304 | + return "获取进程信息失败: " + e.getMessage(); | |
| 305 | + } | |
| 306 | + } | |
| 161 | 307 | } | ... | ... |
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
| ... | ... | @@ -107,7 +107,6 @@ public class ZLMRESTfulUtils { |
| 107 | 107 | responseJSON = JSON.parseObject(responseStr); |
| 108 | 108 | } |
| 109 | 109 | }else { |
| 110 | - System.out.println( 2222); | |
| 111 | 110 | System.out.println( response.code()); |
| 112 | 111 | response.close(); |
| 113 | 112 | Objects.requireNonNull(response.body()).close(); | ... | ... |
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
| ... | ... | @@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.conf.UserSetting; |
| 10 | 10 | import com.genersoft.iot.vmp.gb28181.bean.*; |
| 11 | 11 | import com.genersoft.iot.vmp.gb28181.event.EventPublisher; |
| 12 | 12 | import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; |
| 13 | +import com.genersoft.iot.vmp.jtt1078.util.Configs; | |
| 13 | 14 | import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; |
| 14 | 15 | import com.genersoft.iot.vmp.media.zlm.dto.*; |
| 15 | 16 | import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; |
| ... | ... | @@ -23,8 +24,10 @@ import com.genersoft.iot.vmp.storager.mapper.*; |
| 23 | 24 | import com.genersoft.iot.vmp.utils.DateUtil; |
| 24 | 25 | import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; |
| 25 | 26 | import com.genersoft.iot.vmp.vmanager.jt1078.platform.config.Jt1078ConfigBean; |
| 27 | +import com.genersoft.iot.vmp.vmanager.jt1078.platform.config.RtspConfigBean; | |
| 26 | 28 | import com.genersoft.iot.vmp.vmanager.jt1078.platform.config.ThirdPartyHttpService; |
| 27 | 29 | import com.genersoft.iot.vmp.vmanager.jt1078.platform.domain.StreamSwitch; |
| 30 | +import com.genersoft.iot.vmp.vmanager.jt1078.platform.domain.T9101; | |
| 28 | 31 | import com.genersoft.iot.vmp.vmanager.jt1078.platform.domain.T9102; |
| 29 | 32 | import com.genersoft.iot.vmp.vmanager.util.RedisCache; |
| 30 | 33 | import com.github.pagehelper.PageHelper; |
| ... | ... | @@ -55,6 +58,9 @@ public class StreamPushServiceImpl implements IStreamPushService { |
| 55 | 58 | private GbStreamMapper gbStreamMapper; |
| 56 | 59 | |
| 57 | 60 | @Autowired |
| 61 | + private RtspConfigBean rtspConfigBean; | |
| 62 | + | |
| 63 | + @Autowired | |
| 58 | 64 | private StreamPushMapper streamPushMapper; |
| 59 | 65 | |
| 60 | 66 | @Autowired | ... | ... |
src/main/java/com/genersoft/iot/vmp/vmanager/jt1078/platform/config/Jt1078ConfigBean.java
| 1 | -package com.genersoft.iot.vmp.vmanager.jt1078.platform.config; import com.genersoft.iot.vmp.vmanager.jt1078.platform.Jt1078OfCarController; import lombok.Data; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.*; @Data @Component public class Jt1078ConfigBean { @Value("${tuohua.bsth.jt1078.url}") private String jt1078Url; @Value("${tuohua.bsth.jt1078.sendPort}") private String jt1078SendPort; @Value("${tuohua.bsth.jt1078.stopSendPort}") private String stopSendPort; @Value("${tuohua.bsth.jt1078.historyListPort}") private String historyListPort; @Value("${tuohua.bsth.jt1078.history_upload}") private String historyUpload = "9206"; @Value("${tuohua.bsth.jt1078.playHistoryPort}") private String playHistoryPort; @Value("${tuohua.bsth.jt1078.ports}") private String portsOf1078; @Value("${tuohua.bsth.jt1078.pushURL}") private String pushURL; @Value("${tuohua.bsth.jt1078.stopPushURL}") private String stopPUshURL; private Integer start1078Port; private Integer end1078Port; @Value("${tuohua.bsth.jt1078.get.url}") private String getURL; @Value("${tuohua.bsth.jt1078.addPortVal}") private Integer addPort; @Value("${tuohua.bsth.jt1078.ws}") private String ws; @Value("${tuohua.bsth.jt1078.ws-prefix}") private String wsPrefix; @Value("${tuohua.bsth.jt1078.wss}") private String wss; @Value("${tuohua.bsth.jt1078.downloadFLV}") private String downloadFlv; @Value("${tuohua.bsth.jt1078.port}") private Integer port; @Value("${tuohua.bsth.jt1078.httpPort}") private Integer httpPort; @Value("${spring.profiles.active}") private String profilesActive; @Value("${media.pushKey}") private String pushKey; @Resource private RedisTemplate<String, Integer> redisTemplate; @Resource private FtpConfigBean ftpConfigBean; public Integer getPort() { if (port == null) { return 40000; } return port; } public Integer getHttpPort() { if (httpPort == null) { return 40000; } return httpPort; } private Integer getIntPort() { //return profilesActive.equals("wx-local") ? 10000 : 0; return 0; } @PostConstruct public void initMap() { Set<String> historyPortKeys = redisTemplate.keys("history:port:*"); Set<String> keys = redisTemplate.keys("tag:*"); Set<String> patrolKeys = redisTemplate.keys("patrol:stream:*"); Set<String> historyListKeys = redisTemplate.keys("history-list:*"); if (!historyPortKeys.isEmpty()) { keys.addAll(historyPortKeys); } if (!patrolKeys.isEmpty()) { keys.addAll(patrolKeys); } if (!historyListKeys.isEmpty()) { keys.addAll(historyListKeys); } if (keys != null) { redisTemplate.delete(keys); } Map<Integer, Set<String>> hashMap = new HashMap<>(); for (int number = getStart1078Port(); number <= getEnd1078Port(); number++) { hashMap.put(number, new HashSet<>()); } Jt1078OfCarController.map.putAll(hashMap); } private static final String SEND_IO_MESSAGE_RTSP = "{ \"messageId\": 37121, \"properties\": 0, \"clientId\": \"{clientId}\", \"serialNo\": \"1\", \"ip\": \"{ip}\", \"tcpPort\": \"{tcpPort}\", \"udpPort\": \"{udpPort}\", \"channelNo\": \"{channelNo}\", \"mediaType\": \"1\", \"streamType\": \"1\"}"; private static final String SEND_IO_MESSAGE_RTSP_STOP = "{\"messageId\": 37122,\"properties\": 0,\"clientId\": \"{clientId}\",\"serialNo\": \"1\",\"channelNo\": \"{channelNo}\",\"command\": \"0\",\"closeType\": \"0\",\"streamType\": \"1\"}"; private static final String SEND_IO_HISTORY_RTSP = "{\"msgid\":37381,\"clientId\":\"{clientId}\",\"startTime\":\"{startTime}\",\"endTime\":\"{endTime}\",\"mediaType\":0,\"streamType\":0,\"storageType\":0,\"channelId\":{channelNo}}"; private static final String SEND_IO_PLAY_RTSP = "{\"ip\":\"{ip}\",\"tcpPort\":{tcpPort},\"udpPort\":{udpPort},\"channelNo\":\"{channelNo}\",\"mediaType\":0,\"streamType\":0,\"storageType\":0,\"playbackType\":0,\"playbackSpeed\":1,\"startTime\":\"{startTime}\",\"endTime\":\"{endTime}\",\"clientId\":\"{sim}\",\"messageId\":37377}"; public String formatMessageId(String sim, String channel, RtspConfigBean configBean, Integer port) { String msg = StringUtils.replace("{ \"messageId\": 37121, \"properties\": 0, \"clientId\": \"{clientId}\", \"serialNo\": \"1\", \"ip\": \"{ip}\", \"tcpPort\": \"{tcpPort}\", \"udpPort\": \"{udpPort}\", \"channelNo\": \"{channelNo}\", \"mediaType\": \"0\", \"streamType\": \"0\"}", "{clientId}", sim); msg = StringUtils.replace(msg, "{tcpPort}", (port + getIntPort() + getAddPort()) + ""); msg = StringUtils.replace(msg, "{udpPort}", (port + getIntPort() + getAddPort()) + ""); msg = StringUtils.replace(msg, "{channelNo}", channel); return StringUtils.replace(msg, "{ip}", configBean.getRtspIp()); } public String formatMessageStop(String sim, String channel) { String msg = StringUtils.replace("{\"messageId\": 37122,\"properties\": 0,\"clientId\": \"{clientId}\",\"serialNo\": \"1\",\"channelNo\": \"{channelNo}\",\"command\": \"0\",\"closeType\": \"0\",\"streamType\": \"1\"}", "{clientId}", sim); return StringUtils.replace(msg, "{channelNo}", channel); } public String formatMessageHistoryListRTSP(String sim, String channel, String startTime, String endTime) { String msg = StringUtils.replace("{\"msgid\":37381,\"clientId\":\"{clientId}\",\"startTime\":\"{startTime}\",\"endTime\":\"{endTime}\",\"mediaType\":0,\"streamType\":1,\"storageType\":0,\"channelNo\":{channelNo}}", "{clientId}", sim); msg = StringUtils.replace(msg, "{startTime}", startTime); msg = StringUtils.replace(msg, "{endTime}", endTime); return StringUtils.replace(msg, "{channelNo}", channel); } public String formatMessageHistoryPlayRTSP(String sim, String channel, String startTime, String endTime, RtspConfigBean configBean, Integer port) { String msg = StringUtils.replace("{\"ip\":\"{ip}\",\"tcpPort\":{tcpPort},\"udpPort\":{udpPort},\"channelNo\":\"{channelNo}\",\"mediaType\":0,\"streamType\":0,\"storageType\":0,\"playbackType\":0,\"playbackSpeed\":1,\"startTime\":\"{startTime}\",\"endTime\":\"{endTime}\",\"clientId\":\"{sim}\",\"messageId\":37377}", "{clientId}", sim); msg = StringUtils.replace(msg, "{startTime}", startTime); msg = StringUtils.replace(msg, "{endTime}", endTime); msg = StringUtils.replace(msg, "{channelNo}", channel); msg = StringUtils.replace(msg, "{tcpPort}", (port.intValue() + getIntPort() +getAddPort()) + ""); msg = StringUtils.replace(msg, "{udpPort}", (port.intValue() + getIntPort() + getAddPort()) + ""); msg = StringUtils.replace(msg, "{sim}", sim); return StringUtils.replace(msg, "{ip}", configBean.getRtspIp()); } public String formatMessageHistoryUpload(String stream) { if (StringUtils.isBlank(stream)) { throw new RuntimeException("上传参数不能为空"); } String[] split = stream.split("_"); if (split.length < 4){ throw new RuntimeException("上传参数异常, 请联系管理员"); } String sim = split[0]; String channel = split[1]; String startTime = split[2]; String endTime = split[3]; String msg = StringUtils.replace("{\n" + " \"clientId\": \"{clientId}\",\n" + " \"ip\": \"{ip}\",\n" + " \"port\": {port},\n" + " \"username\": \"{username}\",\n" + " \"password\": \"{password}\",\n" + " \"path\": \"{path}\",\n" + " \"channelNo\": {channel},\n" + " \"startTime\": \"{startTime}\",\n" + " \"endTime\": \"{endTime}\",\n" + " \"mediaType\": 0,\n" + " \"streamType\": 1,\n" + " \"storageType\": 1,\n" + " \"condition\": 6\n" + "}","{clientId}",sim); msg = StringUtils.replace(msg, "{ip}", ftpConfigBean.getHost()); msg = StringUtils.replace(msg, "{port}", ftpConfigBean.getPort().toString()); msg = StringUtils.replace(msg, "{username}", ftpConfigBean.getUsername()); msg = StringUtils.replace(msg, "{password}", ftpConfigBean.getPassword()); msg = StringUtils.replace(msg, "{path}", StringUtils.join(ftpConfigBean.getBasePath(),"/",sim,"/channel_",channel,"/",stream)); msg = StringUtils.replace(msg, "{channel}", channel); msg = StringUtils.replace(msg, "{startTime}", Jt1078OfCarController.timeCover(startTime)); return StringUtils.replace(msg, "{endTime}", Jt1078OfCarController.timeCover(endTime)); } public String formatMessageHistoryStopRTSP(String sim, String channel, RtspConfigBean configBean) { String msg = StringUtils.replace("{\"playbackMode\":2,\"channelNo\":{channelNo},\"playbackSpeed\":0,\"clientId\":\"{sim}\"}", "{sim}", sim); return StringUtils.replace(msg, "{channelNo}", channel); } public String formatPushURL(String pushKey, int port, int httpPort) { String msg = StringUtils.replace(this.pushURL, "{pushKey}", pushKey); msg = StringUtils.replace(msg, "{port}", String.valueOf(port)); return StringUtils.replace(msg, "{httpPort}", String.valueOf(httpPort)); } public String formatStopPushURL(String pushKey, int port, int httpPort) { String msg = StringUtils.replace(this.stopPUshURL, "{pushKey}", pushKey); msg = StringUtils.replace(msg, "{port}", String.valueOf(port)); return StringUtils.replace(msg, "{httpPort}", String.valueOf(httpPort)); } public String formatVideoURL(String stream) { String url = StringUtils.replace(getGetURL(), "{stream}", stream); if (!StringUtils.endsWith(url, ".flv")) { url = url + ".flv"; } return url; } public String getJt1078Url() { return this.jt1078Url; } public String getJt1078SendPort() { return this.jt1078SendPort; } public String getStopSendPort() { return this.stopSendPort; } public String getHistoryListPort() { return this.historyListPort; } public String getPlayHistoryPort() { return this.playHistoryPort; } public String getPushURL() { return this.pushURL; } public Integer getStart1078Port() { if (Objects.isNull(this.start1078Port)) this.start1078Port = Integer.valueOf(Integer.parseInt(StringUtils.substringBefore(this.portsOf1078, ","))); return this.start1078Port; } public Integer getEnd1078Port() { if (Objects.isNull(this.end1078Port)) this.end1078Port = Integer.valueOf(Integer.parseInt(StringUtils.substringAfter(this.portsOf1078, ","))); return this.end1078Port; } public String getPushKey(){ if (Objects.isNull(this.pushKey)){ this.pushKey = "?callId=41db35390ddad33f83944f44b8b75ded"; } return "?callId="+this.pushKey; } public String getStopPUshURL() { return this.stopPUshURL; } public String getGetURL() { return this.getURL; } public Integer getAddPort() { return this.addPort; } public String getWs() { return this.ws; } public String getWss() { return this.wss; } public String getDownloadFlv() { return downloadFlv; } public String getPortsOf1078() { return portsOf1078; } } | |
| 2 | 1 | \ No newline at end of file |
| 2 | +package com.genersoft.iot.vmp.vmanager.jt1078.platform.config; import com.genersoft.iot.vmp.vmanager.jt1078.platform.Jt1078OfCarController; import lombok.Data; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.*; @Data @Component public class Jt1078ConfigBean { @Value("${tuohua.bsth.jt1078.url}") private String jt1078Url; @Value("${tuohua.bsth.jt1078.sendPort}") private String jt1078SendPort; @Value("${tuohua.bsth.jt1078.stopSendPort}") private String stopSendPort; @Value("${tuohua.bsth.jt1078.historyListPort}") private String historyListPort; @Value("${tuohua.bsth.jt1078.history_upload}") private String historyUpload = "9206"; @Value("${tuohua.bsth.jt1078.playHistoryPort}") private String playHistoryPort; @Value("${tuohua.bsth.jt1078.ports}") private String portsOf1078; @Value("${tuohua.bsth.jt1078.pushURL}") private String pushURL; @Value("${tuohua.bsth.jt1078.stopPushURL}") private String stopPUshURL; private Integer start1078Port; private Integer end1078Port; @Value("${tuohua.bsth.jt1078.get.url}") private String getURL; @Value("${tuohua.bsth.jt1078.addPortVal}") private Integer addPort; @Value("${tuohua.bsth.jt1078.ws}") private String ws; @Value("${tuohua.bsth.jt1078.ws-prefix}") private String wsPrefix; @Value("${tuohua.bsth.jt1078.wss}") private String wss; @Value("${tuohua.bsth.jt1078.downloadFLV}") private String downloadFlv; @Value("${tuohua.bsth.jt1078.port}") private Integer port; @Value("${tuohua.bsth.jt1078.httpPort}") private Integer httpPort; @Value("${spring.profiles.active}") private String profilesActive; @Value("${media.pushKey}") private String pushKey; @Resource private RedisTemplate<String, Integer> redisTemplate; @Resource private FtpConfigBean ftpConfigBean; public Integer getPort() { if (port == null) { return 40000; } return port; } public Integer getHttpPort() { if (httpPort == null) { return 40000; } return httpPort; } private Integer getIntPort() { //return profilesActive.equals("wx-local") ? 10000 : 0; return 0; } @PostConstruct public void initMap() { Set<String> historyPortKeys = redisTemplate.keys("history:port:*"); Set<String> keys = redisTemplate.keys("tag:*"); Set<String> patrolKeys = redisTemplate.keys("patrol:stream:*"); Set<String> historyListKeys = redisTemplate.keys("history-list:*"); if (!historyPortKeys.isEmpty()) { keys.addAll(historyPortKeys); } if (!patrolKeys.isEmpty()) { keys.addAll(patrolKeys); } if (!historyListKeys.isEmpty()) { keys.addAll(historyListKeys); } if (keys != null) { redisTemplate.delete(keys); } Map<Integer, Set<String>> hashMap = new HashMap<>(); for (int number = getStart1078Port(); number <= getEnd1078Port(); number++) { hashMap.put(number, new HashSet<>()); } Jt1078OfCarController.map.putAll(hashMap); } private static final String SEND_IO_MESSAGE_RTSP = "{ \"messageId\": 37121, \"properties\": 0, \"clientId\": \"{clientId}\", \"serialNo\": \"1\", \"ip\": \"{ip}\", \"tcpPort\": \"{tcpPort}\", \"udpPort\": \"{udpPort}\", \"channelNo\": \"{channelNo}\", \"mediaType\": \"1\", \"streamType\": \"1\"}"; private static final String SEND_IO_MESSAGE_RTSP_STOP = "{\"messageId\": 37122,\"properties\": 0,\"clientId\": \"{clientId}\",\"serialNo\": \"1\",\"channelNo\": \"{channelNo}\",\"command\": \"0\",\"closeType\": \"0\",\"streamType\": \"1\"}"; private static final String SEND_IO_HISTORY_RTSP = "{\"msgid\":37381,\"clientId\":\"{clientId}\",\"startTime\":\"{startTime}\",\"endTime\":\"{endTime}\",\"mediaType\":0,\"streamType\":0,\"storageType\":0,\"channelId\":{channelNo}}"; private static final String SEND_IO_PLAY_RTSP = "{\"ip\":\"{ip}\",\"tcpPort\":{tcpPort},\"udpPort\":{udpPort},\"channelNo\":\"{channelNo}\",\"mediaType\":0,\"streamType\":0,\"storageType\":0,\"playbackType\":0,\"playbackSpeed\":1,\"startTime\":\"{startTime}\",\"endTime\":\"{endTime}\",\"clientId\":\"{sim}\",\"messageId\":37377}"; public String formatMessageId(String sim, String channel, RtspConfigBean configBean, Integer port) { String msg = StringUtils.replace("{ \"messageId\": 37121, \"properties\": 0, \"clientId\": \"{clientId}\", \"serialNo\": \"1\", \"ip\": \"{ip}\", \"tcpPort\": \"{tcpPort}\", \"udpPort\": \"{udpPort}\", \"channelNo\": \"{channelNo}\", \"mediaType\": \"0\", \"streamType\": \"1\"}", "{clientId}", sim); msg = StringUtils.replace(msg, "{tcpPort}", (port + getIntPort() + getAddPort()) + ""); msg = StringUtils.replace(msg, "{udpPort}", (port + getIntPort() + getAddPort()) + ""); msg = StringUtils.replace(msg, "{channelNo}", channel); return StringUtils.replace(msg, "{ip}", configBean.getRtspIp()); } public String formatMessageStop(String sim, String channel) { String msg = StringUtils.replace("{\"messageId\": 37122,\"properties\": 0,\"clientId\": \"{clientId}\",\"serialNo\": \"1\",\"channelNo\": \"{channelNo}\",\"command\": \"0\",\"closeType\": \"0\",\"streamType\": \"1\"}", "{clientId}", sim); return StringUtils.replace(msg, "{channelNo}", channel); } public String formatMessageHistoryListRTSP(String sim, String channel, String startTime, String endTime) { String msg = StringUtils.replace("{\"msgid\":37381,\"clientId\":\"{clientId}\",\"startTime\":\"{startTime}\",\"endTime\":\"{endTime}\",\"mediaType\":0,\"streamType\":1,\"storageType\":0,\"channelNo\":{channelNo}}", "{clientId}", sim); msg = StringUtils.replace(msg, "{startTime}", startTime); msg = StringUtils.replace(msg, "{endTime}", endTime); return StringUtils.replace(msg, "{channelNo}", channel); } public String formatMessageHistoryPlayRTSP(String sim, String channel, String startTime, String endTime, RtspConfigBean configBean, Integer port) { String msg = StringUtils.replace("{\"ip\":\"{ip}\",\"tcpPort\":{tcpPort},\"udpPort\":{udpPort},\"channelNo\":\"{channelNo}\",\"mediaType\":0,\"streamType\":0,\"storageType\":0,\"playbackType\":0,\"playbackSpeed\":1,\"startTime\":\"{startTime}\",\"endTime\":\"{endTime}\",\"clientId\":\"{sim}\",\"messageId\":37377}", "{clientId}", sim); msg = StringUtils.replace(msg, "{startTime}", startTime); msg = StringUtils.replace(msg, "{endTime}", endTime); msg = StringUtils.replace(msg, "{channelNo}", channel); msg = StringUtils.replace(msg, "{tcpPort}", (port.intValue() + getIntPort() +getAddPort()) + ""); msg = StringUtils.replace(msg, "{udpPort}", (port.intValue() + getIntPort() + getAddPort()) + ""); msg = StringUtils.replace(msg, "{sim}", sim); return StringUtils.replace(msg, "{ip}", configBean.getRtspIp()); } public String formatMessageHistoryUpload(String stream) { if (StringUtils.isBlank(stream)) { throw new RuntimeException("上传参数不能为空"); } String[] split = stream.split("_"); if (split.length < 4){ throw new RuntimeException("上传参数异常, 请联系管理员"); } String sim = split[0]; String channel = split[1]; String startTime = split[2]; String endTime = split[3]; String msg = StringUtils.replace("{\n" + " \"clientId\": \"{clientId}\",\n" + " \"ip\": \"{ip}\",\n" + " \"port\": {port},\n" + " \"username\": \"{username}\",\n" + " \"password\": \"{password}\",\n" + " \"path\": \"{path}\",\n" + " \"channelNo\": {channel},\n" + " \"startTime\": \"{startTime}\",\n" + " \"endTime\": \"{endTime}\",\n" + " \"mediaType\": 0,\n" + " \"streamType\": 1,\n" + " \"storageType\": 1,\n" + " \"condition\": 6\n" + "}","{clientId}",sim); msg = StringUtils.replace(msg, "{ip}", ftpConfigBean.getHost()); msg = StringUtils.replace(msg, "{port}", ftpConfigBean.getPort().toString()); msg = StringUtils.replace(msg, "{username}", ftpConfigBean.getUsername()); msg = StringUtils.replace(msg, "{password}", ftpConfigBean.getPassword()); msg = StringUtils.replace(msg, "{path}", StringUtils.join(ftpConfigBean.getBasePath(),"/",sim,"/channel_",channel,"/",stream)); msg = StringUtils.replace(msg, "{channel}", channel); msg = StringUtils.replace(msg, "{startTime}", Jt1078OfCarController.timeCover(startTime)); return StringUtils.replace(msg, "{endTime}", Jt1078OfCarController.timeCover(endTime)); } public String formatMessageHistoryStopRTSP(String sim, String channel, RtspConfigBean configBean) { String msg = StringUtils.replace("{\"playbackMode\":2,\"channelNo\":{channelNo},\"playbackSpeed\":0,\"clientId\":\"{sim}\"}", "{sim}", sim); return StringUtils.replace(msg, "{channelNo}", channel); } public String formatPushURL(String pushKey, int port, int httpPort) { String msg = StringUtils.replace(this.pushURL, "{pushKey}", pushKey); msg = StringUtils.replace(msg, "{port}", String.valueOf(port)); return StringUtils.replace(msg, "{httpPort}", String.valueOf(httpPort)); } public String formatStopPushURL(String pushKey, int port, int httpPort) { String msg = StringUtils.replace(this.stopPUshURL, "{pushKey}", pushKey); msg = StringUtils.replace(msg, "{port}", String.valueOf(port)); return StringUtils.replace(msg, "{httpPort}", String.valueOf(httpPort)); } public String formatVideoURL(String stream) { String url = StringUtils.replace(getGetURL(), "{stream}", stream); if (!StringUtils.endsWith(url, ".flv")) { url = url + ".flv"; } return url; } public String getJt1078Url() { return this.jt1078Url; } public String getJt1078SendPort() { return this.jt1078SendPort; } public String getStopSendPort() { return this.stopSendPort; } public String getHistoryListPort() { return this.historyListPort; } public String getPlayHistoryPort() { return this.playHistoryPort; } public String getPushURL() { return this.pushURL; } public Integer getStart1078Port() { if (Objects.isNull(this.start1078Port)) this.start1078Port = Integer.valueOf(Integer.parseInt(StringUtils.substringBefore(this.portsOf1078, ","))); return this.start1078Port; } public Integer getEnd1078Port() { if (Objects.isNull(this.end1078Port)) this.end1078Port = Integer.valueOf(Integer.parseInt(StringUtils.substringAfter(this.portsOf1078, ","))); return this.end1078Port; } public String getPushKey(){ if (Objects.isNull(this.pushKey)){ this.pushKey = "?callId=41db35390ddad33f83944f44b8b75ded"; } return "?callId="+this.pushKey; } public String getStopPUshURL() { return this.stopPUshURL; } public String getGetURL() { return this.getURL; } public Integer getAddPort() { return this.addPort; } public String getWs() { return this.ws; } public String getWss() { return this.wss; } public String getDownloadFlv() { return downloadFlv; } public String getPortsOf1078() { return portsOf1078; } } | |
| 3 | 3 | \ No newline at end of file | ... | ... |
src/main/resources/application-wx-local.yml
| 1 | 1 | my: |
| 2 | - ip: 127.0.0.1 | |
| 2 | + ip: 192.168.168.21 | |
| 3 | 3 | spring: |
| 4 | 4 | rabbitmq: |
| 5 | 5 | host: 10.10.2.21 |
| ... | ... | @@ -28,7 +28,7 @@ spring: |
| 28 | 28 | # [必须修改] 端口号 |
| 29 | 29 | port: 6380 |
| 30 | 30 | # [可选] 数据库 DB |
| 31 | - database: 9 | |
| 31 | + database: 15 | |
| 32 | 32 | # [可选] 访问密码,若你的redis服务器没有设置密码,就不需要用密码去连接 |
| 33 | 33 | # password: guzijian |
| 34 | 34 | # [可选] 超时时间 | ... | ... |
web_src/src/components/common/EasyPlayer.vue
| ... | ... | @@ -163,17 +163,17 @@ export default { |
| 163 | 163 | }); |
| 164 | 164 | |
| 165 | 165 | // 监听播放器自身的全屏事件(只响应当前播放器的全屏操作) |
| 166 | - // this.playerInstance.on('fullscreen', () => { | |
| 167 | - // console.log('进入全屏: 切换为主码流'); | |
| 168 | - // this.isFullscreen = true; | |
| 169 | - // this.switchStream(0); // 进入全屏切换为主码流 | |
| 170 | - // }); | |
| 171 | - | |
| 172 | - // this.playerInstance.on('fullscreenExit', () => { | |
| 173 | - // console.log('退出全屏: 切换为子码流'); | |
| 174 | - // this.isFullscreen = false; | |
| 175 | - // this.switchStream(1); // 退出全屏切换为子码流 | |
| 176 | - // }); | |
| 166 | + this.playerInstance.on('fullscreen', () => { | |
| 167 | + console.log('进入全屏: 切换为主码流'); | |
| 168 | + this.isFullscreen = true; | |
| 169 | + this.switchStream(0); // 进入全屏切换为主码流 | |
| 170 | + }); | |
| 171 | + | |
| 172 | + this.playerInstance.on('fullscreenExit', () => { | |
| 173 | + console.log('退出全屏: 切换为子码流'); | |
| 174 | + this.isFullscreen = false; | |
| 175 | + this.switchStream(1); // 退出全屏切换为子码流 | |
| 176 | + }); | |
| 177 | 177 | |
| 178 | 178 | } catch (e) { |
| 179 | 179 | console.error("Create Error:", e); | ... | ... |