RtmpChunkWriter.java 14.1 KB
package com.genersoft.iot.vmp.jtt1078.rtmp;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * FLV数据到RTMP消息的转换器
 *
 * 功能:
 * 1. 解析FLV Video Tag,提取H.264 NAL单元
 * 2. 将FLV时间戳转换为RTMP时间戳
 * 3. 处理SPS/PPS序列头
 * 4. 处理I帧和P帧
 *
 * FLV Video Tag格式:
 * [PreviousTagSize(4)][Tag Type(1)][DataSize(3)][Timestamp(3)][TimestampExt(1)][StreamID(3)][VideoData]
 *
 * VideoData格式(H.264):
 * [FrameType(1)][AVCPacketType(1)][CompositionTime(3)][NALUs]
 *
 * - FrameType: 0x17=I帧, 0x27=P/B帧
 * - AVCPacketType: 0x00=AVC Sequence Header, 0x01=NALU
 * - CompositionTime: pts - dts偏移
 */
public class RtmpChunkWriter {

    static Logger logger = LoggerFactory.getLogger(RtmpChunkWriter.class);

    private RtmpConnection connection;
    private String tag;

    // 时间戳
    private int lastVideoTimestamp = 0;
    private int lastAudioTimestamp = 0;

    // 关键标记
    private boolean hasSentAVCSequenceHeader = false;
    private byte[] lastSPS = null;
    private byte[] lastPPS = null;

    // FLV Tag类型常量
    private static final byte FLV_TAG_TYPE_VIDEO = 9;
    private static final byte FLV_TAG_TYPE_AUDIO = 8;

    // AVC NAL单元类型
    private static final int NAL_UNIT_TYPE_SPS = 7;
    private static final int NAL_UNIT_TYPE_PPS = 8;
    private static final int NAL_UNIT_TYPE_IDR = 5;
    private static final int NAL_UNIT_TYPE_NON_IDR = 1;

    public RtmpChunkWriter(RtmpConnection connection, String tag) {
        this.connection = connection;
        this.tag = tag;
    }

    /**
     * 发送FLV视频数据
     *
     * @param flvData 完整的FLV Video Tag数据
     * @param timestamp FLV中的时间戳
     */
    public void sendVideoData(byte[] flvData, int timestamp) throws IOException {
        if (flvData == null || flvData.length < 16) {
            return;
        }

        // 跳过PreviousTagSize (4字节)
        int offset = 4;

        // 检查Tag Type
        if (flvData[offset] != FLV_TAG_TYPE_VIDEO) {
            return;
        }
        offset++;

        // 读取DataSize (3字节, big-endian)
        int dataSize = ((flvData[offset] & 0xFF) << 16) |
                       ((flvData[offset + 1] & 0xFF) << 8) |
                       (flvData[offset + 2] & 0xFF);
        offset += 3;

        // 跳过Timestamp (3字节) + TimestampExt (1字节)
        offset += 4;

        // 跳过StreamID (3字节)
        offset += 3;

        // 现在offset指向VideoData
        if (offset >= flvData.length) {
            return;
        }

        byte videoInfo = flvData[offset];  // FrameType + CodecID
        offset++;

        byte avcPacketType = flvData[offset];  // AVCPacketType
        offset++;

        // 读取CompositionTime (3字节)
        int compositionTime = ((flvData[offset] & 0xFF) << 16) |
                             ((flvData[offset + 1] & 0xFF) << 8) |
                             (flvData[offset + 2] & 0xFF);
        offset += 3;

        // 计算RTMP时间戳
        // FLV timestamp单位是毫秒,RTMP也是毫秒
        int rtmpTimestamp = timestamp & 0xFFFFFF;  // 24位时间戳
        lastVideoTimestamp = rtmpTimestamp;

        if (avcPacketType == 0x00) {
            // AVC Sequence Header (SPS/PPS)
            logger.info("[{}] 收到AVC Sequence Header (SPS/PPS), offset={}, remaining={}", tag, offset, flvData.length - offset);
            handleAVCSequenceHeader(flvData, offset);
        } else if (avcPacketType == 0x01) {
            // NALU
            logger.info("[{}] 收到NALU数据, timestamp={}, offset={}, remaining={}", tag, rtmpTimestamp, offset, flvData.length - offset);
            handleNALU(flvData, offset, rtmpTimestamp);
        } else {
            logger.warn("[{}] 未知AVCPacketType: 0x{}, 数据长度={}", tag, String.format("%02X", avcPacketType), flvData.length);
        }
    }

