FFmpegPublisher.java 12.9 KB
package com.genersoft.iot.vmp.jtt1078.publisher;

import com.genersoft.iot.vmp.jtt1078.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;

/**
 * FFmpeg 推流器 (仅处理视频直播流)
 *
 * 实现了IStreamPublisher接口,支持通过工厂创建
 *
 * 修改记录:
 * 1. 实现IStreamPublisher接口,支持工厂模式
 * 2. 添加详细的启动和关闭日志,便于排查问题
 * 3. 优化关闭逻辑,确保FFmpeg进程被正确终止
 * 4. 添加FFmpeg输出日志监控
 * 5. 兼容Java 8
 */
public class FFmpegPublisher extends Thread implements IStreamPublisher
{
    static Logger logger = LoggerFactory.getLogger(FFmpegPublisher.class);

    String tag = null;
    Process process = null;
    private volatile boolean running = true;
    private volatile boolean connected = false;

    /** FFmpeg路径 */
    private String ffmpegPath = null;

    /** 目标RTMP地址 */
    private String rtmpUrl = null;

    /** FFmpeg命令格式标志 */
    private String formatFlag = null;

    /** 进程启动时间 */
    private long processStartTime = 0;

    /** 推流器类型标识 */
    private static final String PUBLISHER_TYPE = "ffmpeg";

    public FFmpegPublisher(String tag)
    {
        this.tag = tag;
        this.setName("FFmpegPublisher-" + tag);
        this.setDaemon(true);
    }

    @Override
    public void start(String tag) {
        this.start();
    }

