DataHandleProcess.java 4.93 KB
package com.bsth.data.gpsdata_v2;

import com.bsth.data.gpsdata_v2.cache.GpsCacheData;
import com.bsth.data.gpsdata_v2.entity.GpsEntity;
import com.bsth.data.gpsdata_v2.handlers.*;
import com.google.common.collect.ArrayListMultimap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
 * 实时信号数据处理
 * Created by panzhao on 2017/11/15.
 */
@Component
public class DataHandleProcess {

    static Logger logger = LoggerFactory.getLogger(DataHandleProcess.class);
    final static int POOL_SIZE = 25;

    static ExecutorService threadPool = Executors.newFixedThreadPool(POOL_SIZE + 1, new HandlerThreadFactory());
    public static CountDownLatch count;

    @Autowired
    GpsStateProcess gpsStateProcess;
    @Autowired
    StationInsideProcess stationInsideProcess;
    @Autowired
    AbnormalStateProcess abnormalStateProcess;
    @Autowired
    InStationProcess inStationProcess;
    @Autowired
    OutStationProcess outStationProcess;
    @Autowired
    ReverseRouteProcess reverseRouteProcess;

    @Autowired
    GpsRealData gpsRealData;

    static long lastTime;

    public static boolean isBlock() {
        return System.currentTimeMillis() - lastTime > 1000 * 30;
    }

    public void handle(List<GpsEntity> list) {
        try {
            if(list.size() == 0)
                return;
            lastTime = System.currentTimeMillis();
            //按设备号分组数据(一个设备的多条数据,必须在一个线程里跑)
            ArrayListMultimap multimap = ArrayListMultimap.create();
            for (GpsEntity gps : list) {
                multimap.put(gps.getDeviceId(), gps);
            }
            List<String> deviceList = new ArrayList<>(multimap.keySet());

            //数据均分给线程
            ArrayListMultimap dataListMap = ArrayListMultimap.create();
            int size = deviceList.size(), threadIndex = 0, threadSize = size / POOL_SIZE;
            for (int i = 0; i < size; i++) {
                if (i % threadSize == 0)
                    threadIndex++;
                dataListMap.putAll(threadIndex, multimap.get(deviceList.get(i)));
            }
            Set<Integer> ks = dataListMap.keySet();
            logger.info("analyse gps size: " + list.size() + ", ks: " + ks.size());
            count = new CountDownLatch(ks.size());

            for (Integer index : ks) {
                threadPool.execute(new SignalHandleThread(dataListMap.get(index), count));
            }

            //等待子线程结束
            count.await();

            //加入实时gps对照
            for (GpsEntity gps : list)
                gpsRealData.put(gps);
        } catch (Exception e) {
            logger.error("", e);
        }
    }

    public class SignalHandleThread implements Runnable {

        List<GpsEntity> list;
        CountDownLatch count;

        SignalHandleThread(List<GpsEntity> gpsList, CountDownLatch count) {
            this.list = gpsList;
            this.count = count;
        }

        @Override
        public void run() {
            try {
                for (GpsEntity gps : list) {
                    try{
                        if(Math.abs(gps.getTimestamp() - gps.getServerTimestamp()) > 1000 * 60 * 20)
                            continue;

                        gpsStateProcess.process(gps);//状态处理
                        stationInsideProcess.process(gps);//场站内外判定
                        reverseRouteProcess.process(gps);//反向路由处理
                        abnormalStateProcess.process(gps);//超速越界

                        inStationProcess.process(gps);//进站
                        outStationProcess.process(gps);//出站

                        GpsCacheData.putGps(gps);//历史gps缓存
                    }catch (Exception e){
                        logger.error("", e);
                    }
                }
            } finally {
                if (count != null)
                    count.countDown();
            }
        }
    }

    static class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            logger.error("caught " , e);
        }
    }

    static class HandlerThreadFactory implements ThreadFactory {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
            return t;
        }
    }
}