NettyRtmpClient.java 3.54 KB
package com.genersoft.iot.vmp.jtt1078.rtmp;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.net.URISyntaxException;

/**
 * Netty RTMP 客户端
 */
public class NettyRtmpClient {
    private static final Logger logger = LoggerFactory.getLogger(NettyRtmpClient.class);

    // 共享线程池
    private static final EventLoopGroup sharedGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);

    private Channel channel;
    private final String rtmpUrl;
    private final String app;
    private final String stream;

    // 增加一个状态标记,只有握手完成才能发送数据
    private volatile boolean isReady = false;

    public NettyRtmpClient(String rtmpUrl, String app, String stream) {
        this.rtmpUrl = rtmpUrl;
        this.app = app;
        this.stream = stream;
    }

    public void start() {
        // 1. 解析 RTMP URL 获取 IP 和 端口
        URI uri;
        try {
            // Java URI 解析 rtmp:// 有时会报错,简单处理去掉 scheme
            String tempUrl = rtmpUrl.replace("rtmp://", "http://");
            uri = new URI(tempUrl);
        } catch (URISyntaxException e) {
            logger.error("[{}] RTMP URL 格式错误: {}", stream, rtmpUrl);
            return;
        }

        String host = uri.getHost();
        int port = uri.getPort();
        if (port == -1) port = 1935; // 默认 RTMP 端口

        Bootstrap b = new Bootstrap();
        b.group(sharedGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true) // 禁用 Nagle 算法,降低延迟
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        // 传入回调,当 Handler 状态变为 STREAMING 时通知 Client
                        RtmpHandshakeHandler handler = new RtmpHandshakeHandler(app, rtmpUrl, stream);
                        handler.setOnReadyListener(() -> isReady = true);
                        ch.pipeline().addLast(handler);
                    }
                });

        logger.info("[{}] 正在连接 RTMP 服务器: {}:{}", stream, host, port);
        b.connect(host, port).addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                this.channel = future.channel();
                logger.info("[{}] RTMP TCP 连接成功: {}", stream, rtmpUrl);
            } else {
                logger.error("[{}] RTMP 连接失败: {}", stream, rtmpUrl, future.cause());
            }
        });
    }

    public void send(ByteBuf flvTag) {
        // 【关键修复】
        // 1. 必须 channel active
        // 2. 必须 isReady (RTMP 握手已完成)
        // 否则直接丢弃包并释放内存,绝对不能发给服务器!
        if (channel != null && channel.isActive() && isReady) {
            channel.writeAndFlush(flvTag);
        } else {
            // 如果还没握手完成就发包,服务器会报错非法 BodySize
            // 必须释放 flvTag,否则内存泄漏
            if (flvTag.refCnt() > 0) {
                flvTag.release();
            }
        }
    }

    public void close() {
        if (channel != null) {
            channel.close();
        }
    }
}