WebSocketPushQueue.java
3.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package com.bsth.data.msg_queue;
import com.bsth.common.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* 线调web socket 推送队列
* Created by panzhao on 2017/5/11.
*/
@Component
public class WebSocketPushQueue implements CommandLineRunner {
static ConcurrentLinkedQueue<QueueData> linkedList;
static DataPushThread thread;
static Logger log = LoggerFactory.getLogger(WebSocketPushQueue.class);
static long t;
static final int IDLE_TIME = 1000 * 30;
static {
linkedList = new ConcurrentLinkedQueue();
}
public static boolean isIdle() {
return System.currentTimeMillis() - t > IDLE_TIME;
}
public static void put(WebSocketSession session, TextMessage msg) {
QueueData qd = new QueueData();
qd.setMessage(msg);
qd.setSession(session);
log.info("put、[" + session.getAttributes().get(Constants.SESSION_USERNAME) + "] 、" + msg.getPayload());
linkedList.add(qd);
}
public static void start() {
if (thread != null) {
thread.interrupt();
}
linkedList.clear();
thread = new DataPushThread();
thread.start();
}
public static int size(){
return linkedList.size();
}
@Override
public void run(String... strings) throws Exception {
start();
}
public static class DataPushThread extends Thread {
Logger log = LoggerFactory.getLogger(this.getClass());
@Override
public void run() {
QueueData qd;
WebSocketSession session;
boolean sleepFlag = false;
while (true) {
try {
qd = linkedList.poll();
if (qd != null) {
sleepFlag = false;
session = qd.getSession();
if (session.isOpen()) {
log.info("push start、"+session.getRemoteAddress().getHostString()+"[" + session.getAttributes().get(Constants.SESSION_USERNAME) + "] 、" + qd.getMessage().getPayload());
session.sendMessage(qd.getMessage());
log.info("push end..");
}
} else {
Thread.sleep(500);
if (!sleepFlag) {
log.info("sleep...");
sleepFlag = true;
}
}
t = System.currentTimeMillis();
} catch (InterruptedException e) {
log.error("", e);
break;
} catch (Exception e) {
log.error("", e);
}
}
log.warn("WebSocketPushQueue is break...");
}
}
}