DataToolsServiceImpl.java 11.9 KB
package com.bsth.service.schedule.utils;

import com.bsth.service.schedule.exception.ScheduleException;
import com.google.common.io.Files;
import org.joda.time.DateTime;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.logging.KettleLogStore;
import org.pentaho.di.core.logging.LoggingBuffer;
import org.pentaho.di.core.logging.LoggingRegistry;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Created by xu on 17/1/3.
 */
@Component
public class DataToolsServiceImpl implements DataToolsService {
    /** 日志记录器 */
    private static final Logger LOGGER = LoggerFactory.getLogger(DataToolsServiceImpl.class);

    @Autowired
    private DataToolsProperties dataToolsProperties;

    /** 原子操作类 */
    private AtomicBoolean flag = new AtomicBoolean(false);

    public void initKettle() throws Exception {
        if (flag.compareAndSet(false, true)) {
            LOGGER.info("kettle初始化......");

            // 初始化kettle环境(自定义)
            ktrEnvironmentInit();

            // 添加全局ktr变量,并覆盖原来的设置
            Map<String, String> kvars = new HashMap<>();
            kvars.put("v_db_ip", dataToolsProperties.getKvarsDbip());
            kvars.put("v_db_uname", dataToolsProperties.getKvarsDbuname());
            kvars.put("v_db_pwd", dataToolsProperties.getKvarsDbpwd());
            kvars.put("v_db_dname", dataToolsProperties.getKvarsDbdname());
            EnvUtil.applyKettleProperties(kvars, true);
            KettleEnvironment.init(); // 默认使用jndi数据源
            // 使用自定义的jndi上下文工厂
            System.setProperty("java.naming.factory.initial", "com.bsth.service.schedule.utils.jndi.MySimpleJndiContextFactory");
        }
    }

    /**
     * 自定义kettle环境初始化定义。
     */
    private void ktrEnvironmentInit() throws Exception {
        // 由于kettle.properties可能没有(没有安装过kettle),
        // 导致 EnvUtil.environmentInit() 报找不到kettle.properties文件
        // 所以这里重写 EnvUtil.environmentInit() 方法

        if (Thread.currentThread().getContextClassLoader() == null) {
            Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
        }

        // 获取配置文件
        File file = new File(getClass().getResource(dataToolsProperties.getKettleProperties()).toURI());
        Properties kettleProperties = EnvUtil.readProperties(file.getAbsolutePath());
        EnvUtil.applyKettleProperties(kettleProperties);
//        System.getProperties().put("file.encoding", "Cp1252");
        System.getProperties().put("Internal.Cluster.Size", "1");
        System.getProperties().put("Internal.Slave.Transformation.Number", "0");
        System.getProperties().put("Internal.Slave.Server.Name", "slave-trans-name");
        System.getProperties().put("Internal.Step.CopyNr", "0");
        System.getProperties().put("Internal.Step.Name", "step-name");
        System.getProperties().put("Internal.Step.Partition.ID", "partition-id");
        System.getProperties().put("Internal.Step.Partition.Number", "0");
        System.getProperties().put("Internal.Step.Unique.Count", "1");
        System.getProperties().put("Internal.Step.Unique.Number", "0");
        if (!kettleProperties.containsKey("vfs.sftp.org.apache.commons.vfs2.provider.sftp.SftpFileSystemConfigBuilder.USER_DIR_IS_ROOT")) {
            System.getProperties().put("vfs.sftp.org.apache.commons.vfs2.provider.sftp.SftpFileSystemConfigBuilder.USER_DIR_IS_ROOT", "false");
        }

    }

    @Override
    public DataToolsFile uploadFile(String filename, byte[] filedata) throws ScheduleException {
        // 上传文件
        try {
            initKettle();

            LOGGER.info("start uploadFile...originalFilename={}", filename);

            DataToolsFile dataToolsFile = new DataToolsFile();

            // 判定文件类型(目前只支持xls,xlsx文件)
            if (DataToolsFileType.XLS.isThisType(filedata)) {
                // xls文件
                dataToolsFile.setFileType(DataToolsFileType.XLS);
                dataToolsFile.setFile(new File(DataToolsFileType.XLS.getNewFileName(
                        dataToolsProperties.getFileuploadDir(),
                        File.separator,
                        filename,
                        "-upload-",
                        new DateTime().toString("yyyyMMddHHmmss")
                )));
                Files.write(filedata, dataToolsFile.getFile());
            } else if (DataToolsFileType.XLSX.isThisType(filedata)) {
                // xlsx文件
                dataToolsFile.setFileType(DataToolsFileType.XLSX);
                dataToolsFile.setFile(new File(DataToolsFileType.XLSX.getNewFileName(
                        dataToolsProperties.getFileuploadDir(),
                        File.separator,
                        filename,
                        "-upload-",
                        new DateTime().toString("yyyyMMddHHmmss")
                )));
                Files.write(filedata, dataToolsFile.getFile());
            } else {
                // 非excel文件
                throw new Exception("非.xls .xlsx 格式文件!");
            }

            LOGGER.info("uploadFile success...newFilename={}", dataToolsFile.getFile().getAbsolutePath());

            return dataToolsFile;
        } catch (Exception exp) {
            LOGGER.info("uploadFile failed...stackTrace...");

            StringWriter sw = new StringWriter();
            exp.printStackTrace(new PrintWriter(sw));
            LOGGER.info(sw.toString());

            throw new ScheduleException(exp);
        }
    }

