Category: ‘Talend’

Introducing Apache Beam (dataflow)

January 20, 2016 Posted by jbonofre

As part of the Google Cloud ecosystem, Google created Dataflow SDK. Now, as a Google, Talend, Cask, data Artisans, PayPal, and Cloudera join effort, we are proposing Apache Dataflow to the Apache Incubator.

I’m proud, glad and excited to be the champion on the Apache Dataflow proposal.

But first, I would like to thank James Malone and Frances Perry from Google for their help, always open minded and interesting discussion. It’s really great to work with them !

Let’s take a quick tour on what will be Apache Dataflow.

Architecture and Programming Model

Imagine, you have a Hadoop cluster where you used MapReduce jobs. Now, you want to “migrate” these jobs to Spark: you have to refactore all your jobs which requires lot of works and cost a lot. And after that, see the effort and cost if you want to change for a new platform like Flink: you have to refactore your jobs again.

Dataflow aims to provide an abstraction layer between your code and the execution runtime.

The SDK allows you to use an unified programming model: you implement your data processing logic using the Dataflow SDK, the same code will run on different backends. You don’t need to refactore and change the code anymore !

If your target backend is not yet supported by Dataflow, you can implement your own runner for this backend, again the code using Dataflow SDK doesn’t change.

Dataflow is able to deal with batch processing jobs, but also with streaming jobs.

Architecture: pipelines, translators, runners

Using the SDK, your jobs are actually designed as pipeline. A pipeline is a chain of processes on the data.

It’s basically the only part that you have to write.

Dataflow reads the pipelines definition, and translate them for a target runner. A translator is responsible of adapting the pipeline code depending of the runner. For instance, the MapReduce translator will transform pipelines as MapReduce jobs, the Spark translator will transform pipelines as Spark jobs, etc.

The runners are the “execution” layer. Once a pipeline has been “translated” by a translator, it can be run directly on a runner. The runner “hides” the actual backend: MapReduce/Yarn cluster, Spark cluster (running on Yarn or Mesos), etc.

If Dataflow comes with ready to use translators and runners, you can create your own ones.

For instance, you can implement your own runner by creating a class extending PipelineRunner. You will have to implement different runner behaviours (like the transform evaluators, supported options, apply main transform hook method, etc).

SDK

The SDK is composed by four parts:

  • Pipelines are the streaming and processing logic that you want to implement. It’s a chain of processes. Basically, in a pipeline, you read data from a source, you apply transformations on the data, and eventually send the data to a destination (named sink in Dataflow wording).
  • PCollection is the object transported inside a pipeline. It’s the container of the data, flowing between each step of the pipeline.
  • Transform is a step of a pipeline. It takes an incoming PCollection and creates an outcoming PCollection. You can implement your own transform function.
  • Sink and Source are used to retrieve data as input (first step) of a pipeline, and eventually send the data outside of the pipeline.

In action

Let’s implement a very simple Wordcount job using Dataflow SDK.

First, we create a pipeline:

DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("Dataflow-Wordcount");
Pipeline pipeline = Pipeline.create(options);

We can see in the options that we define the pipeline runner: of course, it’s not a good practice to define the runner in the pipeline code. You can externalize the runner definition using commandline arguments.

We have a pipeline, we can now define the source of the data for this pipeline. It’s basically what we name a source in Dataflow:

pipeline.apply(TextIO.Read.from("/path/to/test.txt"))

Basically, the apply() method allows you to define the steps of a pipeline. A step can be:

  • a source (input)
  • a transform
  • a sink (output)

So, actually, you can see a pipeline like a chain of apply calls: pipeline.apply().apply().apply()….

Now, we have the data coming from a file (step 1). The purpose of the next step (step 2) is to identify the words in the input data. This step is a transformation step: it takes a PCollection as input (coming from the file source) and creates a resulting set of PCollections. To implement this transformation step, we use the ParDo function in the apply method. The ParDo function allows you to inject a DoFn (DoFunction) on the PCollection:

  .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
     @Override
     public void processElement(ProcessContext c) {
       for (String word : c.element().split("[^a-zA-Z']+")) {
         if (!word.isEmpty()) {
           c.output(word);
         }
       }
     }
  }))

The PCollections (input and output) are wrapped in a ProcessContext.

Now, we have identified the words, we are ready to count them. It’s a new step (step 3): it transforms the words as a map containing the count for each word.
To implement this step, as always, we use the apply method. We can use the same logic as in step 2 (using ParDo and DoFn). Fortunately, Dataflow provides ready to use function. Especially, in our case, Dataflow provides a Count function that we can use directly in the apply method:

.apply(Count.<String>perElement())

After this step (step 3), we have a Map containing word => count. We want to format this Map as a String, ready to be written in a file.
So, we add a new transformation step (step 4), again using the apply method on a pipeline. In this apply, we implement a SimpleFunction interface providing our own apply() method:

  .apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
     @Override
     public String apply(KV element) {
       return element.getKey() + ": " + element().getValue();
     }
  }))

