WebSocketPushQueue.java 3.16 KB
package com.bsth.data.msg_queue;

import com.bsth.common.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;

import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * 线调web socket 推送队列
 * Created by panzhao on 2017/5/11.
 */
@Component
public class WebSocketPushQueue implements CommandLineRunner {

    static ConcurrentLinkedQueue<QueueData> linkedList;
    static DataPushThread thread;
    static Logger log = LoggerFactory.getLogger(WebSocketPushQueue.class);
    static long t;
    static final int IDLE_TIME = 1000 * 30;

    static {
        linkedList = new ConcurrentLinkedQueue();
    }

    public static boolean isIdle() {
        return System.currentTimeMillis() - t > IDLE_TIME;
    }

    public static void put(WebSocketSession session, TextMessage msg) {
        QueueData qd = new QueueData();
        qd.setMessage(msg);
        qd.setSession(session);


        log.info("put、[" + session.getAttributes().get(Constants.SESSION_USERNAME) + "] 、" + msg.getPayload());
        linkedList.add(qd);
    }

    public static void start() {
        if (thread != null) {
            thread.interrupt();
        }
        linkedList.clear();
        thread = new DataPushThread();
        thread.start();
    }

    public static int size(){
        return linkedList.size();
    }

    @Override
    public void run(String... strings) throws Exception {
        start();
    }

    public static class DataPushThread extends Thread {

        Logger log = LoggerFactory.getLogger(this.getClass());

        @Override
        public void run() {
            QueueData qd;
            WebSocketSession session;

            boolean sleepFlag = false;
            while (true) {
                try {
                    qd = linkedList.poll();
                    if (qd != null) {
                        sleepFlag = false;
                        session = qd.getSession();
                        if (session.isOpen()) {
                            log.info("push start、"+session.getRemoteAddress().getHostString()+"[" + session.getAttributes().get(Constants.SESSION_USERNAME) + "] 、" + qd.getMessage().getPayload());
                            session.sendMessage(qd.getMessage());
                            log.info("push end..");
                        }
                    } else {
                        Thread.sleep(500);
                        if (!sleepFlag) {
                            log.info("sleep...");
                            sleepFlag = true;
                        }
                    }
                    t = System.currentTimeMillis();
                } catch (InterruptedException e) {
                    log.error("", e);
                    break;
                } catch (Exception e) {
                    log.error("", e);
                }
            }

            log.warn("WebSocketPushQueue is break...");
        }
    }
}