RTMPPublisher.java 15.3 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402
package com.genersoft.iot.vmp.jtt1078.subscriber;

import com.genersoft.iot.vmp.jtt1078.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;

/**
 * FFmpeg 推流器 (仅处理视频直播流)
 *
 * 修改记录:
 * 1. 添加详细的启动和关闭日志,便于排查问题
 * 2. 优化关闭逻辑,确保FFmpeg进程被正确终止
 * 3. 添加FFmpeg输出日志监控
 * 4. 兼容Java 8
 */
public class RTMPPublisher extends Thread
{
    static Logger logger = LoggerFactory.getLogger(RTMPPublisher.class);

    String tag = null;
    Process process = null;
    private volatile boolean running = true;

    /** FFmpeg路径 */
    private String ffmpegPath = null;

    /** 目标RTMP地址 */
    private String rtmpUrl = null;

    /** FFmpeg命令格式标志 */
    private String formatFlag = null;

    /** 进程启动时间 */
    private long processStartTime = 0;

    public RTMPPublisher(String tag)
    {
        this.tag = tag;
        this.setName("RTMPPublisher-" + tag);
        this.setDaemon(true);
    }

    @Override
    public void run()
    {
        InputStream stderr = null;
        InputStream stdout = null;
        int len = -1;
        byte[] buff = new byte[512];
        boolean debugMode = "on".equalsIgnoreCase(Configs.get("debug.mode"));

        try
        {
            // 获取FFmpeg配置
            String sign = "41db35390ddad33f83944f44b8b75ded";
            rtmpUrl = Configs.get("rtmp.url").replaceAll("\\{TAG\\}", tag).replaceAll("\\{sign\\}",sign);
            ffmpegPath = Configs.get("ffmpeg.path");

            // 自动判断协议格式
            formatFlag = "";
            if (rtmpUrl.startsWith("rtsp://")) {
                formatFlag = "-f rtsp";
            } else if (rtmpUrl.startsWith("rtmp://")) {
                formatFlag = "-f flv";
            }

            // 构造FFmpeg命令
            String cmd = String.format("%s -i http://127.0.0.1:%d/video/%s -vcodec copy -acodec aac %s %s",
                    ffmpegPath,
                    Configs.getInt("server.http.port", 3333),
                    tag,
                    formatFlag,
                    rtmpUrl
            );

            logger.info("===========================================");
            logger.info("[{}] ====== FFmpeg推流任务启动 ======", tag);
            logger.info("[{}] FFmpeg路径: {}", tag, ffmpegPath);
            logger.info("[{}] 目标地址: {}", tag, rtmpUrl);
            logger.info("[{}] 完整命令: {}", tag, cmd);
            logger.info("[{}] HTTP端口: {}", tag, Configs.getInt("server.http.port", 3333));
            logger.info("[{}] 源流地址: http://127.0.0.1:{}/video/{}", tag, Configs.getInt("server.http.port", 3333), tag);
            logger.info("[{}] 启动时间: {}", tag, new java.util.Date());
            logger.info("===========================================");

            // 执行FFmpeg命令
            process = Runtime.getRuntime().exec(cmd);
            processStartTime = System.currentTimeMillis();

            // 记录进程启动信息(Java 8兼容方式)
            logger.info("[{}] FFmpeg进程已启动", tag);

            stderr = process.getErrorStream();
            stdout = process.getInputStream();

            // 启动一个线程消费 stdout,防止缓冲区满导致进程阻塞
            final InputStream finalStdout = stdout;
            Thread stdoutConsumer = new Thread(() -> {
                try {
                    byte[] buffer = new byte[512];
                    int count = 0;
                    while (running && finalStdout.read(buffer) > -1) {
                        count++;
                        // 每1000次读取才打印一次(避免刷屏)
                        if (debugMode && count % 1000 == 0) {
                            logger.debug("[{}] FFmpeg stdout消费中... count: {}", tag, count);
                        }
                    }
                    logger.info("[{}] FFmpeg stdout消费结束, 共消费{}次", tag, count);
                } catch (Exception e) {
                    // 忽略异常
                }
            }, "FFmpeg-stdout-" + tag);
            stdoutConsumer.setDaemon(true);
            stdoutConsumer.start();

            // 消费 stderr 日志流
            StringBuilder errorLog = new StringBuilder();
            int errorCount = 0;
            while (running && (len = stderr.read(buff)) > -1)
            {
                if (debugMode) {
                    System.out.print(new String(buff, 0, len));
                }

                // 收集错误日志(便于排查问题)
                errorLog.append(new String(buff, 0, len));
                errorCount++;

                // 每100条错误日志打印一次摘要
                if (errorCount % 100 == 0) {
                    String lastError = errorLog.length() > 500
                        ? errorLog.substring(errorLog.length() - 500)
                        : errorLog.toString();
                    logger.debug("[{}] FFmpeg错误日志摘要: {}", tag, lastError);
                }
            }

            // 进程退出处理
            int exitCode = process.waitFor();
            long runDuration = System.currentTimeMillis() - processStartTime;
            logger.warn("===========================================");
            logger.warn("[{}] ====== FFmpeg推流任务结束 ======", tag);
            logger.warn("[{}] 退出代码: {}", tag, exitCode);
            logger.warn("[{}] 运行时间: {} ms", tag, runDuration);
            logger.warn("[{}] 错误日志条数: {}", tag, errorCount);

            // 分析退出原因
            if (exitCode == 0) {
                logger.info("[{}] FFmpeg正常退出", tag);
            } else if (exitCode == -1 || exitCode == 255) {
                logger.warn("[{}] FFmpeg被信号终止 (exitCode={})", tag, exitCode);
            } else {
                // 保存最后一段错误日志
                String lastError = errorLog.length() > 1000
                    ? errorLog.substring(errorLog.length() - 1000)
                    : errorLog.toString();
                logger.error("[{}] FFmpeg异常退出, 最后错误日志:\n{}", tag, lastError);
            }
            logger.warn("===========================================");

        }
        catch(InterruptedException ex)
        {
            logger.info("[{}] RTMPPublisher被中断: {}", tag, ex.getMessage());
            Thread.currentThread().interrupt();
        }
        catch(Exception ex)
        {
            logger.error("[{}] RTMPPublisher异常: {}", tag, ex);
        }
        finally
        {
            // 确保所有流都被关闭
            closeQuietly(stderr);
            closeQuietly(stdout);
            if (process != null) {
                closeQuietly(process.getInputStream());
                closeQuietly(process.getOutputStream());
                closeQuietly(process.getErrorStream());
            }
            logger.info("[{}] RTMPPublisher资源已释放", tag);
        }
    }

