SchedulePstThread.java 8.04 KB
package com.bsth.data.schedule.thread;

import com.bsth.data.schedule.DayOfSchedule;
import com.bsth.email.SendEmailController;
import com.bsth.email.entity.EmailBean;
import com.bsth.entity.realcontrol.ScheduleRealInfo;
import com.bsth.repository.realcontrol.ScheduleRealInfoRepository;
import com.bsth.util.IpUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

/**
 * @author PanZhao
 * @ClassName: SchedulePstThread
 * @Description: TODO(班次异步持久化)
 * @date 2016年8月24日 上午1:47:05
 */
@Component
public class SchedulePstThread extends Thread {

    @Autowired
    ScheduleRealInfoRepository scheduleRepository;

    @Autowired
    JdbcTemplate jdbcTemplate;

    @Autowired
    DayOfSchedule dayOfSchedule;

    @Autowired
    private ObjectMapper mapper;

    @Autowired
    private SendEmailController sendEmailController;

    @Value("${waybill.emails}")
    private String[] emails;

    Logger logger = LoggerFactory.getLogger(this.getClass());

    static List<ScheduleRealInfo> saveList = new ArrayList<>();

    @Override
    public void run() {

        try{
            ScheduleRealInfo schedule;
            for (int i = 0; i < 500; i++) {
                schedule = DayOfSchedule.pstBuffer.poll();
                if (null == schedule)
                    break;

                if (schedule.isDeleted()) {
                    logger.error("save 发现 deleted=true 的班次,id: " + schedule.getId());
                    continue;
                }

                saveList.add(schedule);
            }

            //写入数据库
            save();
        }catch (Exception e){
            logger.error("", e);
        }
    }

    private void save(){
        if(saveList.size() == 0)
            return;
        //记录同步数据
        logger.info("real schedule update size: " + saveList.size());

        //批量入库
        update2Db();

        //清空容器
        saveList.clear();
        logger.info("update end! ");
    }

    private void update2Db(){
        final List<ScheduleRealInfo> pstList = saveList;
        //编程式事务
        DataSourceTransactionManager tran = new DataSourceTransactionManager(jdbcTemplate.getDataSource());
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        TransactionStatus status = tran.getTransaction(def);

        try{
            //更新
            jdbcTemplate.batchUpdate("update bsth_c_s_sp_info_real set bc_type=?,bcs=?,bcsj=?,cl_zbh=?,create_date=?" +
                    ",dfsj=?,directive_state=?,fcno=?,fcsj=?,fcsj_actual=?,j_gh=?,j_name=?,jhlc=?,lp_name=?,qdz_code=?" +
                    ",qdz_name=?,real_exec_date=?,remarks=?,s_gh=?,s_name=?,schedule_date=?,schedule_date_str=?,sflj=?" +
                    ",sp_id=?,status=?,update_date=?,xl_bm=?,xl_dir=?,xl_name=?,zdsj=?,zdsj_actual=?,zdz_code=?,zdz_name=?" +
                    ",ccno=?,df_auto=?,fgs_bm=?,fgs_name=?,gs_bm=?,gs_name=?,online=?,adjust_exps=?,reissue=?,jhlc_orig=?" +
                    ",sigin_compate=?,drift_status=?,cc_service=?,major_station_name=? where id=?", new BatchPreparedStatementSetter() {
                @Override
                public void setValues(PreparedStatement ps, int i) throws SQLException {
                    ScheduleRealInfo sch = pstList.get(i);
                    ps.setString(1, sch.getBcType());
                    ps.setInt(2, sch.getBcs()==null?0:sch.getBcs());
                    ps.setInt(3, sch.getBcsj()==null?0:sch.getBcsj());
                    ps.setString(4, sch.getClZbh());
                    ps.setTimestamp(5, new java.sql.Timestamp(sch.getCreateDate().getTime()));
                    ps.setString(6, sch.getDfsj());
                    ps.setInt(7, sch.getDirectiveState());
                    ps.setInt(8, sch.getFcno()==null?0:sch.getFcno());
                    ps.setString(9, sch.getFcsj());
                    ps.setString(10, sch.getFcsjActual());
                    ps.setString(11, sch.getjGh());
                    ps.setString(12, sch.getjName());
                    ps.setDouble(13, sch.getJhlc());
                    ps.setString(14, sch.getLpName());
                    ps.setString(15, sch.getQdzCode());
                    ps.setString(16, sch.getQdzName());
                    ps.setString(17, sch.getRealExecDate());
                    ps.setString(18, sch.getRemarks());
                    ps.setString(19, sch.getsGh());
                    ps.setString(20, sch.getsName());
                    ps.setTimestamp(21, new java.sql.Timestamp(sch.getScheduleDate().getTime()));
                    ps.setString(22, sch.getScheduleDateStr());
                    ps.setBoolean(23, sch.isSflj());
                    ps.setLong(24, sch.getSpId());
                    ps.setInt(25, sch.getStatus());
                    ps.setTimestamp(26, new java.sql.Timestamp(sch.getUpdateDate().getTime()));
                    ps.setString(27, sch.getXlBm());
                    ps.setString(28, sch.getXlDir());
                    ps.setString(29, sch.getXlName());
                    ps.setString(30, sch.getZdsj());
                    ps.setString(31, sch.getZdsjActual());
                    ps.setString(32, sch.getZdzCode());
                    ps.setString(33, sch.getZdzName());
                    ps.setInt(34, sch.getCcno()==null?0:sch.getCcno());
                    ps.setBoolean(35, sch.isDfAuto());
                    ps.setString(36, sch.getFgsBm());
                    ps.setString(37, sch.getFgsName());
                    ps.setString(38, sch.getGsBm());
                    ps.setString(39, sch.getGsName());
                    ps.setBoolean(40, sch.isOnline());
                    ps.setString(41, sch.getAdjustExps());
                    ps.setBoolean(42, sch.isReissue());
                    ps.setDouble(43, sch.getJhlcOrig()==null?0:sch.getJhlcOrig());
                    ps.setInt(44, sch.getSiginCompate());
                    ps.setInt(45, sch.getDriftStatus());
                    ps.setBoolean(46, sch.isCcService());
                    ps.setString(47, sch.getMajorStationName());

                    ps.setLong(48, sch.getId());
                }

                @Override
                public int getBatchSize() {
                    return pstList.size();
                }
            });

            tran.commit(status);
        }catch (Exception e){
            tran.rollback(status);
            DayOfSchedule.pstBuffer.addAll(pstList);
            try {
                logger.error(String.format("实际排班批量保存异常: %s", mapper.writeValueAsString(pstList)), e);
            } catch (JsonProcessingException jsonProcessingException) {
                jsonProcessingException.printStackTrace();
            }
            try {
                //发送邮件
                EmailBean mail = new EmailBean();
                mail.setSubject("路单批量保存");
                mail.setContent(IpUtils.getLocalIpAddress() + "路单批量保存异常,检查日志信息<br/>");
                sendEmailController.sendMail(emails[0], mail);
                logger.info("DataHandlerProcess:邮件发送成功!");
            } catch (Exception exception){
                logger.error("DataHandlerProcess:邮件发送失败!", exception);
            }
        }
    }
}