After step 4, we have a String with word and count for each word. We are ready to send this String to an output file (a sink): it’s a new (and final) step of the pipeline, so we use again the apply() method:

.apply(TextIO.Write.to("gs://my-bucket/counts.txt"));

Here we are: our pipeline is done and ready to run.

To run it, it’s pretty simple: just call run() method on a pipeline:

pipeline.run();

What’s next

We are thinking some new features in Dataflow.

NB: these improvements and changes are still under review and discussion.

New runners, sinks, and sources

First, we will extend the provided runners, and sinks/sources to be much more “open” and turnkey.

For instance, why not having a runner for ignite, for Karaf, for a simple JVM (as the DirectPipelineRunner), etc.

In term of sinks & sources, I’m planning to be able to use Camel components, or at least provide more and more out of the box I/O support, especially for the Data Integration Dataflow layer.

DSL

If the usage of apply() method on a pipeline is very flexible, it may appear “complex” to new user. We are thinking about providing a DSL. So basically, instead of definition a pipeline as a chain of apply calls, you would be able to use specialized function like (of course, it would be still possible to use the apply method):

from("file:...").apply().count().to("file:...");

Data Integration

Dataflow is great abstraction layer for data processing (batch and streaming), but it could also become a very flexible integration platform. Additionally to the processing DSL, we will provide a integration DSL. It will bring EIP (Enterprise Integration Patterns) to the data integration.
Imagine an integration pipeline like:

from("http:...").filter().aggregate().to("jms:...").when(...).to().otherwise().to("kafka:...");

We can mix processing and integration in a very smart and powerful way.

It means that we will have the process/stream DSL and the integration DSL on top of the core one. Furthermore, the Sinks/Sources will not be only data source, but also message and data mixes.

Talend ESB: query a database directly in the mediation perspective

August 3, 2015 Posted by jbonofre

When exposing a database as a REST or SOAP service, lot of users use:

  • the integration perspective to create a service, but they don’t leverage Camel
  • the mediation perspective to create a route containing a cTalendJob

However, there’s an easy alternative.

Using the mediation perspective, we can use directly a datasource exposed in the runtime (Karaf) as an OSGi service, and directly use Camel components.

The advantages of this approach are:

  • The same DataSource is shared by different routes and services. It means that we can use a PooledDataSource and optimize the connections to the database.
  • We don’t use any Talend job, and directly leverage Camel native components.

Route design in the studio

We create a route in the mediation perspective of the studio.

screen1

First, in the Spring tab, we add the DataSource OSGi service lookup. To do so, we add the spring-osgi namespace and use the osgi:reference element:

<beans ....
   xmlns:osgi="http://www.springframework.org/schema/osgi"
   xsi:schemaLocation="
      ...
      http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
      ...">

  <osgi:reference id="demoDS" interface="javax.sql.DataSource" filter="(osgi.jndi.service.name=jdbc/demo)"/>

</beans>

screen2

You can use the same mechanism to load a JMS connection factory: you can reference a JMS ConnectionFactory OSGi service, and use the camel-jms component in a cMessagingEndpoint, or use cJMS component with an empty custom cJMSConnectionFactory.

Now, we can start the actual design of our route.

As we want to expose a REST service, the route starts with a CXFRS endpoint.

The CXFRS endpoint property is set to "/demo". It’s not an abolute URL: I recommend to use a relative URL as it will bind relatively to the CXF servlet in the runtime, and so leverage CXF and Jetty configuration of the runtime.

We also create an API mapping getMembers producing a JSON output.

screen3

NB: the methodName (here getMembers) is available as header. You can use a Content Base Router just after the CXFRS endpoint to route the different methods/REST action to different sub-routes/endpoints. Here, as we have only one method/action (getMembers), I directly route to an unique endpoint.

We now add a cMessagingEndpoint to use the camel-sql component. In the URI, we set the SQL query and the datasource reference:

"sql:select * from members?dataSource=demoDS"

screen4

The dataSource property corresponds to the reference id as defined in the Spring tab.

And in the advanced settings, we define the sql component:

screen5

The SQL endpoint populate the body of the in message with the query result, as a List>. I transform this as JSON using a marshaler (which use camel-xstream by default).

For that, I add a cJavaDSLProceddor which does:

.marshal().json()

screen6

For the demo, I directly marshal the list of map as JSON. If you want to have more control in the generated JSON, you can use a cProcessor before the cJavaDSLProcessor (marshaler). In this cProcessor, you create a simple instance of a POJO which will be marshaled as JSON, generating the JSON as you want.

Our route is now ready, let’s create a kar file.

screen7

We are now ready to deploy in the runtime (Karaf).

Deployment in the runtime

Let’s start a runtime:

$ bin/trun
karaf@trun>

First, we install the jdbc feature to easily create a datasource:

karaf@trun> features:install jdbc

Now, we can create the demo datasource (as expected in our route):

karaf@trun> jdbc:create -i -t derby demo

