RTSPHandler.java 6.72 KB
package cn.org.hentai.jtt1078.server;

import cn.org.hentai.jtt1078.rtsp.RtspRequest;
import cn.org.hentai.jtt1078.rtsp.RtspSessionManager;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.rtsp.*;
import io.netty.util.internal.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.*;

public class RTSPHandler extends ChannelInboundHandlerAdapter {

    private final static Logger log = LoggerFactory.getLogger(RTSPHandler.class);

    private String channelId = "138000099998-1";

    private HttpMethod currentMethod = RtspMethods.OPTIONS;

    private List<HttpMethod> methods = Arrays.asList(RtspMethods.OPTIONS, RtspMethods.ANNOUNCE, RtspMethods.SETUP, RtspMethods.RECORD, null);

    private RtspRequest rtspRequest = new RtspRequest("192.168.169.100", 9555, String.format("/schedule/%s?sign=41db35390ddad33f83944f44b8b75ded", channelId));

    /*
     When notified that the channel is active, sends a message. A channel is active
     when a connection has been established, so the method is invoked when the connections
     is established.
     */
    @Override
    public void channelActive(final ChannelHandlerContext ctx) {
        log.debug("channelActive, connection established: {}", ctx);
        log.debug("Sending request to the server");
        ctx.writeAndFlush(rtspRequest.option());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.info("exceptionCaught: {}", cause);
        ctx.close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        log.info("Received from RTSP Server: {}", ctx);
        log.info("Received from RTSP msg: {}", msg.toString());
        log.debug("Received Class Type: {}", msg.getClass().getTypeName());
        if (msg instanceof DefaultHttpResponse) {
            DefaultHttpResponse res = (DefaultHttpResponse) msg;
            if (RtspResponseStatuses.OK.equals(res.status())) {
                log.debug("{}: {}", currentMethod, res.status());
                if (res.headers().contains(RtspHeaderNames.SESSION)) {
                    rtspRequest.setSessionID(res.headers().get(RtspHeaderNames.SESSION));
                }
                nextMethod(ctx);
            } else {
                ctx.close();
            }
        } else if (msg instanceof DefaultHttpContent) {
            DefaultHttpContent content = (DefaultHttpContent) msg;
            log.info("Content: {}", content);

            ByteBuf byteBuf = content.content();
        } else {
            log.debug("dataType error: {}", msg.getClass().getTypeName());
        }
    }

    private void nextMethod(ChannelHandlerContext ctx) {
        for (int i = 0;i < methods.size(); i++) {
            if (methods.get(i).equals(currentMethod) && i < methods.size() - 1) {
                currentMethod = methods.get(i + 1);
                break;
            }
        }
        if (currentMethod == null) {
            RtspSessionManager.register(channelId);
            return;
        }
        if (currentMethod == RtspMethods.ANNOUNCE) {
            ctx.writeAndFlush(rtspRequest.announce());
        }
        if (currentMethod == RtspMethods.SETUP) {
            ctx.writeAndFlush(rtspRequest.setup());
        }
        if (currentMethod == RtspMethods.RECORD) {
            ctx.writeAndFlush(rtspRequest.record());
        }
    }

    private void parseSdp(String sdp) {
        log.debug("Parsing SDP: {}", sdp);
        Map<String, List<String>> mediaMap = new HashMap<>(10);
        String[] array = sdp.split("\\n");
        String mediaName = "";
        for (int i = 0; i < array.length; i++) {
            String line = array[i];
            if (line.startsWith("m=")) {
                mediaName = line.substring(line.indexOf("=") + 1, line.indexOf(" "));
                if (mediaName.equals("video") || mediaName.equals("audio")) {
                    mediaMap.put(mediaName, new ArrayList<>());
                } else {
                    mediaName = "";
                }
            } else if (!StringUtil.isNullOrEmpty(mediaName)) {
                if (line.startsWith("b=") || line.startsWith("a=")) {
                    List<String> medialist = mediaMap.get(mediaName);
                    medialist.add(line);
                }
            }
        }
        for (String mediaKey : mediaMap.keySet()) {
            StringBuilder stringBuilder = new StringBuilder();
            List<String> mediaInfo = mediaMap.get(mediaKey);
            mediaInfo.forEach((s) -> {
                stringBuilder.append("\n");
                stringBuilder.append(s);
            });
            log.info("[>>>>> {} <<<<<] {}", mediaKey, stringBuilder.toString());
        }
    }

    public static void main(String[] args) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap clientBootstrap = new Bootstrap();
            clientBootstrap.group(group)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress("192.168.169.100", 9555))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) {
                            socketChannel.pipeline()
                                    .addLast(new RtspEncoder())
                                    .addLast(new RtspDecoder())
                                    .addLast(new RTSPHandler());
                        }
                    });
            ChannelFuture f = null;
            while (true) {
                log.info("Waiting for server connection");
                f = clientBootstrap.connect();
                f.awaitUninterruptibly();
                if (f.isSuccess()) {
                    log.info("RTSP Connection success!");
                    break;
                }
                Thread.sleep(1000);
            }

            // Wait for the server to close the connection.
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("Error ->", e);
            log.error("<- Error");
        } finally {
            // Shut down executor threads to exit.Ahkk
            try {
                log.info("ShutdownGracefully the connection group");
                group.shutdownGracefully().sync();
            } catch (InterruptedException e) {
                log.error("", e);
            }
        }
    }
}