ZlmRtpPublisher.java 5.74 KB
package com.genersoft.iot.vmp.jtt1078.subscriber;

import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.genersoft.iot.vmp.jtt1078.util.Configs;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;

public class ZlmRtpPublisher {
    private static final Logger logger = LoggerFactory.getLogger(ZlmRtpPublisher.class);
    private static final EventLoopGroup group = new NioEventLoopGroup();

    private Channel nettyChannel;
    private InetSocketAddress zlmRtpAddress;
    private String streamId;

    // [核心修复1] 音视频分离的状态
    private int audioSsrc;
    private int videoSsrc;
    private int audioSeq = 0;
    private int videoSeq = 0;

    public ZlmRtpPublisher(String streamId) {
        this.streamId = streamId;
    }

    public void start() {
        try {
            String zlmHost = Configs.get("zlm.host");
            int zlmHttpPort = Configs.getInt("zlm.http.port", 80);
            String zlmSecret = Configs.get("zlm.secret");

            // 生成 SSRC: 视频用 Hash,音频用 Hash+1,确保不冲突
            int baseSsrc = (streamId.hashCode() & 0x7FFFFFFF);
            this.videoSsrc = baseSsrc;
            this.audioSsrc = baseSsrc + 1;

            // 申请端口
            String url = String.format("http://%s:%d/index/api/openRtpServer?secret=%s&stream_id=%s&ssrc=%s&port=0&enable_tcp=0&app=schedule",
                    zlmHost, zlmHttpPort, zlmSecret, streamId, videoSsrc);

            logger.info("[{}] 申请 RTP 端口: {}", streamId, url);
            String response = HttpUtil.get(url);
            JSONObject json = JSONUtil.parseObj(response);

            if (json.getInt("code") != 0) {
                throw new RuntimeException("ZLM openRtpServer error: " + response);
            }

            int rtpPort = json.getInt("port");
            this.zlmRtpAddress = new InetSocketAddress(zlmHost, rtpPort);

            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_SNDBUF, 1024 * 1024)
                    .handler(new io.netty.channel.ChannelHandlerAdapter() {});

            this.nettyChannel = b.bind(0).sync().channel();
            logger.info("[{}] RTP 推流就绪 (UDP) -> {}:{}", streamId, zlmHost, rtpPort);

        } catch (Exception e) {
            logger.error("[{}] 启动 RTP 推流失败", streamId, e);
        }
    }

    public void sendVideo(long timestampMs, byte[] nalu) {
        if (nettyChannel == null || zlmRtpAddress == null) return;

        // [核心修复2] 智能剔除 H.264 Start Code (00 00 00 01 或 00 00 01)
        int offset = 0;
        if (nalu.length > 4 && nalu[0] == 0 && nalu[1] == 0 && nalu[2] == 0 && nalu[3] == 1) {
            offset = 4;
        } else if (nalu.length > 3 && nalu[0] == 0 && nalu[1] == 0 && nalu[2] == 1) {
            offset = 3;
        }

        int length = nalu.length - offset;
        if (length <= 0) return; // 空包不发

        long rtpTimestamp = timestampMs * 90; // 90kHz
        int maxPacketSize = 1400;

        if (length <= maxPacketSize) {
            // 单包
            ByteBuf packet = createRtpPacket(rtpTimestamp, true, 96, videoSsrc, videoSeq++);
            packet.writeBytes(nalu, offset, length);
            sendRtp(packet);
        } else {
            // FU-A 分片
            byte naluHeader = nalu[offset];
            int nri = naluHeader & 0x60;
            int type = naluHeader & 0x1f;

            offset += 1;
            length -= 1;

            boolean first = true;
            while (length > 0) {
                int currLen = Math.min(length, maxPacketSize);
                boolean last = (length == currLen);

                // 只有最后一包打 Marker
                ByteBuf packet = createRtpPacket(rtpTimestamp, last, 96, videoSsrc, videoSeq++);

                packet.writeByte(nri | 28); // FU Indicator

                int fuHeader = type;
                if (first) fuHeader |= 0x80;
                if (last) fuHeader |= 0x40;
                packet.writeByte(fuHeader); // FU Header

                packet.writeBytes(nalu, offset, currLen);
                sendRtp(packet);

                offset += currLen;
                length -= currLen;
                first = false;
            }
        }
    }

    public void sendAudio(long timestampMs, byte[] data) {
        if (nettyChannel == null || zlmRtpAddress == null) return;

        // 音频必须使用独立的序列号 audioSeq
        // RTP 音频时钟 8kHz
        long rtpTimestamp = timestampMs * 8;

        ByteBuf packet = createRtpPacket(rtpTimestamp, true, 8, audioSsrc, audioSeq++);
        packet.writeBytes(data);
        sendRtp(packet);
    }

    // 统一封装 RTP Header
    private ByteBuf createRtpPacket(long timestamp, boolean marker, int pt, int ssrc, int seqNum) {
        ByteBuf buf = Unpooled.buffer(1500);
        buf.writeByte(0x80); // V=2
        buf.writeByte((marker ? 0x80 : 0x00) | pt); // M | PT
        buf.writeShort(seqNum); // Sequence
        buf.writeInt((int) timestamp); // Timestamp
        buf.writeInt(ssrc); // SSRC
        return buf;
    }

    private void sendRtp(ByteBuf packet) {
        nettyChannel.writeAndFlush(new DatagramPacket(packet, zlmRtpAddress));
    }

    public void close() {
        if (nettyChannel != null) nettyChannel.close();
    }
}