We now have the demo datasource ready:

karaf@trun> jdbc:datasources
                Name         Product    Version                                           URL Status
    jdbc/demoxa, 418    Apache Derby 10.8.2.2 - (1181258)                               jdbc:derby:demo    OK
      jdbc/demo, 419    Apache Derby 10.8.2.2 - (1181258)                               jdbc:derby:demo    OK

We create the members table using the jdbc:execute command:

karaf@trun> jdbc:execute jdbc/demo "create table members(id int, name varchar(256))"

Now, let’s insert a record in the members table:

karaf@trun> jdbc:execute jdbc/demo "insert into members values(1, 'jbonofre')"

As our route uses JSON marshaler, we need the camel-xstream component:

karaf@trun> features:install camel-xstream

We are now ready to deploy our route. We drop the kar file in the deploy folder. We can see in the log:

2015-08-03 10:02:09,895 | INFO  | pool-10-thread-1 | OsgiSpringCamelContext           | e.camel.impl.DefaultCamelContext 1673 | 170 - org.apache.camel.camel-core - 2.13.2 | Apache Camel 2.13.2 (CamelContext: BlogDataSourceRoute-ctx) started in 0.302 seconds

If we access to http://localhost:8040/services and see our REST service:

screen8

And the generated WADL:

screen10

Using a browse, you can use the REST service accessing http://localhost:8040/services/demo/. We have the JSON generated containing the database data:

screen11

Conclusion

When you want to manipulate a database, you don’t have to use a Talend Service or a cTalendJob. Camel provides components:

  • camel-sql
  • camel-jdbc
  • camel-jpa

Using the DataSource as an OSGi service, you can share an unique datasource from different routes, services, applications, and bundles, leveraging the pooling.

It’s an efficient way to leverage some runtime feature, with design in the studio.

Talend ESB Continous Integration, part2: Maven and commandline

October 24, 2013 Posted by jbonofre

In the first part of the “Talend ESB Continuous Integration” serie, we saw how to test the Camel routes created by the studio, by leveraging Camel Test Kit. We saw how to have automatic testing using Jenkins.

The Maven POM that we did assumes that the route has been deployed (on the local repository or on a remote repository like Apache Archiva).

But, it’s not so elegant that a Studio directly publish to the Archiva repository, especially from a continuous integration perspective.

In this second article, I will show how to use the Talend commandline with Maven, and do nightly builds using Jenkins.

Talend CommandLine

CommandLine introduction

The Talend commandline is the Talend Studio without the GUI. Thanks to the commandline, you can do a lot of actions, like checkout, export route, publish route, execute route. Actually, you can do all actions except the design itself 😉

You can find commandline*.sh scripts directly in your Talend Studio installation, or you can launch the commandline using:

./Talend-Studio-linux-gtk-x86_64 -nosplash -application org.talend.commandline.CommandLine -consoleLog -data commandline-workspace

You can use the commandline in different mode:

  • Shell Mode:

    ./Talend-Studio-linux-gtk-x86 -nosplash -application org.talend.commandline.CommandLine -consoleLog -data commandline-workspace shell
    

    Using this mode, the commandline starts a shell. You can execute the action directly in this shell. Type quit to exit from the commandline.

  • Script Mode:

    ./Talend-Studio-linux-gtk-x86 -nosplash -application org.talend.commandline.CommandLine -consoleLog -data commandline-workspace scriptFile /path/to/script
    

    Using this mode, the commandline starts and executes the actions (commands) listed in the script file.

  • Server Mode:

    ./Talend-Studio-linux-gtk-x86 -nosplash -application org.talend.commandline.CommandLine -consoleLog -data commandline-workspace startServer -p 8002
    

    Using this mode, the commandline starts a server. You can execution actions (commands) on the commandline using telnet (telnet localhost 8002). Type stopServer (eventually –force) to exit from the commandline.

The help command provides a list of all commands that you can execute in the commandline.

The first action to perform in the commandline is to init the Talend repository (containing the metadata). The repository can be local or remote.

To init a local repository, simply execute the following command in the Talend commandline:

talend> initLocal

To init a remote repository, you have to use the initRemote command, providing the location of a Talend Administration Center:

talend> initRemote http://localhost:8080/org.talend.administrator

As the commandline performs the actions asynchronously, you can see all commands (and the status) executed by the commandline, using listCommand:

talend> listCommand -a
0:COMPLETED InitRemoteCommand initRemote

Once the repository initialized, we can list the project in the repository:

talend> listProject
CI (CI) java desc=[Continuous Integration Sample] storage=[Local]


If you don't have existing project, you can create a new project:


talend> createProject -pn "CI" -pd "Continuous Integration Sample" -pl java -pa "jbonofre@talend.com"
talend> listCommand -a
1:COMPLETED CreateProjectCommand createProject -pn 'CI' -pd 'Continuous Integration Sample' -pl 'java' -pa 'jbonofre@talend.com'  name CI description Continuous Integration Sample language java author jbonofre@talend.com

