RealControlSocketHandler.java
3.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package com.bsth.websocket.handler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import com.bsth.common.Constants;
import com.bsth.data.BasicData;
/**
* @author PanZhao
*/
@Component
@Scope("prototype")
public class RealControlSocketHandler implements WebSocketHandler {
Logger logger = LoggerFactory.getLogger(this.getClass());
private static final ArrayList<WebSocketSession> users;
static {
users = new ArrayList<WebSocketSession>();
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus arg1)
throws Exception {
users.remove(session);
}
@Override
public void afterConnectionEstablished(WebSocketSession session)
throws Exception {
users.add(session);
logger.info("###############users size" + users.size());
}
@Override
public void handleMessage(WebSocketSession arg0, WebSocketMessage<?> arg1)
throws Exception {
}
@Override
public void handleTransportError(WebSocketSession session, Throwable arg1)
throws Exception {
if(session.isOpen()){
session.close();
}
users.remove(session);
}
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 给所有在线用户发送消息
*
* @param message
*/
public synchronized void sendMessageToUsers(TextMessage message) {
for (WebSocketSession user : users) {
try {
if (user.isOpen()) {
user.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 给某些用户发送消息
*
* @param userId
* @param message
*/
public synchronized void sendMessageToUser(Set<String> uids, String msg) {
TextMessage message = new TextMessage(msg.getBytes());
for (WebSocketSession user : users) {
if (uids.contains(user.getAttributes().get(Constants.SESSION_USERNAME))) {
try {
if (user.isOpen()) {
user.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 根据线路推送消息
*
* @param userId
* @param message
*/
public void sendMessageToLine(String lineCode, String msg) {
Set<String> uids = BasicData.lineCode2SocketUserMap.get(lineCode);
if(null == uids || uids.size() == 0)
return;
TextMessage message = new TextMessage(msg.getBytes());
for (WebSocketSession user : users) {
if (uids.contains(user.getAttributes().get(Constants.SESSION_USERNAME))) {
try {
if (user.isOpen()) {
user.sendMessage(message);
}
} catch (IOException e) {
logger.error("sendMessageToLine error ....", e);
}
}
}
}
}