VideoServerApp.java 9.56 KB
package com.genersoft.iot.vmp.jtt1078.app;

import cn.hutool.core.collection.ConcurrentHashSet;
import com.genersoft.iot.vmp.jtt1078.http.GeneralResponseWriter;
import com.genersoft.iot.vmp.jtt1078.http.NettyHttpServerHandler;
import com.genersoft.iot.vmp.jtt1078.publisher.PublishManager;
import com.genersoft.iot.vmp.jtt1078.server.Jtt1078Handler;
import com.genersoft.iot.vmp.jtt1078.server.Jtt1078MessageDecoder;
import com.genersoft.iot.vmp.jtt1078.server.SessionManager;
import com.genersoft.iot.vmp.jtt1078.util.Configs;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import sun.misc.Signal;
import sun.misc.SignalHandler;

import java.net.InetAddress;


/**
 * Created by matrixy on 2019/4/9.
 */
@Configuration
public class VideoServerApp
{
    private static Logger logger = LoggerFactory.getLogger(VideoServerApp.class);

    public static final ConcurrentHashSet<String> TAG_SET = new ConcurrentHashSet<>();

    @Value("${spring.profiles.active}")
    private String activeProfile;

    public void run(String... args) throws Exception {
        try {
            String configProperties = null;
            switch (activeProfile){
                case "wx-local":
                    configProperties = "/app.properties";
                    break;
                case "jt1078-dev100":
                    configProperties = "/app-dev100.properties";
                    break;
                case "jt1078-dev103":
                    configProperties = "/app-dev103.properties";
                    break;
                default:
                    break;
            }
            if (configProperties == null) {
                throw new RuntimeException(String.format("推流配置文件错误 [ %s ]", activeProfile));
            }
            Configs.init(configProperties);
            PublishManager.init();
            SessionManager.init();
            int port = Configs.getInt("server.port", 1078);
            int historyPort = Configs.getInt("server.history.port", 1078);
            VideoServer videoServer = new VideoServer(port);
            HttpServer httpServer = new HttpServer();
            VideoServer historyVideoServer = new VideoServer(historyPort);

            Signal.handle(new Signal("TERM"), new SignalHandler()
            {
                @Override
                public void handle(Signal signal)
                {
                    videoServer.shutdown();
                    httpServer.shutdown();
                    historyVideoServer.shutdown();
                }
            });

            videoServer.start();
            httpServer.start();
            historyVideoServer.historyStart();
        } catch (Exception e) {
            logger.error("端口监听异常 ===》 {}",e.getMessage(),e);
        }
    }

    public VideoServer getVideoServer(Integer port) {
        return new VideoServer(port);
    }

    public class VideoServer
    {
        private ServerBootstrap serverBootstrap;

        private EventLoopGroup bossGroup;
        private EventLoopGroup workerGroup;
        private final Integer port;

        public VideoServer(Integer port) {
            this.port = port;
        }

        private void start() throws Exception
        {
            serverBootstrap = new ServerBootstrap();
            serverBootstrap.option(ChannelOption.SO_BACKLOG, Configs.getInt("server.backlog", 102400));
            bossGroup = new NioEventLoopGroup(Configs.getInt("server.worker-count", Runtime.getRuntime().availableProcessors()));
            workerGroup = new NioEventLoopGroup();
            serverBootstrap.group(bossGroup, workerGroup)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(final SocketChannel channel) throws Exception {
                            ChannelPipeline p = channel.pipeline();
                            // p.addLast(new IdleStateHandler(10,0,0, TimeUnit.SECONDS));
                            p.addLast(new Jtt1078MessageDecoder());
                            // p.addLast(new Jtt808MessageEncoder());
                            // p.addLast(new JTT808Handler());
                            p.addLast(new Jtt1078Handler());
                        }
                    });

//            int historyPort = Configs.getInt("server.history.port", 30001);
            Channel ch = serverBootstrap.bind(InetAddress.getByName("0.0.0.0"), port).sync().channel();
//            Channel historyCh = serverBootstrap.bind(InetAddress.getByName("0.0.0.0"), historyPort).sync().channel();
            logger.info("Video Server started at: {}", port);
//            logger.info("Video history Server started at: {}", historyPort);
            ch.closeFuture();
//            historyCh.closeFuture();
        }

        public void historyStart() throws Exception
        {
            serverBootstrap = new ServerBootstrap();
            serverBootstrap.option(ChannelOption.SO_BACKLOG, Configs.getInt("server.backlog", 102400));
            bossGroup = new NioEventLoopGroup(Configs.getInt("server.worker-count", Runtime.getRuntime().availableProcessors()));
            workerGroup = new NioEventLoopGroup();
            serverBootstrap.group(bossGroup, workerGroup)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(final SocketChannel channel) throws Exception {
                            ChannelPipeline p = channel.pipeline();
                            // p.addLast(new IdleStateHandler(10,0,0, TimeUnit.SECONDS));
                            p.addLast(new Jtt1078MessageDecoder());
                            // p.addLast(new Jtt808MessageEncoder());
                            // p.addLast(new JTT808Handler());
                            p.addLast(new Jtt1078Handler(port));
                        }
                    });
            Channel ch = serverBootstrap.bind(InetAddress.getByName("0.0.0.0"), port).sync().channel();
            logger.info("Video Server started at: {}", port);
            ch.closeFuture();
        }

        public void shutdown()
        {
            try
            {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
            catch(Exception e)
            {
                e.printStackTrace();
            }
        }
    }

    class HttpServer
    {
        private ServerBootstrap serverBootstrap;

        private EventLoopGroup bossGroup;
        private EventLoopGroup workerGroup;

        private void start() throws Exception
        {
            bossGroup = new NioEventLoopGroup();
            workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());

            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>()
                    {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception
                        {
                            ch.pipeline().addLast(
                                    new GeneralResponseWriter(),
                                    new HttpResponseEncoder(),
                                    new HttpRequestDecoder(),
                                    new HttpObjectAggregator(1024 * 64),
                                    new NettyHttpServerHandler()
                            );
                        }
                    }).option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            try
            {
                int port = Configs.getInt("server.http.port", 3333);
//                int historyPort = Configs.getInt("server.history.http.port", 3334);
                ChannelFuture f = bootstrap.bind(InetAddress.getByName("0.0.0.0"), port).sync();
//                ChannelFuture historyF = bootstrap.bind(InetAddress.getByName("0.0.0.0"), historyPort).sync();
                logger.info("HTTP Server started at: {}", port);
//                logger.info("HTTP Server started at: {}", historyPort);
                f.channel().closeFuture().sync();
//                historyF.channel().closeFuture().sync();
            }
            catch (InterruptedException e)
            {
                logger.error("http server error", e);
            }
            finally
            {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }

        private void shutdown()
        {
            try
            {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
            catch(Exception e)
            {
                e.printStackTrace();
            }
        }
    }
}