VehicleDataSyncTaskServiceImpl.java 9.92 KB
package com.bsth.service.schedule.datasync;

import com.bsth.entity.schedule.datasync.VehicleDataSyncTask;
import com.bsth.entity.schedule.datasync.VehicleDataSyncTaskStatusEnum;
import com.bsth.service.schedule.datasync.task.VehicleDataSyncTaskThread;
import com.bsth.service.schedule.exception.ScheduleException;
import com.bsth.service.schedule.impl.BServiceImpl;
import com.bsth.service.schedule.utils.DataToolsProperties;
import com.bsth.service.schedule.utils.DataToolsService;
import com.bsth.service.schedule.utils.DataToolsServiceImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.File;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.MessageFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;

/**
 * 车辆信息同步实现。
 */
@Service("VehicleDataSyncServiceImpl2")
public class VehicleDataSyncTaskServiceImpl extends BServiceImpl<VehicleDataSyncTask, Long> implements VehicleDataSyncTaskService, Runnable, DisposableBean {
    /** 日志记录器 */
    private final static Logger LOG = LoggerFactory.getLogger(VehicleDataSyncTaskServiceImpl.class);

    /** 同步阻塞队列 */
    private ArrayBlockingQueue<VehicleDataSyncTask> dataSyncQueue;
    /** 队列容量 */
    private final static int QUEUE_CAPACITY = 100;

    /** bean是否销毁(用于优雅退出) */
    private volatile boolean beanDestroy = false;

    @PostConstruct
    public void init() {
        LOG.info("车辆数据同步服务启动!");
        // 创建队列,指定长度100
        dataSyncQueue = new ArrayBlockingQueue<>(100);
        LOG.info("队列容量={},待处理量={}", QUEUE_CAPACITY, dataSyncQueue.size());
        // 启动线程实例
        new Thread(this).start();
    }

    //--------------------- 禁止override某些方法(如下) -----------------//
    @Override
    public VehicleDataSyncTask save(VehicleDataSyncTask vehicleDataSyncTask) {
        throw new RuntimeException("VehicleDataSyncTaskServiceImpl 不支持save操作!");
    }

    @Override
    public <S extends VehicleDataSyncTask> List<S> bulkSave(List<S> entities) {
        throw new RuntimeException("VehicleDataSyncTaskServiceImpl 不支持bulkSave操作!");
    }

    @Override
    public void delete(Long aLong) throws ScheduleException {
        throw new RuntimeException("VehicleDataSyncTaskServiceImpl 不支持delete操作!");
    }
    //--------------------- 禁止override某些方法(如上) -----------------//

    @Override
    public void addToDataSyncTaskQueue(VehicleDataSyncTask task) {
        if (!this.beanDestroy) {
            // 1、初始创建同步日志对象
            VehicleDataSyncTask internalTask = VehicleDataSyncTask.builder()
                    .paramClzbh(task.getParamClzbh())
                    .paramFrom(task.getParamFrom())
                    .paramTo(task.getParamTo())
                    .type(task.getType())
                    .startDate(new Date())
                    .status(VehicleDataSyncTaskStatusEnum.PREPARE)
                    .build();
            internalTask = super.save(internalTask);
            if (!this.dataSyncQueue.offer(internalTask)) {
                internalTask.setEndDate(new Date());
                internalTask.setStatus(VehicleDataSyncTaskStatusEnum.PREPARE_FAIL);
                internalTask.setProcessMsg(MessageFormat.format(
                        "同步队列已满,队列容量={0},待处理量={1}, 无法添加同步任务!",
                        QUEUE_CAPACITY, dataSyncQueue.size()));
                super.save(internalTask);
            }
        }
        LOG.info("队列容量={},待处理量={}", QUEUE_CAPACITY, dataSyncQueue.size());
    }

    @Override
    public void destroy() throws Exception {
        beanDestroy = true;
        int queueSize = this.dataSyncQueue.size();
        while (queueSize != 0) {
            // 队列中的同步操作全部完成
            try {
                // 等待1秒后继续判定
                Thread.sleep(1000);
            } catch (Exception exp) {
                exp.printStackTrace();
            }
        }
        LOG.info("车辆数据同步服务销毁!");
    }