    /**
     * 处理AVC Sequence Header
     *
     * FLV中封装的AVCDecoderConfigurationRecord:
     * [configurationVersion(1)][AVCProfileIndication(1)][profile_compatibility(1)][AVCLevelIndication(1)]
     * [lengthSizeMinusOne(1)][numOfSequenceParameterSets(1)][SPS length(2)][SPS data][numOfPictureParameterSets(1)][PPS length(2)][PPS data]
     */
    private void handleAVCSequenceHeader(byte[] data, int offset) {
        try {
            // 解析AVCDecoderConfigurationRecord
            int pos = offset;

            byte configurationVersion = data[pos++];
            byte AVCProfileIndication = data[pos++];
            byte profileCompatibility = data[pos++];
            byte AVCLevelIndication = data[pos++];

            byte lengthSizeMinusOne = data[pos++];  // 通常是3,表示NALU长度占4字节

            // SPS
            byte numOfSequenceParameterSets = data[pos++];
            int spsLength = ((data[pos] & 0xFF) << 8) | (data[pos + 1] & 0xFF);
            pos += 2;
            lastSPS = new byte[spsLength];
            System.arraycopy(data, pos, lastSPS, 0, spsLength);
            pos += spsLength;

            // PPS
            byte numOfPictureParameterSets = data[pos++];
            int ppsLength = ((data[pos] & 0xFF) << 8) | (data[pos + 1] & 0xFF);
            pos += 2;
            lastPPS = new byte[ppsLength];
            System.arraycopy(data, pos, lastPPS, 0, ppsLength);

            hasSentAVCSequenceHeader = false;  // 重置,强制发送新的

            // 调试日志:打印SPS/PPS的十六进制
            StringBuilder spsHex = new StringBuilder();
            for (int i = 0; i < Math.min(lastSPS.length, 20); i++) {
                spsHex.append(String.format("%02X ", lastSPS[i] & 0xFF));
            }
            StringBuilder ppsHex = new StringBuilder();
            for (int i = 0; i < Math.min(lastPPS.length, 10); i++) {
                ppsHex.append(String.format("%02X ", lastPPS[i] & 0xFF));
            }
            logger.info("[{}] 解析到AVC Sequence Header: SPS长度={}, PPS长度={}, SPS前20字节=[{}], PPS前10字节=[{}], profile={}, level={}",
                    tag, spsLength, ppsLength, spsHex.toString(), ppsHex.toString(),
                    String.format("0x%02X", AVCProfileIndication), String.format("0x%02X", AVCLevelIndication));

        } catch (Exception e) {
            logger.error("[{}] 解析AVC Sequence Header失败: {}", tag, e.getMessage());
        }
    }