    @Override
    public void run()
    {
        InputStream stderr = null;
        InputStream stdout = null;
        int len = -1;
        byte[] buff = new byte[512];
        boolean debugMode = "on".equalsIgnoreCase(Configs.get("debug.mode"));

        try
        {
            // 获取FFmpeg配置
            String sign = "41db35390ddad33f83944f44b8b75ded";
            rtmpUrl = Configs.get("rtmp.url").replaceAll("\\{TAG\\}", tag).replaceAll("\\{sign\\}",sign);
            ffmpegPath = Configs.get("ffmpeg.path");

            // 自动判断协议格式
            formatFlag = "";
            if (rtmpUrl.startsWith("rtsp://")) {
                formatFlag = "-f rtsp";
            } else if (rtmpUrl.startsWith("rtmp://")) {
                formatFlag = "-f flv";
            }

            // 构造FFmpeg命令
            String cmd = String.format("%s -i http://127.0.0.1:%d/video/%s -vcodec copy -acodec aac %s %s",
                    ffmpegPath,
                    Configs.getInt("server.http.port", 3333),
                    tag,
                    formatFlag,
                    rtmpUrl
            );

            logger.info("===========================================");
            logger.info("[{}] ====== FFmpeg推流任务启动 ======", tag);
            logger.info("[{}] FFmpeg路径: {}", tag, ffmpegPath);
            logger.info("[{}] 目标地址: {}", tag, rtmpUrl);
            logger.info("[{}] 完整命令: {}", tag, cmd);
            logger.info("[{}] HTTP端口: {}", tag, Configs.getInt("server.http.port", 3333));
            logger.info("[{}] 源流地址: http://127.0.0.1:{}/video/{}", tag, Configs.getInt("server.http.port", 3333), tag);
            logger.info("[{}] 启动时间: {}", tag, new java.util.Date());
            logger.info("===========================================");

            // 执行FFmpeg命令
            process = Runtime.getRuntime().exec(cmd);
            processStartTime = System.currentTimeMillis();
            connected = true;

            // 记录进程启动信息(Java 8兼容方式)
            logger.info("[{}] FFmpeg进程已启动", tag);

            stderr = process.getErrorStream();
            stdout = process.getInputStream();

            // 启动一个线程消费 stdout,防止缓冲区满导致进程阻塞
            final InputStream finalStdout = stdout;
            Thread stdoutConsumer = new Thread(() -> {
                try {
                    byte[] buffer = new byte[512];
                    int count = 0;
                    while (running && finalStdout.read(buffer) > -1) {
                        count++;
                        // 每1000次读取才打印一次(避免刷屏)
                        if (debugMode && count % 1000 == 0) {
                            logger.debug("[{}] FFmpeg stdout消费中... count: {}", tag, count);
                        }
                    }
                    logger.info("[{}] FFmpeg stdout消费结束, 共消费{}次", tag, count);
                } catch (Exception e) {
                    // 忽略异常
                }
            }, "FFmpeg-stdout-" + tag);
            stdoutConsumer.setDaemon(true);
            stdoutConsumer.start();

            // 消费 stderr 日志流
            StringBuilder errorLog = new StringBuilder();
            int errorCount = 0;
            while (running && (len = stderr.read(buff)) > -1)
            {
                if (debugMode) {
                    System.out.print(new String(buff, 0, len));
                }

                // 收集错误日志(便于排查问题)
                errorLog.append(new String(buff, 0, len));
                errorCount++;

                // 每100条错误日志打印一次摘要
                if (errorCount % 100 == 0) {
                    String lastError = errorLog.length() > 500
                        ? errorLog.substring(errorLog.length() - 500)
                        : errorLog.toString();
                    logger.debug("[{}] FFmpeg错误日志摘要: {}", tag, lastError);
                }
            }

            // 进程退出处理
            int exitCode = process.waitFor();
            long runDuration = System.currentTimeMillis() - processStartTime;
            connected = false;
            logger.warn("===========================================");
            logger.warn("[{}] ====== FFmpeg推流任务结束 ======", tag);
            logger.warn("[{}] 退出代码: {}", tag, exitCode);
            logger.warn("[{}] 运行时间: {} ms", tag, runDuration);
            logger.warn("[{}] 错误日志条数: {}", tag, errorCount);

            // 分析退出原因
            if (exitCode == 0) {
                logger.info("[{}] FFmpeg正常退出", tag);
            } else if (exitCode == -1 || exitCode == 255) {
                logger.warn("[{}] FFmpeg被信号终止 (exitCode={})", tag, exitCode);
            } else {
                // 保存最后一段错误日志
                String lastError = errorLog.length() > 1000
                    ? errorLog.substring(errorLog.length() - 1000)
                    : errorLog.toString();
                logger.error("[{}] FFmpeg异常退出, 最后错误日志:\n{}", tag, lastError);
            }
            logger.warn("===========================================");

        }
        catch(InterruptedException ex)
        {
            logger.info("[{}] FFmpegPublisher被中断: {}", tag, ex.getMessage());
            Thread.currentThread().interrupt();
        }
        catch(Exception ex)
        {
            logger.error("[{}] FFmpegPublisher异常: {}", tag, ex);
        }
        finally
        {
            connected = false;
            // 确保所有流都被关闭
            closeQuietly(stderr);
            closeQuietly(stdout);
            if (process != null) {
                closeQuietly(process.getInputStream());
                closeQuietly(process.getOutputStream());
                closeQuietly(process.getErrorStream());
            }
            logger.info("[{}] FFmpegPublisher资源已释放", tag);
        }
    }