Now, you can logon a project:

talend> logonProject -pn CI -ul "jbonofre@talend.com" [-up "password"]
talend> listCommand -a
2:COMPLETED LogonProjectCommand log on CI

Once logged on a project, you can list routes, jobs, services in this project:

talend> listRoute
talend> listJob
talend> listService

If you use a remote repository, once logged on the project, you will have all jobs, routes, and services checked out from the svn.

If you initialized a local repository, you may want to import items (jobs, routes, services) that you export from a studio.

talend> importItem /home/jbonofre/MyRoute.zip
talend> listCommand -a
3:COMPLETED ImportItemsCommand

Now, you can see the items that you imported:

talend> listRoute
[Samples]
  MyRoute

Now, we can use the command line the create the route kar file:

talend> exportRoute MyRoute -dd /home/jbonofre
talend> listCommand -a
4:COMPLETED ExportRouteServerCommand exportRoute 'MyRoute' -dd '/home/jbonofre'

We have the MyRoute.kar file created:

jbonofre@vostro:~$ ls -lh|grep -i kar
-rw-r--r--  1 jbonofre jbonofre 231K Oct 24 17:29 MyRoute.kar

Using the Talend Enterprise Edition, instead of creating the kar file locally, we can publish the route features (and all dependencies) directly to a Maven repository (Apache Archiva in my case):

talend> publishRoute MyRoute -pv 0.1.0-SNAPSHOT -g net.nanthrax -a MyRoute -r "http://localhost:8082/archiva/repository/repo-snapshot" -u tadmin -p foo 

We gonna use a combination of these commands on a commandline invoked by Maven.

Prepare the commandline

For our build, we use the script mode on the commandline.

To simplify, we create a commandline-script.sh in the Talend Studio installation directory. The commandline-script.sh contains:

./Talend-Studio-linux-gtk-x86_64 -nosplash -application org.talend.commandline.CommandLine -consoleLog -data commandline-workspace scriptFile $1

Publish script

We can now create a publish script called by the commandline-script.sh. This script performs the following action:

  1. Initialize the repository (local or remote, for this example, I use a remote repository)
  2. Logon on the project
  3. Publish a route

This script uses properties that we will filter with Maven using the resource plugin.

We place the script in src/scripts/commandline folder, with the publish name:

initRemote ${tac.location}
logonProject -pn ${talend.project} -ul "${tac.user}" -up ${tac.password}
publishRoute ${project.artifactId} -r "${repo.snapshot}" -u ${repo.user} -p ${repo.password} -pv ${project.version} -g ${project.groupId} -a ${project.artifactId}

We are now ready to call the commandline using Maven.

Maven deploy using commandline

To call the commandline with Maven, we use the exec-maven-plugin from codehaus.

Our Maven POM does:

  1. Disable the “default” deploy plugin.
  2. Use the maven-resource-plugin to filter the commandline scripts.
  3. Execute the commandline-script.sh at the deploy phase, using the filtered script files.

