RTMPPublisher.java 5.35 KB
package com.genersoft.iot.vmp.jtt1078.subscriber;

import com.genersoft.iot.vmp.jtt1078.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;

/**
 * FFmpeg 推流器 (仅处理视频直播流)
 */
public class RTMPPublisher extends Thread
{
    static Logger logger = LoggerFactory.getLogger(RTMPPublisher.class);

    String tag = null;
    Process process = null;
    private volatile boolean running = true;

    public RTMPPublisher(String tag)
    {
        this.tag = tag;
        this.setName("RTMPPublisher-" + tag);
        this.setDaemon(true);
    }

    @Override
    public void run()
    {
        InputStream stderr = null;
        InputStream stdout = null;
        int len = -1;
        byte[] buff = new byte[512];
        boolean debugMode = "on".equalsIgnoreCase(Configs.get("debug.mode"));

        try
        {
            String sign = "41db35390ddad33f83944f44b8b75ded";
            String rtmpUrl = Configs.get("rtmp.url").replaceAll("\\{TAG\\}", tag).replaceAll("\\{sign\\}",sign);

            // 【修复】自动判断协议格式,避免硬编码 -f rtsp 导致 RTMP 推流失败
            String formatFlag = "";
            if (rtmpUrl.startsWith("rtsp://")) {
                formatFlag = "-f rtsp";
            } else if (rtmpUrl.startsWith("rtmp://")) {
                formatFlag = "-f flv";
            }

            // 低延迟:缩短 HTTP-FLV 探测与缓冲,尽快向 ZLM 输出(仍受设备 GOP/关键帧限制)
            String inputLowLatency =
                    "-fflags +nobuffer+discardcorrupt -flags low_delay -probesize 32 -analyzeduration 0";
            String outputLowLatency = "-flush_packets 1";

            // 构造命令:只处理视频流和非对讲的音频流
            String cmd = String.format(
                    "%s %s -i http://127.0.0.1:%d/video/%s -vcodec copy -acodec aac %s %s %s",
                    Configs.get("ffmpeg.path"),
                    inputLowLatency,
                    Configs.getInt("server.http.port", 3333),
                    tag,
                    outputLowLatency,
                    formatFlag,
                    rtmpUrl
            );

            logger.info("FFmpeg Push Started. Tag: {}, CMD: {}", tag, cmd);

            process = Runtime.getRuntime().exec(cmd);
            stderr = process.getErrorStream();
            stdout = process.getInputStream();

            // 启动一个线程消费 stdout,防止缓冲区满导致进程阻塞
            final InputStream finalStdout = stdout;
            Thread stdoutConsumer = new Thread(() -> {
                try {
                    byte[] buffer = new byte[512];
                    while (running && finalStdout.read(buffer) > -1) {
                        // 只消费,不输出
                    }
                } catch (Exception e) {
                    // 忽略异常
                }
            }, "FFmpeg-stdout-" + tag);
            stdoutConsumer.setDaemon(true);
            stdoutConsumer.start();

            // 消费 stderr 日志流,防止阻塞
            while (running && (len = stderr.read(buff)) > -1)
            {
                if (debugMode) {
                    System.out.print(new String(buff, 0, len));
                }
            }

            // 进程退出处理
            int exitCode = process.waitFor();
            logger.warn("FFmpeg process exited. Code: {}, Tag: {}", exitCode, tag);
        }
        catch(InterruptedException ex)
        {
            logger.info("RTMPPublisher interrupted: {}", tag);
            Thread.currentThread().interrupt();
        }
        catch(Exception ex)
        {
            logger.error("RTMPPublisher Error: " + tag, ex);
        }
        finally
        {
            // 确保所有流都被关闭
            closeQuietly(stderr);
            closeQuietly(stdout);
            if (process != null) {
                closeQuietly(process.getInputStream());
                closeQuietly(process.getOutputStream());
                closeQuietly(process.getErrorStream());
            }
        }
    }

    public void close()
    {
        try {
            // 设置停止标志
            running = false;

            if (process != null) {
                // 先尝试正常终止
                process.destroy();

                // 等待最多 3 秒
                boolean exited = process.waitFor(3, TimeUnit.SECONDS);

                if (!exited) {
                    // 如果还没退出,强制终止
                    logger.warn("FFmpeg process did not exit gracefully, forcing termination: {}", tag);
                    process.destroyForcibly();
                    process.waitFor(2, TimeUnit.SECONDS);
                }

                logger.info("FFmpeg process terminated: {}", tag);
            }

            // 中断线程(如果还在阻塞读取)
            this.interrupt();

            // 等待线程结束
            this.join(2000);

        } catch(Exception e) {
            logger.error("Error closing RTMPPublisher: " + tag, e);
        }
    }

    /**
     * 安全关闭流,忽略异常
     */
    private void closeQuietly(Closeable stream) {
        if (stream != null) {
            try {
                stream.close();
            } catch (Exception e) {
                // 忽略
            }
        }
    }
}