    @Autowired
    @Qualifier(value = "dataToolsServiceImpl")
    private DataToolsService dataToolsService;
    @Autowired
    private DataToolsProperties dataToolsProperties;
    @Override
    public void run() {
        int queueSize = this.dataSyncQueue.size();
        // 如果没有销毁bean,则一直检索队列,如果销毁bean,则需要等待所有队列元素执行完毕
        while (!this.beanDestroy || queueSize > 0) {
            // task出队列
            VehicleDataSyncTask task = this.dataSyncQueue.poll();
            if (task != null) {
                try {
                    // 1、设定logback MDC key,用于子线程单独输出日志
                    String taskLogKey = String.format(
                            "%s.%s",
                            DateFormatUtils.format(task.getStartDate(), "yyyy-MM-dd"),
                            String.valueOf(task.getId()));
                    MDC.put("taskLogKey", taskLogKey);
                    // 2、开始同步
                    task.setStatus(VehicleDataSyncTaskStatusEnum.SYNCING);
                    task = super.save(task);
                    // 2-1、启动同步线程
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    new Thread(VehicleDataSyncTaskThread.builder()
                            .countDownLatch(countDownLatch)
                            .dataToolsService((DataToolsServiceImpl) dataToolsService)
                            .dataToolsProperties(dataToolsProperties)
                            .vehicleDataSyncTask(task)
                            .mdcCopyOfContextMap(MDC.getCopyOfContextMap())
                            .build()).start();
                    // 3、等待同步结束
                    countDownLatch.await();
                    // 3-1、更新日志
                    task.setEndDate(new Date());
                    long runMills = task.getEndDate().getTime() - task.getStartDate().getTime();
                    task.setProcessSeconds((int) runMills / 1000);
                    task.setStatus(VehicleDataSyncTaskStatusEnum.END);
                    task.setProcessMsg("成功!");
                    super.save(task);
                } catch (Exception exp) {
                    // 获取异常stack信息
                    StringWriter sw = new StringWriter();
                    PrintWriter pw = new PrintWriter(sw);
                    exp.printStackTrace(pw);

                    // 更新日志
                    task.setEndDate(new Date());
                    long runMills = task.getEndDate().getTime() - task.getStartDate().getTime();
                    task.setProcessSeconds((int) runMills / 1000);
                    task.setStatus(VehicleDataSyncTaskStatusEnum.END);
                    task.setProcessMsg("有问题:" + StringUtils.substring(sw.toString(), 0, 1800)); // 日志限定长度
                    super.save(task);
                } finally {
                    MDC.clear();
                }
            }

            // 等待100毫秒后继续判定
            try {
                Thread.sleep(100);
            } catch (Exception exp) {
                exp.printStackTrace();
            }

        }

    }

    //----------------------- 获取日志相关文件 ------------------//
    @Override
    public File getTaskLogFile(Long vehicleDataSyncTaskId) {
        return this.getInternalFile(vehicleDataSyncTaskId, ".log");
    }

    @Override
    public File getTaskValidateErrorFile(Long vehicleDataSyncTaskId) {
        return this.getInternalFile(vehicleDataSyncTaskId, "-validate_error.xls");
    }

    @Override
    public File getTaskPrepareFile(Long vehicleDataSyncTaskId) {
        return this.getInternalFile(vehicleDataSyncTaskId, "-prepare.xls");
    }

    @Override
    public File getTaskAddErrorFile(Long vehicleDataSyncTaskId) {
        return this.getInternalFile(vehicleDataSyncTaskId, "-add_error.xls");
    }

    @Override
    public File getTaskUpdateErrorFile(Long vehicleDataSyncTaskId) {
        return this.getInternalFile(vehicleDataSyncTaskId, "-update_error.xls");
    }

    @Override
    public File getTaskDeviceAddErrorFile(Long vehicleDataSyncTaskId) {
        return this.getInternalFile(vehicleDataSyncTaskId, "-device_add_error.xls");
    }

    /** 文件前缀格式 */
    private final static String FILE_PREFIX_PATTERN = "vehicle-data-sync.%s.%s";
    private File getInternalFile(Long vehicleDataSyncLogId, String fileSuffix) {
        VehicleDataSyncTask vehicleDataSyncLog = this.findById(vehicleDataSyncLogId);
        if (vehicleDataSyncLog == null) {
            throw new RuntimeException("日志Id=" + vehicleDataSyncLogId + ",不存在!");
        }
        // 组合日志文件名称
        String fileName = String.format(
                FILE_PREFIX_PATTERN,
                DateFormatUtils.format(vehicleDataSyncLog.getStartDate(), "yyyy-MM-dd"),
                String.valueOf(vehicleDataSyncLogId)) + fileSuffix;
        File file = null;
        try {
            Path rootPath = VehicleDataSyncTaskThread.getLogbackFilePath();
            Path logFilePath = Paths.get(rootPath.toString(), "datasync", "vehicle", fileName);
            return logFilePath.toFile();
        } catch (Exception exp) {
            exp.printStackTrace();
        }
        return file;
    }

}