RealControlSocketHandler.java
3.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package com.bsth.websocket.handler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import com.bsth.common.Constants;
import com.bsth.data.BasicData;
/**
* 注意:在sendMsg时,多线程状态下有几率出现连接状态脏读,建议 synchronized
* @author PanZhao
*/
@Component
public class RealControlSocketHandler implements WebSocketHandler {
Logger logger = LoggerFactory.getLogger(this.getClass());
private static final ArrayList<WebSocketSession> users;
static {
users = new ArrayList<WebSocketSession>();
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus arg1)
throws Exception {
users.remove(session);
}
@Override
public void afterConnectionEstablished(WebSocketSession session)
throws Exception {
users.add(session);
logger.info("###############users size" + users.size());
}
@Override
public void handleMessage(WebSocketSession arg0, WebSocketMessage<?> arg1)
throws Exception {
}
@Override
public void handleTransportError(WebSocketSession session, Throwable arg1)
throws Exception {
if(session.isOpen()){
session.close();
}
users.remove(session);
}
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 给所有在线用户发送消息
*
* @param message
*/
public synchronized void sendMessageToUsers(TextMessage message) {
for (WebSocketSession user : users) {
try {
if (user.isOpen()) {
user.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 给某些用户发送消息
*
* @param userId
* @param message
*/
public synchronized void sendMessageToUser(Set<String> uids, String msg) {
TextMessage message = new TextMessage(msg.getBytes());
for (WebSocketSession user : users) {
if (uids.contains(user.getAttributes().get(Constants.SESSION_USERNAME))) {
try {
if (user.isOpen()) {
user.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 根据线路推送消息
*
* @param userId
* @param message
*/
public synchronized void sendMessageToLine(String lineCode, String msg) {
Set<String> uids = BasicData.lineCode2SocketUserMap.get(lineCode);
if(null == uids || uids.size() == 0)
return;
TextMessage message = new TextMessage(msg.getBytes());
for (WebSocketSession user : users) {
if (uids.contains(user.getAttributes().get(Constants.SESSION_USERNAME))) {
try {
if (user.isOpen()) {
user.sendMessage(message);
}
} catch (IOException e) {
logger.error("sendMessageToLine error ....", e);
}
}
}
}
}