Finally, the Maven POM looks like:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>net.nanthrax</groupId>
    <artifactId>MyRoute</artifactId>
    <version>0.1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>MyRoute</name>

    <properties>
        <talend.project>MAIN</talend.project>
        <tac.location>http://localhost:8080/org.talend.administrator</tac.location>
        <tac.user>jbonofre@talend.com</tac.user>
        <tac.password>foobar</tac.password>
        <commandline.location>/opt/talend/commandline</commandline.location>
        <commandline.executable>./commandline-script.sh</commandline.executable>
        <repo.release>http://localhost:8082/archiva/repository/repo-release/</repo.release>
        <repo.snapshot>http://localhost:8082/archiva/repository/repo-snapshot/</repo.snapshot>
        <repo.user>admin</repo.user>
        <repo.password>foobar</repo.password>
    </properties>

    <repositories>
        <repository>
            <id>archiva.repo.release</id>
            <name>Aarchiva Artifact Repository (release)</name>
            <url>${repo.release}</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>archiva.repo.snapshot</id>
            <name>Archiva Artifact Repository (snapshot)</name>
            <url>${repo.snapshot}</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <build>
        <resources>
            <resource>
                <directory>${project.basedir}/src/scripts/commandline</directory>
                <filtering>true</filtering>
                <includes>
                    <include>**/*</include>
                </includes>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-deploy-plugin</artifactId>
                <version>2.7</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <id>export</id>
                        <phase>deploy</phase>
                        <goals>
                            <goal>exec</goal>
                        </goals>
                        <configuration>
                            <executable>${commandline.executable}</executable>
                            <workingDirectory>${commandline.location}</workingDirectory>
                            <arguments>
                                <argument>${project.build.directory}/classes/publish</argument>
                            </arguments>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

By leveraging the commandline, our Maven POM does:

  1. Checkout the Talend metadata repository.
  2. Generate the code using the metadata.
  3. Compile the generated code.
  4. Create the artifact, the Karaf features XML, and deploy on the Archiva repository.

Nightly builds using Jenkins

Now that we have our Maven POM, we can creat the job in Jenkins. Like this, we will have nightly builds including the latest changes performed by the developers.

Of course, we can “couple” this deploy phase with the unit tests that we did in the first article. We can merge both in the same Maven POM.

It’s interesting to note that we can leverage Maven features (like pluginManagement, profiles, etc), and especially reactor (multiple Maven modules) with this, allowing to build a set of jobs, routes, or services in a row.

Talend ESB Continous Integration, part1: Using Camel Test Kit

October 17, 2013 Posted by jbonofre

Introduction

In this serie of articles, I will show how to setup a Continuous Integration solution mixing Talend ESB tools, Maven, and Jenkins.

The purpose is to decouple the design (performed in the studio), the tests (both unit and integration tests), and the deployment of the artifacts.

The developers that use the studio should never directly upload to the Maven repository (Archiva in my case).

I propose to implement the following steps:

  1. the developers use the studio to design their routes: the metadata (used to generate the code) are stored in the subversion. The studio “only” checkouts and commits on subversion: it never directly upload to the artifact repository.
  2. a continuous integration tool (Jenkins in my case) uses Maven. The Maven POM leverages the Talend commandline (a studio without the GUI) to checkout, generate the code, and publish to the artifact repository. The Maven POM is also used to execute unit tests, eventually integration tests, and cleanly cut off the releases.
  3. the Talend runtimes (Karaf) deploy (using JMX or Talend Administration Center) the routes from the artifact repositories.

With this approach, we have a much cleaner isolation of concerns and tasks.

To demonstrate, I used Talend Enterprise edition 5.3.1, but you can do the same using the Open Studio edition.

In this first part, I will show how to use the Camel Test Kit with routes designed by the Talend studio, and how to periodically execute these tests using Jenkins.
To simplify, I will directly publish the routes on Archiva (my artifacts repository) using the studio. As I said before, it should not be done this way: only the Talend commandline (called from Jenkins) should be able to upload to the artifacts repository.

Camel Test Kit benefits

There are multiple reaons to use the Camel Test Kit and to write unit tests “outside” of the Talend studio:

  • it’s a step forward to continuous integration: the unit tests can be periodically executed by Jenkins. Thanks to that, it’s a good way to detect regressions: some changes performed in the studio may break the routes and so the unit tests.
  • it allows you to test components that you can’t run in the studio: for instance, you can’t run routes using vm component directly in the studio (you can but it’s not really useful). Thanks to mock and producer template, we can test the route and the vm endpoints.
  • it allows you to test even if you don’t have the actual dependent systems: in your route, you will probably use endpoints like CXF (for WebServices), file, FTP, JMS/ActiveMQ, etc. It’s not always easy to test route using such components directly in the studio: you may not want to really communicate with a FTP server, or to create local filesystem, etc. The Camel Test Kit allows you to mock some endpoints and mimic the actual endpoint without having really it.
  • Simulate errors: most of the time, in the studio, you test the “happy path”. But, especially when you use “custom” error handling, you may want to see if your error hanlder reacts correctly. The mock component is a good way to generate errors.

Talend Studio for the design

In the Talend Studio, using the Mediation perspective, you can design Camel routes.

The Studio should be used only for the design: not the deployment, the tests, or the releases (even if you can do all in the studio ;)).

Using the Mediation perspective, I created a simple route:

Talend Studio screenshot

We have two routes in this design:

  • from(“vm:start”).to(“log:cLog1”).to(“direct:start”)
  • from(“direct:start”).to(“log:cLog2”).choice().when(simple(“${in.header.type} == ‘region'”)).to(“vm:region”).otherwise().to(“vm:zipcode”)
  • a DeadLetter ErrorHandler which catch any exception and send to vm:errorhandling

The first step is to publish the route on the artifact repository (Apache Archiva or Sonatype Nexus for instance). You configure the location of the artifact repository in the Talend preferences of the studio.

A right click on the route show a menu containing the “Publish” button: it will upload (deploy) the route to the artifact repository. The “Publish” button is available only in Enterprise edition. If you use the OpenStudio edition, you have to export the route as a kar file, explode the kar file and use the Maven deploy plugin to upload to the artifact repository.

The publish window allows you to define the Maven groupId, artifactId, version, etc.

The route jar file (which is an OSGi bundle) contains two “special jar files” that you have to upload to the artifact repository. This step has to be done only one time per Talend studio version. The jar files are located into the lib folder of the route jar, so you can do:

jar xvf ShowUnitTest-0.1.0-SNAPSHOT.jar lib
mvn deploy:deploy-file -DgroupId=org.talend -DartifactId=systemRoutines -Dversion=5.3.1 -Dfile=lib/systemRoutines.jar -Dpackaging=jar -Durl=http://tadmin:tadmin@localhost:8082/archiva/repository/repo-release/
mvn deploy:deploy-file -DgroupId=org.talend -DartifactId=userBeans -Dversion=5.3.1 -Dfile=lib/userBeans.jar -Dpackaging=jar -Durl=http://tadmin:tadmin@localhost:8082/archiva/repository/repo-release/

NB: if systemRoutines artifact doesn’t really change, the userBeans artifact should be uploaded “per route” and updated when you modify or create a new bean that you use in your route.

We have now all the artifacts on our artifact repository to create the unit tests.

Using Camel Test Kit

The Camel Test (provided by the camel-test.jar) provides:

  • JUnit extensions: you can create very easily unit tests by extend the CamelTestSupport and CamelSpringTestSupport abstract classes
  • Producer/Consumer template: you can “inject” exchanges/messages at any point of a route. It allows you to test exactly a route at a given point, and create messages which mimic the actual messages
  • Mock component: you can mock actual endpoints, simulate errors, and set expectations on the mock.

Now, we can create a Maven project that will gather our unit tests. We start by creating the POM:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>utests</artifactId>
    <version>0.1.0-SNAPSHOT</version>

    <properties>
        <camel.version>2.10.4</camel.version>
        <talend.version>5.3.1</talend.version>
        <commandline.path>/home/jbonofre/Talend/Talend-Studio-r104014-V5.3.1</commandline.path>
    </properties>

    <repositories>
        <repository>
            <id>local.archiva.snapshot</id>
            <name>Local Maven Archiva for Snapshots</name>
            <url>http://localhost:8082/archiva/repository/repo-snapshot/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>local.archiva.release</id>
            <name>Local Maven Archiva for Releases</name>
            <url>http://localhost:8082/archiva/repository/repo-release/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.example</groupId>
            <artifactId>ShowUnitTest</artifactId>
            <version>${project.version}</version>
            <scope>test</scope>
        </dependency>

        <!-- Talend dependencies -->
        <dependency>
            <groupId>org.talend</groupId>
            <artifactId>systemRoutines</artifactId>
            <version>${talend.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.talend</groupId>
            <artifactId>userBeans</artifactId>
            <version>${talend.version}</version>
            <scope>test</scope>
        </dependency>

        <!-- Camel dependencies -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-test-spring</artifactId>
            <version>${camel.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-jdk14</artifactId>
            <version>1.6.6</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

On this Maven POM, we can note:

  • We define the Maven artifact repositories (in my case Apache Archiva) location in the <repositories> element.
  • The first <dependencies> is the route jar file itself.
  • We define the “Talend” dependencies, especially systemRoutines and userBeans.
  • Finally, we define the “Camel” dependencies: the Camel Test Kit itself, and a slf4j provider to have the log messages during the execution of the unit tests.

We are now ready to write the unit test itself. To do so, we create the src/test/java folder. In this folder, we create directly the unit test class. In my case, I create the ShowUnitTestTest class:

package test.showunittest_0_1;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.CamelTestSupport;
import org.junit.Test;

import java.io.IOException;

/**
 * Test on the ShowUnitTest routes
 */
