Channel.java 5.32 KB
package com.genersoft.iot.vmp.jtt1078.publisher;

import com.genersoft.iot.vmp.jtt1078.codec.AudioCodec;
import com.genersoft.iot.vmp.jtt1078.entity.Media;
import com.genersoft.iot.vmp.jtt1078.entity.MediaEncoding;
import com.genersoft.iot.vmp.jtt1078.flv.FlvEncoder;
// [恢复] 引用 FFmpeg 进程管理类
import com.genersoft.iot.vmp.jtt1078.subscriber.RTMPPublisher;
import com.genersoft.iot.vmp.jtt1078.subscriber.Subscriber;
import com.genersoft.iot.vmp.jtt1078.subscriber.VideoSubscriber;
import com.genersoft.iot.vmp.jtt1078.util.ByteHolder;
import com.genersoft.iot.vmp.jtt1078.util.Configs;
import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Channel
{
    static Logger logger = LoggerFactory.getLogger(Channel.class);

    ConcurrentLinkedQueue<Subscriber> subscribers;

    // [恢复] FFmpeg 推流进程管理器
    RTMPPublisher rtmpPublisher;
    // [删除] ZlmRtpPublisher rtpPublisher;

    String tag;
    boolean publishing;
    ByteHolder buffer;
    AudioCodec audioCodec;
    FlvEncoder flvEncoder;
    private long firstTimestamp = -1;

    public Channel(String tag)
    {
        this.tag = tag;
        this.subscribers = new ConcurrentLinkedQueue<>();
        this.flvEncoder = new FlvEncoder(true, true);
        this.buffer = new ByteHolder(2048 * 100);

        // [恢复] 启动 FFmpeg 进程
        // 只要配置了 rtmp.url,就启动 FFmpeg 去拉取当前的 HTTP 流并转码推送
        String rtmpUrl = Configs.get("rtmp.url");
        if (StringUtils.isNotBlank(rtmpUrl))
        {
            logger.info("[{}] 启动 FFmpeg 进程推流至: {}", tag, rtmpUrl);
            rtmpPublisher = new RTMPPublisher(tag);
            rtmpPublisher.start();
        }
    }

    public boolean isPublishing()
    {
        return publishing;
    }

    public Subscriber subscribe(ChannelHandlerContext ctx)
    {
        Subscriber subscriber = new VideoSubscriber(this.tag, ctx);
        this.subscribers.add(subscriber);
        return subscriber;
    }

    public void writeAudio(long timestamp, int pt, byte[] data)
    {
        // 1. 转码为 PCM (用于 FLV 封装,FFmpeg 会从 FLV 中读取 PCM 并转码为 AAC)
        if (audioCodec == null)
        {
            audioCodec = AudioCodec.getCodec(pt);
            logger.info("audio codec: {}", MediaEncoding.getEncoding(Media.Type.Audio, pt));
        }
        // 写入到内部广播,FFmpeg 通过 HTTP 拉取这个数据
        broadcastAudio(timestamp, audioCodec.toPCM(data));

        // [删除] rtpPublisher.sendAudio(...)
    }

    public void writeVideo(long sequence, long timeoffset, int payloadType, byte[] h264)
    {
        if (firstTimestamp == -1) firstTimestamp = timeoffset;
        this.publishing = true;
        this.buffer.write(h264);

        while (true)
        {
            byte[] nalu = readNalu();
            if (nalu == null) break;
            if (nalu.length < 4) continue;

            // 1. 封装为 FLV Tag (必须)
            // FFmpeg 通过 HTTP 读取这些 FLV Tag
            byte[] flvTag = this.flvEncoder.write(nalu, (int) (timeoffset - firstTimestamp));

            if (flvTag == null) continue;

            // 广播给所有的观众
            broadcastVideo(timeoffset, flvTag);
        }
    }

    public void broadcastVideo(long timeoffset, byte[] flvTag)
    {
        for (Subscriber subscriber : subscribers)
        {
            subscriber.onVideoData(timeoffset, flvTag, flvEncoder);
        }
    }

    public void broadcastAudio(long timeoffset, byte[] flvTag)
    {
        for (Subscriber subscriber : subscribers)
        {
            subscriber.onAudioData(timeoffset, flvTag, flvEncoder);
        }
    }

    public void unsubscribe(long watcherId)
    {
        for (Iterator<Subscriber> itr = subscribers.iterator(); itr.hasNext(); )
        {
            Subscriber subscriber = itr.next();
            if (subscriber.getId() == watcherId)
            {
                itr.remove();
                subscriber.close();
                return;
            }
        }
    }

    public void close()
    {
        for (Iterator<Subscriber> itr = subscribers.iterator(); itr.hasNext(); )
        {
            Subscriber subscriber = itr.next();
            subscriber.close();
            itr.remove();
        }

        // [恢复] 关闭 FFmpeg 进程
        if (rtmpPublisher != null) {
            rtmpPublisher.close();
            rtmpPublisher = null;
        }
    }

    // [恢复] 原版 readNalu (FFmpeg 偏好带 StartCode 的数据,或者 FlvEncoder 需要)
    // 之前为了 RTP 特意修改了切片逻辑,现在改回原版简单逻辑即可
    private byte[] readNalu()
    {
        // 寻找 00 00 00 01
        for (int i = 0; i < buffer.size() - 3; i++)
        {
            int a = buffer.get(i + 0) & 0xff;
            int b = buffer.get(i + 1) & 0xff;
            int c = buffer.get(i + 2) & 0xff;
            int d = buffer.get(i + 3) & 0xff;
            if (a == 0x00 && b == 0x00 && c == 0x00 && d == 0x01)
            {
                if (i == 0) continue;
                byte[] nalu = new byte[i];
                buffer.sliceInto(nalu, i);
                return nalu;
            }
        }
        return null;
    }
}