NettyRtmpPublisher.java 7.42 KB
package com.genersoft.iot.vmp.jtt1078.publisher;

import com.genersoft.iot.vmp.jtt1078.rtmp.NettyRtmpClient;
import com.genersoft.iot.vmp.jtt1078.util.Configs;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Netty RTMP 推流器
 *
 * 使用 Netty 实现的非阻塞 RTMP 客户端
 */
public class NettyRtmpPublisher implements IStreamPublisher {

    private static final Logger logger = LoggerFactory.getLogger(NettyRtmpPublisher.class);

    private String tag;
    private volatile boolean connected = false;
    private static final String PUBLISHER_TYPE = "netty";

    // RTMP连接配置
    private String host;
    private int port;
    private String app;
    private String tcUrl;
    private String streamName;

    // Netty RTMP 客户端
    private NettyRtmpClient client;

    public NettyRtmpPublisher(String tag) {
        this.tag = tag;
        loadConfig();
    }

    /**
     * 从配置加载RTMP连接参数
     */
    private void loadConfig() {
        // 优先使用原生RTMP配置
        String configHost = Configs.get("rtmp.native.host");
        String configPort = Configs.get("rtmp.native.port");
        String configApp = Configs.get("rtmp.native.app");
        String configTcUrl = Configs.get("rtmp.native.tcUrl");

        // 如果有完整的tcUrl配置,直接使用
        if (configTcUrl != null && !configTcUrl.isEmpty()) {
            this.tcUrl = configTcUrl.replace("{TAG}", tag);
            // 从tcUrl解析host和port
            parseTcUrl(configTcUrl);
            logger.info("[{}] 使用配置的tcUrl: {}", tag, this.tcUrl);
            return;
        }

        // 如果有分离配置,使用分离配置
        if (configHost != null && !configHost.isEmpty()) {
            this.host = configHost;
        } else {
            this.host = "127.0.0.1";
        }

        if (configPort != null && !configPort.isEmpty()) {
            this.port = Integer.parseInt(configPort);
        } else {
            this.port = 1935;
        }

        if (configApp != null && !configApp.isEmpty()) {
            this.app = configApp;
        } else {
            this.app = "schedule";
        }

        // streamName = tag
        this.streamName = tag;

        // 从rtmp.url获取sign参数
        String sign = "41db35390ddad33f83944f44b8b75ded";
        String rtmpUrl = Configs.get("rtmp.url");
        if (rtmpUrl != null && rtmpUrl.contains("sign=")) {
            int signStart = rtmpUrl.indexOf("sign=") + 5;
            int signEnd = rtmpUrl.indexOf("&", signStart);
            if (signEnd == -1) signEnd = rtmpUrl.length();
            String extractedSign = rtmpUrl.substring(signStart, signEnd);
            if (!extractedSign.isEmpty() && !"{sign}".equals(extractedSign)) {
                sign = extractedSign;
            }
        }

        // 完整URL: rtmp://host:port/app/streamName?sign=xxx
        this.tcUrl = String.format("rtmp://%s:%d/%s/%s?sign=%s", host, port, app, tag, sign);

        logger.info("[{}] Netty RTMP推流器配置: host={}, port={}, app={}, stream={}, tcUrl={}",
                tag, host, port, app, streamName, tcUrl);
    }

    /**
     * 从tcUrl解析host和port
     */
    private void parseTcUrl(String tcUrl) {
        try {
            // 移除sign参数
            if (tcUrl.contains("?")) {
                tcUrl = tcUrl.substring(0, tcUrl.indexOf("?"));
            }

            // 支持 rtmp://, rtsp://, http:// 等协议
            String prefix = "://";
            int prefixIndex = tcUrl.indexOf(prefix);
            if (prefixIndex == -1) {
                logger.warn("[{}] tcUrl没有有效的协议前缀: {}", tag, tcUrl);
                return;
            }

            String afterPrefix = tcUrl.substring(prefixIndex + prefix.length());

            // 分割host:port和path
            int slashIndex = afterPrefix.indexOf("/");
            String hostPort;
            String path;
            if (slashIndex == -1) {
                hostPort = afterPrefix;
                path = "";
            } else {
                hostPort = afterPrefix.substring(0, slashIndex);
                path = afterPrefix.substring(slashIndex + 1);
            }

            // 解析host:port
            int colonIndex = hostPort.indexOf(":");
            if (colonIndex == -1) {
                this.host = hostPort;
                this.port = 1935;
            } else {
                this.host = hostPort.substring(0, colonIndex);
                this.port = Integer.parseInt(hostPort.substring(colonIndex + 1));
            }

            // app是路径的第一部分
            if (path.length() > 0) {
                this.app = path;
                this.streamName = tag;
                this.tcUrl = String.format("rtmp://%s:%d/%s", host, port, path);
            }

        } catch (Exception e) {
            logger.error("[{}] 解析tcUrl失败: {}", tag, e.getMessage());
        }
    }

    @Override
    public void start(String tag) {
        this.tag = tag;

        logger.info("[{}] ====== 启动Netty RTMP推流器 ======", tag);
        logger.info("[{}] 目标服务器: {}:{}", tag, host, port);
        logger.info("[{}] 推流地址: {}/{}", tag, tcUrl, streamName);

        try {
            // 创建 Netty RTMP 客户端
            client = new NettyRtmpClient(tcUrl, app, streamName);
            client.start();

            connected = true;
            logger.info("[{}] ====== Netty RTMP推流器启动成功 ======", tag);

        } catch (Exception e) {
            logger.error("[{}] Netty RTMP推流器启动异常: {}", tag, e.getMessage(), e);
            connected = false;
        }
    }

    @Override
    public void sendVideoData(byte[] flvData, int timestamp) {
        if (!connected || client == null) {
            logger.debug("[{}] sendVideoData: not connected", tag);
            return;
        }

        try {
            // 将 byte[] 转换为 Netty ByteBuf
            ByteBuf buf = Unpooled.wrappedBuffer(flvData);
            client.send(buf);

            logger.debug("[{}] sendVideoData: flvDataLen={}, timestamp={}", tag, flvData.length, timestamp);
        } catch (Exception e) {
            logger.error("[{}] 发送视频数据失败: {}", tag, e.getMessage());
            connected = false;
        }
    }

    @Override
    public void sendAVCSequenceHeader() {
        // Netty 版本不需要单独发送 Sequence Header
        // 因为 FLV Tag 已经包含了 Sequence Header 信息
        logger.debug("[{}] sendAVCSequenceHeader: 无需操作(由FLV Tag携带)", tag);
    }

    @Override
    public void close() {
        logger.info("[{}] ====== 关闭Netty RTMP推流器 ======", tag);

        connected = false;

        if (client != null) {
            try {
                client.close();
            } catch (Exception e) {
                logger.debug("[{}] 关闭客户端时出错: {}", tag, e.getMessage());
            }
            client = null;
        }

        logger.info("[{}] Netty RTMP推流器已关闭", tag);
    }

    @Override
    public boolean isConnected() {
        return connected && client != null;
    }

    @Override
    public String getType() {
        return PUBLISHER_TYPE;
    }

    @Override
    public String getStatus() {
        if (connected) {
            return String.format("Netty RTMP推流器, 已连接, streamName=%s", streamName);
        } else {
            return "Netty RTMP推流器, 未连接";
        }
    }
}