public class ShowUnitTestTest extends CamelTestSupport {

    @Override
    public String isMockEndpoints() {
        return "*";
    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        ShowUnitTest route = new ShowUnitTest();
        route.initUriMap();
        return route;
    }

    @Test
    public void testRegionRouting() throws Exception {
        MockEndpoint regionMock = getMockEndpoint("mock:vm:region");
        MockEndpoint zipcodeMock = getMockEndpoint("mock:vm:zipcode");

        // we expect to receive one message on the JMS queue:region, and no message on the JMS queue:zipcode
        regionMock.setExpectedMessageCount(1);
        zipcodeMock.setExpectedMessageCount(0);

        // send a message with the region header
        template.sendBodyAndHeader("vm:start", "Foobar", "type", "region");

        // check the assertion
        assertMockEndpointsSatisfied();
    }

    @Test
    public void testZipCodeRouting() throws Exception {
        MockEndpoint regionMock = getMockEndpoint("mock:vm:region");
        MockEndpoint zipcodeMock = getMockEndpoint("mock:vm:zipcode");

        regionMock.setExpectedMessageCount(0);
        zipcodeMock.setExpectedMessageCount(1);

        // send a message with the region header
        template.sendBodyAndHeader("vm:start", "Foobar", "type", "zipcode");

        // check the assertion
        assertMockEndpointsSatisfied();
    }

    @Test
    public void testNoHeaderRouting() throws Exception {
        MockEndpoint regionMock = getMockEndpoint("mock:vm:region");
        MockEndpoint zipcodeMock = getMockEndpoint("mock:vm:zipcode");

        regionMock.setExpectedMessageCount(0);
        zipcodeMock.setExpectedMessageCount(1);

        // send a message with the region header
        template.sendBody("vm:start", "Foobar");

        // check the assertion
        assertMockEndpointsSatisfied();
    }

