GpsBeforeBuffer.java 2.7 KB
package com.bsth.client;

import com.bsth.entity.GpsEntity;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * 从 socket client 到调度系统 的缓冲
 * Created by panzhao on 2017/5/4.
 */
@Component
@Order(2)
public class GpsBeforeBuffer implements CommandLineRunner{

    static ConcurrentLinkedQueue<GpsEntity> linkedList = new ConcurrentLinkedQueue();
    static final int MAX_SIZE = 4000 * 20;
    static int size = 0;

    public void put(GpsEntity gps){
        try {
            if(gps == null)
                return;

            if(StringUtils.isBlank(gps.getDeviceId()))
                return;

            linkedList.add(gps);
            size++;
        }catch (Exception e){
            log.error("", e);
        }
    }

    static Logger log = LoggerFactory.getLogger(GpsBeforeBuffer.class);

    public static List<GpsEntity> pollAll(){
        List<GpsEntity> rs = new ArrayList<>(300);
        GpsEntity gps;

        while (true){
            gps = linkedList.poll();
            if(gps == null){
                size = 0;
                break;
            }
            rs.add(gps);
        }

        log.info("poll size: " + rs.size() + " -current size: " + size);
        return rs;
    }

    /**
     * 清理数据,保持最大 MAX_SIZE 个数的元素
     */
    public static void clear(){
        if(size <= MAX_SIZE)
            return;
        int len = size - MAX_SIZE;
        for(int j = 0; j < len; j++){
            linkedList.poll();
            size--;
        }
        log.info("clear size: " + len + " -current size: " + size);
    }

    @Autowired
    BufferSizeCheck bufferSizeCheck;
    @Autowired
    SizeCheck sizeCheck;

    ScheduledExecutorService sexec = Executors.newScheduledThreadPool(2);

    @Override
    public void run(String... strings) throws Exception {
        sexec.scheduleWithFixedDelay(bufferSizeCheck, 60, 30, TimeUnit.SECONDS);
        sexec.scheduleWithFixedDelay(sizeCheck, 60, 60 * 5, TimeUnit.SECONDS);
    }

    @Component
    public static class BufferSizeCheck extends Thread {

        @Override
        public void run() {
            GpsBeforeBuffer.clear();
        }
    }

    @Component
    public static class SizeCheck extends Thread{

        @Override
        public void run() {
            log.info("[SizeCheck] linkedList real size: " + linkedList.size() + " -current size: " + size);
        }
    }
}