ETL引擎Kettle的使用

技术选型

在选型上工作流引擎基本上是一个没法选的状态,开源的除了kettle之外基本上是完蛋的状态,不完善。商用的形成了完善ETL功能的基本上全部都完成了Pass系统我们集成上会非常难受所以没有什么好选的基本上上就是kettle了。

项目集成

集成上采用了最简单的方案由项目代码自行进行调度,任务执行的逻辑通过编辑器产出的配置文件进行定义最后就是基于配置文件的运行。在集成上其实是比较傻逼的,原因在于maven中央仓库没有kettle的引擎包并且它采用的数据库驱动版本比我们项目使用的要落后一点就是不兼容,只有通过maven以<scope>system</scope>的方式加载依赖包。打包上为spring的maven插件开启<includeSystemScope>true</includeSystemScope>选项便可以将这些依赖打包到项目jar包之中。只需要将这些jar包放置到项目/resources/lib文件夹下就可以了

maven配置如下:

<dependency>
   <groupId>pentaho-kettle</groupId>
   <artifactId>kettle-core</artifactId>
   <version>8.2.0.7-719</version>
   <scope>system</scope>
   <systemPath>${project.basedir}/src/main/resources/lib/kettle-core-8.2.0.7-719.jar</systemPath>
</dependency>

<dependency>
   <groupId>pentaho-kettle</groupId>
   <artifactId>kettle-engine</artifactId>
   <version>8.2.0.7-719</version>
   <scope>system</scope>
   <systemPath>${project.basedir}/src/main/resources/lib/kettle-engine-8.2.0.7-719.jar</systemPath>
</dependency>

<dependency>
   <groupId>pentaho-kettle</groupId>
   <artifactId>metastore</artifactId>
   <version>8.2.0.7-719</version>
   <scope>system</scope>
   <systemPath>${project.basedir}/src/main/resources/lib/metastore-8.2.0.7-719.jar</systemPath>
</dependency>

<dependency>
   <groupId>org.apache.commons</groupId>
   <artifactId>commons-vfs2</artifactId>
   <version>2.2</version>
</dependency>

<dependency>
   <groupId>com.google.guava</groupId>
   <artifactId>guava</artifactId>
   <version>17.0</version>
</dependency>

<dependency>
   <groupId>commons-io</groupId>
   <artifactId>commons-io</artifactId>
   <version>2.2</version>
</dependency>

<dependency>
   <groupId>commons-lang</groupId>
   <artifactId>commons-lang</artifactId>
   <version>2.6</version>
</dependency>

<dependency>
   <groupId>commons-codec</groupId>
   <artifactId>commons-codec</artifactId>
   <version>1.10</version>
</dependency>

<dependency>
   <groupId>com.jcraft</groupId>
   <artifactId>jsch</artifactId>
   <version>0.1.54</version>
</dependency>

<!--不要笑,这TM是无奈之举,Kettle的mysql驱动和我们项目的驱动冲突了,目前只有这样才能同时使用多个包-->
<!--不过我们项目的驱动和Kettle的驱动不同,所以引进了也没有什么影响无非大一点-->
<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-custom-connector-java</artifactId>
   <version>5.1.34</version>
   <scope>system</scope>
   <systemPath>${project.basedir}/src/main/resources/lib/mysql-connector-java-5.1.34.jar</systemPath>
</dependency>

基础封装

基础封装上比较简单,目前在KettleService之中定义了两个接口:

public interface KettleService {
   /**
    * 开始执行ETL任务(ktr文件)
    *
    * @param taskFileName 执行的任务文件名(ktr)
    * @param params 执行任务输入的参数
    * @return 运行结果
    * @throws Exception 没有找到配置文件,Kettle的运行异常不会抛出
    */
   Result runTaskKtr(String taskFileName, Map<String, String> params) throws Exception;

   /**
    * 开始执行ETL任务(kjb文件)
    *
    * @param taskFileName 执行的任务文件名(kjb)
    * @param params 执行任务输入的参数
    * @return 运行结果
    * @throws Exception 没有找到配置文件,Kettle的运行异常不会抛出
    */
   Result runTaskKjb(String taskFileName, Map<String, String> params) throws Exception;
}

接口实现如下:

@Service
public class KettleServiceImpl implements KettleService {

   @Value("${kettle.script.path}")
   private String kettleScriptPath;

   private static final Logger logger = LoggerFactory.getLogger("kettle-service-log");

   private final List<KtrMeta> KTR_METAS = new ArrayList<>();
   private final List<KjbMeta> KJB_METAS = new ArrayList<>();

   private List<String> getFiles(String path, String subName) {
      List<String> files = new ArrayList<>();
      File file = new File(path);
      File[] tempList = file.listFiles();
      if (tempList == null){
         return files;
      }
      for (File value : tempList) {
         if (value.isFile()) {
            if (Objects.equals(value.toString().substring(value.toString().length() - 3), subName)) {
               files.add(value.getName());
            }
         }
      }
      return files;
   }

