ServerService.java
4.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package com.bsth.service;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.util.Collection;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import com.bsth.socket.handler.ServerHandler;
import com.bsth.util.AppProperties;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.session.IoSessionConfig;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.logging.LogLevel;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.bsth.socket.codec.MessageCodecFactory;
import com.bsth.socket.manager.MessageSessionManager;
/**
* @author Hill
*/
@Service
public class ServerService implements InitializingBean, DisposableBean {
private final static Logger log = LoggerFactory.getLogger(ServerService.class);
@Autowired
private ServerHandler serverHandler;
private NioSocketAcceptor dataAccepter;
private ScheduledExecutorService sexec = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
// TODO Auto-generated method stub
Thread t = new Thread(r);
t.setName("DeadSessionCheckExecutor");
return t;
}
});
@Override
public void afterPropertiesSet() throws Exception {
// TODO Auto-generated method stub
sexec.scheduleAtFixedRate(new DeadSessionChecker(), 5, 5, TimeUnit.MINUTES);
int port = AppProperties.getPort();
try {
dataAccepter = new NioSocketAcceptor();
LoggingFilter logger = new LoggingFilter();
logger.setMessageReceivedLogLevel(LogLevel.DEBUG);
logger.setSessionClosedLogLevel(LogLevel.WARN);
dataAccepter.getFilterChain().addLast("logger", logger);
dataAccepter.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new MessageCodecFactory()));
dataAccepter.getFilterChain().addLast("threadPool", new ExecutorFilter());
IoSessionConfig config = dataAccepter.getSessionConfig();
config.setReadBufferSize(4096);
config.setWriteTimeout(10);
config.setIdleTime(IdleStatus.BOTH_IDLE, 50);
dataAccepter.setHandler(serverHandler);
dataAccepter.setReuseAddress(true);
// 8899
dataAccepter.bind(new InetSocketAddress(port));
log.info("数据服务器启动成功!端口号:" + port);
} catch (Exception e) {
log.error("服务器启动失败:" + e.getMessage(), e);
}
}
@Override
public void destroy() throws Exception {
// TODO Auto-generated method stub
if (dataAccepter != null) {
dataAccepter.unbind();
dataAccepter.getFilterChain().clear();
dataAccepter.dispose();
dataAccepter = null;
}
sexec.shutdown();
}
final class DeadSessionChecker implements Runnable {
@Override
public void run() {
// TODO Auto-generated method stub
try {
long now = System.currentTimeMillis();
MessageSessionManager manager = MessageSessionManager.getInstance();
log.warn("转发设备号:" + manager.getAllForwardDevice() + " cost:" + (System.currentTimeMillis() - now));
Collection<IoSession> sessions = dataAccepter.getManagedSessions().values();
for (IoSession session : sessions) {
if (!session.isActive() && session.getLastIoTime() < System.currentTimeMillis() - 300000 || session.getWriteRequestQueue().size() > 6000) {
session.getWriteRequestQueue().clear(session);
dataAccepter.getListeners().fireSessionDestroyed(session);
Class<?> clazzS = session.getClass();
Field key = clazzS.getDeclaredField("key");
key.setAccessible(true);
SelectionKey sk = (SelectionKey)key.get(session);
sk.cancel();
}
}
} catch (Exception e) {
log.error("DeadSessionChecker异常", e);
}
}
}
}