    @Test
    public void testErrorHandler() throws Exception {
        MockEndpoint zipcodeMock = getMockEndpoint("mock:vm:zipcode");
        MockEndpoint errorhandlingMock = getMockEndpoint("mock:vm:errorhandling");

        // raise an exception at the cLog processor step
        zipcodeMock.whenAnyExchangeReceived(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                throw new IOException("Test Error Handler");
            }
        });

        // the error handling route should have received a message
        errorhandlingMock.setExpectedMessageCount(1);

        // send a message, it should call the error handler
        template.sendBody("vm:start", "Foobar");

        // check the assertion
        assertMockEndpointsSatisfied();
    }

}

In this class, we said to Camel to be able to mockup any endpoint (overriding the isMockEndpoints() method). To find the Camel URI generated by the studio, you can switch to the source tab in the studio and take a look on the initUriMap() method: this method contains all URI of the route endpoints.

We also override the createRouteBuilder() method to load the route designed in the studio. To do it, we create the route object, call the initUriMap() method, and finally return this object.

Of course, we created four different tests:

  • the testRegionRouting() tests the route, and especially the content base router when setting the header ‘type’ to ‘region’. We mock up the vm:region and vm:zipcode endpoints. We use the producer template to send at the vm:start endpoint step.
  • the testZipCodeRouting() tests the route, and especially the content base router when setting the header ‘type’ to ‘zipcode’.
  • the testNoHeaderRouting() tests the route, and especially the content base router when the header ‘type’ is not set.
  • the testErrorHandler() tests the route, simulate an error to check if the error handler reacts correctly.

Special cases: JMS, context variables, cTalendJob,…

Depending of components that you use, Talend Studio manipulates the CamelContext for you. For instance, when you use the cJMS component, you have to create a cJMSConnectionFactory.

The Talend Studio generates the code to handle the CamelContext and “inject” the JMS connection factory into the Camel JMS component.

Unfortunately, it’s a done in a private method, so not callable directly from the test createRouteBuilder method (as we do with the initUriMap() method).

The workaround is to create the CamelContext in the test and copy the code generated by the studio here. Here’s an example how to use the “custom” JMS component (as the Studio does):

    @Override
    protected CamelContext createCamelContext() throws Exception {
        DefaultCamelContext camelContext = (DefaultCamelContext) super.createCamelContext();

        RouteName_Registry contextRegister = new RouteName_Registry(camelContext.getRegistry());
        camelContext.setRegistry(contextRegister);

        javax.jms.ConnectionFactory jmsConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
        camelContext.addComponent("cJMSConnectionFactory1", org.apache.camel.component.jms.JmsComponent.jmsComponent(jmsConnectionFactory));

        return camelContext;
    }

Another typical use case is about the Talend context variables. Thanks to the Talend Studio, you can define context variables that you can use in any place of your route.

In the route definition (in the studio), you can create multiple contexts.

In the unit test, you can decide which context you want to use for the test. To do so, you can use the readContextValues() method when you instanciate the route:

    @Override
    public RouteBuilder createRouteBuilder() throws Exception {
        RouteToTestName route = new RouteToTestName();
        route.readContextValues("Default");
        route.initUriMap();
        return route;
    }

Another feature provided in Talend ESB is that you can call Data Integration jobs in your Camel routes. To do so, Talend ESB registers a Camel component with “talend:” as URI prefix.
You have to load this component in the test CamelContext:

        TalendComponent talendComponent = new TalendComponent();
        camelContext.addComponent("talend", talendComponent);

Complete test

To summarize, if we take a look on the required resources, we need two things.

