NettyHttpServerHandler.java 12.5 KB
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package com.genersoft.iot.vmp.jt1078.http;

import com.genersoft.iot.vmp.jt1078.app.VideoServerApp;
import com.genersoft.iot.vmp.jt1078.entity.Media.Type;
import com.genersoft.iot.vmp.jt1078.publisher.PublishManager;
import com.genersoft.iot.vmp.jt1078.server.Jtt1078Handler;
import com.genersoft.iot.vmp.jt1078.server.Session;
import com.genersoft.iot.vmp.jt1078.util.FileUtils;
import com.genersoft.iot.vmp.jt1078.util.Packet;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Objects;

public class NettyHttpServerHandler extends ChannelInboundHandlerAdapter {
    private String tagMapping;
    private Integer httpPort;
    static Logger logger = LoggerFactory.getLogger(NettyHttpServerHandler.class);
    static final byte[] HTTP_403_DATA = "<h1>403 Forbidden</h1><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding--><!--padding-->".getBytes();
    static final byte[] SUCCESS = "{code:0}".getBytes();
    static final String HEADER_ENCODING = "ISO-8859-1";
    private static final AttributeKey<Session> SESSION_KEY = AttributeKey.valueOf("session");

    public NettyHttpServerHandler(String tagMapping, Integer httpPort) {
        this.tagMapping = tagMapping;
        this.httpPort = httpPort;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        FullHttpRequest fhr = (FullHttpRequest) msg;
        String uri = fhr.uri();
        Packet resp = Packet.create(1024);
        String tagMapping;
        long wid;
        if (uri.startsWith("/video/")) {
            tagMapping = uri.substring("/video/".length());
            resp.addBytes("HTTP/1.1 200 OK\r\n".getBytes("ISO-8859-1"));
            resp.addBytes("Connection: keep-alive\r\n".getBytes("ISO-8859-1"));
            resp.addBytes("Content-Type: video/x-flv\r\n".getBytes("ISO-8859-1"));
            resp.addBytes("Transfer-Encoding: chunked\r\n".getBytes("ISO-8859-1"));
            resp.addBytes("Cache-Control: no-cache\r\n".getBytes("ISO-8859-1"));
            resp.addBytes("Access-Control-Allow-Origin: *\r\n".getBytes("ISO-8859-1"));
            resp.addBytes("Access-Control-Allow-Credentials: true\r\n".getBytes("ISO-8859-1"));
            resp.addBytes("\r\n".getBytes("ISO-8859-1"));
            ctx.writeAndFlush(resp.getBytes()).await();
            logger.info("Thread id:[{}]", Thread.currentThread().getId());
            if (StringUtils.isEmpty(this.tagMapping)) {
                this.tagMapping = tagMapping;
            }


            wid = PublishManager.getInstance().subscribe(this.tagMapping, Type.Video, ctx, this.httpPort).getId();
            this.setSession(ctx, (new Session()).set("subscriber-id", wid).set("tag", this.tagMapping));
//            if (wid == 0) {

            try {
                Jtt1078Handler.createStreamProxy(this.tagMapping, httpPort);
            } catch (URISyntaxException e) {
                throw new RuntimeException(e);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }

//            }
        } else if (uri.equals("/test/multimedia")) {
            this.responseHTMLFile("/multimedia.html", ctx);
        } else {
            String httpPort;
            if (uri.startsWith("/stop/channel/")) {
                resp.addBytes("HTTP/1.1 200 OK\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Connection: keep-alive\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Content-Type: video/x-flv\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Transfer-Encoding: chunked\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Cache-Control: no-cache\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Access-Control-Allow-Origin: *\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Access-Control-Allow-Credentials: true\r\n".getBytes("ISO-8859-1"));
                tagMapping = uri.substring("/stop/channel/".length());
                String str = uri.substring("/stop/channel/".length());
                int endIndex = StringUtils.indexOf(str, "/");
                StringUtils.substring(str, 0, endIndex);
                Integer startIndex = endIndex + 1;
                endIndex = StringUtils.indexOf(str, "/", startIndex);
                httpPort = StringUtils.substring(str, startIndex, endIndex);
                startIndex = endIndex + 1;
                httpPort = StringUtils.substring(str, startIndex, str.length());
                PublishManager publishManager = PublishManager.getInstance();
                publishManager.unsubscribeAndClose(tagMapping);
                VideoServerApp.stopServer(Integer.parseInt(httpPort), Integer.parseInt(httpPort));
                logger.info("{}停流", tagMapping);
                ByteBuf body = Unpooled.buffer(SUCCESS.length);
                body.writeBytes(SUCCESS);
                FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(200), body);
                response.headers().add("Content-Length", SUCCESS.length);
                ctx.writeAndFlush(response).await();
                ctx.flush();
            } else if (uri.startsWith("/new/server/")) {
                resp.addBytes("HTTP/1.1 200 OK\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Connection: keep-alive\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Content-Type: video/x-flv\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Transfer-Encoding: chunked\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Cache-Control: no-cache\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Access-Control-Allow-Origin: *\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Access-Control-Allow-Credentials: true\r\n".getBytes("ISO-8859-1"));
                tagMapping = uri.substring("/new/server/".length());
                int endIndex = StringUtils.indexOf(tagMapping, "/");
                String key = StringUtils.substring(tagMapping, 0, endIndex);
                Integer startIndex = endIndex + 1;
                endIndex = StringUtils.indexOf(tagMapping, "/", startIndex);
                String port = StringUtils.substring(tagMapping, startIndex, endIndex);
                startIndex = endIndex + 1;
                httpPort = StringUtils.substring(tagMapping, startIndex, tagMapping.length());

                ByteBuf body = Unpooled.buffer(SUCCESS.length);
                body.writeBytes(SUCCESS);
                FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(200), body);
                response.headers().add("Content-Length", SUCCESS.length);
                ctx.writeAndFlush(response).await();
                ctx.flush();
            } else if (uri.startsWith("/play/history/")) {
                tagMapping = uri.substring("/play/history/".length());
                resp.addBytes("HTTP/1.1 200 OK\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Connection: keep-alive\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Content-Type: video/x-flv\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Transfer-Encoding: chunked\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Cache-Control: no-cache\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Access-Control-Allow-Origin: *\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("Access-Control-Allow-Credentials: true\r\n".getBytes("ISO-8859-1"));
                resp.addBytes("\r\n".getBytes("ISO-8859-1"));
                ctx.writeAndFlush(resp.getBytes()).await();
                logger.info("Thread id:[{}]", Thread.currentThread().getId());
                wid = PublishManager.getInstance().subscribe(tagMapping, Type.Video, ctx, this.httpPort).getId();
                this.setSession(ctx, (new Session()).set("subscriber-id", wid).set("tag", tagMapping));
            } else {
                ByteBuf body = Unpooled.buffer(HTTP_403_DATA.length);
                body.writeBytes(HTTP_403_DATA);
                FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(403), body);
                response.headers().add("Content-Length", HTTP_403_DATA.length);
                ctx.writeAndFlush(response).await();
                ctx.flush();
            }
        }

    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        Session session = this.getSession(ctx);
        if (session != null && session.has("subscriber-id") && session.has("tag")) {
            String tag = (String) session.get("tag");
            Long wid = (Long) session.get("subscriber-id");
            PublishManager.getInstance().unsubscribe(tag, wid);
        }

    }

    private void responseHTMLFile(String htmlFilePath, ChannelHandlerContext ctx) {
        byte[] fileData = FileUtils.read(NettyHttpServerHandler.class.getResourceAsStream(htmlFilePath));
        ByteBuf body = Unpooled.buffer(fileData.length);
        body.writeBytes(fileData);
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(200), body);
        response.headers().add("Content-Length", fileData.length);
        ctx.write(response);
        ctx.flush();
    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        cause.printStackTrace();
    }

    public final void setSession(ChannelHandlerContext context, Session session) {
        context.channel().attr(SESSION_KEY).set(session);
    }

    public final Session getSession(ChannelHandlerContext context) {
        Attribute<Session> attr = context.channel().attr(SESSION_KEY);
        return null == attr ? null : (Session) attr.get();
    }
}