ClientApp.java 7.85 KB
package com.bsth.client;

import com.bsth.client.pd.codec.PdMessageCodecFactory;
import com.bsth.client.pd.handler.PdClientHandler;
import com.bsth.client.pd.protocol.Pd_31_0;
import com.bsth.client.pf.handler.PfClientHandler;
import com.bsth.util.ConfigUtil;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.session.IoSessionConfig;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LogLevel;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.concurrent.*;

/**
 * Created by panzhao on 2017/5/4.
 */
@Component
@Order(1)
public class ClientApp implements CommandLineRunner{

    private static NioSocketConnector pdDataConnector;
    //private static NioSocketConnector pfDataConnector;

    @Autowired
    private PdClientHandler pdClient;
    @Autowired
    private PfClientHandler pfClient;
    @Autowired
    GpsBeforeBuffer gpsBeforeBuffer;

    static Logger logger = LoggerFactory.getLogger(ClientApp.class);
    private static ExecutorService exec;

    private ScheduledExecutorService sexec;

    public static boolean dconnect(String device) {
        boolean flag = false;
        try {
            ConnectFuture con = pdDataConnector.connect(new InetSocketAddress(ConfigUtil.get("gps.server.pd"), Integer.parseInt(ConfigUtil.get("gps.port.pd"))));
            con.awaitUninterruptibly();
            IoSession session = con.getSession();
            session.setAttribute("deviceId", device);
            com.bsth.client.pd.protocol.PdMessage msg = new com.bsth.client.pd.protocol.PdMessage();
            Pd_31_0 body = new Pd_31_0();
            body.setFunCode((short)0x15);
            body.setLineId(0);
            body.setDeviceId(device);
            msg.setMessageBody(body);
            msg.setVersion((short)1);
            msg.setSerialNo((short)1);
            msg.setCommandType((short)0x31);
            byte[] bytes = msg.write();
            WriteFuture write = session.write(bytes);
            write.awaitUninterruptibly();
            flag = true;

            logger.info("dconnect...");
            pdSession = session;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return flag;
    }

    public static void pdconnect(final String device) {
        exec.submit(new Runnable() {

            @Override
            public void run() {
                // TODO Auto-generated method stub
                long now = System.currentTimeMillis();
                boolean flag = false;
                while (!flag) {
                    flag = dconnect(device);
                }
                System.out.println("设备编号:" + device + "重连, cost time: " + (System.currentTimeMillis() - now));
            }
        });
    }

/*    public static void pfconnect(final String device) {
        exec.submit(new Runnable() {

            @Override
            public void run() {
                // TODO Auto-generated method stub
                long now = System.currentTimeMillis();
                boolean flag = false;
                while (!flag) {
                    flag = fconnect(device);
                }
                System.out.println("重连, cost time: " + (System.currentTimeMillis() - now));
            }
        });
    }*/

    public static void pdreconn(){
        pdconnect(ConfigUtil.get("forward.device.name"));
    }

    /*public static void pfreconn(){
        pfconnect(ConfigUtil.get("forward.device.name"));
    }*/

    public void destroy(){
        try {
            logger.warn("socket client destroy!!!");
            exec.shutdownNow();
            sexec.shutdownNow();

            pdDataConnector.dispose(true);
            //pfDataConnector.dispose(true);
        } catch (Exception e) {
            logger.error("", e);
        }
    }


    public static void pdClose(){
        pdSession.closeNow();
    }

    public static void pfClose(){
        pfSession.closeNow();
    }

/*    public static boolean fconnect(String device) {
        boolean flag = false;
        try {
            ConnectFuture con = pfDataConnector.connect(new InetSocketAddress(ConfigUtil.get("gps.server.pf"),  Integer.parseInt(ConfigUtil.get("gps.port.pf"))));
            con.awaitUninterruptibly();
            IoSession session = con.getSession();
            session.setAttribute("deviceId", device);
            flag = true;

            logger.info("fconnect...");
            pfSession = session;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return flag;
    }*/

    public void init() {
        logger.warn("socket client init...");
        exec = Executors.newFixedThreadPool(50);
        sexec = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                // TODO Auto-generated method stub
                Thread t = new Thread(r);
                t.setName("SessionCheckExecutor");
                return t;
            }
        });
        sexec.scheduleAtFixedRate(new SessionChecker(), 30, 30, TimeUnit.SECONDS);
        /*******************************浦东********************************/
        pdDataConnector = new NioSocketConnector();

        LoggingFilter log = new LoggingFilter();
        log.setMessageReceivedLogLevel(LogLevel.DEBUG);
        pdDataConnector.getFilterChain().addLast("logger", log);

        pdDataConnector.getFilterChain().addLast("codec",
                new ProtocolCodecFilter(new PdMessageCodecFactory()));

        IoSessionConfig config = pdDataConnector.getSessionConfig();

        config.setReadBufferSize(4096);
        config.setWriteTimeout(10000);
        config.setWriterIdleTime(60000);

        config.setIdleTime(IdleStatus.BOTH_IDLE, 60);

        pdDataConnector.setHandler(pdClient);

        pdconnect(ConfigUtil.get("forward.device.name"));
        /*******************************浦东转发*******************************
        pfDataConnector = new NioSocketConnector();

        LoggingFilter log1 = new LoggingFilter();
        log1.setMessageReceivedLogLevel(LogLevel.DEBUG);
        pfDataConnector.getFilterChain().addLast("logger", log1);

        pfDataConnector.getFilterChain().addLast("codec",
                new ProtocolCodecFilter(new PfMessageCodecFactory()));

        IoSessionConfig config1 = pfDataConnector.getSessionConfig();

        config1.setReadBufferSize(4096);
        config1.setWriteTimeout(10000);
        config1.setWriterIdleTime(60000);

        config1.setIdleTime(IdleStatus.BOTH_IDLE, 60);

        pfDataConnector.setHandler(pfClient);
        pfconnect(ConfigUtil.get("forward.device.name"));*/
    }


    static IoSession pdSession;
    static IoSession pfSession;

    @Override
    public void run(String... strings) throws Exception {
        init();
    }

    final class SessionChecker implements Runnable {

        @Override
        public void run() {
            // TODO Auto-generated method stub
            try {

                if(!pdSession.isActive()){
                    logger.warn("浦东设备注销");
                    ClientApp.pdreconn();
                }

                /*if(!pfSession.isActive()){
                    logger.warn("浦东转发设备注销");
                    ClientApp.pfreconn();
                }*/
            } catch (Exception e) {
                logger.error("SessionChecker异常", e);
            }
        }
    }
}