RealControlSocketHandler.java 4.4 KB
package com.bsth.websocket.handler;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;

import com.alibaba.fastjson.JSONObject;
import com.bsth.common.Constants;
import com.bsth.data.BasicData;
import com.google.common.base.Splitter;
import com.google.common.collect.ArrayListMultimap;

/**
 * @author PanZhao
 */
@Component
@Scope("prototype")
public class RealControlSocketHandler implements WebSocketHandler {
	
	Logger logger = LoggerFactory.getLogger(this.getClass());

	private static final ArrayList<WebSocketSession> users;
	private static final ArrayListMultimap<String, WebSocketSession> listenMap;

	static {
		users = new ArrayList<WebSocketSession>();
		listenMap = ArrayListMultimap.create();
	}
	
	@Override
	public void afterConnectionClosed(WebSocketSession session, CloseStatus arg1)
			throws Exception {
        users.remove(session);
        //清理监听
        Set<String> keys = listenMap.keySet();
        Map<String, WebSocketSession> remMap = new HashMap<>();
        for(String k : keys){
        	if(listenMap.get(k).contains(session))
        		remMap.put(k, session);
        }
        
        Set<String> remSet = remMap.keySet();
        for(String k : remSet){
        	listenMap.remove(k, remMap.get(k));
        	logger.info("web socket close, remove listen K: "+ k);
        }
        logger.info("listen values size: " + listenMap.values().size());
	}

	@Override
	public void afterConnectionEstablished(WebSocketSession session)
			throws Exception {
		users.add(session);
	}

	@Override
	public void handleMessage(WebSocketSession session, WebSocketMessage<?> msg)
			throws Exception {
		JSONObject jsonObj = JSONObject.parseObject(msg.getPayload().toString());
		switch (jsonObj.getString("operCode")) {
		case "register_line":
			//注册线路监听
			List<String> idx = Splitter.on(",").splitToList(jsonObj.getString("idx"));
			for(String lineCode : idx){
				if(BasicData.lineCode2NameMap.containsKey(lineCode))
					listenMap.put(lineCode, session);
			}
			break;

		default:
			break;
		}
		logger.info(msg.getPayload().toString());
	}

	@Override
	public void handleTransportError(WebSocketSession session, Throwable arg1)
			throws Exception {
		if(session.isOpen()){
            session.close();
        }
        users.remove(session);
	}

	@Override
	public boolean supportsPartialMessages() {
		return false;
	}

	/**
	 * 给所有在线用户发送消息
	 *
	 * @param message
	 */
	public synchronized void sendMessageToUsers(TextMessage message) {
		for (WebSocketSession user : users) {
			try {
				if (user.isOpen()) {
					user.sendMessage(message);
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	/**
	 * 给某些用户发送消息
	 *
	 * @param userId
	 * @param message
	 */
	public synchronized void sendMessageToUser(Set<String> uids, String msg) {
		TextMessage message = new TextMessage(msg.getBytes());
		for (WebSocketSession user : users) {
			if (uids.contains(user.getAttributes().get(Constants.SESSION_USERNAME))) {
				try {
					if (user.isOpen()) {
						user.sendMessage(message);
					}
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
	
	/**
	 * 根据线路推送消息
	 *
	 * @param userId
	 * @param message
	 */
	public synchronized void sendMessageToLine(String lineCode, String msg) {

		TextMessage message = new TextMessage(msg.getBytes());
		
		Iterator<WebSocketSession> iterator = listenMap.get(lineCode).iterator();
		
		WebSocketSession user;
		while(iterator.hasNext()){
			user = iterator.next();
			try {
				if (user.isOpen()) {
					user.sendMessage(message);
				}
			} catch (Exception e) {
				logger.error("sendMessageToLine error ...."+msg);
				logger.error("sendMessageToLine error ....", e);
			}
			
		}
	}
}