SocketServer.java 4.55 KB
package com.ruoyi.system.server;

import com.ruoyi.common.core.redis.RedisCache;
import com.ruoyi.system.handler.SocketServerHandler;
import com.ruoyi.system.protocol.ThinkraceUtil;
import com.ruoyi.system.protocol.adapter.ICommandAdapter;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import java.util.Map;

/**
 * @author 20412
 */
@Component
public class SocketServer implements CommandLineRunner {
    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private RedisCache redisCache;

    private static final Logger log = LoggerFactory.getLogger(SocketServer.class);
    EventLoopGroup bossGroup = null;
    EventLoopGroup workerGroup = null;
    @Value("${netty.socket.port:8989}")
    private Integer port;
    
    @Value("${netty.enabled:false}")
    private Boolean enabled;

    @Override
    public void run(String... args) throws Exception {
        if (enabled) {
            // 创建一个线程启动netty  这个CommandLineRunner 接口在主服务基本启动完后会调用使用的是主线程 netty启动会占用线程一直监听端口
            Thread thread = new Thread(new NettyServer());
            log.info("netty server active...");
            thread.start();
            // 初始化命令适配器
            log.info("command adapter init...");
            initCommandAdapter();
        }
    }

    private void initCommandAdapter() {
        // 把所有适配器注入到工具类中管理
        Map<String, ICommandAdapter> implementations = applicationContext.getBeansOfType(ICommandAdapter.class);
        for (Map.Entry<String, ICommandAdapter> entry : implementations.entrySet()) {
            ThinkraceUtil.addAdapter(entry.getKey(), entry.getValue());
        }
        ThinkraceUtil.initRedisThinkraceCommandParam(redisCache);
    }

    /**
     * 销毁方法
     *
     * @throws Exception
     */
    @PreDestroy
    public void destroy() throws Exception {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully();
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
        log.info("netty server destroy...");
    }

    class NettyServer implements Runnable {
        @Override
        public void run() {
            try {
                bossGroup = new NioEventLoopGroup();
                workerGroup = new NioEventLoopGroup();
                //创建服务端的启动对象,设置参数
                ServerBootstrap bootstrap = new ServerBootstrap();
                //设置两个线程组boosGroup和workerGroup
                bootstrap.group(bossGroup, workerGroup)
                        //设置服务端通道实现类型
                        .channel(NioServerSocketChannel.class)
                        //设置线程队列得到连接个数
                        .option(ChannelOption.SO_BACKLOG, 128)
                        //设置保持活动连接状态
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        //使用匿名内部类的形式初始化通道对象
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                //给pipeline管道设置处理器
                                socketChannel.pipeline().addLast(new SocketServerHandler());
                            }
                        });//给workerGroup的EventLoop对应的管道设置处理器
                //绑定端口号,启动服务端
                ChannelFuture channelFuture = bootstrap.bind(port).sync();
                //对关闭通道进行监听
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.error("SocketServer at line 71:" + e.getMessage());
            }
        }
    }

}