VehicleDataSyncServiceImpl.java 9.9 KB
package com.bsth.service.schedule.datasync;

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.core.FileAppender;
import com.bsth.entity.schedule.datasync.VehicleDataSyncLog;
import com.bsth.entity.schedule.datasync.VehicleDataSyncStatusEnum;
import com.bsth.service.schedule.datasync.log.VehicleDataSyncLogService;
import com.bsth.service.schedule.datasync.task.VehicleDataSyncTaskFlag;
import com.bsth.service.schedule.datasync.task.VehicleDataSyncTaskThread;
import com.bsth.service.schedule.utils.DataToolsProperties;
import com.bsth.service.schedule.utils.DataToolsService;
import com.bsth.service.schedule.utils.DataToolsServiceImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.File;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.MessageFormat;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;

/**
 * 车辆信息同步实现。
 */
@Service("VehicleDataSyncServiceImpl2")
public class VehicleDataSyncServiceImpl implements VehicleDataSyncService, Runnable, DisposableBean {
    /** 日志记录器 */
    private final static Logger LOG = LoggerFactory.getLogger(VehicleDataSyncServiceImpl.class);

    /** 同步阻塞队列 */
    private ArrayBlockingQueue<VehicleDataSyncTaskFlag> dataSyncQueue;
    /** 队列容量 */
    private final static int QUEUE_CAPACITY = 100;

    /** bean是否销毁(用于优雅退出) */
    private volatile boolean beanDestroy = false;

    @PostConstruct
    public void init() {
        LOG.info("车辆数据同步服务启动!");
        // 创建队列,指定长度100
        dataSyncQueue = new ArrayBlockingQueue<>(100);
        LOG.info("队列容量={},待处理量={}", QUEUE_CAPACITY, dataSyncQueue.size());
        // 启动线程实例
        new Thread(this).start();
    }

    @Override
    public void addToDataSyncTaskQueue(VehicleDataSyncTaskFlag vehicleDataSyncFlag) {
        if (!this.beanDestroy) {
            // 1、初始创建同步日志对象
            VehicleDataSyncLog vehicleDataSyncLog = new VehicleDataSyncLog();
            vehicleDataSyncLog.setStartDate(new Date());
            vehicleDataSyncLog.setStatus(VehicleDataSyncStatusEnum.PREPARE);
            vehicleDataSyncLog = this.vehicleDataSyncLogService.save(vehicleDataSyncLog);
            vehicleDataSyncFlag.setVehicleDataSyncLogId(vehicleDataSyncLog.getId()); // 设定日志Id
            if (!this.dataSyncQueue.offer(vehicleDataSyncFlag)) {
                vehicleDataSyncLog.setEndDate(new Date());
                vehicleDataSyncLog.setStatus(VehicleDataSyncStatusEnum.PREPARE_FAIL);
                vehicleDataSyncLog.setProcessMsg(MessageFormat.format(
                        "同步队列已满,队列容量={0},待处理量={1}, 无法添加同步任务!",
                        QUEUE_CAPACITY, dataSyncQueue.size()));
                this.vehicleDataSyncLogService.save(vehicleDataSyncLog);
            }
        }
        LOG.info("队列容量={},待处理量={}", QUEUE_CAPACITY, dataSyncQueue.size());
    }

    @Override
    public void destroy() throws Exception {
        beanDestroy = true;
        int queueSize = this.dataSyncQueue.size();
        while (queueSize != 0) {
            // 队列中的同步操作全部完成
            try {
                // 等待1秒后继续判定
                Thread.sleep(1000);
            } catch (Exception exp) {
                exp.printStackTrace();
            }
        }
        LOG.info("车辆数据同步服务销毁!");
    }

