GpsDataLoaderThread.java 7.75 KB
package com.bsth.data.gpsdata.thread;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bsth.data.BasicData;
import com.bsth.data.gpsdata.GpsEntity;
import com.bsth.data.gpsdata.GpsRealData;
import com.bsth.data.gpsdata.arrival.GpsRealAnalyse;
import com.bsth.util.ConfigUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

/**
 * Created by panzhao on 2017/1/11.
 */
@Component
public class GpsDataLoaderThread extends Thread {

    Logger logger = LoggerFactory.getLogger(GpsDataLoaderThread.class);

    /**
     * 构造函数
     */
    public GpsDataLoaderThread() {
        url = ConfigUtil.get("http.gps.real.url");
        clientUrl = ConfigUtil.get("http.gps.real.cache.url");
    }

    // 网关数据接口地址
    private static String url;
    // GPS客户端内存数据接口
    private static String clientUrl;

    //0:从GPS客户端内存获取  -1:从网关获取
    private static int flag = 0;

    public static void setFlag(int v) {
        flag = v;
    }

    public static int getFlag(int v) {
        return flag;
    }

    @Autowired
    GpsRealData gpsRealData;

    @Autowired
    GpsRealAnalyse gpsRealAnalyse;

    @Override
    public void run() {
        try {
            if (flag == 0)
                load();
            else
                loadByGateway();
        } catch (Exception e) {
            logger.error("", e);
        }
    }

    /**
     * 从网关获取实时GPS数据
     *
     * @throws Exception
     */
    public void loadByGateway() throws Exception {
        List<GpsEntity> list = null;
        List<GpsEntity> updateList = new ArrayList<>();
        CloseableHttpClient httpClient = null;
        CloseableHttpResponse response = null;
        try {
            httpClient = HttpClients.createDefault();
            HttpGet get = new HttpGet(url);
            //超时时间
            RequestConfig requestConfig = RequestConfig.custom()
                    .setConnectTimeout(1500).setConnectionRequestTimeout(1000)
                    .setSocketTimeout(1500).build();
            get.setConfig(requestConfig);

            response = httpClient.execute(get);

            HttpEntity entity = response.getEntity();
            if (null != entity) {
                BufferedReader br = new BufferedReader(new InputStreamReader(entity.getContent()));
                StringBuilder stringBuffer = new StringBuilder();
                String str;
                while ((str = br.readLine()) != null)
                    stringBuffer.append(str);

                JSONObject jsonObj = JSON.parseObject(stringBuffer.toString());

                if (jsonObj != null)
                    list = JSON.parseArray(jsonObj.getString("data"), GpsEntity.class);

                //过滤掉无效的点位
                list = filterInvalid(list);

                String nbbm;
                GpsEntity old;
                for (GpsEntity gps : list) {

                    //没有设备号
                    if (StringUtils.isBlank(gps.getDeviceId()))
                        continue;

                    old = gpsRealData.get(gps.getDeviceId());
                    if (old != null &&
                            old.getTimestamp().equals(gps.getTimestamp()) &&
                            old.getLat().equals(gps.getLat()) &&
                            old.getLon().equals(gps.getLon()))
                        continue;

                    nbbm = BasicData.deviceId2NbbmMap.get(gps.getDeviceId());
                    if (StringUtils.isBlank(nbbm))
                        gps.setIncomplete(true);//标记为异常数据
                    else
                        gps.setNbbm(nbbm);
                    //有更新的点位
                    updateList.add(gps);
                }
                logger.info("全量点:" + list.size() + ",更新点" + updateList.size());
                //分析数据
                gpsRealAnalyse.analyse(updateList);
            } else
                logger.error("real gps result is null");
        } catch (Exception e) {
            logger.error("", e);
        } finally {
            if (null != httpClient)
                httpClient.close();
            if (null != response)
                response.close();
        }
    }

    /**
     * 过滤无效的gps点位
     *
     * @param list
     * @return
     */
    private List<GpsEntity> filterInvalid(List<GpsEntity> list) {
        List<GpsEntity> rsList = new ArrayList<>();

        try {
            for (GpsEntity gps : list) {
                if (gps.getValid() == 0)
                    rsList.add(gps);
            }

            if (rsList.size() < list.size())
                logger.info("过滤无效的点位 : " + (list.size() - rsList.size()));
        } catch (Exception e) {
            logger.error("", e);
            rsList = list;
        }
        return rsList;
    }

    /**
     * 从客户端内存获取GPS数据
     */
    public void load() throws Exception {
        List<GpsEntity> list;
        CloseableHttpClient httpClient = null;
        CloseableHttpResponse response = null;

        try {
            logger.info("load start...");
            httpClient = HttpClients.createDefault();
            HttpGet get = new HttpGet(clientUrl);
            //超时时间
            RequestConfig requestConfig = RequestConfig.custom()
                    .setConnectTimeout(2000).setConnectionRequestTimeout(1000)
                    .setSocketTimeout(3000).build();
            get.setConfig(requestConfig);

            response = httpClient.execute(get);

            HttpEntity entity = response.getEntity();
            if (null != entity) {
                BufferedReader br = new BufferedReader(new InputStreamReader(entity.getContent()));
                StringBuilder stringBuffer = new StringBuilder();
                String str;
                while ((str = br.readLine()) != null)
                    stringBuffer.append(str);

                list = JSON.parseArray(stringBuffer.toString(), GpsEntity.class);

                //过滤掉无效的点位
                list = filterInvalid(list);

                String nbbm;
                logger.info("load end!");
                for (GpsEntity gps : list) {

                    //没有设备号
                    if (StringUtils.isBlank(gps.getDeviceId()))
                        continue;

                    nbbm = BasicData.deviceId2NbbmMap.get(gps.getDeviceId());
                    if (StringUtils.isBlank(nbbm))
                        gps.setIncomplete(true);//标记为异常数据
                    else
                        gps.setNbbm(nbbm);
                }
                //分析数据
                gpsRealAnalyse.analyse(list);
            } else
                logger.error("client gps result is null");
        } catch (Exception e) {
            logger.error("", e);
        } finally {
            if (null != httpClient)
                httpClient.close();
            if (null != response)
                response.close();
        }
    }
}