DirectivePushQueue.java 5.56 KB
package com.bsth.data.msg_queue;

import com.bsth.data.schedule.DayOfSchedule;
import com.bsth.entity.realcontrol.ScheduleRealInfo;
import com.bsth.service.directive.DirectiveService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;

/**
 * 到网关的指令推送队列 (系统发送的队列, 用户手动发送的不走这里)
 * Created by panzhao on 2017/5/11.
 */
@Component
public class DirectivePushQueue implements ApplicationContextAware {

    static ConcurrentLinkedQueue<QueueData_Directive> linkedList;
    static DataPushThread thread;
    static DirectiveService directiveService;
    static long threadT;

    /**
     * 下发运营指令6003的最小间隔时间
     */
    static final int MIN_SEND6003_SPACE = 1000 * 30;

    /**
     * 车辆 ——> 上次下发6003的时间
     */
    static ConcurrentMap<String, Long> lastSend60TimeMap;

    static {
        linkedList = new ConcurrentLinkedQueue<>();
        lastSend60TimeMap = new ConcurrentHashMap<>();
    }

    public static void put6002(ScheduleRealInfo sch, int finish, String sender, String txtPrefix){
        if(null == sch)
            return;
        QueueData_Directive qd6002 = new QueueData_Directive();
        qd6002.setSch(sch);
        qd6002.setFinish(finish);
        qd6002.setSender(sender);
        qd6002.setCode("60_02");
        qd6002.setTxtPrefix(txtPrefix);

        linkedList.add(qd6002);
        lastSend60TimeMap.put(sch.getClZbh(), System.currentTimeMillis());
    }

    public static void put6003(String nbbm, int state, int upDown, String sender){
        long t = System.currentTimeMillis();
        if(lastSend60TimeMap.containsKey(nbbm)
                && t - lastSend60TimeMap.get(nbbm) < MIN_SEND6003_SPACE)
            return; //最短下发间隔

        QueueData_Directive qd6003 = new QueueData_Directive();
        qd6003.setNbbm(nbbm);
        qd6003.setState(state);
        qd6003.setUpDown(upDown);
        qd6003.setSender(sender);
        qd6003.setCode("60_03");

        linkedList.add(qd6003);
        lastSend60TimeMap.put(nbbm, t);
    }

    public static void put6003(ScheduleRealInfo sch, String sender){
        if(null == sch)
            return;
        int state = 0;//营运状态
        if(DayOfSchedule.emptyService(sch))
            state = 1;

        put6003(sch.getClZbh(), state, Integer.parseInt(sch.getXlDir()), sender);
    }

    public static void put64(String nbbm, String lineCode, String sender){
        QueueData_Directive qd64 = new QueueData_Directive();
        qd64.setNbbm(nbbm);
        qd64.setLineCode(lineCode);
        qd64.setSender(sender);

        qd64.setCode("64");

        linkedList.add(qd64);
    }

    public static void start(){
        if(thread != null){
            thread.interrupt();
        }
        linkedList.clear();
        thread = new DataPushThread();
        thread.start();
    }

    public static int size(){
        return linkedList.size();
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        directiveService = applicationContext.getBean(DirectiveService.class);
    }

    public static class DataPushThread extends Thread {

        Logger log = LoggerFactory.getLogger(this.getClass());

        @Override
        public void run() {
            boolean sleepFlag = false;
            QueueData_Directive qd;
            String code;
            while (true) {
                try {
                    qd = linkedList.poll();
                    if (qd != null) {
                        sleepFlag = false;
                        code = qd.getCode();

                        if(code.equals("60_02")){
                            directiveService.send60Dispatch(qd.getSch(), qd.getFinish(), qd.getSender(), qd.getTxtPrefix());
                            log.info("directive 60_02 sch id: " + qd.getSch().getId());
                        }
                        else if(code.equals("60_03")){
                            directiveService.send60Operation(qd.getNbbm(), qd.getState(), qd.getUpDown(), qd.getSender());
                            log.info("directive 60_03 nbbm: " + qd.getNbbm());
                        }
                        else if(code.equals("64")){
                            directiveService.lineChange(qd.getNbbm(), qd.getLineCode(), qd.getSender());
                            log.info("directive 64 nbbm: " + qd.getNbbm() + " lineCode: " + qd.getLineCode());
                        }

                    } else{
                        Thread.sleep(500);
                        if(!sleepFlag){
                            log.info("sleep...");
                            sleepFlag = true;
                        }
                    }
                    threadT = System.currentTimeMillis();
                }
                catch(InterruptedException e){
                    log.error("", e);
                    break;
                }
                catch (Exception e) {
                    log.error("", e);
                }
            }

            log.warn("DirectivePushQueue is break...");
        }
    }
}