DataToolsServiceImpl.java 10.8 KB
package com.bsth.service.schedule.utils;

import com.bsth.service.schedule.exception.ScheduleException;
import com.google.common.io.Files;
import org.apache.tika.Tika;
import org.joda.time.DateTime;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.logging.KettleLogStore;
import org.pentaho.di.core.logging.LoggingBuffer;
import org.pentaho.di.core.logging.LoggingRegistry;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Service;

import java.io.File;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * Created by xu on 17/1/3.
 */
@Service
@EnableConfigurationProperties(DataToolsProperties.class)
public class DataToolsServiceImpl implements DataToolsService, InitializingBean {
    /** 日志记录器 */
    private static final Logger LOGGER = LoggerFactory.getLogger(DataToolsServiceImpl.class);

    @Autowired
    private DataToolsProperties dataToolsProperties;

    /**
     * 自定义kettle环境初始化定义。
     */
    private void ktrEnvironmentInit() throws Exception {
        // 由于kettle.properties可能没有(没有安装过kettle),
        // 导致 EnvUtil.environmentInit() 报找不到kettle.properties文件
        // 所以这里重写 EnvUtil.environmentInit() 方法

        if (Thread.currentThread().getContextClassLoader() == null) {
            Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
        }

        // 获取配置文件
        File file = new File(getClass().getResource(dataToolsProperties.getKettleProperties()).toURI());
        Properties kettleProperties = EnvUtil.readProperties(file.getAbsolutePath());
        EnvUtil.applyKettleProperties(kettleProperties);
        System.getProperties().put("Internal.Cluster.Size", "1");
        System.getProperties().put("Internal.Slave.Transformation.Number", "0");
        System.getProperties().put("Internal.Slave.Server.Name", "slave-trans-name");
        System.getProperties().put("Internal.Step.CopyNr", "0");
        System.getProperties().put("Internal.Step.Name", "step-name");
        System.getProperties().put("Internal.Step.Partition.ID", "partition-id");
        System.getProperties().put("Internal.Step.Partition.Number", "0");
        System.getProperties().put("Internal.Step.Unique.Count", "1");
        System.getProperties().put("Internal.Step.Unique.Number", "0");
        if (!kettleProperties.containsKey("vfs.sftp.org.apache.commons.vfs2.provider.sftp.SftpFileSystemConfigBuilder.USER_DIR_IS_ROOT")) {
            System.getProperties().put("vfs.sftp.org.apache.commons.vfs2.provider.sftp.SftpFileSystemConfigBuilder.USER_DIR_IS_ROOT", "false");
        }

    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // 初始化kettle环境(自定义)
        ktrEnvironmentInit();

        // 添加全局ktr变量,并覆盖原来的设置
        Map<String, String> kvars = new HashMap<>();
        kvars.put("v_db_ip", dataToolsProperties.getKvarsDbip());
        kvars.put("v_db_uname", dataToolsProperties.getKvarsDbuname());
        kvars.put("v_db_pwd", dataToolsProperties.getKvarsDbpwd());
        kvars.put("v_db_dname", dataToolsProperties.getKvarsDbdname());
        EnvUtil.applyKettleProperties(kvars, true);
        KettleEnvironment.init();
    }

    @Override
    public File uploadFile(String filename, byte[] filedata) throws ScheduleException {
        // 上传文件
        try {
            LOGGER.info("start uploadFile...originalFilename={}", filename);
            File newFile = new File(dataToolsProperties.getFileuploadDir() + File.separator +
                    filename + "-upload-" + new DateTime().toString("yyyyMMddHHmmss") + ".xls");
            // TODO:判定是否excel数据
            Tika tika = new Tika();
            String type = tika.detect(filedata);
            // application/x-tika-msoffice
            LOGGER.info("文件格式={}", type);
            if ("application/vnd.ms-excel".equals(type) || "application/x-tika-msoffice".equals(type)) {
                // .xls 2007的格式
                Files.write(filedata, newFile);
            } else if ("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet".equals(type)) {
                // .xlsx 2007之后的格式
                throw new Exception("暂时不支持.xlsx格式文件!");
            } else {
                // 非excel文件
                throw new Exception("非.xls格式文件!");
            }

            LOGGER.info("uploadFile success...newFilename={}", newFile.getAbsolutePath());

            return newFile;
        } catch (Exception exp) {
            LOGGER.info("uploadFile failed...stackTrace...");

            StringWriter sw = new StringWriter();
            exp.printStackTrace(new PrintWriter(sw));
            LOGGER.info(sw.toString());

            throw new ScheduleException("上传文件错误!");
        }
    }

