GpsDataRecovery.java 6.48 KB
package com.bsth.data.gpsdata.recovery;

import com.bsth.data.BasicData;
import com.bsth.data.gpsdata.GpsEntity;
import com.bsth.data.gpsdata.arrival.GeoCacheData;
import com.bsth.data.gpsdata.arrival.handlers.*;
import com.bsth.data.gpsdata.arrival.utils.CircleQueue;
import com.bsth.util.db.DBUtils_MS;
import com.google.common.collect.ArrayListMultimap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * gps数据恢复
 * Created by panzhao on 2016/12/24.
 */
@Component
public class GpsDataRecovery implements ApplicationContextAware {

    static Logger logger = LoggerFactory.getLogger(GpsDataRecovery.class);

    public static boolean run;

    static ExecutorService threadPool = Executors.newFixedThreadPool(10);

    static OfflineSignalHandle offlineSignalHandle;
    static CorrectSignalHandle correctSignalHandle;
    static StationInsideHandle stationInsideHandle;
    static InOutStationSignalHandle inOutStationSignalHandle;
    static ReverseSignalHandle reverseSignalHandle;

    public void recovery() {
        List<GpsEntity> list = loadData();

        //按车辆分组数据
        ArrayListMultimap<String, GpsEntity> listMap = ArrayListMultimap.create();
        for (GpsEntity gps : list) {
            if (gps.getNbbm() != null)
                listMap.put(gps.getNbbm(), gps);
        }


        Set<String> keys = listMap.keySet();

        CountDownLatch count = new CountDownLatch(keys.size());
        GpsComp comp = new GpsComp();
        for (String nbbm : keys) {
            Collections.sort(listMap.get(nbbm), comp);
            threadPool.execute(new RecoveryThread(listMap.get(nbbm), count));
            /*if(nbbm.equals("W9H-003"))
                new RecoveryThread(listMap.get(nbbm), count).run();*/
        }

        try {
            count.await();
            run = false;
            logger.info("数据恢复完成....");
        } catch (InterruptedException e) {
            logger.error("", e);
        }
    }

    /**
     * 加载当天的gps数据
     *
     * @return
     */
    public List<GpsEntity> loadData() {
        Calendar calendar = Calendar.getInstance();
        int dayOfYear = calendar.get(Calendar.DAY_OF_YEAR);

        String sql = "select DEVICE_ID,LAT,LON,TS,SPEED_GPS,LINE_ID,SERVICE_STATE from bsth_c_gps_info where days_year=" + dayOfYear;
        JdbcTemplate jdbcTemplate = new JdbcTemplate(DBUtils_MS.getDataSource());

        List<GpsEntity> list =
                jdbcTemplate.query(sql, new RowMapper<GpsEntity>() {
                    @Override
                    public GpsEntity mapRow(ResultSet rs, int rowNum) throws SQLException {
                        GpsEntity gps = new GpsEntity();

                        gps.setDeviceId(rs.getString("DEVICE_ID"));
                        gps.setNbbm(BasicData.deviceId2NbbmMap.get(gps.getDeviceId()));
                        gps.setSpeed(rs.getFloat("SPEED_GPS"));
                        gps.setLat(rs.getFloat("LAT"));
                        gps.setLon(rs.getFloat("LON"));
                        gps.setLineId(rs.getString("LINE_ID"));
                        gps.setTimestamp(rs.getLong("TS"));
                        gps.setUpDown(getUpOrDown(rs.getLong("SERVICE_STATE")));
                        return gps;
                    }
                });
        return list;
    }

    /**
     * 王通 2016/6/29 9:23:24 获取车辆线路上下行
     *
     * @return -1无效 0上行 1下行
     */
    public static int getUpOrDown(long serviceState) {
        if ((serviceState & 0x00020000) == 0x00020000 || (serviceState & 0x80000000) == 0x80000000
                || (serviceState & 0x01000000) == 0x01000000 || (serviceState & 0x08000000) == 0x08000000)
            return -1;
        return (((serviceState & 0x10000000) == 0x10000000) ? 1 : 0);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        offlineSignalHandle = applicationContext.getBean(OfflineSignalHandle.class);
        correctSignalHandle = applicationContext.getBean(CorrectSignalHandle.class);
        stationInsideHandle = applicationContext.getBean(StationInsideHandle.class);
        inOutStationSignalHandle = applicationContext.getBean(InOutStationSignalHandle.class);
        reverseSignalHandle = applicationContext.getBean(ReverseSignalHandle.class);
    }

    public static class GpsComp implements Comparator<GpsEntity> {

        @Override
        public int compare(GpsEntity g1, GpsEntity g2) {
            return g1.getTimestamp().compareTo(g2.getTimestamp());
        }
    }

    public static class RecoveryThread implements Runnable {
        List<GpsEntity> list;
        CountDownLatch count;

        RecoveryThread(List<GpsEntity> list, CountDownLatch count) {
            this.list = list;
            this.count = count;
        }

        @Override
        public void run() {
            try {
                //循环gps恢复数据
                CircleQueue<GpsEntity> prevs;

                for (GpsEntity gps : list) {

                    prevs = GeoCacheData.getGps(gps.getNbbm());
                    //掉线处理
                    offlineSignalHandle.handle(gps, prevs);
                    //状态处理
                    if (!correctSignalHandle.handle(gps, prevs))
                        continue;
                    //场,站内外判断
                    stationInsideHandle.handle(gps, prevs);
                    //反向处理
                    reverseSignalHandle.handle(gps, prevs);
                    //进出站动作处理
                    inOutStationSignalHandle.handle(gps, prevs);
                    GeoCacheData.putGps(gps);

                    //Thread.sleep(50);
                }
            } catch (Exception e) {
                logger.error("", e);
            } finally {
                count.countDown();
            }
        }
    }
}