DataHandleProcess.java 6.59 KB
package com.bsth.data.gpsdata_v2;

import com.alibaba.fastjson.JSON;
import com.bsth.data.gpsdata_v2.cache.GpsCacheData;
import com.bsth.data.gpsdata_v2.entity.GpsEntity;
import com.bsth.data.gpsdata_v2.handlers.*;
import com.bsth.email.SendEmailController;
import com.bsth.email.entity.EmailBean;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ArrayListMultimap;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
 * 实时信号数据处理
 * Created by panzhao on 2017/11/15.
 */
@Component
public class DataHandleProcess {

    @Autowired
    GpsStateProcess gpsStateProcess;
    @Autowired
    StationInsideProcess stationInsideProcess;
    @Autowired
    AbnormalStateProcess abnormalStateProcess;
    @Autowired
    InStationProcess inStationProcess;
    @Autowired
    OutStationProcess outStationProcess;
    @Autowired
    ReverseRouteProcess reverseRouteProcess;
    @Autowired
    GpsRealData gpsRealData;
    // 发送邮件
    @Autowired
    private SendEmailController sendEmailController;


    static Logger logger = LoggerFactory.getLogger(DataHandleProcess.class);

    final static int POOL_SIZE = 20;

    static ExecutorService threadPool = Executors.newFixedThreadPool(POOL_SIZE + 1, new ThreadFactory() {

        @Override
        public Thread newThread(Runnable r) {
            // TODO Auto-generated method stub
            Thread t = new Thread(r);
            t.setName("GPSProcessor");

            return t;
        }

    });

    static long lastTime;

    public static boolean isBlock() {
        return System.currentTimeMillis() - lastTime > 1000 * 30;
    }

    public void handle(List<GpsEntity> list) {
        try {
            if (list.size() == 0)
                return;
            lastTime = System.currentTimeMillis();
            //按设备号分组数据(一个设备的多条数据,必须在一个线程里跑)
            ArrayListMultimap multimap = ArrayListMultimap.create();
            for (GpsEntity gps : list) {
                multimap.put(gps.getDeviceId(), gps);
            }
            List<String> deviceList = new ArrayList<>(multimap.keySet());

            //数据均分给线程
            ArrayListMultimap dataListMap = ArrayListMultimap.create();
            int size = deviceList.size(), threadIndex = 0, threadSize = size / POOL_SIZE;
            if(threadSize==0)
                threadSize = size;
            for (int i = 0; i < size; i++) {
                if (i % threadSize == 0)
                    threadIndex++;
                dataListMap.putAll(threadIndex, multimap.get(deviceList.get(i)));
            }
            Set<Integer> ks = dataListMap.keySet();
            logger.info("analyse gps size: " + list.size() + ", ks: " + ks.size());
            CountDownLatch count = new CountDownLatch(ks.size());

            logger.info(JSON.toJSONString(ks));
            for (Integer index : ks) {
                threadPool.execute(new SignalHandleThread(dataListMap.get(index), count));
            }


            //等待子线程结束
            boolean isNormal = count.await(5000, TimeUnit.MILLISECONDS);
            if (!isNormal) {
                try {
                    //发送邮件
                    EmailBean mail = new EmailBean();
                    mail.setSubject("线调GPS处理");
                    mail.setContent("GPS处理超时,检查日志信息<br/>");
                    sendEmailController.sendMail("113252620@qq.com", mail);
                    logger.info(new ObjectMapper().writeValueAsString(list));
                    logger.info("DataHandlerProcess:邮件发送成功!");
                } catch (Exception e){
                    logger.error("DataHandlerProcess:邮件发送失败!",e);
                }
            }

            //加入实时gps对照
            for (GpsEntity gps : list)
                gpsRealData.put(gps);

            logger.info("time , " + (System.currentTimeMillis() - lastTime));
        } catch (Exception e) {
            logger.error("", e);
        }
    }

    static GpsComp comp = new GpsComp();

    public class SignalHandleThread implements Runnable {

        List<GpsEntity> list;
        CountDownLatch count;

        SignalHandleThread(List<GpsEntity> gpsList, CountDownLatch count) {
            this.list = gpsList;
            this.count = count;
        }

        @Override
        public void run() {
            long start = System.currentTimeMillis();
            try {
                Collections.sort(list, comp);
                GpsEntity gps;
                for(int i = 0,len = list.size(); i< len ;i ++){
                    if (Thread.currentThread().isInterrupted()) break;
                    gps = list.get(i);

                    try {
                        if (StringUtils.isEmpty(gps.getNbbm()))
                            continue;
                        if (Math.abs(gps.getTimestamp() - gps.getServerTimestamp()) > 1000 * 60 * 20)
                            continue;

                        gpsStateProcess.process(gps);//状态处理
                        stationInsideProcess.process(gps);//场站内外判定
                        reverseRouteProcess.process(gps);//反向路由处理
                        abnormalStateProcess.process(gps);//超速越界

                        inStationProcess.process(gps);//进站
                        outStationProcess.process(gps);//出站


                        GpsCacheData.putGps(gps);//历史gps缓存
                    } catch (Throwable e) {
                        logger.error("SignalHandleThread.run1", e);
                    }
                }
            } catch (Exception e) {
                logger.error("SignalHandleThread.run2", e);
            } finally {
                if (count != null)
                    count.countDown();

                StringBuilder sb = new StringBuilder();
                sb.append("list size:").append(list.size()).append(" cost:").append(System.currentTimeMillis() - start);
                logger.info(sb.toString());
            }
        }
    }

    public static class GpsComp implements Comparator<GpsEntity> {

        @Override
        public int compare(GpsEntity g1, GpsEntity g2) {
            return g1.getTimestamp().compareTo(g2.getTimestamp());
        }
    }
}