RealControlSocketHandler.java 4.3 KB
package com.bsth.websocket.handler;

import com.alibaba.fastjson.JSONObject;
import com.bsth.common.Constants;
import com.bsth.data.BasicData;
import com.bsth.data.msg_queue.WebSocketPushQueue;
import com.google.common.base.Splitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

/**
 * @author PanZhao
 */
@Component
public class RealControlSocketHandler implements WebSocketHandler {

	Logger logger = LoggerFactory.getLogger(this.getClass());

	private static ArrayList<WebSocketSession> users;
	private static ConcurrentHashMap<String, CopyOnWriteArrayList<WebSocketSession>> listenMap;

	static {
		users = new ArrayList<WebSocketSession>();
		listenMap = new ConcurrentHashMap();
	}

	@Override
	public void afterConnectionClosed(WebSocketSession session, CloseStatus arg1)
			throws Exception {
		users.remove(session);
		//清理监听
		int vsCount=0;
		Collection<CopyOnWriteArrayList<WebSocketSession>> vs = listenMap.values();
		for(CopyOnWriteArrayList<WebSocketSession> list : vs){
			list.remove(session);

			vsCount += list.size();
		}
		logger.info("listen values size: " + vsCount + " -CloseStatus:" + arg1 + "conn: " + users.size());
	}

	@Override
	public void afterConnectionEstablished(WebSocketSession session)
			throws Exception {
		session.setBinaryMessageSizeLimit(52768);
		session.setTextMessageSizeLimit(52768);
		users.add(session);
	}

	@Override
	public void handleMessage(WebSocketSession session, WebSocketMessage<?> msg)
			throws Exception {
		JSONObject jsonObj = JSONObject.parseObject(msg.getPayload().toString());
		switch (jsonObj.getString("operCode")) {
			case "register_line":
				//注册线路监听
				List<String> idx = Splitter.on(",").splitToList(jsonObj.getString("idx"));
				for(String lineCode : idx){
					if(BasicData.lineCode2NameMap.containsKey(lineCode)){
						if(!listenMap.containsKey(lineCode)){
							listenMap.put(lineCode, new CopyOnWriteArrayList<WebSocketSession>());
						}
						listenMap.get(lineCode).add(session);
					}
				}
				break;

			default:
				break;
		}
		logger.info(msg.getPayload().toString());
	}

	@Override
	public void handleTransportError(WebSocketSession session, Throwable arg1)
			throws Exception {
		if(session.isOpen()){
			session.close();
		}
		users.remove(session);
	}

	@Override
	public boolean supportsPartialMessages() {
		return false;
	}


	/**
	 * 根据线路推送消息
	 */
	public void sendMessageToLine(String lineCode, String msg) {

		TextMessage message = new TextMessage(msg.getBytes());
		List<WebSocketSession> list = listenMap.get(lineCode);
		if(list == null || list.size() == 0)
			return;

		for(WebSocketSession user : list){
			WebSocketPushQueue.put(user, message);
			/*try {
				if (user.isOpen()) {
					user.sendMessage(message);
				}
			} catch (Exception e) {
				try{
					logger.error("error user...."+user.getAttributes().get(Constants.SESSION_USERNAME));
				}
				catch(Exception e2){}
				logger.error("sendMessageToLine error ...."+msg);
				logger.error("sendMessageToLine error ....", e);
			}*/
		}
	}



	/**
	 * 全局推送
	 */
	public void sendMessage(String msg) {

		TextMessage message = new TextMessage(msg.getBytes());

		Iterator<WebSocketSession> iterator = users.iterator();

		WebSocketSession user;
		while(iterator.hasNext()){
			user = iterator.next();
			WebSocketPushQueue.put(user, message);
			/*try {
				if (user.isOpen()) {
					user.sendMessage(message);
				}
			} catch (Exception e) {
				logger.error("sendMessage error ...."+msg);
			}*/
		}
	}

	/**
	 * 根据线路推送消息
	 */
	public void sendMessageToUser(String userName, String msg) {
		logger.debug(String.format("userName:%s, msg:%s", userName, msg));
		TextMessage message = new TextMessage(msg.getBytes());

		for(WebSocketSession user : users){
			if (userName.equals(user.getAttributes().get(Constants.SESSION_USERNAME))) {
				WebSocketPushQueue.put(user, message);
			}
		}
	}
}