已经有一篇文章,经测试是可以的,不过好像不太正宗,可以到参考原文。
官方的连接repository的示例代码如下:
KettleEnvironment.init();RepositoriesMeta repositoriesMeta = new RepositoriesMeta();repositoriesMeta.readData();RepositoryMeta repositoryMeta = repositoriesMeta.findRepository( repositoryName );PluginRegistry registry = PluginRegistry.getInstance();Repository repository = registry.loadClass( RepositoryPluginType.class, repositoryMeta, Repository.class );repository.init(repositoryMeta);repository.connect(username, password);RepositoryDirectoryInterface tree=repository.loadRepositoryDirectoryTree();TransMeta transMeta=repository.loadTransformation("first",tree,listener,false,null);Trans Trans = new Trans(transMeta);trans.execute(arguments);trans.waitUntilFinished();Result result = trans.getResult();//这样就获取到了数据库形式的资源库,可以加载转换和job并运行了。repositoryName就是在spoon中创建的资源库名称,这里会读取用户目录下.kettle/repositories.xml文件,并根据repositoryName获取资源库的具体信息。
添加 parameters, variables 和 arguments
try { for (String key : parameterMap.keySet()) { transMeta.setParameterValue(key, parameterMap.get(key)); } for (String key : variableMap.keySet()) { transMeta.setVariable(key, variableMap.get(key)); }} catch (UnknownParamException e) { error(e.getMessage());}transMeta.setArguments(arguments);
获取转换或job中某一步骤的输入和输出的数据,需要添加一个RowListener(RowAdapter):
Trans Trans = new Trans(transMeta);// prepare the execution of the transformation (instead of simply execute)trans.prepareExecution(arguments);// Find a step thread (ready to run but not yet started)// You can also use method Trans.findBaseStep which gives back a list of all the step copies//先获取感兴趣的那个步骤,也可以使用Trans.findBaseStep方法获取所有步骤的一个副本。StepInterface step = trans.findRunThread("Your Step Name");// Attach a row listener to a step copystep.addRowListener(new RowAdapter() { public void rowReadEvent(RowMetaInterface rowMeta, Object[] row) throws KettleStepException { // Here you get the rows as they are read by the step //该步骤读取的数据,即上一步骤传入的数据 } public void rowWrittenEvent(RowMetaInterface rowMeta, Object[] row) throws KettleStepException { // Here you get the rows as they are written by the step //该步骤输出的数据,即处理后传给下一步骤的数据 } });// Now start the transformation threads...trans.startThreads();// If you want to wait until the transformation is finished...trans.waitUntilFinished(); // If you want to know about the execution result.Result result = trans.getResult();
还可以手动向转换中添加数据,不需要从文件或数据库中读取,详情。
Options
Option | Definition |
---|---|
Step name | The name of this step as it appears in the transformation workspace. |
Fieldname | Specify the field name of the rows to inject. |
Type | Specify the type of data. |
Length | For Number: Total number of significant figures in a number; For String: total length of string; For Date: length of printed output of the string. |
Precision | For Number: Number of floating point digits; For String, Date, Boolean: unused. |
Example
Here is some information on how to do it:
- You can ask a Trans object for a RowProducer object
- Also see the unit test case:
Use this type of code:
Trans trans = new Trans(... TransMeta ...);trans.prepareExecution(args);RowProcuder rp = trans.addRowProducer(String stepname, int stepCopy);
After that you start the threads in the transformation. Then you can inject the rows while the transformation is running:
trans.startThreads();...rp.putRow(some row you need to inject);...
You can also specify the rows you are expecting to be injected. This makes it easier to build transformations because you have the meta-data at design time.
嵌入到其它项目中使用,参考
未完待续,如有更好的相关博文,可以留言贴地址,继续参考丰富一下。