ServerService.java 4.19 KB
package com.bsth.service;

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.util.Collection;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import com.bsth.socket.handler.ServerHandler;
import com.bsth.util.AppProperties;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.session.IoSessionConfig;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.logging.LogLevel;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.bsth.socket.codec.MessageCodecFactory;
import com.bsth.socket.manager.MessageSessionManager;

/**
 * @author Hill
 */
@Service
public class ServerService implements InitializingBean, DisposableBean {

	private final static Logger log = LoggerFactory.getLogger(ServerService.class);

	@Autowired
	private ServerHandler serverHandler;

	private NioSocketAcceptor dataAccepter;

	private ScheduledExecutorService sexec = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
		
		@Override
		public Thread newThread(Runnable r) {
			// TODO Auto-generated method stub
			Thread t = new Thread(r);
			t.setName("DeadSessionCheckExecutor");
			return t;
		}
	});
	
	@Override
	public void afterPropertiesSet() throws Exception {
		// TODO Auto-generated method stub
		sexec.scheduleAtFixedRate(new DeadSessionChecker(), 5, 5, TimeUnit.MINUTES);
		int port = AppProperties.getPort();
		try {
			dataAccepter = new NioSocketAcceptor();

			LoggingFilter logger = new LoggingFilter();
			logger.setMessageReceivedLogLevel(LogLevel.DEBUG);
			logger.setSessionClosedLogLevel(LogLevel.WARN);
			dataAccepter.getFilterChain().addLast("logger", logger);

			dataAccepter.getFilterChain().addLast("codec",
					new ProtocolCodecFilter(new MessageCodecFactory()));
			
			dataAccepter.getFilterChain().addLast("threadPool", new ExecutorFilter());

			IoSessionConfig config = dataAccepter.getSessionConfig();

			config.setReadBufferSize(4096);
			config.setWriteTimeout(10);
			config.setIdleTime(IdleStatus.BOTH_IDLE, 50);

			dataAccepter.setHandler(serverHandler);

			dataAccepter.setReuseAddress(true);
			// 8899
			dataAccepter.bind(new InetSocketAddress(port));
			log.info("数据服务器启动成功!端口号:" + port);
		} catch (Exception e) {
			log.error("服务器启动失败:" + e.getMessage(), e);
		}
	}

	@Override
	public void destroy() throws Exception {
		// TODO Auto-generated method stub
		if (dataAccepter != null) {
			dataAccepter.unbind();  
			dataAccepter.getFilterChain().clear();
			dataAccepter.dispose();
			dataAccepter = null;
		}
		sexec.shutdown();
	}
	
	final class DeadSessionChecker implements Runnable {

		@Override
		public void run() {
			// TODO Auto-generated method stub
			try {
				long now = System.currentTimeMillis();
				MessageSessionManager manager = MessageSessionManager.getInstance();
				log.warn("转发设备号:" + manager.getAllForwardDevice() + " cost:" + (System.currentTimeMillis() - now));
				
				Collection<IoSession> sessions = dataAccepter.getManagedSessions().values();
				for (IoSession session : sessions) {
					if (!session.isActive() && session.getLastIoTime() < System.currentTimeMillis() - 300000 || session.getWriteRequestQueue().size() > 6000) {
						session.getWriteRequestQueue().clear(session);
						dataAccepter.getListeners().fireSessionDestroyed(session);
						
						Class<?> clazzS = session.getClass();
						Field key = clazzS.getDeclaredField("key");
						key.setAccessible(true);
						SelectionKey sk = (SelectionKey)key.get(session);
						sk.cancel();
					}
				}
			} catch (Exception e) {
				log.error("DeadSessionChecker异常", e);
			}
		}
	}
}