    /**
     * 处理NALU数据
     */
    private void handleNALU(byte[] data, int offset, int timestamp) throws IOException {
        if (lastSPS == null || lastPPS == null) {
            logger.warn("[{}] 还未收到SPS/PPS,跳过NALU. lastSPS={}, lastPPS={}", tag,
                    lastSPS != null ? "set(" + lastSPS.length + ")" : "null",
                    lastPPS != null ? "set(" + lastPPS.length + ")" : "null");
            return;
        }

        // 发送AVC Sequence Header(如果还没发送)
        if (!hasSentAVCSequenceHeader) {
            sendAVCSequenceHeaderPacket();
            hasSentAVCSequenceHeader = true;
        }

        // 解析NALU
        // FLV中NALU格式: [NALU长度(4字节)][NALU数据...]
        int pos = offset;
        int nalType = -1;

        while (pos + 4 < data.length) {
            // 读取NALU长度
            int nalLength = ((data[pos] & 0xFF) << 24) |
                           ((data[pos + 1] & 0xFF) << 16) |
                           ((data[pos + 2] & 0xFF) << 8) |
                           (data[pos + 3] & 0xFF);
            pos += 4;

            if (pos + nalLength > data.length) {
                break;
            }

            nalType = data[pos] & 0x1F;

            // 构建RTMP视频数据
            // 格式: [FrameType(1)][AVCPacketType(1)][CompositionTime(3)][NALU]
            byte[] rtmpData = new byte[5 + nalLength];
            rtmpData[0] = (byte) (nalType == NAL_UNIT_TYPE_IDR ? 0x17 : 0x27);  // I帧或P帧
            rtmpData[1] = 0x01;  // AVCPacketType = NALU
            rtmpData[2] = 0x00;  // CompositionTime
            rtmpData[3] = 0x00;
            rtmpData[4] = 0x00;

            // 复制NALU数据
            System.arraycopy(data, pos, rtmpData, 5, nalLength);

            // 打印NALU的十六进制便于调试
            StringBuilder hex = new StringBuilder();
            for (int i = 0; i < Math.min(nalLength, 20); i++) {
                hex.append(String.format("%02X ", rtmpData[5 + i] & 0xFF));
            }
            if (nalLength > 20) {
                hex.append("...(").append(nalLength).append(" bytes total)");
            }
            logger.info("[{}] 发送NALU到RTMP: nalType=0x{}, timestamp={}, nalLength={}, firstBytes={}", tag,
                    Integer.toHexString(nalType), timestamp, nalLength, hex.toString());

            // 发送到RTMP
            connection.sendVideoData(rtmpData, timestamp);

            pos += nalLength;
        }
    }

    /**
     * 发送AVC Sequence Header到RTMP
     * 这是实现无缝码流切换的关键!
     *
     * 注意:lastSPS和lastPPS是从FLV数据中提取的,不包含start code
     * FLV中的SPS格式: [profile_idc(1)][constraint(1)][level_idc(1)][...]
     * 实际H.264 SPS (Annex B): [00 00 00 01][67][profile_idc][constraint][level_idc][...]
     */
    private void sendAVCSequenceHeaderPacket() throws IOException {
        if (lastSPS == null || lastPPS == null) {
            logger.warn("[{}] SPS/PPS为空,无法发送Sequence Header", tag);
            return;
        }

        // 构建AVCDecoderConfigurationRecord
        // 这个格式和FLV中的略有不同,需要转换
        int recordLength = 11 + lastSPS.length + lastPPS.length;
        byte[] configRecord = new byte[recordLength];

        int pos = 0;
        configRecord[pos++] = 0x01;  // configurationVersion
        // SPS是从FLV提取的,没有start code
        // FLV SPS: [profile_idc(1)][constraint(1)][level_idc(1)][...]
        // 所以 profile_idc 在 lastSPS[0], constraint 在 lastSPS[1], level 在 lastSPS[2]
        configRecord[pos++] = lastSPS.length > 0 ? lastSPS[0] : 0x64;  // AVCProfileIndication
        configRecord[pos++] = lastSPS.length > 1 ? lastSPS[1] : 0x00;  // profile_compatibility
        configRecord[pos++] = lastSPS.length > 2 ? lastSPS[2] : 0x1F;  // AVCLevelIndication
        configRecord[pos++] = (byte) 0xFF;  // lengthSizeMinusOne = 3 (NAL长度占4字节)

        // SPS
        configRecord[pos++] = (byte) 0xE1;  // numOfSequenceParameterSets = 1
        configRecord[pos++] = (byte) ((lastSPS.length >> 8) & 0xFF);  // SPS length high
        configRecord[pos++] = (byte) (lastSPS.length & 0xFF);  // SPS length low
        System.arraycopy(lastSPS, 0, configRecord, pos, lastSPS.length);
        pos += lastSPS.length;

        // PPS (同样没有start code)
        configRecord[pos++] = 0x01;  // numOfPictureParameterSets = 1
        configRecord[pos++] = (byte) ((lastPPS.length >> 8) & 0xFF);  // PPS length high
        configRecord[pos++] = (byte) (lastPPS.length & 0xFF);  // PPS length low
        System.arraycopy(lastPPS, 0, configRecord, pos, lastPPS.length);

        // 构建RTMP视频数据
        // [FrameType(1)][AVCPacketType(1)][CompositionTime(3)][ConfigRecord]
        byte[] rtmpData = new byte[5 + recordLength];
        rtmpData[0] = 0x17;  // AVC sequence header (I-frame)
        rtmpData[1] = 0x00;  // AVCPacketType = AVC sequence header
        rtmpData[2] = 0x00;
        rtmpData[3] = 0x00;
        rtmpData[4] = 0x00;
        System.arraycopy(configRecord, 0, rtmpData, 5, recordLength);

        // 打印Sequence Header的十六进制便于调试
        StringBuilder hex = new StringBuilder();
        for (int i = 0; i < Math.min(rtmpData.length, 64); i++) {
            hex.append(String.format("%02X ", rtmpData[i] & 0xFF));
        }
        if (rtmpData.length > 64) {
            hex.append("...(").append(rtmpData.length).append(" bytes total)");
        }
        logger.info("[{}] 发送AVC Sequence Header到RTMP, 数据长度={}, hex={}", tag, rtmpData.length, hex.toString());
        logger.info("[{}] SPS profile/level: configRecord[1]={}, configRecord[3]={}",
                tag, String.format("0x%02X", configRecord[1]), String.format("0x%02X", configRecord[3]));

        // 发送到RTMP
        connection.sendVideoData(rtmpData, 0);
    }