    @Override
    public void importData(File file, Map<String, Object> params) throws ScheduleException {
        // 导入数据
        String transLogId = "";
        String transMetaLogId = "";
        try {
            LOGGER.info("start importData...originalFilename={}", file.getAbsolutePath());
            // 检查参数
            String transpath = String.valueOf(params.get("transpath"));
            if ("null".equals(transpath)) {
                throw new Exception(
                        "没有指定transpath参数值,无法确定ktr转换文件!");
            }
            File ktrFile = new File(transpath);
            // 设置文件路径,错误输出文件路径参数
            params.put("filepath", file.getAbsolutePath());
            params.put("erroroutputdir", dataToolsProperties.getTransErrordir());

            // 2、使用kettle运行封装数据导入逻辑的ktr转换文件
            // 2.1、初始化kettle(组件初始化已经做了)
            // 2.2、创建转换元数据,转换
            TransMeta transMeta = new TransMeta(ktrFile.getAbsolutePath());
            Trans trans = new Trans(transMeta);
            // 2.3、设定命名参数,用于指定数据文件,注意每个ktr必须都有以下指定的命名参数
            for (String key : params.keySet()) {
                trans.setParameterValue(key, String.valueOf(params.get(key)));
            }
            // 2.4、执行转换
            trans.execute(null);
            // 2.5、等待转换结束
            trans.waitUntilFinished();

            // 获取日志
            transLogId = trans.getLogChannelId();
            transMetaLogId = transMeta.getLogChannelId();

            LoggingBuffer loggingBuffer = KettleLogStore.getAppender();
            StringBuffer stringBuffer = loggingBuffer.getBuffer(
                    trans.getLogChannelId(), false
            );
            if (trans.getErrors() > 0) {
                throw new Exception(stringBuffer.toString());
            }
            LOGGER.info(stringBuffer.toString());
            LOGGER.info("importData success...");

        } catch (Exception exp) {
            LOGGER.info("importData failed...statckTrace...");

            StringWriter sw = new StringWriter();
            exp.printStackTrace(new PrintWriter(sw));
            LOGGER.info(sw.toString());

            throw new ScheduleException("导入数据错误!");
        } finally {
            // 清除日志操作
            KettleLogStore.discardLines(transLogId, true);
            KettleLogStore.discardLines(transMetaLogId, true);
            LoggingRegistry.getInstance().removeIncludingChildren(transLogId);
        }
    }

    @Override
    public File exportData(Map<String, Object> params) throws ScheduleException {
        // 导出数据
        String transLogId = "";
        String transMetaLogId = "";
        try {
            LOGGER.info("start exportData...");
            // 检查参数
            String filename = String.valueOf(params.get("filename"));
            if ("null".equals(filename)) {
                filename = "temp";
            }
            String transpath = String.valueOf(params.get("transpath"));
            if ("null".equals(transpath)) {
                throw new Exception(
                        "没有指定transpath参数值,无法确定ktr转换文件!");
            }
            File ktrFile = new File(transpath);
            // 设置文件路径参数
            String filepath = dataToolsProperties.getFileoutputDir() +
                    File.separator +
                    filename +
                    new DateTime().toString("yyyyMMddHHmmss") + ".xls";
            params.put("filepath", filepath);

            // 2、使用kettle运行封装数据导入逻辑的ktr转换文件
            // 2.1、初始化kettle(组件初始化已经做了)
            // 2.2、创建转换元数据,转换
            TransMeta transMeta = new TransMeta(ktrFile.getAbsolutePath());
            Trans trans = new Trans(transMeta);
            // 2.3、设定命名参数,用于指定数据文件,注意每个ktr必须都有以下指定的命名参数
            for (String key : params.keySet()) {
                trans.setParameterValue(key, String.valueOf(params.get(key)));
            }
            // 2.4、执行转换
            trans.execute(null);
            // 2.5、等待转换结束
            trans.waitUntilFinished();

            // 获取日志
            transLogId = trans.getLogChannelId();
            transMetaLogId = transMeta.getLogChannelId();

            LoggingBuffer loggingBuffer = KettleLogStore.getAppender();
            StringBuffer stringBuffer = loggingBuffer.getBuffer(
                    trans.getLogChannelId(), false
            );
            if (trans.getErrors() > 0) {
                throw new Exception(stringBuffer.toString());
            }
            LOGGER.info(stringBuffer.toString());
            LOGGER.info("exportData success...");

            return new File(filepath);
        } catch (Exception exp) {
            LOGGER.info("exportData failed...statckTrace...");

            StringWriter sw = new StringWriter();
            exp.printStackTrace(new PrintWriter(sw));
            LOGGER.info(sw.toString());

            throw new ScheduleException("导出数据错误!");
        } finally {
            // 清除日志操作
            KettleLogStore.discardLines(transLogId, true);
            KettleLogStore.discardLines(transMetaLogId, true);
            LoggingRegistry.getInstance().removeIncludingChildren(transLogId);
        }
    }
}