RealControlSocketHandler.java 2.96 KB
package com.bsth.websocket.handler;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Set;

import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;

import com.bsth.common.Constants;
import com.bsth.vehicle.common.CommonMapped;

/**
 * 注意:在sendMsg时,多线程状态下有几率出现连接状态脏读,建议 synchronized
 * @author PanZhao
 */
@Component
public class RealControlSocketHandler implements WebSocketHandler {

	private static final ArrayList<WebSocketSession> users;

	static {
		users = new ArrayList<WebSocketSession>();
	}
	
	@Override
	public void afterConnectionClosed(WebSocketSession session, CloseStatus arg1)
			throws Exception {
        users.remove(session);
	}

	@Override
	public void afterConnectionEstablished(WebSocketSession session)
			throws Exception {
		users.add(session);
	}

	@Override
	public void handleMessage(WebSocketSession arg0, WebSocketMessage<?> arg1)
			throws Exception {
	}

	@Override
	public void handleTransportError(WebSocketSession session, Throwable arg1)
			throws Exception {
		if(session.isOpen()){
            session.close();
        }
        users.remove(session);
	}

	@Override
	public boolean supportsPartialMessages() {
		return false;
	}

	/**
	 * 给所有在线用户发送消息
	 *
	 * @param message
	 */
	public synchronized void sendMessageToUsers(TextMessage message) {
		for (WebSocketSession user : users) {
			try {
				if (user.isOpen()) {
					user.sendMessage(message);
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	/**
	 * 给某些用户发送消息
	 *
	 * @param userId
	 * @param message
	 */
	public synchronized void sendMessageToUser(Set<String> uids, String msg) {
		TextMessage message = new TextMessage(msg.getBytes());
		for (WebSocketSession user : users) {
			if (uids.contains(user.getAttributes().get(Constants.SESSION_USERNAME))) {
				try {
					if (user.isOpen()) {
						user.sendMessage(message);
					}
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
	
	/**
	 * 根据线路推送消息
	 *
	 * @param userId
	 * @param message
	 */
	public synchronized void sendMessageToLine(Integer lineCode, String msg) {
		Set<String> uids = CommonMapped.lineUserMap.get(lineCode);
		if(null == uids || uids.size() == 0)
			return;
		
		TextMessage message = new TextMessage(msg.getBytes());
		for (WebSocketSession user : users) {
			if (uids.contains(user.getAttributes().get(Constants.SESSION_USERNAME))) {
				try {
					if (user.isOpen()) {
						user.sendMessage(message);
					}
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
}