DirectivesPstThread.java 10.8 KB
package com.bsth.data.directive;

import com.alibaba.fastjson.JSON;
import com.bsth.data.schedule.DayOfSchedule;
import com.bsth.entity.directive.D60;
import com.bsth.entity.directive.D64;
import com.bsth.entity.directive.Directive;
import com.bsth.repository.directive.D60Repository;
import com.bsth.repository.directive.D64Repository;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * 指令持久化线程
 * Created by panzhao on 2017/3/6.
 */
@Component
public class DirectivesPstThread extends Thread {

    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    D60Repository d60Repository;

    @Autowired
    D64Repository d64Repository;

    @Autowired
    DayOfSchedule dayOfSchedule;

    @Autowired
    JdbcTemplate jdbcTemplate;

    private static DateTimeFormatter fmtyyyyMMdd = DateTimeFormat.forPattern("yyyy-MM-dd");

    @Override
    public void run() {
        try{
            ConcurrentLinkedQueue<Directive> list = DayOfDirectives.pstDirectives;

            List<D60> d60s = new ArrayList<>();
            List<D64> d64s = new ArrayList<>();
            //按 60 和 64 分组
            Directive directive;
            D60 d60;
            for (int i = 0; i < 2000; i++) {
                directive = list.poll();
                if(null == directive)
                    break;

                //日期
                directive.setRq(fmtyyyyMMdd.print(directive.getTimestamp()));

                if (directive instanceof D60) {
                    d60 = (D60) directive;
                    if(isDelete(d60))
                        continue;
                    d60s.add(d60);
                }
                else if(directive instanceof D64)
                    d64s.add((D64) directive);
            }

            //入库60
            save60(d60s);
            //入库64
            save64(d64s);


            // 60 指令更新(车载响应)
            ConcurrentLinkedQueue<D60> updateD60s = DayOfDirectives.pstD60s;
            d60s = new ArrayList<>();
            for (int i = 0; i < 2000; i++) {
                d60 = updateD60s.poll();
                if(null == d60)
                    break;
                d60s.add(d60);
            }

            if(d60s.size() > 0)
                update60(d60s);
        }catch (Exception e){
            logger.error("指令入库出现异常", e);
        }
    }

    private void save64(final List<D64> d64s) {
        if(null == d64s || d64s.size() == 0)
            return;

        String sql = "insert into bsth_v_directive_64(device_id,error_text,http_code,oper_code,rq,sender,timestamp,city_code,line_id,txt_content,resp_ack) " +
                " values(?,?,?,?,?,?,?,?,?,?,?)";

        //编程式事务
        DataSourceTransactionManager tran = new DataSourceTransactionManager(jdbcTemplate.getDataSource());
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        TransactionStatus status = tran.getTransaction(def);

        try{
            jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
                @Override
                public void setValues(PreparedStatement ps, int i) throws SQLException {
                    D64 d64 = d64s.get(i);
                    ps.setString(1 , d64.getDeviceId());
                    ps.setString(2, isNvl(d64.getErrorText()));
                    ps.setInt(3, d64.getHttpCode());
                    ps.setShort(4, isNvl(d64.getOperCode()));
                    ps.setString(5, d64.getRq());

                    ps.setString(6, isNvl(d64.getSender()));
                    ps.setLong(7, d64.getTimestamp());

                    ps.setShort(8, isNvl(d64.getData().getCityCode()));
                    ps.setString(9, isNvl(d64.getData().getLineId()));
                    ps.setString(10, isNvl(d64.getData().getTxtContent()));
                    ps.setShort(11, isNvl(d64.getRespAck()));
                }

                @Override
                public int getBatchSize() {
                    return d64s.size();
                }
            });

            tran.commit(status);

