DirectivePushQueue.java 4.69 KB
package com.bsth.data.msg_queue;

import com.bsth.entity.realcontrol.ScheduleRealInfo;
import com.bsth.service.directive.DirectiveService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * 到网关的指令推送队列 (系统发送的队列, 用户手动发送的不走这里)
 * Created by panzhao on 2017/5/11.
 */
@Component
public class DirectivePushQueue implements ApplicationContextAware {

    static ConcurrentLinkedQueue<QueueData_Directive> linkedList;
    static DataPushThread thread;
    static DirectiveService directiveService;
    static long t;
    static final int IDLE_TIME = 1000 * 30;

    static {
        linkedList = new ConcurrentLinkedQueue<>();
    }

    public static void put6002(ScheduleRealInfo sch, int finish, String sender){
        if(null == sch)
            return;
        QueueData_Directive qd6002 = new QueueData_Directive();
        qd6002.setSch(sch);
        qd6002.setFinish(finish);
        qd6002.setSender(sender);
        qd6002.setCode("60_02");

        linkedList.add(qd6002);
    }

    public static void put6003(String nbbm, int state, int upDown, String sender){
        QueueData_Directive qd6003 = new QueueData_Directive();
        qd6003.setNbbm(nbbm);
        qd6003.setState(state);
        qd6003.setUpDown(upDown);
        //qd6003.setSch(sch);
        qd6003.setSender(sender);

        qd6003.setCode("60_03");

        linkedList.add(qd6003);
    }

    public static void put64(String nbbm, String lineCode, String sender){
        QueueData_Directive qd64 = new QueueData_Directive();
        qd64.setNbbm(nbbm);
        qd64.setLineCode(lineCode);
        qd64.setSender(sender);

        qd64.setCode("64");

        linkedList.add(qd64);
    }

    public static boolean isIdle(){
        return System.currentTimeMillis() - t > IDLE_TIME;
    }

    public static void start(){
        if(thread != null){
            thread.interrupt();
        }
        linkedList.clear();
        thread = new DataPushThread();
        thread.start();
    }

    public static int size(){
        return linkedList.size();
    }

/*    @Override
    public void run(String... strings) throws Exception {
        start();
    }*/

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        directiveService = applicationContext.getBean(DirectiveService.class);
    }

    public static class DataPushThread extends Thread {

        Logger log = LoggerFactory.getLogger(this.getClass());

        @Override
        public void run() {
            boolean sleepFlag = false;
            QueueData_Directive qd;
            String code;
            while (true) {
                try {
                    qd = linkedList.poll();
                    if (qd != null) {
                        sleepFlag = false;
                        code = qd.getCode();

                        if(code.equals("60_02")){
                            directiveService.send60Dispatch(qd.getSch(), qd.getFinish(), qd.getSender());
                            log.info("directive 60_02 sch id: " + qd.getSch().getId());
                        }
                        else if(code.equals("60_03")){
                            directiveService.send60Operation(qd.getNbbm(), qd.getState(), qd.getUpDown(), qd.getSender());
                            log.info("directive 60_03 nbbm: " + qd.getNbbm());
                        }
                        else if(code.equals("64")){
                            directiveService.lineChange(qd.getNbbm(), qd.getLineCode(), qd.getSender());
                            log.info("directive 64 nbbm: " + qd.getNbbm() + " lineCode: " + qd.getLineCode());
                        }

                    } else{
                        Thread.sleep(500);
                        if(!sleepFlag){
                            log.info("sleep...");
                            sleepFlag = true;
                        }
                    }
                    t = System.currentTimeMillis();
                }
                catch(InterruptedException e){
                    log.error("", e);
                    break;
                }
                catch (Exception e) {
                    log.error("", e);
                }
            }

            log.warn("DirectivePushQueue is break...");
        }
    }
}