The first thing is a Maven POM containing all the resources and artifacts required for the route execution. Here’s a complete example:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>test</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>My Route Test</name>

  <properties>
    <talend.version>5.3.1</talend.version>
    <camel.version>2.10.4</camel.version>
  </properties>

  <dependencies>
    <!-- Route itself -->
    <dependency>
      <groupId>org.example</groupId>
      <artifactId>MyRoute</artifactId>
      <version>1.0-SNAPSHOT</version>
      <scope>test</scope>
    </dependency>
    <!-- Eventually job used in the route (via cTalendJob) -->
    <dependency>
      <groupId>org.example</groupId>
      <artifactId>MyRouteJob</artifactId>
      <version>1.0-SNAPSHOT</version>
      <scope>test</scope>
    </dependency>

    <!-- Eventually Camel components used in the route -->
    <!-- camel-ftp -->
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-ftp</artifactId>
      <version>${camel.version}</version>
      <scope>test</scope>
    </dependency>
    <!-- camel-http -->
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-http</artifactId>
      <version>${camel.version}</version>
      <scope>test</scope>
    </dependency>
    <!-- camel-xmljson -->
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-xmljson</artifactId>
      <version>${camel.version}</version>
      <scope>test</scope>
    </dependency>
    <!-- camel-cxf -->
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-cxf</artifactId>
      <version>${camel.version}</version>
      <scope>test</scope>
    </dependency>
    <!-- camel-jms and dependencies -->
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-jms</artifactId>
      <version>${camel.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.geronimo.specs</groupId>
      <artifactId>geronimo-jms_1.1_spec</artifactId>
      <version>1.1.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-core</artifactId>
      <version>5.7.0</version>
      <scope>test</scope>
    </dependency>
    <!-- camel-mail and mock-javamail -->
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-mail</artifactId>
      <version>${camel.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.jvnet.mock-javamail</groupId>
      <artifactId>mock-javamail</artifactId>
      <version>1.7</version>
      <scope>test</scope>
      <exclusions>
        <exclusion>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <!-- Talend dependencies -->
    <dependency>
      <groupId>org.talend</groupId>
      <artifactId>systemRoutines</artifactId>
      <version>${talend.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.talend</groupId>
      <artifactId>userBeans</artifactId>
      <version>${talend.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.talend.camel</groupId>
      <artifactId>camel-talendjob</artifactId>
      <version>${talend.version}</version>
      <scope>test</scope>
    </dependency>

    <!-- Camel dependencies -->
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-test-spring</artifactId>
      <version>${camel.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-jdk14</artifactId>
      <version>1.6.6</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

</project>

The second resource is the unit test itself (in src/test/java). Here’s a complete example, including registration of “custom” JMS component, Talend component, some custom beans registration:

package main.myroute_1_0;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import org.talend.camel.TalendComponent;

import java.util.*;

public class MyRoute_Test extends CamelTestSupport {

    @Override
    public String isMockEndpoints() {
        return "cJMSConnectionFactory1:*";
    }

    @Override
    public RouteBuilder createRouteBuilder() throws Exception {
        MyRoute route = new MyRoute();
        route.readContextValues("Default");
        route.initUriMap();
        return route;
    }

    @Override
    public CamelContext createCamelContext() throws Exception {
        DefaultCamelContext camelContext = (DefaultCamelContext) super.createCamelContext();
        MyRoute_Registry contextRegister = new MyRoute_Registry(camelContext.getRegistry());

        // custom MyBean
        beans.MyBean myBean = new beans.MyBean();
        contextRegister.register("myBean", myBean);

        // CXF_PAYLOAD_HEADER_FILTER bean required by cxf endpoint generated by the Studio
        CxfConsumerSoapHeaderFilter cxfConsumerSoapHeaderFilter = new CxfConsumerSoapHeaderFilter();
        registry.register("CXF_PAYLOAD_HEADER_FILTER", cxfConsumerSoapHeaderFilter);

        camelContext.setRegistry(contextRegister);

        // "custom" JMS component as generated by the Studio
        javax.jms.ConnectionFactory jmsConnectionFactory = new  org.apache.activemq.ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
        camelContext.addComponent("cJMSConnectionFactory1", org.apache.camel.component.jms.JmsComponent.jmsComponent(jmsConnectionFactory));

        // Talend component
        TalendComponent talendComponent = new TalendComponent();
        camelContext.addComponent("talend", talendComponent);        

        return camelContext;
    }

    @Test
    public void testRouteWithMyHeader() throws Exception {
        MockEndpoint queueMock = getMockEndpoint("mock:cJMSConnectionFactory1:queue:OUTPUT_QUEUE");

        queueMock.setMinimumExpectedMessageCount(1);

        String testHeader = "MyHeader";

        // construct the body
        List<String> body = new ArrayList<String>();
        body.add("foo");
        body.add("bar");

        Map<String, Object> camelHeaders = new HashMap<String, Object>();
        camelHeaders.put("MyHeader", testHeader);
        camelHeaders.put("CamelFileName", "/tmp/foobar.csv");
        template.sendBodyAndHeaders("cJMSConnectionFactory1:queue:INPUT_QUEUE", body, camelHeaders);

        assertMockEndpointsSatisfied();

        assertTrue(queueMock.getExchanges().get(0).getIn().getBody() instanceof List<String>);
    }

    class CxfConsumerSoapHeaderFilter extends org.apache.camel.component.cxf.common.header.CxfHeaderFilterStrategy {
        public boolean applyFilterToCamelHeaders(String headerName, Object headerValue, org.apache.camel.Exchange exchange) {
            if (org.apache.cxf.headers.Header.HEADER_LIST.equals(headerName)) {
                return true;
            }
            return super.applyFilterToCamelHeaders(headerName, headerValue,
                    exchange);
        }

        public boolean applyFilterToExternalHeaders(String headerName, Object headerValue, org.apache.camel.Exchange exchange) {
            if (org.apache.cxf.headers.Header.HEADER_LIST.equals(headerName)) {
                return true;
            }
            return super.applyFilterToExternalHeaders(headerName, headerValue,
                    exchange);
        }
    }

}

Integration with Jenkins

Now, we can periodically execute these unit tests.

To do so, I installed Jenkin in a Tomcat, and setup the Maven POM:

screen2

screen3

screen4

Next step

Unit test is the first step to a complete continuous integration process using Talend.

In the next article, I will deal with the usage of the Talend commandline via Maven, and integrate this in Jenkins.