    /**
     * 关闭FFmpeg推流
     * 优化关闭逻辑,确保进程被正确终止(Java 8兼容)
     */
    public void close()
    {
        logger.info("[{}] ====== 开始关闭FFmpeg推流 ======", tag);
        logger.info("[{}] 关闭请求时间: {}", tag, new java.util.Date());

        try {
            // 设置停止标志
            running = false;

            if (process != null) {
                long runDuration = processStartTime > 0 ? System.currentTimeMillis() - processStartTime : 0;
                logger.info("[{}] 正在终止FFmpeg进程... (已运行{}ms)", tag, runDuration);

                // 先尝试正常终止
                process.destroy();

                // 等待最多3秒
                boolean exited = process.waitFor(3, TimeUnit.SECONDS);

                if (!exited) {
                    logger.warn("[{}] FFmpeg进程未能在3秒内正常退出,开始强制终止...", tag);

                    // 强制终止(Java 8的方式)
                    process.destroyForcibly();

                    // 再等待2秒
                    try {
                        exited = process.waitFor(2, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    if (!exited) {
                        logger.error("[{}] FFmpeg进程强制终止失败,可能存在资源泄漏", tag);
                    } else {
                        logger.info("[{}] FFmpeg进程已强制终止", tag);
                    }
                } else {
                    int exitCode = process.exitValue();
                    logger.info("[{}] FFmpeg进程已正常终止, 退出代码: {}", tag, exitCode);
                }

                // 检查是否需要杀掉残留进程
                checkAndKillOrphanedProcesses();

            } else {
                logger.info("[{}] FFmpeg进程为空,无需关闭", tag);
            }

            logger.info("[{}] ====== FFmpeg推流已关闭 ======", tag);

            // 中断线程(如果还在阻塞读取)
            this.interrupt();

            // 等待线程结束
            this.join(2000);
            if (this.isAlive()) {
                logger.warn("[{}] RTMPPublisher线程未能正常结束", tag);
            }

        } catch(Exception e) {
            logger.error("[{}] 关闭RTMPPublisher时出错: {}", tag, e);
        }
    }

    /**
     * 检查并杀掉可能残留的FFmpeg进程
     * Java 8兼容实现,使用系统命令查找和终止进程
     */
    private void checkAndKillOrphanedProcesses() {
        try {
            logger.debug("[{}] 检查是否有FFmpeg残留进程...", tag);
            // 判断操作系统类型
            String os = System.getProperty("os.name").toLowerCase();
            boolean isWindows = os.contains("win");

            if (isWindows) {
                // Windows: 使用tasklist和taskkill
                killOrphanedProcessesWindows();
            } else {
                // Linux/Unix: 使用ps和kill
                killOrphanedProcessesUnix();
            }
        } catch (Exception e) {
            logger.error("[{}] 检查残留进程时出错: {}", tag, e.getMessage());
        }
    }

    /**
     * Windows环境下查找并终止残留的FFmpeg进程
     */
    private void killOrphanedProcessesWindows() {
        try {
            // 查找包含ffmpeg的进程
            ProcessBuilder pb = new ProcessBuilder("tasklist", "/FI", "IMAGENAME eq ffmpeg.exe", "/FO", "CSV", "/NH");
            Process process = pb.start();
            java.io.BufferedReader reader = new java.io.BufferedReader(
                    new java.io.InputStreamReader(process.getInputStream()));

            String line;
            while ((line = reader.readLine()) != null) {
                // CSV格式: "Image Name","PID","Session Name","Session#","Mem Usage"
                String[] parts = line.split(",");
                if (parts.length > 1) {
                    String pidStr = parts[1].replaceAll("\"", "").trim();
                    try {
                        long pid = Long.parseLong(pidStr);
                        // 检查命令行是否包含当前tag或wvp-jt1078
                        ProcessBuilder cmdPb = new ProcessBuilder("wmic", "process", "where", "ProcessId=" + pid, "get", "CommandLine");
                        Process cmdProcess = cmdPb.start();
                        java.io.BufferedReader cmdReader = new java.io.BufferedReader(
                                new java.io.InputStreamReader(cmdProcess.getInputStream()));

                        String cmdLine;
                        boolean shouldKill = false;
                        while ((cmdLine = cmdReader.readLine()) != null) {
                            if (cmdLine.toLowerCase().contains("ffmpeg")
                                    && (cmdLine.contains(tag) || cmdLine.contains("wvp-jt1078"))) {
                                shouldKill = true;
                                break;
                            }
                        }

                        if (shouldKill) {
                            logger.warn("[{}] 发现残留FFmpeg进程[PID:{}],正在终止...", tag, pid);
                            Process killProcess = Runtime.getRuntime().exec("taskkill /PID " + pid + " /F");
                            killProcess.waitFor();
                            logger.info("[{}] 残留FFmpeg进程已终止[PID:{}]", tag, pid);
                        }
                    } catch (NumberFormatException e) {
                        // 忽略非数字PID
                    }
                }
            }
        } catch (Exception e) {
            logger.error("[{}] Windows检查残留进程时出错: {}", tag, e.getMessage());
        }
    }

    /**
     * Linux/Unix环境下查找并终止残留的FFmpeg进程
     */
    private void killOrphanedProcessesUnix() {
        try {
            // 查找包含ffmpeg的进程
            ProcessBuilder pb = new ProcessBuilder("ps", "-ef");
            Process process = pb.start();
            java.io.BufferedReader reader = new java.io.BufferedReader(
                    new java.io.InputStreamReader(process.getInputStream()));

            String line;
            while ((line = reader.readLine()) != null) {
                // ps -ef 输出格式: UID PID PPID C STIME TTY TIME CMD
                if (line.toLowerCase().contains("ffmpeg") && line.contains(tag)) {
                    String[] parts = line.split("\\s+");
                    if (parts.length > 1) {
                        String pidStr = parts[1];
                        try {
                            long pid = Long.parseLong(pidStr);
                            logger.warn("[{}] 发现残留FFmpeg进程[PID:{}],正在终止...", tag, pid);
                            Process killProcess = Runtime.getRuntime().exec("kill -9 " + pid);
                            killProcess.waitFor();
                            logger.info("[{}] 残留FFmpeg进程已终止[PID:{}]", tag, pid);
                        } catch (NumberFormatException e) {
                            // 忽略非数字PID
                        }
                    }
                }
            }
        } catch (Exception e) {
            logger.error("[{}] Unix检查残留进程时出错: {}", tag, e.getMessage());
        }
    }

    /**
     * 安全关闭流,忽略异常
     */
    private void closeQuietly(Closeable stream) {
        if (stream != null) {
            try {
                stream.close();
            } catch (Exception e) {
                // 忽略
            }
        }
    }

    /**
     * 获取推流状态
     */
    public boolean isRunning() {
        return running && this.isAlive() && process != null;
    }

    /**
     * 获取FFmpeg进程信息(用于调试)
     */
    public String getProcessInfo() {
        if (process == null) {
            return "process is null";
        }
        try {
            long runDuration = processStartTime > 0 ? System.currentTimeMillis() - processStartTime : 0;
            return String.format("FFmpeg进程, 已运行%dms, alive=%s",
                    runDuration, process.isAlive());
        } catch (Exception e) {
            return "获取进程信息失败: " + e.getMessage();
        }
    }
}