DataLoader.java 5.76 KB
package com.bsth.data.arrival;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.bsth.data.BasicData;
import com.bsth.data.LineConfigData;
import com.bsth.entity.realcontrol.LineConfig;
import com.bsth.util.db.DBUtils_MS;

/**
 * 
 * @ClassName: DataLoader 
 * @Description: TODO(从数据库加载进出站数据) 
 * @author PanZhao 
 * @date 2016年8月19日 上午9:59:21 
 *
 */
@Component
public class DataLoader {
	
	private static Long prveLoadTime;
	
	private final static long DAY_TIME = 1000 * 60 * 60 * 24;
	
	private Logger logger = LoggerFactory.getLogger(this.getClass());
	
	@Autowired
	LineConfigData lineConfigData;
	
	/**
	 * 
	 * @Title: load 
	 * @Description: TODO(根据上次加载时间,查询之后的增量数据) 
	 */
	public List<ArrivalEntity> load(){
		List<ArrivalEntity> list = null;
		
		if(null == prveLoadTime)
			list = recovery();
		else{
			Calendar cal = Calendar.getInstance();
			//周数,表分区字段
			int weeks_year = cal.get(Calendar.WEEK_OF_YEAR);
			
			Connection conn = null;
			PreparedStatement ps = null;
			ResultSet rs = null;
			
			String sql = "select * from bsth_c_arrival_info where weeks_year=? AND create_timestamp > ? AND create_timestamp <=? AND ABS(create_timestamp - ts) < 3600000 order by create_date";
			try{
				long t = System.currentTimeMillis();
				
				conn = DBUtils_MS.getConnection();
				ps = conn.prepareStatement(sql);
				ps.setInt(1, weeks_year);
				ps.setLong(2, prveLoadTime);
				ps.setLong(3, t);
				rs = ps.executeQuery();
				
				list = resultSet2Set(rs);
				
				prveLoadTime = t;
			}catch(Exception e){
				logger.error("", e);
			}finally {
				DBUtils_MS.close(rs, ps, conn);
			}
		}
		return list;
	}
	
	/**
	 * 
	 * @Title: recovery 
	 * @Description: TODO(从数据库恢复数据,按照线路的开始运营时间恢复) 
	 */
	public List<ArrivalEntity> recovery(){
		Collection<LineConfig> confs = lineConfigData.getAll();
		long t = System.currentTimeMillis()
			,st;
		
		List<ArrivalEntity> all = new ArrayList<>();
		for(LineConfig conf : confs){
			st = conf.getCurrStartTime();
			if(t < st)
				st = st - DAY_TIME;
			try{
				all.addAll(loadByLineAndTime(conf.getLine().getLineCode(), st, t));
			}catch(Exception e){
				logger.error("", e);
			}
		}
		
		prveLoadTime = t;
		return all;
	}
	
	/**
	 * 
	 * @Title: loadByLineAndStartTime 
	 * @Description: TODO(根据线路和时间戳加载数据) 
	 */
	public List<ArrivalEntity> loadByLineAndTime(String lineCode, long st, long et){
		Calendar cal = Calendar.getInstance();
		cal.setTimeInMillis(st);
		int weeks_year = cal.get(Calendar.WEEK_OF_YEAR);
		
		Connection conn = null;
		PreparedStatement ps = null;
		ResultSet rs = null;
		
		List<ArrivalEntity> list = new ArrayList<>();
		String sql = "select * from bsth_c_arrival_info where weeks_year=? and line_id=? AND create_timestamp > ? AND create_timestamp <=? AND ABS(create_timestamp - ts) < 3600000 order by ts";
		try{
			conn = DBUtils_MS.getConnection();
			ps = conn.prepareStatement(sql);
			ps.setInt(1, weeks_year);
			ps.setString(2, lineCode);
			ps.setLong(3, st);
			ps.setLong(4, et);
			rs = ps.executeQuery();
			
			list = resultSet2Set(rs);
		}catch(Exception e){
			logger.error("", e);
		}finally {
			DBUtils_MS.close(rs, ps, conn);
		}
		return list;
	}
	
	/**
	 * 
	 * @Title: loadByLineAndStartTime 
	 * @Description: TODO(根据线路,走向和时间戳加载数据) 
	 */
	public List<ArrivalEntity> loadByLineAndTime(String lineCode, int updown, long st, long et){
		Calendar cal = Calendar.getInstance();
		cal.setTimeInMillis(st);
		int weeks_year = cal.get(Calendar.WEEK_OF_YEAR);
		
		Connection conn = null;
		PreparedStatement ps = null;
		ResultSet rs = null;
		
		List<ArrivalEntity> list = new ArrayList<>();
		String sql = "select * from bsth_c_arrival_info where weeks_year=? and line_id=? and up_down=? and in_out=0 AND create_timestamp > ? AND create_timestamp <=? AND ABS(create_timestamp - ts) < 3600000 order by ts";
		try{
			conn = DBUtils_MS.getConnection();
			ps = conn.prepareStatement(sql);
			ps.setInt(1, weeks_year);
			ps.setString(2, lineCode);
			ps.setInt(3, updown);
			ps.setLong(4, st);
			ps.setLong(5, et);
			rs = ps.executeQuery();
			
			list = resultSet2Set(rs);
		}catch(Exception e){
			logger.error("", e);
		}finally {
			DBUtils_MS.close(rs, ps, conn);
		}
		return list;
	}
	
	public List<ArrivalEntity> resultSet2Set(ResultSet rs) throws SQLException{
		List<ArrivalEntity> list = new ArrayList<>();
		
		ArrivalEntity arr;
		while(rs.next()){
			arr = new ArrivalEntity();
			arr.setDeviceId(rs.getString("device_id"));
			arr.setNbbm(BasicData.deviceId2NbbmMap.get(arr.getDeviceId()));
			if(null == arr.getNbbm()){
				logger.warn("未注册的设备号," + arr.getDeviceId());
				continue;
			}
			
			arr.setTs(rs.getLong("ts"));
			arr.setLineCode(rs.getString("line_id"));
			arr.setUpDown(rs.getInt("up_down"));
			arr.setStopNo(rs.getString("stop_no"));
			arr.setStopName(BasicData.stationCode2NameMap.get(arr.getStopNo()));
			arr.setInOut(rs.getInt("in_out"));
			arr.setCreateDate(rs.getLong("create_timestamp"));
			arr.setWeeksYear(rs.getInt("weeks_year"));
			arr.setEnable(true);
			
			list.add(arr);
		}
		return list;
	}
	
	public static void setPrveLoadTime(long t){
		prveLoadTime = t;
	}
}