    /**
     * 关闭FFmpeg推流
     * 优化关闭逻辑,确保进程被正确终止(Java 8兼容)
     */
    @Override
    public void close()
    {
        logger.info("[{}] ====== 开始关闭FFmpeg推流 ======", tag);
        logger.info("[{}] 关闭请求时间: {}", tag, new java.util.Date());

        try {
            // 设置停止标志
            running = false;
            connected = false;

            if (process != null) {
                long runDuration = processStartTime > 0 ? System.currentTimeMillis() - processStartTime : 0;
                logger.info("[{}] 正在终止FFmpeg进程... (已运行{}ms)", tag, runDuration);

                // 先尝试正常终止
                process.destroy();

                // 等待最多3秒
                boolean exited = process.waitFor(3, TimeUnit.SECONDS);

                if (!exited) {
                    logger.warn("[{}] FFmpeg进程未能在3秒内正常退出,开始强制终止...", tag);

                    // 强制终止(Java 8的方式)
                    process.destroyForcibly();

                    // 再等待2秒
                    try {
                        exited = process.waitFor(2, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    if (!exited) {
                        logger.error("[{}] FFmpeg进程强制终止失败,可能存在资源泄漏", tag);
                    } else {
                        logger.info("[{}] FFmpeg进程已强制终止", tag);
                    }
                } else {
                    int exitCode = process.exitValue();
                    logger.info("[{}] FFmpeg进程已正常终止, 退出代码: {}", tag, exitCode);
                }

                // 检查是否需要杀掉残留进程
                checkAndKillOrphanedProcesses();

            } else {
                logger.info("[{}] FFmpeg进程为空,无需关闭", tag);
            }

            logger.info("[{}] ====== FFmpeg推流已关闭 ======", tag);

            // 中断线程(如果还在阻塞读取)
            this.interrupt();

            // 等待线程结束
            this.join(2000);
            if (this.isAlive()) {
                logger.warn("[{}] FFmpegPublisher线程未能正常结束", tag);
            }

        } catch(Exception e) {
            logger.error("[{}] 关闭FFmpegPublisher时出错: {}", tag, e);
        }
    }

    /**
     * 检查并杀掉可能残留的FFmpeg进程
     * 某些情况下FFmpeg进程可能没有被正确回收
     */
    private void checkAndKillOrphanedProcesses() {
        try {
            // 根据FFmpeg命令特征查找残留进程
            // 注意:这里只是记录日志,实际杀进程需要谨慎
            logger.debug("[{}] 检查是否有FFmpeg残留进程...", tag);
        } catch (Exception e) {
            logger.debug("[{}] 检查残留进程时出错: {}", tag, e.getMessage());
        }
    }

    /**
     * 安全关闭流,忽略异常
     */
    private void closeQuietly(Closeable stream) {
        if (stream != null) {
            try {
                stream.close();
            } catch (Exception e) {
                // 忽略
            }
        }
    }

    /**
     * 发送FLV数据 - FFmpeg模式不需要此方法
     * 数据通过HTTP接口由FFmpeg自动拉取
     */
    @Override
    public void sendVideoData(byte[] flvData, int timestamp) {
        // FFmpeg模式不需要实现此方法
        // 数据通过HTTP接口由FFmpeg自动拉取
    }

    /**
     * 发送AVC序列头 - FFmpeg模式不需要此方法
     * FFmpeg模式下,SPS变化通过重启FFmpeg进程来解决
     */
    @Override
    public void sendAVCSequenceHeader() {
        // FFmpeg模式不需要实现此方法
        // SPS变化时通过restartRtmpPublisher()重启FFmpeg解决
        logger.debug("[{}] FFmpeg模式不需要sendAVCSequenceHeader", tag);
    }

    /**
     * 获取推流状态
     */
    @Override
    public boolean isConnected() {
        return connected && this.isAlive() && process != null;
    }

    /**
     * 获取推流器类型
     */
    @Override
    public String getType() {
        return PUBLISHER_TYPE;
    }

    /**
     * 获取推流器信息(用于调试)
     */
    @Override
    public String getStatus() {
        if (process == null) {
            return "FFmpeg进程未启动";
        }
        try {
            long runDuration = processStartTime > 0 ? System.currentTimeMillis() - processStartTime : 0;
            return String.format("FFmpeg推流器, 已运行%dms, 进程存活=%s, 连接状态=%s",
                    runDuration, process.isAlive(), connected ? "已连接" : "未连接");
        } catch (Exception e) {
            return "获取FFmpeg状态失败: " + e.getMessage();
        }
    }

    /**
     * 获取FFmpeg进程信息(用于调试)
     */
    public String getProcessInfo() {
        if (process == null) {
            return "process is null";
        }
        try {
            long runDuration = processStartTime > 0 ? System.currentTimeMillis() - processStartTime : 0;
            return String.format("FFmpeg进程, 已运行%dms, alive=%s",
                    runDuration, process.isAlive());
        } catch (Exception e) {
            return "获取进程信息失败: " + e.getMessage();
        }
    }
}