Channel.java 5.46 KB
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package com.genersoft.iot.vmp.jt1078.publisher;

import com.genersoft.iot.vmp.jt1078.codec.AudioCodec;
import com.genersoft.iot.vmp.jt1078.entity.MediaEncoding;
import com.genersoft.iot.vmp.jt1078.entity.Media.Type;
import com.genersoft.iot.vmp.jt1078.flv.FlvEncoder;
import com.genersoft.iot.vmp.jt1078.server.Jtt1078Handler;
import com.genersoft.iot.vmp.jt1078.subscriber.RTMPPublisher;
import com.genersoft.iot.vmp.jt1078.subscriber.Subscriber;
import com.genersoft.iot.vmp.jt1078.subscriber.VideoSubscriber;
import com.genersoft.iot.vmp.jt1078.util.ByteHolder;
import com.genersoft.iot.vmp.jt1078.util.Configs;
import io.netty.channel.ChannelHandlerContext;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Channel {
    static Logger logger = LoggerFactory.getLogger(Channel.class);
    ConcurrentLinkedQueue<Subscriber> subscribers;
    RTMPPublisher rtmpPublisher;
    String tag;
    boolean publishing;
    ByteHolder buffer;
    AudioCodec audioCodec;
    FlvEncoder flvEncoder;
    private long firstTimestamp = -1L;
    private Integer httpPort;
    private boolean flag = true;

    public Channel(String tag, Integer httpPort) {

        this.tag = tag;
        this.subscribers = new ConcurrentLinkedQueue();
        this.flvEncoder = new FlvEncoder(true, true);
        this.buffer = new ByteHolder(204800);
        this.httpPort = httpPort;
        if (!StringUtils.isEmpty(Configs.get("rtmp.url"))) {
            this.rtmpPublisher = new RTMPPublisher(tag, httpPort);
            this.rtmpPublisher.start();
        }

    }

    public boolean isPublishing() {
        return this.publishing;
    }

    public Subscriber subscribe(ChannelHandlerContext ctx) {
        logger.info("channel: {} -> {}, subscriber: {}", new Object[]{Long.toHexString((long)this.hashCode() & 4294967295L), this.tag, ctx.channel().remoteAddress().toString()});
        Subscriber subscriber = new VideoSubscriber(this.tag, ctx);
        this.subscribers.add(subscriber);


        return subscriber;
    }

    public void writeAudio(long timestamp, int pt, byte[] data) {
        if (this.audioCodec == null) {
            this.audioCodec = AudioCodec.getCodec(pt);
            logger.info("audio codec: {}", MediaEncoding.getEncoding(Type.Audio, pt));
        }

        this.broadcastAudio(timestamp, this.audioCodec.toPCM(data));
    }

    public void writeVideo(long sequence, long timeoffset, int payloadType, byte[] h264) {
        if (this.firstTimestamp == -1L) {
            this.firstTimestamp = timeoffset;
        }

        this.publishing = true;
        this.buffer.write(h264);

        while(true) {
            byte[] nalu = this.readNalu();
            if (nalu == null) {
                return;
            }

            if (nalu.length >= 4) {
                byte[] flvTag = this.flvEncoder.write(nalu, (int)(timeoffset - this.firstTimestamp));
                if (flvTag != null) {
                    this.broadcastVideo(timeoffset, flvTag);
                }
            }
        }
    }

    public void broadcastVideo(long timeoffset, byte[] flvTag) {
        Iterator var4 = this.subscribers.iterator();

        while(var4.hasNext()) {
            Subscriber subscriber = (Subscriber)var4.next();
            subscriber.onVideoData(timeoffset, flvTag, this.flvEncoder);
        }

    }

    public void broadcastAudio(long timeoffset, byte[] flvTag) {
        Iterator var4 = this.subscribers.iterator();

        while(var4.hasNext()) {
            Subscriber subscriber = (Subscriber)var4.next();
            subscriber.onAudioData(timeoffset, flvTag, this.flvEncoder);
        }

    }

    public void unsubscribe(long watcherId) {
        Iterator<Subscriber> itr = this.subscribers.iterator();

        Subscriber subscriber;
        do {
            if (!itr.hasNext()) {
                return;
            }

            subscriber = (Subscriber)itr.next();
        } while(subscriber.getId() != watcherId);

        itr.remove();
        subscriber.close();
    }

    public long getWatcherId(String tag) {
        Iterator<Subscriber> itr = this.subscribers.iterator();

        Subscriber subscriber;
        do {
            if (!itr.hasNext()) {
                return -1100L;
            }

            subscriber = (Subscriber)itr.next();
        } while(!StringUtils.equals(tag, subscriber.getTag()));

        return subscriber.getId();
    }

    public void close() {
        Iterator<Subscriber> itr = this.subscribers.iterator();

        while(itr.hasNext()) {
            Subscriber subscriber = (Subscriber)itr.next();
            subscriber.close();
            itr.remove();
        }

        if (this.rtmpPublisher != null) {
            this.rtmpPublisher.close();
        }

    }

    private byte[] readNalu() {
        for(int i = 0; i < this.buffer.size(); ++i) {
            int a = this.buffer.get(i + 0) & 255;
            int b = this.buffer.get(i + 1) & 255;
            int c = this.buffer.get(i + 2) & 255;
            int d = this.buffer.get(i + 3) & 255;
            if (a == 0 && b == 0 && c == 0 && d == 1 && i != 0) {
                byte[] nalu = new byte[i];
                this.buffer.sliceInto(nalu, i);
                return nalu;
            }
        }

        return null;
    }
}