   @PostConstruct
   public void init() throws KettleException {
      logger.info("----------------------开始初始化ETL配置------------------------");
      KettleEnvironment.init();
      List<String> ktrFiles = getFiles(kettleScriptPath, "ktr");
      List<String> kjbFiles = getFiles(kettleScriptPath, "kjb");
      logger.info("需要加载的转换为:" + ktrFiles.toString());
      logger.info("需要加载的任务为:" + kjbFiles.toString());
      logger.info("----------------------开始加载ETL配置--------------------------");
      for (String ktrFile : ktrFiles) {
         KtrMeta ktrMeta = new KtrMeta();
         ktrMeta.setName(ktrFile);
         ktrMeta.setTransMeta(new TransMeta(kettleScriptPath + ktrFile));
         KTR_METAS.add(ktrMeta);
         logger.info("成功加载转换配置:" + ktrFile);
      }
      for (String kjbFile : kjbFiles) {
         KjbMeta kjbMeta = new KjbMeta();
         kjbMeta.setName(kjbFile);
         kjbMeta.setJobMeta(new JobMeta(kettleScriptPath + kjbFile, null));
         KJB_METAS.add(kjbMeta);
         logger.info("成功加载任务配置:" + kjbFile);
      }
      logger.info("----------------------全部ETL配置加载完毕-----------------------");
   }

   @Override
   public Result runTaskKtr(String taskFileName, Map<String, String> params) throws Exception {
      logger.info("开始执行转换:" + taskFileName);
      TransMeta transMeta = null;
      for (KtrMeta ktrMeta : KTR_METAS) {
         if(Objects.equals(taskFileName,ktrMeta.getName())){
            transMeta = ktrMeta.getTransMeta();
         }
      }
      if (transMeta == null) {
         logger.error("没有找到配置文件:" + taskFileName);
         throw new Exception("没有找到配置文件");
      }
      Trans trans = new Trans(transMeta);
      if (params != null) {
         for (Map.Entry<String, String> entry : params.entrySet()) {
            trans.setParameterValue(entry.getKey(), entry.getValue());
         }
      }
      trans.prepareExecution(null);
      trans.startThreads();
      trans.waitUntilFinished();
      return trans.getResult();
   }

   @Override
   public Result runTaskKjb(String taskFileName, Map<String, String> params) throws Exception {
      logger.info("开始执行任务:" + taskFileName);
      JobMeta jobMeta = null;
      for (KjbMeta kjbMeta : KJB_METAS) {
         if(Objects.equals(taskFileName,kjbMeta.getName())){
            jobMeta = kjbMeta.getJobMeta();
         }
      }
      if (jobMeta == null) {
         logger.error("没有找到配置文件:" + taskFileName);
         throw new Exception("没有找到配置文件");
      }
      Job job = new Job(null, jobMeta);
      if (params != null) {
         for (Map.Entry<String, String> entry : params.entrySet()) {
            job.setParameterValue(entry.getKey(), entry.getValue());
         }
      }
      job.start();
      job.waitUntilFinished();
      return job.getResult();
   }
}

kettle之中定义好ktr文件与kjb文件之后只要导入到系统之中便可以直接运行,这里有一个问题那就是配置文件的加载需要创建对应对象,这个创建过程非常慢,一个配置文件差不多要20几秒但是一旦初始化完成之后这些对象并不会怎么变动所以我们采用了单例设计,程序初始化时候就去构造,在最终使用的时候一定要注意线程安全,最好加一个锁

还有一个需要注意的事情,这两个接口执行都会对线程进行阻塞,虽然本质上都是多线程任务但是为了能够获取结果所以把他搞阻塞了,如果项目需要搞异步操作那么直接改代码即可。

最后就是参数问题,所有的ktr文件与kjb文件可以传入参数要不然作为查询的条件要不然作为写入的常量,接口之中的params便是传入的参数,参数在具体的转换流程之中可以作为一个基础的初始值。

Kettle的使用

创建配置文件需要使用kettle的Spoon编辑器(下载地址)。编辑器本身还是比较复杂涉及的转换功能不少甚至可以使用JS脚本预约或者JAVA代码进行细节控制。整体上基本都是图形界面上手很简单想要精通的上限也很高。不过需要kettle进场的机会不会特别多,更多的是一对一的跨数据库迁移。转换上仅仅依靠kettle不可能完成非常复杂的转换大部分情况是对字段值修改、值类型设置、字段过滤、字段选择等等这类基础的操作,基本上就是一些SQL可以干的事情。

Kettle的边界

kettle也不是万能的它也是有明确的功能边界,它真正的优势在于这个东西可以跨数据源进行统一的数据操作,比如读写xmlexceljson等文件或者SQLServerMySQLOracleRedisMongoDB这些数据库这就不需要进行各种集成便可以直接使用。如果复杂数据集成或者是各种分析、计算得到结果数据的ETL工具依旧是无法实现的。

ETL引擎Kettle的使用》有2条留言

留下回复