GpsRealData.java 7.82 KB
package com.bsth.data.gpsdata;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bsth.data.BasicData;
import com.bsth.data.forecast.ForecastRealServer;
import com.bsth.data.gpsdata.arrival.GpsRealAnalyse;
import com.bsth.data.gpsdata.recovery.GpsDataRecovery;
import com.bsth.data.schedule.DayOfSchedule;
import com.bsth.entity.realcontrol.ScheduleRealInfo;
import com.bsth.util.ConfigUtil;
import com.google.common.collect.TreeMultimap;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.*;

/**
 * @author PanZhao
 * @ClassName: GpsRealData
 * @Description: TODO(实时GPS数据集合)
 * @date 2016年8月12日 下午2:04:41
 */
@Component
public class GpsRealData implements CommandLineRunner {

    static Logger logger = LoggerFactory.getLogger(GpsRealData.class);

    private static Map<String, GpsEntity> gpsMap;

    //按线路分组设备号
    private static TreeMultimap<String, String> lineCode2Devices;

    // 网关数据接口地址
    private static String url;

    @Autowired
    GpsDataLoader gpsDataLoader;

    @Autowired
    DayOfSchedule dayOfSchedule;

    @Autowired
    ForecastRealServer forecastRealServer;

    /**
     * 构造函数
     */
    public GpsRealData() {
        gpsMap = new HashMap<>();
        lineCode2Devices = TreeMultimap.create();
        url = ConfigUtil.get("http.gps.real.url");
    }

    @Override
    public void run(String... arg0) throws Exception {
        logger.info("gpsDataLoader,20,5");
        //Application.mainServices.scheduleWithFixedDelay(gpsDataLoader, 20, 5, TimeUnit.SECONDS);
    }

    public void put(GpsEntity gps) {
        String device = gps.getDeviceId();
        GpsEntity old = gpsMap.get(device);

        try {
            if (!StringUtils.isEmpty(gps.getStopNo())) {
                //站点编码改变
                if (null == old || !gps.getStopNo().equals(old.getStopNo())) {
                    gps.setArrTime(gps.getTimestamp());
                    //预测到达终点时间
                    forecastRealServer.forecast(gps.getNbbm(), gps);
                } else {
                    gps.setArrTime(old.getArrTime());
                    //不预测, 重新计算终点时间
                    gps.setExpectStopTime(forecastRealServer.expectStopTime(gps.getNbbm()));
                }
            }
        } catch (Exception e) {
            logger.error("", e);
        }

        //刷新对照
        gpsMap.put(device, gps);
        if (StringUtils.isNotBlank(gps.getLineId())) {
            //站点名称
            gps.setStationName(getStationName(gps));
            lineCode2Devices.put(gps.getLineId(), device);
        }
    }

    public String getStationName(GpsEntity gps) {
        return BasicData.getStationNameByCode(gps.getStopNo(), gps.getLineId() + "_" + gps.getUpDown() + "_");
    }

    /**
     * @Title: get @Description: TODO(设备号获取GPS)
     */
    public GpsEntity get(String deviceId) {
        return gpsMap.get(deviceId);
    }

    /**
     * @Title: get @Description: TODO(线路编码获取GPS集合) @throws
     */
    public List<GpsEntity> getByLine(String lineCode) {
        NavigableSet<String> set = lineCode2Devices.get(lineCode);

        List<GpsEntity> rs = new ArrayList<>();
        GpsEntity gps;
        ScheduleRealInfo sch;
        for (String device : set) {
            gps = gpsMap.get(device);
            //过滤异常GPS数据
            if (gps == null || gps.isIncomplete())
                continue;

            sch = dayOfSchedule.execPlanMap().get(gps.getNbbm());
            if (null != sch)
                gps.setSchId(sch.getId());
            rs.add(gps);
        }

        return rs;
    }

    public List<GpsEntity> get(List<String> pArray) {
        List<GpsEntity> list = new ArrayList<>();

        for (String code : pArray)
            list.addAll(getByLine(code));
        return list;
    }

    public Set<String> allDevices() {
        return gpsMap.keySet();
    }

    public GpsEntity findByDeviceId(String deviceId) {
        return gpsMap.get(deviceId);
    }

    public Collection<GpsEntity> all() {
        return gpsMap.values();
    }

    public void remove(String device) {
        gpsMap.remove(device);
    }

    @Component
    public static class GpsDataLoader extends Thread {

        Logger logger = LoggerFactory.getLogger(GpsDataLoader.class);

        @Autowired
        GpsRealData gpsRealData;

        @Autowired
        GpsRealAnalyse gpsRealAnalyse;

        @Override
        public void run() {
            try {
                //如果正在恢复数据
                if (GpsDataRecovery.run)
                    return;

                load();
            } catch (Exception e) {
                logger.error("", e);
            }
        }

        public void load() throws Exception {
            List<GpsEntity> list = null;
            List<GpsEntity> updateList = new ArrayList<>();
            CloseableHttpClient httpClient = null;
            CloseableHttpResponse response = null;
            try {
                httpClient = HttpClients.createDefault();
                HttpGet get = new HttpGet(url);

                response = httpClient.execute(get);

                HttpEntity entity = response.getEntity();
                if (null != entity) {
                    BufferedReader br = new BufferedReader(new InputStreamReader(entity.getContent()));
                    StringBuffer stringBuffer = new StringBuffer();
                    String str = "";
                    while ((str = br.readLine()) != null)
                        stringBuffer.append(str);

                    JSONObject jsonObj = JSON.parseObject(stringBuffer.toString());

                    if (jsonObj != null)
                        list = JSON.parseArray(jsonObj.getString("data"), GpsEntity.class);

                    String nbbm;
                    GpsEntity old;
                    for (GpsEntity gps : list) {
                        //没有设备号
                        if (StringUtils.isBlank(gps.getDeviceId()))
                            continue;

                        old = gpsMap.get(gps.getDeviceId());
                        if (old != null &&
                                old.getTimestamp() == gps.getTimestamp())
                            continue;

                        nbbm = BasicData.deviceId2NbbmMap.get(gps.getDeviceId());
                        if (StringUtils.isBlank(nbbm))
                            gps.setIncomplete(true);//标记为异常数据
                        else
                            gps.setNbbm(nbbm);
                        //有更新的点位
                        updateList.add(gps);
                    }
                    //分析数据
                    gpsRealAnalyse.analyse(updateList);
                } else
                    logger.error("real gps result is null");
            } catch (Exception e) {
                logger.error("", e);
            } finally {
                if (null != httpClient)
                    httpClient.close();
                if (null != response)
                    response.close();
            }
        }
    }
}