    @Autowired
    private VehicleDataSyncLogService vehicleDataSyncLogService;
    @Autowired
    @Qualifier(value = "dataToolsServiceImpl")
    private DataToolsService dataToolsService;
    @Autowired
    private DataToolsProperties dataToolsProperties;
    @Override
    public void run() {
        int queueSize = this.dataSyncQueue.size();
        // 如果没有销毁bean,则一直检索队列,如果销毁bean,则需要等待所有队列元素执行完毕
        while (!this.beanDestroy || queueSize > 0) {
            // flag出队列
            VehicleDataSyncTaskFlag flag = this.dataSyncQueue.poll();
            if (flag != null) {
                // 同步日志对象
                VehicleDataSyncLog vehicleDataSyncLog = null;
                try {
                    // 1、获取同步日志
                    vehicleDataSyncLog = this.vehicleDataSyncLogService.findById(
                            flag.getVehicleDataSyncLogId());
                    // 2、设定logback MDC key,用于子线程单独输出日志
                    String taskLogKey = String.format(
                            "%s.%s",
                            DateFormatUtils.format(vehicleDataSyncLog.getStartDate(), "yyyy-MM-dd"),
                            String.valueOf(vehicleDataSyncLog.getId()));
                    MDC.put("taskLogKey", taskLogKey);
                    // 3、开始同步
                    vehicleDataSyncLog.setStatus(VehicleDataSyncStatusEnum.SYNCING);
                    vehicleDataSyncLog = this.vehicleDataSyncLogService.save(vehicleDataSyncLog);
                    // 3-1、启动同步线程
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    new Thread(VehicleDataSyncTaskThread.builder()
                            .countDownLatch(countDownLatch)
                            .dataToolsService((DataToolsServiceImpl) dataToolsService)
                            .dataToolsProperties(dataToolsProperties)
                            .vehicleDataSyncLogService(vehicleDataSyncLogService)
                            .vehicleDataSyncLog(vehicleDataSyncLog)
                            .mdcCopyOfContextMap(MDC.getCopyOfContextMap())
                            .build()).start();
                    // 4、等待同步结束
                    countDownLatch.await();
                    // 4-1、更新日志
                    vehicleDataSyncLog.setEndDate(new Date());
                    long runMills = vehicleDataSyncLog.getEndDate().getTime() - vehicleDataSyncLog.getStartDate().getTime();
                    vehicleDataSyncLog.setProcessSeconds((int) runMills / 1000);
                    vehicleDataSyncLog.setStatus(VehicleDataSyncStatusEnum.END);
                    vehicleDataSyncLog.setProcessMsg("成功!");
                    this.vehicleDataSyncLogService.save(vehicleDataSyncLog);
                } catch (Exception exp) {
                    // 获取异常stack信息
                    StringWriter sw = new StringWriter();
                    PrintWriter pw = new PrintWriter(sw);
                    exp.printStackTrace(pw);

                    // 更新日志
                    vehicleDataSyncLog.setEndDate(new Date());
                    long runMills = vehicleDataSyncLog.getEndDate().getTime() - vehicleDataSyncLog.getStartDate().getTime();
                    vehicleDataSyncLog.setProcessSeconds((int) runMills / 1000);
                    vehicleDataSyncLog.setStatus(VehicleDataSyncStatusEnum.END);
                    vehicleDataSyncLog.setProcessMsg("有问题:" + StringUtils.substring(sw.toString(), 0, 1800)); // 日志限定长度
                    this.vehicleDataSyncLogService.save(vehicleDataSyncLog);
                } finally {
                    MDC.clear();
                }
            }

            // 等待100毫秒后继续判定
            try {
                Thread.sleep(100);
            } catch (Exception exp) {
                exp.printStackTrace();
            }

        }

    }

    /** 日志格式 */
    private final static String TASK_LOG_PATTERN = "vehicle-data-sync.%s.%s.log";
    @Override
    public File getTaskLogFile(Long vehicleDataSyncLogId) {
        VehicleDataSyncLog vehicleDataSyncLog = this.vehicleDataSyncLogService.findById(vehicleDataSyncLogId);
        if (vehicleDataSyncLog == null) {
            throw new RuntimeException("日志Id=" + vehicleDataSyncLogId + ",不存在!");
        }
        // 组合日志文件名称
        String logFileName = String.format(
                TASK_LOG_PATTERN,
                DateFormatUtils.format(vehicleDataSyncLog.getStartDate(), "yyyy-MM-dd"),
                String.valueOf(vehicleDataSyncLogId));

        File file = null;
        try {
            Path rootPath = this.getLogbackFilePath();
            Path logFilePath = Paths.get(rootPath.toString(), "datasync", "vehicle", logFileName);
            return logFilePath.toFile();
        } catch (Exception exp) {
            exp.printStackTrace();
        }
        return file;
    }

    // 获取Logback中命名为File的Appender的日志文件输出路径,然后反向获取 ${LOG_BASE} 目录
    private Path getLogbackFilePath() throws Exception {
        LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
        ch.qos.logback.classic.Logger logger = loggerContext.getLogger("ROOT");
        FileAppender fileAppender = (FileAppender) logger.getAppender("FILE");//"FILE"是logback配置中,输出日志文件的appender的name属性
        File file = new File(fileAppender.getFile());
        Path filePath = Paths.get(file.getCanonicalPath()); // 这个等于 ${LOG_BASE}/main/main.log
        return filePath.getParent().getParent();
    }

    public static void main(String[] args) throws Exception {
        String filepath = "/Users/xu/resource/project_code/bsth_project/bsth_control_parent/E:/bsth_control_logs";
        Path p = Paths.get(filepath);
        System.out.println(p.getParent().getParent());

    }
}