Jtt1078Handler.java 7.15 KB
package cn.org.hentai.jtt1078.server;

import cn.org.hentai.jtt1078.entity.Media;
import cn.org.hentai.jtt1078.entity.MediaEncoding;
import cn.org.hentai.jtt1078.publisher.Channel;
import cn.org.hentai.jtt1078.publisher.PublishManager;
import cn.org.hentai.jtt1078.entity.Audio;
import cn.org.hentai.jtt1078.rtp.H264Packeter;
import cn.org.hentai.jtt1078.rtsp.RtspSessionManager;
import cn.org.hentai.jtt1078.util.Packet;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.rtsp.RtspDecoder;
import io.netty.handler.codec.rtsp.RtspEncoder;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.Iterator;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

/**
 * Created by matrixy on 2019/4/9.
 */
public class Jtt1078Handler extends SimpleChannelInboundHandler<Packet>
{
    static Logger logger = LoggerFactory.getLogger(Jtt1078Handler.class);
    private static final AttributeKey<Session> SESSION_KEY = AttributeKey.valueOf("session-key");

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception
    {
        io.netty.channel.Channel nettyChannel = ctx.channel();

        packet.seek(8);
        String sim = packet.nextBCD() + packet.nextBCD() + packet.nextBCD() + packet.nextBCD() + packet.nextBCD() + packet.nextBCD();
        int channel = packet.nextByte() & 0xff;
        String tag = sim + "-" + channel;

        if (SessionManager.contains(nettyChannel, "tag") == false)
        {
            Channel chl = PublishManager.getInstance().open(tag);
            SessionManager.set(nettyChannel, "tag", tag);
            new Thread(new PushTask(tag)).start();
            logger.info("start publishing: {} -> {}-{}", Long.toHexString(chl.hashCode() & 0xffffffffL), sim, channel);
        }

        Integer sequence = SessionManager.get(nettyChannel, "video-sequence");
        if (sequence == null) sequence = 0;
        // 1. 做好序号
        // 2. 音频需要转码后提供订阅
        int lengthOffset = 28;
        int dataType = (packet.seek(15).nextByte() >> 4) & 0x0f;
        int pkType = packet.seek(15).nextByte() & 0x0f;
        // 透传数据类型:0100,没有后面的时间以及Last I Frame Interval和Last Frame Interval字段
        if (dataType == 0x04) lengthOffset = 28 - 8 - 2 - 2;
        else if (dataType == 0x03) lengthOffset = 28 - 4;

        int pt = packet.seek(5).nextByte() & 0x7f;

        if (dataType == 0x00 || dataType == 0x01 || dataType == 0x02)
        {
            // 碰到结束标记时,序号+1
            if (pkType == 0 || pkType == 2)
            {
                sequence += 1;
                SessionManager.set(nettyChannel, "video-sequence", sequence);
            }
            long timestamp = packet.seek(16).nextLong();
            PublishManager.getInstance().publishVideo(tag, sequence, timestamp, pt, packet.seek(lengthOffset + 2).nextBytes());
            if (RtspSessionManager.isRegistered(tag)) {
                io.netty.channel.Channel push = RtspSessionManager.getPush(tag);
                H264Packeter packeter = (H264Packeter) push.attr(AttributeKey.valueOf(tag)).get();
                for (byte[] nalu : packeter.packet(packet.seek(lengthOffset + 2).nextBytes(), timestamp)) {
                    push.writeAndFlush(Unpooled.wrappedBuffer(nalu));
                }
            }
        }
        else if (dataType == 0x03)
        {
            long timestamp = packet.seek(16).nextLong();
            byte[] data = packet.seek(lengthOffset + 2).nextBytes();
            PublishManager.getInstance().publishAudio(tag, sequence, timestamp, pt, data);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception
    {
        super.channelInactive(ctx);
        release(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        // super.exceptionCaught(ctx, cause);
        cause.printStackTrace();
        release(ctx.channel());
        ctx.close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                String tag = SessionManager.get(ctx.channel(), "tag");
                logger.info("read timeout: {}",tag);
                release(ctx.channel());
            }
        }
    }

    private void release(io.netty.channel.Channel channel)
    {
        String tag = SessionManager.get(channel, "tag");
        if (tag != null)
        {
            logger.info("close netty channel: {}", tag);
            PublishManager.getInstance().close(tag);
        }
    }

    static class PushTask implements Runnable {

        private String channelId;

        private H264Packeter packeter = new H264Packeter();

        public PushTask(String channelId) {
            this.channelId = channelId;
        }

        @Override
        public void run() {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                ChannelFuture future = null;
                Bootstrap clientBootstrap = new Bootstrap();
                clientBootstrap.group(group)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .channel(NioSocketChannel.class)
                        .remoteAddress(new InetSocketAddress("192.168.169.100", 9555))
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) {
                                socketChannel.pipeline()
                                        .addLast(new RtspEncoder())
                                        .addLast(new RtspDecoder())
                                        .addLast(new RTSPHandler());
                            }
                        });
                while (true) {
                    logger.info("Waiting for server connection");
                    future = clientBootstrap.connect();
                    future.awaitUninterruptibly();
                    if (future.isSuccess()) {
                        logger.info("RTSP Connection success!");
                        break;
                    }
                    Thread.sleep(1000);
                }

                future.channel().attr(AttributeKey.valueOf(this.channelId)).set(this.packeter);
                RtspSessionManager.setPush(this.channelId, future.channel());
                // Wait for the server to close the connection.
                //future.channel().closeFuture().sync();
            } catch (Exception e) {
                logger.error("Error ->", e);
                logger.error("<- Error");
            }
        }
    }
}