            logger.info("64 入库成功: " + d64s.size());
        }catch (Exception e){
            logger.error(String.format("错误数据:%s", JSON.toJSONString(d64s)), e);
            tran.rollback(status);
        }
    }

    private void update60(final List<D60> d60s) {
        if(null == d60s || d60s.size() == 0)
            return;

        String sql = "update bsth_v_directive_60 set reply46=?,reply46time=?,reply47=?,reply47time=? where device_id=? and timestamp=? and msg_id=?";

        //编程式事务
        DataSourceTransactionManager tran = new DataSourceTransactionManager(jdbcTemplate.getDataSource());
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        TransactionStatus status = tran.getTransaction(def);

        try{
            jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
                @Override
                public void setValues(PreparedStatement ps, int i) throws SQLException {
                    D60 d60 = d60s.get(i);
                    ps.setShort(1, isNvl(d60.getReply46()));
                    if(null == d60.getReply46Time())
                        ps.setNull(2, Types.BIGINT);
                    else
                        ps.setLong(2, d60.getReply46Time());

                    ps.setShort(3, isNvl(d60.getReply47()));

                    if(null == d60.getReply47Time())
                        ps.setNull(4, Types.BIGINT);
                    else
                        ps.setLong(4, d60.getReply47Time());
                    ps.setString(5, d60.getDeviceId());
                    ps.setLong(6, d60.getTimestamp());
                    ps.setInt(7, d60.getMsgId());
                }

                @Override
                public int getBatchSize() {
                    return d60s.size();
                }
            });

            tran.commit(status);

            logger.info("60 更新成功: " + d60s.size());
        }catch (Exception e){
            logger.error(String.format("错误数据:%s", JSON.toJSONString(d60s)), e);
            tran.rollback(status);
        }
    }

    private void save60(final List<D60> d60s) {
        if(null == d60s || d60s.size() == 0)
            return;

        String sql = "insert into bsth_v_directive_60(device_id,error_text,http_code,oper_code,rq,sender,timestamp" +
                ",alarm_time,company_code,dispatch_instruct,instruct_type,msg_id,service_state,txt_content,is_dispatch" +
                ",line_code,reply46,reply46time,reply47,reply47time,sch) " +
                " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

        //编程式事务
        DataSourceTransactionManager tran = new DataSourceTransactionManager(jdbcTemplate.getDataSource());
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        TransactionStatus status = tran.getTransaction(def);

        try{
            jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
                @Override
                public void setValues(PreparedStatement ps, int i) throws SQLException {
                    D60 d60 = d60s.get(i);
                    ps.setString(1, d60.getDeviceId());
                    ps.setString(2, isNvl(d60.getErrorText()));
                    ps.setInt(3, d60.getHttpCode());
                    ps.setShort(4, d60.getOperCode());
                    ps.setString(5, d60.getRq());
                    ps.setString(6, d60.getSender());
                    ps.setLong(7, d60.getTimestamp());

                    ps.setLong(8, isNvl(d60.getData().getAlarmTime()));
                    ps.setShort(9, isNvl(d60.getData().getCompanyCode()));
                    ps.setShort(10, isNvl(d60.getData().getDispatchInstruct()));
                    ps.setInt(11, d60.getData().getInstructType());
                    ps.setInt(12, d60.getData().getMsgId());
                    ps.setLong(13, d60.getData().getServiceState());
                    ps.setString(14, d60.getData().getTxtContent());
                    ps.setBoolean(15, d60.isDispatch());

                    ps.setString(16, isNvl(d60.getLineCode()));
                    ps.setShort(17, isNvl(d60.getReply46()));

                    if(null == d60.getReply46Time())
                        ps.setNull(18, Types.BIGINT);
                    else
                        ps.setLong(18, d60.getReply46Time());

                    ps.setShort(19, isNvl(d60.getReply47()));

                    if(null == d60.getReply47Time())
                        ps.setNull(20, Types.BIGINT);
                    else
                        ps.setLong(20, d60.getReply47Time());

                    if(d60.getSch()==null)
                        ps.setNull(21, Types.BIGINT);
                    else
                        ps.setLong(21, d60.getSch().getId());
                }

                @Override
                public int getBatchSize() {
                    return d60s.size();
                }
            });

            tran.commit(status);

            logger.info("60 入库成功: " + d60s.size());
        }catch (Exception e){
            logger.error(String.format("错误数据:%s", JSON.toJSONString(d60s)), e);
            tran.rollback(status);
        }
    }

    private String isNvl(String v) {
        return v==null?"":v;
    }

    private short isNvl(Short v) {
        return v==null?0:v;
    }

    private long isNvl(Long v) {
        return v==null?0:v;
    }

    private boolean isDelete(D60 d60){
        try{
            //如果关联的班次已经不存在了,放弃入库,很低概率出现
            if(d60.isDispatch() && d60.getSch().isDeleted()){
                logger.warn("save 指令,发现 deleted=true 的班次,id=" + d60.getSch().getId());
                return true;
            }
        }catch (Exception e){
            logger.error("", e);
        }

        return false;
    }
}