DataHandleProcess.java 5.74 KB
package com.bsth.data.gpsdata_v2;

import com.bsth.data.gpsdata_v2.cache.GpsCacheData;
import com.bsth.data.gpsdata_v2.entity.GpsEntity;
import com.bsth.data.gpsdata_v2.handlers.*;
import com.google.common.collect.ArrayListMultimap;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.concurrent.*;

/**
 * 实时信号数据处理
 * Created by panzhao on 2017/11/15.
 */
@Component
public class DataHandleProcess {

    @Autowired
    GpsStateProcess gpsStateProcess;
    @Autowired
    StationInsideProcess stationInsideProcess;
    @Autowired
    AbnormalStateProcess abnormalStateProcess;
    @Autowired
    InStationProcess inStationProcess;
    @Autowired
    OutStationProcess outStationProcess;
    @Autowired
    ReverseRouteProcess reverseRouteProcess;
    @Autowired
    GpsRealData gpsRealData;


    static Logger logger = LoggerFactory.getLogger(DataHandleProcess.class);

    final static int POOL_SIZE = 25;

    static ExecutorService threadPool = Executors.newFixedThreadPool(POOL_SIZE + 1);
    public static CountDownLatch count;

    static long lastTime;

    public static boolean isBlock() {
        return System.currentTimeMillis() - lastTime > 1000 * 30;
    }

    public void handle(List<GpsEntity> list) {
        try {
            if (list.size() == 0)
                return;
            lastTime = System.currentTimeMillis();
            //按设备号分组数据(一个设备的多条数据,必须在一个线程里跑)
            ArrayListMultimap multimap = ArrayListMultimap.create();
            for (GpsEntity gps : list) {
                multimap.put(gps.getDeviceId(), gps);
            }
            List<String> deviceList = new ArrayList<>(multimap.keySet());

            //数据均分给线程
            ArrayListMultimap dataListMap = ArrayListMultimap.create();
            int size = deviceList.size(), threadIndex = 0, threadSize = size / POOL_SIZE;
            for (int i = 0; i < size; i++) {
                if (i % threadSize == 0)
                    threadIndex++;
                dataListMap.putAll(threadIndex, multimap.get(deviceList.get(i)));
            }
            Set<Integer> ks = dataListMap.keySet();
            logger.info("analyse gps size: " + list.size() + ", ks: " + ks.size());
            count = new CountDownLatch(ks.size());

            List<Future> fRs = new ArrayList<>(ks.size());
            for (Integer index : ks) {
                fRs.add(threadPool.submit(new SignalHandleThread(dataListMap.get(index), count)));
            }

            //按线路分组gps
            /*ArrayListMultimap multimap = ArrayListMultimap.create();
            for (GpsEntity gps : list) {
                multimap.put(gps.getLineId(), gps);
            }

            Set<String> ks = multimap.keySet();

            logger.info("analyse gps size: " + list.size() + ", ks: " + ks.size());
            count = new CountDownLatch(ks.size());

            for (String lineCode : ks) {
                threadPool.execute(new SignalHandleThread(multimap.get(lineCode), count));
            }*/

            //等待子线程结束
            count.await();

            for (Future f : fRs) {
                try {
                    f.get();
                } catch (InterruptedException e) {
                } catch (ExecutionException e) {
                    logger.error(e.getCause().getMessage());
                }
            }

            //加入实时gps对照
            for (GpsEntity gps : list)
                gpsRealData.put(gps);

            logger.info("time , " + (System.currentTimeMillis() - lastTime));
        } catch (Exception e) {
            logger.error("", e);
        }
    }

    static GpsComp comp = new GpsComp();

    public class SignalHandleThread implements Runnable {

        List<GpsEntity> list;
        CountDownLatch count;

        SignalHandleThread(List<GpsEntity> gpsList, CountDownLatch count) {
            this.list = gpsList;
            this.count = count;
        }

        @Override
        public void run() {
            try {
                Collections.sort(list, comp);
                for (GpsEntity gps : list) {
                    try {
                        if (StringUtils.isEmpty(gps.getNbbm()))
                            continue;
                        if (Math.abs(gps.getTimestamp() - gps.getServerTimestamp()) > 1000 * 60 * 20)
                            continue;

                        gpsStateProcess.process(gps);//状态处理
                        stationInsideProcess.process(gps);//场站内外判定
                        reverseRouteProcess.process(gps);//反向路由处理
                        abnormalStateProcess.process(gps);//超速越界

                        inStationProcess.process(gps);//进站
                        outStationProcess.process(gps);//出站

                        GpsCacheData.putGps(gps);//历史gps缓存
                    } catch (Exception e) {
                        logger.error("", e);
                    }
                }

            } finally {
                if (count != null)
                    count.countDown();
                logger.info(Thread.currentThread().getName() + " -countDown : " + count.getCount());
            }
        }
    }

    public static class GpsComp implements Comparator<GpsEntity> {

        @Override
        public int compare(GpsEntity g1, GpsEntity g2) {
            return g1.getTimestamp().compareTo(g2.getTimestamp());
        }
    }
}