    @Override
    public void importData(File file, Map<String, Object> params) throws ScheduleException {
        // 导入数据
        String transLogId = "";
        String transMetaLogId = "";
        try {
            initKettle();

            LOGGER.info("start importData...originalFilename={}", file.getAbsolutePath());
            // 检查参数
            String transpath = String.valueOf(params.get("transpath"));
            if ("null".equals(transpath)) {
                throw new Exception(
                        "没有指定transpath参数值,无法确定ktr转换文件!");
            }
            File ktrFile = new File(transpath);
            // 设置文件路径,错误输出文件路径参数
            params.put("filepath", file.getAbsolutePath());
            params.put("erroroutputdir", dataToolsProperties.getTransErrordir());

            // 2、使用kettle运行封装数据导入逻辑的ktr转换文件
            // 2.1、初始化kettle(组件初始化已经做了)
            // 2.2、创建转换元数据,转换
            TransMeta transMeta = new TransMeta(ktrFile.getAbsolutePath());
            Trans trans = new Trans(transMeta);
            // 2.3、设定命名参数,用于指定数据文件,注意每个ktr必须都有以下指定的命名参数
            for (String key : params.keySet()) {
                trans.setParameterValue(key, String.valueOf(params.get(key)));
            }
            // 2.4、执行转换
            trans.execute(null);
            // 2.5、等待转换结束
            trans.waitUntilFinished();

            // 获取日志
            transLogId = trans.getLogChannelId();
            transMetaLogId = transMeta.getLogChannelId();

            LoggingBuffer loggingBuffer = KettleLogStore.getAppender();
            StringBuffer stringBuffer = loggingBuffer.getBuffer(
                    trans.getLogChannelId(), false
            );
            if (trans.getErrors() > 0) {
                throw new Exception(stringBuffer.toString());
            }
            LOGGER.info(stringBuffer.toString());
            LOGGER.info("importData success...");

        } catch (Exception exp) {
            LOGGER.info("importData failed...statckTrace...");

            StringWriter sw = new StringWriter();
            exp.printStackTrace(new PrintWriter(sw));
            LOGGER.info(sw.toString());

            throw new ScheduleException(exp);
        } finally {
            // 清除日志操作
            KettleLogStore.discardLines(transLogId, true);
            KettleLogStore.discardLines(transMetaLogId, true);
            LoggingRegistry.getInstance().removeIncludingChildren(transLogId);
        }
    }

    @Override
    public DataToolsFile exportData(Map<String, Object> params) throws ScheduleException {
        // 导出数据
        String transLogId = "";
        String transMetaLogId = "";
        try {
            initKettle();

            LOGGER.info("start exportData...");
            // 检查参数
            String filename = String.valueOf(params.get("filename"));
            if ("null".equals(filename)) {
                filename = "temp";
            }
            String transpath = String.valueOf(params.get("transpath"));
            if ("null".equals(transpath)) {
                throw new Exception(
                        "没有指定transpath参数值,无法确定ktr转换文件!");
            }
            File ktrFile = new File(transpath);
            // 设置文件路径参数
            String filepath = dataToolsProperties.getFileoutputDir() +
                    File.separator +
                    filename +
                    new DateTime().toString("yyyyMMddHHmmss");
            params.put("filepath", filepath);

            // ktr输出模版目录(可选)
            params.put("templatepath", dataToolsProperties.getTransTemplatedir());

            // 2、使用kettle运行封装数据导入逻辑的ktr转换文件
            // 2.1、初始化kettle(组件初始化已经做了)
            // 2.2、创建转换元数据,转换
            TransMeta transMeta = new TransMeta(ktrFile.getAbsolutePath());
            Trans trans = new Trans(transMeta);
            // 2.3、设定命名参数,用于指定数据文件,注意每个ktr必须都有以下指定的命名参数
            for (String key : params.keySet()) {
                trans.setParameterValue(key, String.valueOf(params.get(key)));
            }
            // 2.4、执行转换
            trans.execute(null);
            // 2.5、等待转换结束
            trans.waitUntilFinished();

            // 获取日志
            transLogId = trans.getLogChannelId();
            transMetaLogId = transMeta.getLogChannelId();

            LoggingBuffer loggingBuffer = KettleLogStore.getAppender();
            StringBuffer stringBuffer = loggingBuffer.getBuffer(
                    trans.getLogChannelId(), false
            );
            if (trans.getErrors() > 0) {
                throw new Exception(stringBuffer.toString());
            }
            LOGGER.info(stringBuffer.toString());
            LOGGER.info("exportData success...");

            // 导出目前是xls格式
            DataToolsFile dataToolsFile = new DataToolsFile();
            dataToolsFile.setFileType(DataToolsFileType.XLS);
            dataToolsFile.setFile(new File(filepath + ".xls"));

            return dataToolsFile;
        } catch (Exception exp) {
            LOGGER.info("exportData failed...statckTrace...");

            StringWriter sw = new StringWriter();
            exp.printStackTrace(new PrintWriter(sw));
            LOGGER.info(sw.toString());

            throw new ScheduleException(exp);
        } finally {
            // 清除日志操作
            KettleLogStore.discardLines(transLogId, true);
            KettleLogStore.discardLines(transMetaLogId, true);
            LoggingRegistry.getInstance().removeIncludingChildren(transLogId);
        }
    }

}