    /**
     * 强制发送新的AVC Sequence Header
     * 用于码流切换时立即发送新的SPS/PPS
     */
    public void forceSendAVCSequenceHeader() {
        hasSentAVCSequenceHeader = false;
    }

    /**
     * 更新SPS/PPS(当检测到码流变化时调用)
     */
    public void updateSPSPPS(byte[] sps, byte[] pps) {
        this.lastSPS = sps;
        this.lastPPS = pps;
        this.hasSentAVCSequenceHeader = false;  // 重置,强制发送新的
        logger.info("[{}] 更新SPS/PPS: SPS长度={}, PPS长度={}", tag,
                sps != null ? sps.length : 0, pps != null ? pps.length : 0);
    }

    /**
     * 发送FLV音频数据
     */
    public void sendAudioData(byte[] flvData, int timestamp) throws IOException {
        if (flvData == null || flvData.length < 16) {
            return;
        }

        // 跳过PreviousTagSize (4字节)
        int offset = 4;

        // 检查Tag Type
        if (flvData[offset] != FLV_TAG_TYPE_AUDIO) {
            return;
        }
        offset++;

        // 读取DataSize (3字节)
        int dataSize = ((flvData[offset] & 0xFF) << 16) |
                       ((flvData[offset + 1] & 0xFF) << 8) |
                       (flvData[offset + 2] & 0xFF);
        offset += 3;

        // 跳过Timestamp (3字节) + TimestampExt (1字节)
        offset += 4;

        // 跳过StreamID (3字节)
        offset += 3;

        // 提取音频数据
        int audioDataSize = dataSize - 1;  // 减去audioInfo的1字节
        if (offset + audioDataSize > flvData.length) {
            return;
        }

        byte[] audioData = new byte[audioDataSize];
        System.arraycopy(flvData, offset + 1, audioData, 0, audioDataSize);  // +1 跳过audioInfo

        lastAudioTimestamp = timestamp & 0xFFFFFF;
        connection.sendAudioData(audioData, lastAudioTimestamp);
    }

    public boolean hasSentAVCSequenceHeader() {
        return hasSentAVCSequenceHeader;
    }
}