Posts Tagged: ‘camel’

Talend ESB: query a database directly in the mediation perspective

August 3, 2015 Posted by jbonofre

When exposing a database as a REST or SOAP service, lot of users use:

  • the integration perspective to create a service, but they don’t leverage Camel
  • the mediation perspective to create a route containing a cTalendJob

However, there’s an easy alternative.

Using the mediation perspective, we can use directly a datasource exposed in the runtime (Karaf) as an OSGi service, and directly use Camel components.

The advantages of this approach are:

  • The same DataSource is shared by different routes and services. It means that we can use a PooledDataSource and optimize the connections to the database.
  • We don’t use any Talend job, and directly leverage Camel native components.

Route design in the studio

We create a route in the mediation perspective of the studio.

screen1

First, in the Spring tab, we add the DataSource OSGi service lookup. To do so, we add the spring-osgi namespace and use the osgi:reference element:

<beans ....
   xmlns:osgi="http://www.springframework.org/schema/osgi"
   xsi:schemaLocation="
      ...
      http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
      ...">

  <osgi:reference id="demoDS" interface="javax.sql.DataSource" filter="(osgi.jndi.service.name=jdbc/demo)"/>

</beans>

screen2

You can use the same mechanism to load a JMS connection factory: you can reference a JMS ConnectionFactory OSGi service, and use the camel-jms component in a cMessagingEndpoint, or use cJMS component with an empty custom cJMSConnectionFactory.

Now, we can start the actual design of our route.

As we want to expose a REST service, the route starts with a CXFRS endpoint.

The CXFRS endpoint property is set to "/demo". It’s not an abolute URL: I recommend to use a relative URL as it will bind relatively to the CXF servlet in the runtime, and so leverage CXF and Jetty configuration of the runtime.

We also create an API mapping getMembers producing a JSON output.

screen3

NB: the methodName (here getMembers) is available as header. You can use a Content Base Router just after the CXFRS endpoint to route the different methods/REST action to different sub-routes/endpoints. Here, as we have only one method/action (getMembers), I directly route to an unique endpoint.

We now add a cMessagingEndpoint to use the camel-sql component. In the URI, we set the SQL query and the datasource reference:

"sql:select * from members?dataSource=demoDS"

screen4

The dataSource property corresponds to the reference id as defined in the Spring tab.

And in the advanced settings, we define the sql component:

screen5

The SQL endpoint populate the body of the in message with the query result, as a List>. I transform this as JSON using a marshaler (which use camel-xstream by default).

For that, I add a cJavaDSLProceddor which does:

.marshal().json()

screen6

For the demo, I directly marshal the list of map as JSON. If you want to have more control in the generated JSON, you can use a cProcessor before the cJavaDSLProcessor (marshaler). In this cProcessor, you create a simple instance of a POJO which will be marshaled as JSON, generating the JSON as you want.

Our route is now ready, let’s create a kar file.

screen7

We are now ready to deploy in the runtime (Karaf).

Deployment in the runtime

Let’s start a runtime:

$ bin/trun
karaf@trun>

First, we install the jdbc feature to easily create a datasource:

karaf@trun> features:install jdbc

Now, we can create the demo datasource (as expected in our route):

karaf@trun> jdbc:create -i -t derby demo

We now have the demo datasource ready:

karaf@trun> jdbc:datasources
                Name         Product    Version                                           URL Status
    jdbc/demoxa, 418    Apache Derby 10.8.2.2 - (1181258)                               jdbc:derby:demo    OK
      jdbc/demo, 419    Apache Derby 10.8.2.2 - (1181258)                               jdbc:derby:demo    OK

We create the members table using the jdbc:execute command:

karaf@trun> jdbc:execute jdbc/demo "create table members(id int, name varchar(256))"

Now, let’s insert a record in the members table:

karaf@trun> jdbc:execute jdbc/demo "insert into members values(1, 'jbonofre')"

As our route uses JSON marshaler, we need the camel-xstream component:

karaf@trun> features:install camel-xstream

We are now ready to deploy our route. We drop the kar file in the deploy folder. We can see in the log:

2015-08-03 10:02:09,895 | INFO  | pool-10-thread-1 | OsgiSpringCamelContext           | e.camel.impl.DefaultCamelContext 1673 | 170 - org.apache.camel.camel-core - 2.13.2 | Apache Camel 2.13.2 (CamelContext: BlogDataSourceRoute-ctx) started in 0.302 seconds

If we access to http://localhost:8040/services and see our REST service:

screen8

And the generated WADL:

screen10

Using a browse, you can use the REST service accessing http://localhost:8040/services/demo/. We have the JSON generated containing the database data:

screen11

Conclusion

When you want to manipulate a database, you don’t have to use a Talend Service or a cTalendJob. Camel provides components:

  • camel-sql
  • camel-jdbc
  • camel-jpa

Using the DataSource as an OSGi service, you can share an unique datasource from different routes, services, applications, and bundles, leveraging the pooling.

It’s an efficient way to leverage some runtime feature, with design in the studio.

Monitoring and alerting with Apache Karaf Decanter

July 28, 2015 Posted by jbonofre

Some months ago, I proposed Decanter on the Apache Karaf Dev mailing list.

Today, Apache Karaf Decanter 1.0.0 first release is now on vote.

It’s the good time to do a presentation 😉

Overview

Apache Karaf Decanter is complete monitoring and alerting solution for Karaf and the applications running on it.

It’s very flexible, providing ready to use features, and also very easy to extend.

Decanter 1.0.0 release works with any Karaf version, and can also be used to monitor applications outside of Karaf.

Decanter provides collectors, appenders, and SLA.

Collectors

Decanter Collectors are responsible of harvesting the monitoring data.

Basically, a collector harvest the data, create an OSGi EventAdmin Event event send to decanter/collect/* topic.

A Collector can be:

  • Event Driven, meaning that it will automatically react to an internal event
  • Polled, meaning that it’s periodically executed by the Decanter Scheduler

You can install multiple Decanter Collectors in the same time. In the 1.0.0 release, Decanter provides the following collectors:

  • log is an event-driven collector. It’s actually a Pax Logging PaxAppender that listens for any log messages and send the log details into the EventAdmin topic.
  • jmx is a polled collector. Periodically, the Decanter Scheduler executes this collector. It retrieves all attributes of all MBeans in the MBeanServer, and send the JMX metrics into the EventAdmin topic.
  • camel (jmx) is a specific JMX collector configuration, that retrieves the metrics only for the Camel routes MBeans.
  • activemq (jmx) is a specific JMX collector configuration, that retrieves the metrics only for the ActiveMQ MBeans.
  • camel-tracer is a Camel Tracer TraceEventHandler. In your Camel route definition, you can set this trace event handler to the default Camel tracer. Thanks to that, all tracing details (from URI, to URI, exchange with headers, body, etc) will be send into the EventAdmin topic.

Appenders

The Decanter Appenders receives the data harvested by the collectors. They consume OSGi EventAdmin Events from the decanter/collect/* topics.

They are responsible of storing the monitoring data into a backend.

You can install multiple Decanter Appenders in the same time. In the 1.0.0 release, Decanter provides the following appenders:

  • log creates a log message with the monitoring data
  • elasticsearch stores the monitoring data into an Elasticsearch instance
  • jdbc stores the monitoring data into a database
  • jms sends the monitoring data to a JMS broker
  • camel sends the monitoring data to a Camel route

SLA and alerters

Decanter also provides an alerting system when some data doesn’t validate a SLA.

For instance, you can define the maximum acceptable number of threads running in Karaf. If the current number of threads is over the limit, Decanter calls alerters.

Decanter Alerters are a special kind of appenders, consuming events from the OSGi EventAdmin decanter/alert/* topics.

As for the appenders, you can have multiple alerters active at the same time. Decanter 1.0.0 release provides the following alerters:

  • log to create a log message for each alert
  • e-mail to send an e-mail for each alert
  • camel to execute a Camel route for each alert

Let see Decanter in action to have details how to install and use it !

Quick start

Decanter is pretty easy to install and provide “key turn” functionalities.

The first thing to do is to register the Decanter features repository in the Karaf instance:

karaf@root()> feature:repo-add mvn:org.apache.karaf.decanter/apache-karaf-decanter/1.0.0/xml/features

NB: for the next Karaf releases, I will add Decanter features repository in etc/org.apache.karaf.features.repos.cfg, allowing to easily register Decanter features simply using feature:repo-add decanter 1.0.0.

We now have the Decanter features available:

karaf@root()> feature:list |grep -i decanter
decanter-common                 | 1.0.0            |           | karaf-decanter-1.0.0     | Karaf Decanter API                                
decanter-simple-scheduler       | 1.0.0            |           | karaf-decanter-1.0.0     | Karaf Decanter Simple Scheduler                   
decanter-collector-log          | 1.0.0            |           | karaf-decanter-1.0.0     | Karaf Decanter Log Messages Collector             
decanter-collector-jmx          | 1.0.0            |           | karaf-decanter-1.0.0     | Karaf Decanter JMX Collector                      
decanter-collector-camel        | 1.0.0            |           | karaf-decanter-1.0.0     | Karaf Decanter Camel Collector                    
decanter-collector-activemq     | 1.0.0            |           | karaf-decanter-1.0.0     | Karaf Decanter ActiveMQ Collector                 
decanter-collector-camel-tracer | 1.0.0            |           | karaf-decanter-1.0.0     | Karaf Decanter Camel Tracer Collector             
decanter-collector-system       | 1.0.0            |           | karaf-decanter-1.0.0     | Karaf Decanter OS Collector                       
decanter-appender-log           | 1.0.0            |           | karaf-decanter-1.0.0     | Karaf Decanter Log Appender                       
decanter-appender-elasticsearch | 1.0.0            |           | karaf-decanter-1.0.0     | Karaf Decanter Elasticsearch Appender             
decanter-appender-jdbc          | 1.0.0            |           | karaf-decanter-1.0.0     | Karaf Decanter JDBC Appender                      
decanter-appender-jms           | 1.0.0            |           | karaf-decanter-1.0.0     | Karaf Decanter JMS Appender                       
decanter-appender-camel         | 1.0.0            |           | karaf-decanter-1.0.0     | Karaf Decanter Camel Appender                     
decanter-sla                    | 1.0.0            |           | karaf-decanter-1.0.0     | Karaf Decanter SLA support                        
decanter-sla-log                | 1.0.0            |           | karaf-decanter-1.0.0     | Karaf Decanter SLA log alerter                    
decanter-sla-email              | 1.0.0            |           | karaf-decanter-1.0.0     | Karaf Decanter SLA email alerter                  
decanter-sla-camel              | 1.0.0            |           | karaf-decanter-1.0.0     | Karaf Decanter SLA Camel alerter                  
elasticsearch                   | 1.6.0            |           | karaf-decanter-1.0.0     | Embedded Elasticsearch node                       
kibana                          | 3.1.1            |           | karaf-decanter-1.0.0     | Embedded Kibana dashboard

For a quick start, we will use elasticsearch embedded to store the monitoring data. Decanter provides a ready to use elasticsearch feature, starting an embedded elasticsearch node:

karaf@root()> feature:install elasticsearch

The elasticsearch feature installs the elasticsearch configuration: etc/elasticsearch.yml.

We now have a ready to use elasticsearch node, where we will store the monitoring data.

Decanter also provides a kibana feature, providing a ready to use set of kibana dashboards:

karaf@root()> feature:install kibana 


We can now install the Decanter Elasticsearch appender: this appender will get the data harvested by the collectors, and store it in elasticsearch:


karaf@root()> feature:install decanter-appender-elasticsearch

The decanter-appender-elasticsearch feature also installs etc/org.apache.karaf.decanter.appender.elasticsearch.cfg file. You can configure the location of the Elasticsearch node there. By default, it uses a local elasticsearch node, especially the one embedded that we installed with the elasticsearch feature.

The etc/org.apache.karaf.decanter.appender.elasticsearch.cfg file contains hostname, port and clusterName of the elasticsearch instance to use:

################################################
# Decanter Elasticsearch Appender Configuration
################################################

# Hostname of the elasticsearch instance
host=localhost
# Port number of the elasticsearch instance
port=9300
# Name of the elasticsearch cluster
clusterName=elasticsearch

Now, our Decanter appender and elasticsearch node are ready.

It's now time to install some collectors to harvest the data.

Karaf monitoring

First, we install the log collector:

karaf@root()> feature:install decanter-collector-log 

This collector is event-driven and will automatically listen for log events, and send into the EventAdmin collect topic.

We install a second collector: the JMX collector.

karaf@root()> feature:install decanter-collector-jmx

The JMX collector is a polled collector. So, it also installs and starts the Decanter Scheduler.

You can define the call execution period of the scheduler in etc/org.apache.karaf.decanter.scheduler.simple.cfg configuration file. By default, the Decanter Scheduler calls the polled collectors every 5 seconds.

The JMX collector is able to retrieve all metrics (attributes) from multiple MBeanServers.

By default, it uses the etc/org.apache.karaf.decanter.collector.jmx-local.cfg configuration file. This file polls the local MBeanServer.

You can create new configuration files (for instance etc/org.apache.karaf.decanter.collector.jmx-mystuff.cfg configuration file), to poll other remote or local MBeanServers.

The etc/org.apache.karaf.decanter.collector.jmx-*.cfg configuration file contains:

type=jmx-mystuff
url=service:jmx:rmi:///jndi/rmi://hostname:1099/karaf-root
username=karaf
password=karaf
object.name=*.*:*

The type property is a free field allowing you to identify the source of the metrics.

The url property allows you to define the JMX URL. You can also use the local keyword to poll the local MBeanServer.
The username and password allows you to define the username and password to connect to the MBeanServer.

The object.name property is optional. By default, the collector harvests all the MBeans in the server. But you can filter to harvest only some MBeans (for instance org.apache.camel:context=*,type=routes,name=* to harvest only the Camel routes metrics).

Now, we can go in the Decanter Kibana to see the dashboards using the harvested data.

You can access to the Decanter Kibana using http://localhost:8181/kibana.

You have the Decanter Kibana welcome page:

Decanter Kibana

Decanter provides ready to use dashboard. Let see the Karaf Dashboard.

Decanter Kibana Karaf 1

These histograms use the metrics harvested by the JMX collector.

You can also see the log details harvested by the log collector:

Decanter Karaf 2

As Kibana uses Lucene, you can extract exactly the data that you need using filtering or queries.

You can also define the time range to get the metrics and logs.

For instance, you can create the following query to filter only the message coming from Elasticsearch:

loggerName:org.elasticsearch*

Camel monitoring and tracing

We can also use Decanter for the monitoring of the Camel routes that you deploy in Karaf.

For instance, we add Camel in our Karaf instance:

karaf@root()> feature:repo-add camel 2.13.2
Adding feature url mvn:org.apache.camel.karaf/apache-camel/2.13.2/xml/features
karaf@root()> feature:install camel-blueprint

In the deploy, we create the following very simple route (using the route.xml file):

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">

    <camelContext xmlns="http://camel.apache.org/schema/blueprint">
        <route id="test">
            <from uri="timer:fire?period=10000"/>
            <setBody><constant>Hello World</constant></setBody>
            <to uri="log:test"/>
        </route>
    </camelContext>

</blueprint>

Now, in Decanter Kibana, we can go in the Camel dashboard:

Decanter Kibana Camel 1

We can see the histograms here, using the JMX metrics retrieved on the Camel MBeans (especially, we can see for our route the exchanges completed, failed, the last processing time, etc).

You can also see the log messages related to Camel.

Another feature provided by Decanter is a Camel Tracer collector: you can enable the Decanter Camel Tracer to log all exchange state in the backend.

For that, we install the Decanter Camel Tracer feature:

karaf@root()> feature:install decanter-collector-camel-tracer

We update our route.xml in the deploy folder like this:

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">

    <reference id="eventAdmin" interface="org.osgi.service.event.EventAdmin"/>

    <bean id="traceHandler" class="org.apache.karaf.decanter.collector.camel.DecanterTraceEventHandler">
        <property name="eventAdmin" ref="eventAdmin"/>
    </bean>

    <bean id="tracer" class="org.apache.camel.processor.interceptor.Tracer">
        <property name="traceHandler" ref="traceHandler"/>
        <property name="enabled" value="true"/>
        <property name="traceOutExchanges" value="true"/>
        <property name="logLevel" value="OFF"/>
    </bean>

    <camelContext trace="true" xmlns="http://camel.apache.org/schema/blueprint">
        <route id="test">
            <from uri="timer:fire?period=10000"/>
            <setBody><constant>Hello World</constant></setBody>
            <to uri="log:test"/>
        </route>
    </camelContext>

</blueprint>

Now, in Decanter Kibana Camel dashboard, you can see the details in the tracer panel:

Decanter Kibana Camel 2

Decanter Kibana also provides a ready to use ActiveMQ dashboard, using the JMX metrics retrieved from an ActiveMQ broker.

SLA and alerting

Another Decanter feature is the SLA (Service Level Agreement) checking.

The purpose is to check if a harvested data validate a check condition. If not, an alert is created and send to SLA alerters.

We want to send the alerts to two alerters:

  • log to create a log message for each alert (warn log level for serious alerts, error log level for critical alerts)
  • camel to call a Camel route for each alert.

First, we install the decanter-sla-log feature:

karaf@root()> feature:install decanter-sla-log

The SLA checker uses the etc/org.apache.karaf.decanter.sla.checker.cfg configuration file.

Here, we want to throw an alert when the number of threads in Karaf is greater to 60. So in the checker configuration file, we set:

ThreadCount.error=range:[0,60]

The syntax in this file is:

attribute.level=check

where:

  • attribute is the name of the attribute in the harvested data (coming from the collectors).
  • level is the alert level. The two possible values are: warn or error.
  • check is the check expression.

The check expression can be:

  • range for numeric attribute, like range:[x,y]. The alert is thrown if the attribute is out of the range.
  • equal for numeric attribute, like equal:x. The alert is thrown if the attribute is not equal to the value.
  • notequal for numeric attribute, like notequal:x. The alert is thrown if the attribute is equal to the value.
  • match for String attribute, like match:regex. The alert is thrown if the attribute doesn't match the regex.
  • notmatch for String attribute, like nomatch:regex. The alert is thrown if the attribute match the regex.

So, in our case, if the number of threads is greater than 60 (which is probably the case ;)), we can see the following messages in the log:

2015-07-28 22:17:11,950 | ERROR | Thread-44        | Logger                           | 119 - org.apache.karaf.decanter.sla.log - 1.0.0 | DECANTER SLA ALERT: ThreadCount out of pattern range:[0,60]
2015-07-28 22:17:11,951 | ERROR | Thread-44        | Logger                           | 119 - org.apache.karaf.decanter.sla.log - 1.0.0 | DECANTER SLA ALERT: Details: hostName:service:jmx:rmi:///jndi/rmi://localhost:1099/karaf-root | alertPattern:range:[0,60] | ThreadAllocatedMemorySupported:true | ThreadContentionMonitoringEnabled:false | TotalStartedThreadCount:5639 | alertLevel:error | CurrentThreadCpuTimeSupported:true | CurrentThreadUserTime:22000000000 | PeakThreadCount:225 | AllThreadIds:[J@6d9ad2c5 | type:jmx-local | ThreadAllocatedMemoryEnabled:true | CurrentThreadCpuTime:22911917003 | ObjectName:java.lang:type=Threading | ThreadContentionMonitoringSupported:true | ThreadCpuTimeSupported:true | ThreadCount:221 | ThreadCpuTimeEnabled:true | ObjectMonitorUsageSupported:true | SynchronizerUsageSupported:true | alertAttribute:ThreadCount | DaemonThreadCount:198 | event.topics:decanter/alert/error | 

Let's now extend the range, add a new check on the thread, and add a new check to throw alerts when we have errors in the log:

ThreadCount.error=range:[0,600]
ThreadCount.warn=range:[0,300]
loggerLevel.error=match:ERROR

Now, we want to call a Camel route to deal with the alerts.

We create the following Camel route, using the deploy/alert.xml:

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">

        <camelContext xmlns="http://camel.apache.org/schema/blueprint">
                <route id="alerter">
                        <from uri="direct-vm:decanter-alert"/>
                        <to uri="log:alert"/>
                </route>
        </camelContext>

</blueprint>

Now, we can install the decanter-sla-camel feature:

karaf@root()> feature:install decanter-sla-camel

This feature also installs a etc/org.apache.karaf.decanter.sla.camel.cfg configuration file. In this file, you can define the Camel endpoint URI where you want to send the alert:

alert.destination.uri=direct-vm:decanter-alert

Now, let's decrease the thread range in etc/org.apache.karaf.decanter.sla.checker.cfg configuration file to throw some alerts:

ThreadCount.error=range:[0,600]
ThreadCount.warn=range:[0,60]
loggerLevel.error=match:ERROR

Now, in the log, we can see the alerts.

From the SLA log alerter:

2015-07-28 22:39:09,268 | WARN  | Thread-43        | Logger                           | 119 - org.apache.karaf.decanter.sla.log - 1.0.0 | DECANTER SLA ALERT: ThreadCount out of pattern range:[0,60]
2015-07-28 22:39:09,268 | WARN  | Thread-43        | Logger                           | 119 - org.apache.karaf.decanter.sla.log - 1.0.0 | DECANTER SLA ALERT: Details: hostName:service:jmx:rmi:///jndi/rmi://localhost:1099/karaf-root | alertPattern:range:[0,60] | ThreadAllocatedMemorySupported:true | ThreadContentionMonitoringEnabled:false | TotalStartedThreadCount:6234 | alertLevel:warn | CurrentThreadCpuTimeSupported:true | CurrentThreadUserTime:193150000000 | PeakThreadCount:225 | AllThreadIds:[J@28f0ef87 | type:jmx-local | ThreadAllocatedMemoryEnabled:true | CurrentThreadCpuTime:201484424892 | ObjectName:java.lang:type=Threading | ThreadContentionMonitoringSupported:true | ThreadCpuTimeSupported:true | ThreadCount:222 | ThreadCpuTimeEnabled:true | ObjectMonitorUsageSupported:true | SynchronizerUsageSupported:true | alertAttribute:ThreadCount | DaemonThreadCount:198 | event.topics:decanter/alert/warn | 

but also from the SLA Camel alerter:

2015-07-28 22:39:15,293 | INFO  | Thread-41        | alert                            | 114 - org.apache.camel.camel-core - 2.13.2 | Exchange[ExchangePattern: InOnly, BodyType: java.util.HashMap, Body: {hostName=service:jmx:rmi:///jndi/rmi://localhost:1099/karaf-root, alertPattern=range:[0,60], ThreadAllocatedMemorySupported=true, ThreadContentionMonitoringEnabled=false, TotalStartedThreadCount=6236, alertLevel=warn, CurrentThreadCpuTimeSupported=true, CurrentThreadUserTime=193940000000, PeakThreadCount=225, AllThreadIds=[J@408db39f, type=jmx-local, ThreadAllocatedMemoryEnabled=true, CurrentThreadCpuTime=202296849879, ObjectName=java.lang:type=Threading, ThreadContentionMonitoringSupported=true, ThreadCpuTimeSupported=true, ThreadCount=222, event.topics=decanter/alert/warn, ThreadCpuTimeEnabled=true, ObjectMonitorUsageSupported=true, SynchronizerUsageSupported=true, alertAttribute=ThreadCount, DaemonThreadCount=198}]

Decanter also provides the SLA e-mail alerter to send the alerts by e-mail.

Now, you can play with the SLA checker, and add the checks on the attributes that you need. The Decanter Kibana dashboards help a lot there: in the "Event Monitoring" table, you can see all raw harvested data, allowing you to find the attributes.

What's next

It's just the first Decanter release, but I think it's an interesting one.

Now, we are in the process of adding:

  • a new Decanter CXF interceptor collector, thanks to this collector, you will be able to send details about the request/response on CXF endpoints (SOAP-Request, SOAP-Response, REST message, etc).
  • a new Decanter Redis appender, to send the harvested data to Redis
  • a new Decanter Cassandra appender, to send the harvested data to Cassandra
  • a Decanter WebConsole, allowing to easily manipulate the SLA
  • improvement the SLA support with "recovery" support to send only one alert when the check failed, and another alert when the value "recovered"

Anyway, if you have ideas and want to see new features in Decanter, please let us know.

I hope you like Decanter and see interest in this new Karaf project !

MDC logging with Apache Karaf and Camel

August 31, 2014 Posted by jbonofre

MDC (Mapped Diagnostic Context) logging is an interesting feature to log contextual messages.

It’s classic to want to log contextual messages in your application. For instance, we want to log the actions performed by an user (identified by an username or user id). As you have a lot of simultaneous users on your application, it’s easier to “follow” the log.

MDC is supported by several logging frameworks, like log4j or slf4j, and so by Karaf (thanks to pax-logging) as well.
The approach is pretty simple:

  1. You define the context using a key ID and a value for the key:
    MDC.put("userid", "user1");
    
  2. You use the logger as usual, the log messages to this logger will be contextual to the context:
    logger.debug("my message");
    
  3. After that, we can change the context by overriding the key:
    MDC.put("userid", "user2");
    logger.debug("another message");
    

    Or you can remove the key, so to remove the context, and the log will be “global” (not local to a context):

    MDC.remove("userid"); // or MDC.clear() to remove all
    logger.debug("my global message");
    
  4. In the configuration, we can use pattern with %X{key} to log context. A pattern like %X{userid} - %m%n will result to a log file looking like:
    user1 - my message
    user2 - another message
    

In this blog, we will see how to use MDC in different cases (directly in your bundle, generic Karaf OSGi, and in Camel routes.

The source code of the blog post are available on my github: http://github.com/jbonofre/blog-mdc.

Using MDC in your application/bundle

The purpose here is to use slf4j MDC in our bundle and configure Karaf to create one log file per context.

To illustrate this, we will create multiple threads in the bundle, given a different context key for each thread:

package net.nanthrax.blog.mdc;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class MdcExampleBean {

    private Logger logger = LoggerFactory.getLogger(MdcExampleBean.class);

    public void init() throws Exception {
        CycleThread thread1 = new CycleThread("thread1");
        CycleThread thread2 = new CycleThread("thread2");
        CycleThread thread3 = new CycleThread("thread3");
        thread1.start();
        thread2.start();
        thread3.start();
    }

    class CycleThread extends Thread {
        private String mdcContext;
        public CycleThread(String mdcContext) {
            this.mdcContext = mdcContext;
        }
        public void run() {
            MDC.put("threadId", mdcContext);
            for (int i = 0; i < 20; i++) {
                logger.info("Cycle {}", i);
            }
        }
    }

}

After deploying this bundle in Karaf 3.0.1, we can see the log messages:

karaf@root()> bundle:install mvn:net.nanthrax.blog/mdc-bundle/1.0-SNAPSHOT
karaf@root()> log:display
...
2014-08-30 09:44:25,594 | INFO  | Thread-15        | MdcExampleBean                   | 78 - net.nanthrax.blog.mdc-bundle - 1.0.0.SNAPSHOT | Cycle 17
2014-08-30 09:44:25,594 | INFO  | Thread-13        | MdcExampleBean                   | 78 - net.nanthrax.blog.mdc-bundle - 1.0.0.SNAPSHOT | Cycle 19
2014-08-30 09:44:25,594 | INFO  | Thread-15        | MdcExampleBean                   | 78 - net.nanthrax.blog.mdc-bundle - 1.0.0.SNAPSHOT | Cycle 18
2014-08-30 09:44:25,595 | INFO  | Thread-15        | MdcExampleBean                   | 78 - net.nanthrax.blog.mdc-bundle - 1.0.0.SNAPSHOT | Cycle 19

Now, we can setup the Karaf etc/org.ops4j.pax.logging.cfg file to use our MDC. For that, we add a MDCSiftingAppender, providing the threadId as MDC key, and displaying the threadId in the log message pattern. We will create one log file per key (threadId in our case), and finally, we add this appender to the rootLogger:

...
log4j.rootLogger=INFO, out, mdc-bundle, osgi:*
...
# MDC Bundle appender
log4j.appender.mdc-bundle=org.apache.log4j.sift.MDCSiftingAppender
log4j.appender.mdc-bundle.key=threadId
log4j.appender.mdc-bundle.default=unknown
log4j.appender.mdc-bundle.appender=org.apache.log4j.FileAppender
log4j.appender.mdc-bundle.appender.layout=org.apache.log4j.PatternLayout
log4j.appender.mdc-bundle.appender.layout.ConversionPattern=%d | %-5.5p | %X{threadId} | %m%n
log4j.appender.mdc-bundle.appender.file=${karaf.data}/log/mdc-bundle-$\\{threadId\\}.log
log4j.appender.mdc-bundle.appender.append=true
...

Now, in the Karaf data/log folder, we can see:

mdc-bundle-thread1.log
mdc-bundle-thread2.log
mdc-bundle-thread3.log

each file containing the log messages contextual to the thread:

$ cat data/log/mdc-bundle-thread1.log
2014-08-30 09:54:48,287 | INFO  | thread1 | Cycle 0
2014-08-30 09:54:48,298 | INFO  | thread1 | Cycle 1
2014-08-30 09:54:48,298 | INFO  | thread1 | Cycle 2
2014-08-30 09:54:48,299 | INFO  | thread1 | Cycle 3
2014-08-30 09:54:48,299 | INFO  | thread1 | Cycle 4
...
$ cat data/log/mdc-bundle-thread2.log
2014-08-30 09:54:48,287 | INFO  | thread2 | Cycle 0
2014-08-30 09:54:48,298 | INFO  | thread2 | Cycle 1
2014-08-30 09:54:48,298 | INFO  | thread2 | Cycle 2
2014-08-30 09:54:48,299 | INFO  | thread2 | Cycle 3
2014-08-30 09:54:48,299 | INFO  | thread2 | Cycle 4
2014-08-30 09:54:48,299 | INFO  | thread2 | Cycle 5
...

In addition, Karaf “natively” provides OSGi MDC data that we can use.

Using Karaf OSGi MDC

So, in Karaf, you can use directly some OSGi headers for MDC logging, especially the bundle name.

We can use this MDC key to create one log file per bundle.

Karaf already provides a pre-defined appender configuration in etc/org.ops4j.pax.logging.cfg:

...
# Sift appender
log4j.appender.sift=org.apache.log4j.sift.MDCSiftingAppender
log4j.appender.sift.key=bundle.name
log4j.appender.sift.default=karaf
log4j.appender.sift.appender=org.apache.log4j.FileAppender
log4j.appender.sift.appender.layout=org.apache.log4j.PatternLayout
log4j.appender.sift.appender.layout.ConversionPattern=%d{ISO8601} | %-5.5p | %-16.16t | %-32.32c{1} | %m%n
log4j.appender.sift.appender.file=${karaf.data}/log/$\\{bundle.name\\}.log
log4j.appender.sift.appender.append=true
...

The only thing that we have to do is to add this appender to the rootLogger:

log4j.rootLogger=INFO, out, sift, osgi:*

Now, in the Karaf data/log folder, we can see one file per bundle:

data/log$ ls -1
karaf.log
net.nanthrax.blog.mdc-bundle.log
org.apache.aries.blueprint.core.log
org.apache.aries.jmx.core.log
org.apache.felix.fileinstall.log
org.apache.karaf.features.core.log
org.apache.karaf.region.persist.log
org.apache.karaf.shell.console.log
org.apache.sshd.core.log

Especially, we can see our mdc-bundle, containing the log messages “local” to the bundle.

However, if this approach works great, it doesn’t always create interesting log file. For instance, when you use Camel, using OSGi headers for MDC logging will gather most of the log messages into the camel-core bundle log file, so, not really contextual to something or easy to read/seek.

The good news is that Camel also provides MDC logging support.

Using Camel MDC

If Camel provides MDC logging support, it’s not enabled by default. It’s up to you to enable it on the camel context.

Once enabled, Camel provides the following MDC logging properties:

  • camel.exchangeId providing the exchange ID
  • camel.messageId providing the message ID
  • camel.routeId providing the route ID
  • camel.contextId providing the Camel Context ID
  • camel.breadcrumbId providing an unique id used for tracking messages across transports
  • camel.correlationId providing the correlation ID of the exchange (if it’s correlated, for instance like in Splitter EIP)
  • camel.trasactionKey providing the ID of the transaction (for transacted exchange).

To enable the MDC logging, you have to:

  • if you use the Blueprint or Spring XML DSL:
    <camelContext xmlns="http://camel.apache.org/schema/blueprint" useMDCLogging="true">
    
  • if you use the Java DSL:
    CamelContext context = ...
    context.setUseMDCLogging(true);
    
  • using the Talend ESB studio, you have to use a cConfig component from the palette:
    studio1

So, let say, we create the following route using the Blueprint DSL:

<?xml version="1.0" encoding="UTF-8"?> 
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"> 

   <camelContext xmlns="http://camel.apache.org/schema/blueprint" useMDCLogging="true"> 
      <route id="my-route"> 
         <from uri="timer:fire?period=5000"/> 
         <setBody> 
            <constant>Hello Blog</constant> 
         </setBody> 
         <to uri="log:net.nanthrax.blog?level=INFO"/>
      </route>
   </camelContext>
 
</blueprint>

We want to create one log file per route (using the routeId). So, we update the Karaf etc/org.ops4j.pax.logging.cfg file to add a MDC sifting appender using the Camel MDC properties, and we add this appender to the rootLogger:

...
log4j.rootLogger=INFO, out, camel-mdc, osgi:*
...
# Camel MDC appender
log4j.appender.camel-mdc=org.apache.log4j.sift.MDCSiftingAppender
log4j.appender.camel-mdc.key=camel.routeId
log4j.appender.camel-mdc.default=unknown 
log4j.appender.camel-mdc.appender=org.apache.log4j.FileAppender
log4j.appender.camel-mdc.appender.layout=org.apache.log4j.PatternLayout
log4j.appender.camel-mdc.appender.layout.ConversionPattern=%d{ISO8601} | %-5.5p | %-16.16t | %-32.32c{1} | %X{camel.exchangeId} | %m%n
log4j.appender.camel-mdc.appender.file=${karaf.data}/log/camel-$\\{camel.routeId\\}.log
log4j.appender.camel-mdc.appender.append=true
...

The camel-mdc appender will create one log file by route (named camel-(routeId).log). The log messages will contain the exchange ID.

We start Karaf, and after the installation of the camel-blueprint feature, we can drop our route.xml directly in the deploy folder:

karaf@root()> feature:repo-add camel 2.12.1
Adding feature url mvn:org.apache.camel.karaf/apache-camel/2.12.1/xml/features
karaf@root()> feature:install camel-blueprint
cp route.xml apache-karaf-3.0.1/deploy/

Using log:display command in Karaf, we can see the messages for our route:

karaf@root()> log:display

2014-08-31 08:58:24,176 | INFO | 0 – timer://fire | blog | 85 – org.apache.camel.camel-core – 2.12.1 | Exchange[ExchangePattern: InOnly, BodyType: String, Body: Hello Blog]
2014-08-31 08:58:29,176 | INFO | 0 – timer://fire | blog | 85 – org.apache.camel.camel-core – 2.12.1 | Exchange[ExchangePattern: InOnly, BodyType: String, Body: Hello Blog]

Now, if we go into the Karaf data/log folder, we can see the log file for our route:

$ ls -1 data/log
camel-my-route.log
...

If we take a look in the camel-my-route.log file, we can see the messages contextual to the route, including the exchange ID:

2014-08-31 08:58:19,196 | INFO  | 0 - timer://fire | blog                             | ID-latitude-57336-1409468297774-0-2 | Exchange[ExchangePattern: InOnly, BodyType: String, Body: Hello Blog]
2014-08-31 08:58:24,176 | INFO  | 0 - timer://fire | blog                             | ID-latitude-57336-1409468297774-0-4 | Exchange[ExchangePattern: InOnly, BodyType: String, Body: Hello Blog]
2014-08-31 08:58:29,176 | INFO  | 0 - timer://fire | blog                             | ID-latitude-57336-1409468297774-0-6 | Exchange[ExchangePattern: InOnly, BodyType: String, Body: Hello Blog]
2014-08-31 08:58:34,176 | INFO  | 0 - timer://fire | blog                             | ID-latitude-57336-1409468297774-0-8 | Exchange[ExchangePattern: InOnly, BodyType: String, Body: Hello Blog]

Testing (utest and itest) Apache Camel Blueprint route

August 28, 2014 Posted by jbonofre

In any integration project, testing is vital for multiple reasons:

  • to guarantee that the integration logic matches the expectations
  • to quickly identify some regression issues
  • to test some special cases, like the errors for instance
  • to validate the succesful provisioning (deployment) on a runtime as close as possible to the target platform

We distinguish two kinds of tests:

  • the unit tests (utest) aim to test the behaviors of integration logic, and define the expectations that the logic has to match
  • the integration tests (itest) aim to provision the integration logic artifact to a runtime, and check the behaviors on the actual platform

Camel is THE framework to implement your integration logic (mediation).

It provides the Camel Test Kit, based on JUnit to implement utest. In combinaison with Karaf and Pax Exam, we can cover both utest and itest.

In this blog, we will:

  • create an OSGi service
  • create a Camel route using the Blueprint DSL, using the previously created OSGi service
  • implement the utest using the Camel Blueprint Test
  • implement the itest using Pax Exam and Karaf

You can find the whole source code used for this blog post on my github: https://github.com/jbonofre/blog-camel-blueprint.

Blueprint Camel route and features

We create a project (using Maven) containing the following modules:

  • my-service is the OSGi bundle providing the service that we will use in the Camel route
  • my-route is the OSGi bundle providing the Camel route, using the Blueprint DSL. This route uses the OSGi service provided by my-service. It’s where we will implement the utest.
  • features packages the OSGi bundles as a Karaf features XML, ready to be deployed.
  • itests contains the integration test (itest) leveraging Karaf and Pax Exam.

It means we have the following parent pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>net.nanthrax.blog</groupId>
    <artifactId>net.nanthrax.blog.camel.route.blueprint</artifactId>
    <name>Nanthrax Blog :: Camel :: Blueprint</name>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>

    <modules>
        <module>my-service</module>
        <module>my-route</module>
        <module>features</module>
        <module>itests</module>
    </modules>

</project>

OSGi service

The my-service Maven module provides an OSGi bundle providing an echo service.

It uses the following Maven POM:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>net.nanthrax.blog</groupId>
        <artifactId>net.nanthrax.blog.camel.route.blueprint</artifactId>
        <version>1.0-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>

    <artifactId>net.nanthrax.blog.camel.route.blueprint.service</artifactId>
    <name>Nanthrax Blog :: Camel :: Blueprint :: Service</name>
    <packaging>bundle</packaging>

    <dependencies>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.5</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.felix</groupId>
                <artifactId>maven-bundle-plugin</artifactId>
                <version>2.4.0</version>
                <extensions>true</extensions>
                <configuration>
                    <instructions>
                        <Export-Package>
                            net.nanthrax.blog.service
                        </Export-Package>
                        <Import-Package>
                            org.slf4j*;resolution:=optional
                        </Import-Package>
                        <Private-Package>
                            net.nanthrax.blog.service.internal
                        </Private-Package>
                    </instructions>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

The echo service is described by the net.nanthrax.blog.service.EchoService interface:

package net.nanthrax.blog.service;

public interface EchoService {

    public String echo(String message);

}

We expose the package containing this interface using OSGi Export-Package header.

The implementation of the EchoService is hidden using the OSGi Private-Package header. This implementation is very simple, it gets a message and return the same message with the “Echoing ” prefix:

package net.nanthrax.blog.service.internal;

import net.nanthrax.blog.service.EchoService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EchoServiceImpl implements EchoService {

    private final static Logger LOGGER = LoggerFactory.getLogger(EchoServiceImpl.class);

    public String echo(String message) {
        return "Echoing " + message;
    }

}

To expose this service in OSGi, we use blueprint. We create the blueprint descriptor in src/main/resources/OSGI-INF/blueprint/blueprint.xml:

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">

    <service interface="net.nanthrax.blog.service.EchoService">
        <bean class="net.nanthrax.blog.service.internal.EchoServiceImpl"/>
    </service>

</blueprint>

The Camel route will use this Echo service.

Camel route and utest

We use the Camel Blueprint DSL to design the route.

The route is packaged as an OSGi bundle, in the my-route Maven module, using the following pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>net.nanthrax.blog</groupId>
        <artifactId>net.nanthrax.blog.camel.route.blueprint</artifactId>
        <version>1.0-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>

    <artifactId>net.nanthrax.blog.camel.route.blueprint.myroute</artifactId>
    <name>Nanthrax Blog :: Camel :: Blueprint :: My Route</name>
    <packaging>bundle</packaging>

    <dependencies>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-test-blueprint</artifactId>
            <version>2.12.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.5</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>net.nanthrax.blog</groupId>
            <artifactId>net.nanthrax.blog.camel.route.blueprint.service</artifactId>
            <version>1.0-SNAPSHOT</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.felix</groupId>
                <artifactId>maven-bundle-plugin</artifactId>
                <version>2.4.0</version>
                <extensions>true</extensions>
                <configuration>
                    <instructions>
                        <Import-Package>
                            net.nanthrax.blog.service
                        </Import-Package>
                    </instructions>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

The src/main/resources/OSGI-INF/blueprint/route.xml contains the route definition:

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">

    <reference id="myService" interface="net.nanthrax.blog.service.EchoService"/>

    <camelContext xmlns="http://camel.apache.org/schema/blueprint">
        <route>
            <from uri="timer:fire?period=5000"/>
            <setBody>
                <constant>Hello Blog</constant>
            </setBody>
            <to uri="bean:myService"/>
            <to uri="log:net.nanthrax.blog.route"/>
            <to uri="file:camel-output"/>
        </route>
    </camelContext>

</blueprint>

This route:

  • creates an exchange every 5 secondes, using a Camel timer
  • we set the body of the “in” message in the exchange to “Hello Blog”
  • the message is sent to the EchoService, which prefix the message with “Echoing”, resulting to an updated message containing “Echoing Hello Blog”
  • we log the exchange
  • we create a file for each exchange, in the camel-output folder, using the Camel file component

We are now to create the utest for this route.

As this route uses Blueprint, and Blueprint is an OSGi specific technology, normally, we would have to deploy the route on Karaf to test it.

However, thanks to Camel Blueprint Test and the use of PojoSR, we can test the Blueprint route “outside” of OSGi. Camel Blueprint Test also supports a mock of the OSGi service registry, allowing to mock the OSGi service as well.

Basically, in the unit test, we:

  • load the route Blueprint XML by overridding the getBlueprintDescriptor() method
  • mock the timer and file endpoints by overridding the isMockEndpointsAndSkip() method (skip means that we don’t send the message to the actual endpoint)
  • mock the Echo OSGi service by overriding the addServicesOnStartup() method
  • finally implement a test in the testMyRoute() method

The test itself get the mocked file endpoint, and define the expectations on this endpoint: we expect one message containing “Echoing Hello Blog” on the file endpoint.
Instead of using the actual timer endpoint, we mock it and we use the producer template to send an exchange (in order to control the number of created exchange).
Finally, we check if the expectations are satisfied on the mocked file endpoint.

package net.nanthrax.blog;

import net.nanthrax.blog.service.EchoService;
import net.nanthrax.blog.service.internal.EchoServiceImpl;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.language.ConstantExpression;
import org.apache.camel.test.blueprint.CamelBlueprintTestSupport;
import org.apache.camel.util.KeyValueHolder;
import org.junit.Test;

import java.util.Dictionary;
import java.util.Map;

public class MyRouteTest extends CamelBlueprintTestSupport {

    @Override
    protected String getBlueprintDescriptor() {
        return "OSGI-INF/blueprint/route.xml";
    }

    @Override
    public String isMockEndpointsAndSkip() {
        return "((file)|(timer)):(.*)";
    }

    @Override
    protected void addServicesOnStartup(Map<String, KeyValueHolder<Object, Dictionary>> services) {
        KeyValueHolder serviceHolder = new KeyValueHolder(new EchoServiceImpl(), null);
        services.put(EchoService.class.getName(), serviceHolder);
    }

    @Test
    public void testMyRoute() throws Exception {

        // mocking the file endpoint and define the expectation
        MockEndpoint mockEndpoint = getMockEndpoint("mock:file:camel-output");
        mockEndpoint.expectedMessageCount(1);
        mockEndpoint.expectedBodiesReceived("Echoing Hello Blog");

        // send a message at the timer endpoint level
        template.sendBody("mock:timer:fire", "empty");

        // check if the expectation is satisfied
        assertMockEndpointsSatisfied();
    }

}

We can see that we mock the Echo OSGi service using the actual EchoServiceImpl. However, of course, it’s possible to use your own local test implementation of the EchoService. It’s interesting to test some use cases, or to simulate errors.

We can note that we use a regex (((file)|(timer)):(.*)) to mock both timer and file endpoints.

We load the route.xml blueprint descriptor directly from the bundle location (OSGI-INF/blueprint/route.xml).

We can run mvn to test the route:

my-route$ mvn clean install
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Nanthrax Blog :: Camel :: Blueprint :: My Route 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.4.1:clean (default-clean) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 1 resource
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/jbonofre/Workspace/blog-camel-blueprint/my-route/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[INFO] Changes detected - recompiling the module!
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 1 source file to /home/jbonofre/Workspace/blog-camel-blueprint/my-route/target/test-classes
[WARNING] /home/jbonofre/Workspace/blog-camel-blueprint/my-route/src/test/java/net/nanthrax/blog/MyRouteTest.java: /home/jbonofre/Workspace/blog-camel-blueprint/my-route/src/test/java/net/nanthrax/blog/MyRouteTest.java uses unchecked or unsafe operations.
[WARNING] /home/jbonofre/Workspace/blog-camel-blueprint/my-route/src/test/java/net/nanthrax/blog/MyRouteTest.java: Recompile with -Xlint:unchecked for details.
[INFO]
[INFO] --- maven-surefire-plugin:2.17:test (default-test) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[INFO] Surefire report directory: /home/jbonofre/Workspace/blog-camel-blueprint/my-route/target/surefire-reports

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running net.nanthrax.blog.MyRouteTest
[main] INFO org.apache.camel.test.blueprint.CamelBlueprintHelper - Using Blueprint XML file: /home/jbonofre/Workspace/blog-camel-blueprint/my-route/target/classes/OSGI-INF/blueprint/route.xml
Aug 28, 2014 2:57:43 PM org.ops4j.pax.swissbox.tinybundles.core.metadata.RawBuilder run
INFO: Copy thread finished.
[main] INFO org.apache.camel.impl.osgi.Activator - Camel activator starting
[main] INFO org.apache.camel.impl.osgi.Activator - Camel activator started
[main] INFO org.apache.aries.blueprint.container.BlueprintExtender - No quiesce support is available, so blueprint components will not participate in quiesce operations
[main] INFO net.nanthrax.blog.MyRouteTest - ********************************************************************************
[main] INFO net.nanthrax.blog.MyRouteTest - Testing: testMyRoute(net.nanthrax.blog.MyRouteTest)
[main] INFO net.nanthrax.blog.MyRouteTest - ********************************************************************************
[main] INFO net.nanthrax.blog.MyRouteTest - Skipping starting CamelContext as system property skipStartingCamelContext is set to be true.
[main] INFO org.apache.camel.blueprint.BlueprintCamelContext - Apache Camel 2.12.1 (CamelContext: 23-camel-3) is starting
[main] INFO org.apache.camel.management.DefaultManagementStrategy - JMX is disabled
[main] INFO org.apache.camel.impl.InterceptSendToMockEndpointStrategy - Adviced endpoint [timer://fire?period=5000] with mock endpoint [mock:timer:fire]
[main] INFO org.apache.camel.impl.InterceptSendToMockEndpointStrategy - Adviced endpoint [file://camel-output] with mock endpoint [mock:file:camel-output]
[main] INFO org.apache.camel.blueprint.BlueprintCamelContext - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
[main] INFO org.apache.camel.blueprint.BlueprintCamelContext - Route: route1 started and consuming from: Endpoint[timer://fire?period=5000]
[main] INFO org.apache.camel.blueprint.BlueprintCamelContext - Total 1 routes, of which 1 is started.
[main] INFO org.apache.camel.blueprint.BlueprintCamelContext - Apache Camel 2.12.1 (CamelContext: 23-camel-3) started in 0.069 seconds
[main] INFO org.apache.camel.component.mock.MockEndpoint - Asserting: Endpoint[mock://file:camel-output] is satisfied
[Camel (23-camel-3) thread #0 - timer://fire] INFO net.nanthrax.blog.route - Exchange[ExchangePattern: InOnly, BodyType: String, Body: Echoing Hello Blog]
[main] INFO org.apache.camel.component.mock.MockEndpoint - Asserting: Endpoint[mock://timer:fire] is satisfied
[main] INFO net.nanthrax.blog.MyRouteTest - ********************************************************************************
[main] INFO net.nanthrax.blog.MyRouteTest - Testing done: testMyRoute(net.nanthrax.blog.MyRouteTest)
[main] INFO net.nanthrax.blog.MyRouteTest - Took: 1.094 seconds (1094 millis)
[main] INFO net.nanthrax.blog.MyRouteTest - ********************************************************************************
[main] INFO org.apache.camel.blueprint.BlueprintCamelContext - Apache Camel 2.12.1 (CamelContext: 23-camel-3) is shutting down
[main] INFO org.apache.camel.impl.DefaultShutdownStrategy - Starting to graceful shutdown 1 routes (timeout 10 seconds)
[Camel (23-camel-3) thread #1 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Route: route1 shutdown complete, was consuming from: Endpoint[timer://fire?period=5000]
[main] INFO org.apache.camel.impl.DefaultShutdownStrategy - Graceful shutdown of 1 routes completed in 0 seconds
[main] INFO org.apache.camel.blueprint.BlueprintCamelContext - Apache Camel 2.12.1 (CamelContext: 23-camel-3) uptime 1.117 seconds
[main] INFO org.apache.camel.blueprint.BlueprintCamelContext - Apache Camel 2.12.1 (CamelContext: 23-camel-3) is shutdown in 0.021 seconds
[main] INFO org.apache.aries.blueprint.container.BlueprintExtender - Destroying BlueprintContainer for bundle MyRouteTest
[main] INFO org.apache.aries.blueprint.container.BlueprintExtender - Destroying BlueprintContainer for bundle net.nanthrax.blog.camel.route.blueprint.service
[main] INFO org.apache.aries.blueprint.container.BlueprintExtender - Destroying BlueprintContainer for bundle org.apache.aries.blueprint
[main] INFO org.apache.aries.blueprint.container.BlueprintExtender - Destroying BlueprintContainer for bundle org.apache.camel.camel-blueprint
[main] INFO org.apache.camel.impl.osgi.Activator - Camel activator stopping
[main] INFO org.apache.camel.impl.osgi.Activator - Camel activator stopped
[main] INFO org.apache.camel.test.blueprint.CamelBlueprintHelper - Deleting work directory target/bundles/1409230663118
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2.581 sec - in net.nanthrax.blog.MyRouteTest

Results :

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0

[INFO] 
[INFO] --- maven-bundle-plugin:2.4.0:bundle (default-bundle) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[WARNING] Bundle net.nanthrax.blog:net.nanthrax.blog.camel.route.blueprint.myroute:bundle:1.0-SNAPSHOT : Unused Private-Package instructions, no such package(s) on the class path: [!*]
[INFO] 
[INFO] --- maven-install-plugin:2.5.1:install (default-install) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[INFO] Installing /home/jbonofre/Workspace/blog-camel-blueprint/my-route/target/net.nanthrax.blog.camel.route.blueprint.myroute-1.0-SNAPSHOT.jar to /home/jbonofre/.m2/repository/net/nanthrax/blog/net.nanthrax.blog.camel.route.blueprint.myroute/1.0-SNAPSHOT/net.nanthrax.blog.camel.route.blueprint.myroute-1.0-SNAPSHOT.jar
[INFO] Installing /home/jbonofre/Workspace/blog-camel-blueprint/my-route/pom.xml to /home/jbonofre/.m2/repository/net/nanthrax/blog/net.nanthrax.blog.camel.route.blueprint.myroute/1.0-SNAPSHOT/net.nanthrax.blog.camel.route.blueprint.myroute-1.0-SNAPSHOT.pom
[INFO] 
[INFO] --- maven-bundle-plugin:2.4.0:install (default-install) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[INFO] Installing net/nanthrax/blog/net.nanthrax.blog.camel.route.blueprint.myroute/1.0-SNAPSHOT/net.nanthrax.blog.camel.route.blueprint.myroute-1.0-SNAPSHOT.jar
[INFO] Writing OBR metadata
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 6.906s
[INFO] Finished at: Thu Aug 28 14:57:47 CEST 2014
[INFO] Final Memory: 22M/557M
[INFO] ------------------------------------------------------------------------

Again, the purpose of the utest is to test the behaviors of the route: check if the message content is what we expect, if the message arrives on the expected endpoint, etc.

Karaf features and itests

The purpose of the itest is not really to test the behavior of the route: it’s more to test if the provisioning (deployment) of the route is OK, if the route starts without problem, and, when possible, if the “default” behavior is what we expect.

If it’s possible to deploy bundle per bundle (first the one providing the Echo service, and after the one providing the route), with Karaf, it’s largely easier to create a features XML.

It’s what we do in the features Maven module, grouping the bundles in two features as show in the following features XML:

<?xml version="1.0" encoding="UTF-8"?>
<features name="blog-camel-blueprint" xmlns="http://karaf.apache.org/xmlns/features/v1.0.0">

    <feature name="blog-camel-blueprint-service" version="${project.version}">
        <bundle>mvn:net.nanthrax.blog/net.nanthrax.blog.camel.route.blueprint.service/${project.version}</bundle>
    </feature>

    <feature name="blog-camel-blueprint-route" version="${project.version}">
        <feature>blog-camel-blueprint-service</feature>
        <bundle>mvn:net.nanthrax.blog/net.nanthrax.blog.camel.route.blueprint.myroute/${project.version}</bundle>
    </feature>

</features>

Now, we can use Pax Exam to implement our itests, by:

  • bootstrap a Karaf container, where we deploy the camel-blueprint, and our features
  • test if the provisioning is OK
  • create a local route to test the output of my-route

We do that in the itests Maven module, where we define the Pax Exam dependency:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>net.nanthrax.blog</groupId>
        <artifactId>net.nanthrax.blog.camel.route.blueprint</artifactId>
        <version>1.0-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>

    <artifactId>itests</artifactId>

    <dependencies>

        <dependency>
            <groupId>net.nanthrax.blog</groupId>
            <artifactId>camel-blueprint</artifactId>
            <version>1.0-SNAPSHOT</version>
            <classifier>features</classifier>
            <type>xml</type>
            <scope>test</scope>
        </dependency>

        <!-- Pax Exam -->
        <dependency>
            <groupId>org.ops4j.pax.exam</groupId>
            <artifactId>pax-exam-container-karaf</artifactId>
            <version>3.4.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.ops4j.pax.exam</groupId>
            <artifactId>pax-exam-junit4</artifactId>
            <version>3.4.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.ops4j.pax.exam</groupId>
            <artifactId>pax-exam-inject</artifactId>
            <version>3.4.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.geronimo.specs</groupId>
            <artifactId>geronimo-atinject_1.0_spec</artifactId>
            <version>1.0</version>
            <scope>test</scope>
        </dependency>

        <!-- Camel Test -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-test</artifactId>
            <version>2.12.1</version>
            <scope>test</scope>
        </dependency>

        <!-- Karaf -->
        <dependency>
            <groupId>org.apache.karaf</groupId>
            <artifactId>apache-karaf</artifactId>
            <version>2.3.6</version>
            <type>tar.gz</type>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.karaf</groupId>
                    <artifactId>org.apache.karaf.client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>

</project>

We create MyRouteTest in src/test/java/net/nanthrax/blog/itests:

package net.nanthrax.blog.itests;

import static org.ops4j.pax.exam.CoreOptions.maven;
import static org.ops4j.pax.exam.karaf.options.KarafDistributionOption.*;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.language.ConstantExpression;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.karaf.features.FeaturesService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.Configuration;
import org.ops4j.pax.exam.Option;
import org.ops4j.pax.exam.junit.PaxExam;
import org.ops4j.pax.exam.karaf.options.LogLevelOption;
import org.osgi.framework.BundleContext;

import javax.inject.Inject;

import java.io.File;

@RunWith(PaxExam.class)
public class MyRouteTest extends CamelTestSupport {

    @Inject
    protected FeaturesService featuresService;

    @Inject
    protected BundleContext bundleContext;

    @Configuration
    public static Option[] configure() throws Exception {
        return new Option[] {
                karafDistributionConfiguration()
                        .frameworkUrl(maven().groupId("org.apache.karaf").artifactId("apache-karaf").type("tar.gz").version("2.3.6"))
                        .karafVersion("2.3.6")
                        .useDeployFolder(false)
                        .unpackDirectory(new File("target/paxexam/unpack")),
                logLevel(LogLevelOption.LogLevel.WARN),
                features(maven().groupId("org.apache.camel.karaf").artifactId("apache-camel").type("xml").classifier("features").version("2.12.1"), "camel-blueprint", "camel-test"),
                features(maven().groupId("net.nanthrax.blog").artifactId("camel-blueprint").type("xml").classifier("features").version("1.0-SNAPSHOT"), "blog-camel-blueprint-route"),
                keepRuntimeFolder()
        };
    }

    @Test
    public void testProvisioning() throws Exception {
        // first check that the features are installed
        assertTrue(featuresService.isInstalled(featuresService.getFeature("camel-blueprint")));
        assertTrue(featuresService.isInstalled(featuresService.getFeature("blog-camel-blueprint-route")));

        // now we check if the OSGi services corresponding to the camel context and route are there

    }

    @Test
    public void testMyRoute() throws Exception {
        MockEndpoint itestMock = getMockEndpoint("mock:itest");
        itestMock.expectedMinimumMessageCount(3);
        itestMock.whenAnyExchangeReceived(new Processor() {
            public void process(Exchange exchange) {
                System.out.println(exchange.getIn().getBody(String.class));
            }
        });

        template.start();

        Thread.sleep(20000);

        assertMockEndpointsSatisfied();
    }

    @Override
    protected RouteBuilder createRouteBuilder() {
        return new RouteBuilder() {
            public void configure() {
                from("file:camel-output").to("mock:itest");
            }
        };
    }

}

In this test class, we can see:

  • the configure() method where we define the Karaf distribution to use, the log level, the Camel features XML location and the Camel features that we want to install (camel-blueprint and camel-test), the location of our features XML and the feature that we want to install (blog-camel-blueprint-route)
  • the testProvisioning() method where we check if the features have been correctly installed
  • the createRouteBuilder() method where we programmatically create a new route (using the Java DSL here) consuming the files created by my-route and sending to a mock endpoint
  • the testMyRoute() gets the itest mock endpoint (from the route created by the createRouteBuilder() method) and check that it receives at least 3 messages, during an update of 20 secondes (and also display the content of the message)

Running mvn, it bootstraps a Karaf instance, install the features, deploy our test bundle, and check the execution:

itests$ mvn clean install
...
-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running net.nanthrax.blog.itests.MyRouteTest
[org.ops4j.pax.exam.spi.DefaultExamSystem] : Pax Exam System (Version: 3.4.0) created.
[org.ops4j.store.intern.TemporaryStore] : Storage Area is /tmp/1409248259083-0
[org.ops4j.pax.exam.junit.impl.ProbeRunner] : creating PaxExam runner for class net.nanthrax.blog.itests.MyRouteTest
...
[org.ops4j.pax.exam.karaf.container.internal.KarafTestContainer] : Test Container started in 3 millis
[org.ops4j.pax.exam.karaf.container.internal.KarafTestContainer] : Wait for test container to finish its initialization [ RelativeTimeout value = 180000 ]
[org.ops4j.pax.exam.rbc.client.RemoteBundleContextClient] : Waiting for remote bundle context.. on 21414 name: 7cd8df34-0ed2-4449-8d60-d51f395cfa1d timout: [ RelativeTimeout value = 180000 ]
        __ __                  ____
       / //_/____ __________ _/ __/
      / ,<  / __ `/ ___/ __ `/ /_
     / /| |/ /_/ / /  / /_/ / __/
    /_/ |_|\__,_/_/   \__,_/_/

  Apache Karaf (2.3.6)

Hit '<tab>' for a list of available commands
and '[cmd] --help' for help on a specific command.
Hit '<ctrl-d>' or type 'osgi:shutdown' or 'logout' to shutdown Karaf.

karaf@root> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[org.ops4j.pax.exam.rbc.client.RemoteBundleContextClient] : Remote bundle context found after 5774 millis
[org.ops4j.pax.tinybundles.core.intern.RawBuilder] : make()
[org.ops4j.store.intern.TemporaryStore] : Enter store()
[org.ops4j.pax.tinybundles.core.intern.RawBuilder] : Creating manifest from added headers.
...
[org.ops4j.pax.exam.container.remote.RBCRemoteTarget] : Installed bundle (from stream) as ID: 102
[org.ops4j.pax.exam.container.remote.RBCRemoteTarget] : call [[TestAddress:PaxExam-d7899c82-74e1-445e-9fcb-ab9b18e286b4 root:PaxExam-5dfb0f4b-96d9-4226-bdea-5b057e7e7335]]
Echoing Hello Blog
Echoing Hello Blog
Echoing Hello Blog
Echoing Hello Blog
...
Results :

Tests run: 2, Failures: 0, Errors: 0, Skipped: 0

[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ itests ---
[WARNING] JAR will be empty - no content was marked for inclusion!
[INFO] Building jar: /home/jbonofre/Workspace/blog-camel-blueprint/itests/target/itests-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-install-plugin:2.3.1:install (default-install) @ itests ---
[INFO] Installing /home/jbonofre/Workspace/blog-camel-blueprint/itests/target/itests-1.0-SNAPSHOT.jar to /home/jbonofre/.m2/repository/net/nanthrax/blog/itests/1.0-SNAPSHOT/itests-1.0-SNAPSHOT.jar
[INFO] Installing /home/jbonofre/Workspace/blog-camel-blueprint/itests/pom.xml to /home/jbonofre/.m2/repository/net/nanthrax/blog/itests/1.0-SNAPSHOT/itests-1.0-SNAPSHOT.pom
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 35.904s
[INFO] Finished at: Thu Aug 28 19:51:32 CEST 2014
[INFO] Final Memory: 28M/430M
[INFO] ------------------------------------------------------------------------

Integration in Jenkins

We can now integrate our project in Jenkins CI. We now have a complete CI covering, build of the service, packaging of the route, utest on the route, itest of the service and route in Karaf.

jenkins1

jenkins2

jenkins3

Talend ESB Continous Integration, part1: Using Camel Test Kit

October 17, 2013 Posted by jbonofre

Introduction

In this serie of articles, I will show how to setup a Continuous Integration solution mixing Talend ESB tools, Maven, and Jenkins.

The purpose is to decouple the design (performed in the studio), the tests (both unit and integration tests), and the deployment of the artifacts.

The developers that use the studio should never directly upload to the Maven repository (Archiva in my case).

I propose to implement the following steps:

  1. the developers use the studio to design their routes: the metadata (used to generate the code) are stored in the subversion. The studio “only” checkouts and commits on subversion: it never directly upload to the artifact repository.
  2. a continuous integration tool (Jenkins in my case) uses Maven. The Maven POM leverages the Talend commandline (a studio without the GUI) to checkout, generate the code, and publish to the artifact repository. The Maven POM is also used to execute unit tests, eventually integration tests, and cleanly cut off the releases.
  3. the Talend runtimes (Karaf) deploy (using JMX or Talend Administration Center) the routes from the artifact repositories.

With this approach, we have a much cleaner isolation of concerns and tasks.

To demonstrate, I used Talend Enterprise edition 5.3.1, but you can do the same using the Open Studio edition.

In this first part, I will show how to use the Camel Test Kit with routes designed by the Talend studio, and how to periodically execute these tests using Jenkins.
To simplify, I will directly publish the routes on Archiva (my artifacts repository) using the studio. As I said before, it should not be done this way: only the Talend commandline (called from Jenkins) should be able to upload to the artifacts repository.

Camel Test Kit benefits

There are multiple reaons to use the Camel Test Kit and to write unit tests “outside” of the Talend studio:

  • it’s a step forward to continuous integration: the unit tests can be periodically executed by Jenkins. Thanks to that, it’s a good way to detect regressions: some changes performed in the studio may break the routes and so the unit tests.
  • it allows you to test components that you can’t run in the studio: for instance, you can’t run routes using vm component directly in the studio (you can but it’s not really useful). Thanks to mock and producer template, we can test the route and the vm endpoints.
  • it allows you to test even if you don’t have the actual dependent systems: in your route, you will probably use endpoints like CXF (for WebServices), file, FTP, JMS/ActiveMQ, etc. It’s not always easy to test route using such components directly in the studio: you may not want to really communicate with a FTP server, or to create local filesystem, etc. The Camel Test Kit allows you to mock some endpoints and mimic the actual endpoint without having really it.
  • Simulate errors: most of the time, in the studio, you test the “happy path”. But, especially when you use “custom” error handling, you may want to see if your error hanlder reacts correctly. The mock component is a good way to generate errors.

Talend Studio for the design

In the Talend Studio, using the Mediation perspective, you can design Camel routes.

The Studio should be used only for the design: not the deployment, the tests, or the releases (even if you can do all in the studio ;)).

Using the Mediation perspective, I created a simple route:

Talend Studio screenshot

We have two routes in this design:

  • from(“vm:start”).to(“log:cLog1”).to(“direct:start”)
  • from(“direct:start”).to(“log:cLog2”).choice().when(simple(“${in.header.type} == ‘region'”)).to(“vm:region”).otherwise().to(“vm:zipcode”)
  • a DeadLetter ErrorHandler which catch any exception and send to vm:errorhandling

The first step is to publish the route on the artifact repository (Apache Archiva or Sonatype Nexus for instance). You configure the location of the artifact repository in the Talend preferences of the studio.

A right click on the route show a menu containing the “Publish” button: it will upload (deploy) the route to the artifact repository. The “Publish” button is available only in Enterprise edition. If you use the OpenStudio edition, you have to export the route as a kar file, explode the kar file and use the Maven deploy plugin to upload to the artifact repository.

The publish window allows you to define the Maven groupId, artifactId, version, etc.

The route jar file (which is an OSGi bundle) contains two “special jar files” that you have to upload to the artifact repository. This step has to be done only one time per Talend studio version. The jar files are located into the lib folder of the route jar, so you can do:

jar xvf ShowUnitTest-0.1.0-SNAPSHOT.jar lib
mvn deploy:deploy-file -DgroupId=org.talend -DartifactId=systemRoutines -Dversion=5.3.1 -Dfile=lib/systemRoutines.jar -Dpackaging=jar -Durl=http://tadmin:tadmin@localhost:8082/archiva/repository/repo-release/
mvn deploy:deploy-file -DgroupId=org.talend -DartifactId=userBeans -Dversion=5.3.1 -Dfile=lib/userBeans.jar -Dpackaging=jar -Durl=http://tadmin:tadmin@localhost:8082/archiva/repository/repo-release/

NB: if systemRoutines artifact doesn’t really change, the userBeans artifact should be uploaded “per route” and updated when you modify or create a new bean that you use in your route.

We have now all the artifacts on our artifact repository to create the unit tests.

Using Camel Test Kit

The Camel Test (provided by the camel-test.jar) provides:

  • JUnit extensions: you can create very easily unit tests by extend the CamelTestSupport and CamelSpringTestSupport abstract classes
  • Producer/Consumer template: you can “inject” exchanges/messages at any point of a route. It allows you to test exactly a route at a given point, and create messages which mimic the actual messages
  • Mock component: you can mock actual endpoints, simulate errors, and set expectations on the mock.

Now, we can create a Maven project that will gather our unit tests. We start by creating the POM:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>utests</artifactId>
    <version>0.1.0-SNAPSHOT</version>

    <properties>
        <camel.version>2.10.4</camel.version>
        <talend.version>5.3.1</talend.version>
        <commandline.path>/home/jbonofre/Talend/Talend-Studio-r104014-V5.3.1</commandline.path>
    </properties>

    <repositories>
        <repository>
            <id>local.archiva.snapshot</id>
            <name>Local Maven Archiva for Snapshots</name>
            <url>http://localhost:8082/archiva/repository/repo-snapshot/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>local.archiva.release</id>
            <name>Local Maven Archiva for Releases</name>
            <url>http://localhost:8082/archiva/repository/repo-release/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.example</groupId>
            <artifactId>ShowUnitTest</artifactId>
            <version>${project.version}</version>
            <scope>test</scope>
        </dependency>

        <!-- Talend dependencies -->
        <dependency>
            <groupId>org.talend</groupId>
            <artifactId>systemRoutines</artifactId>
            <version>${talend.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.talend</groupId>
            <artifactId>userBeans</artifactId>
            <version>${talend.version}</version>
            <scope>test</scope>
        </dependency>

        <!-- Camel dependencies -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-test-spring</artifactId>
            <version>${camel.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-jdk14</artifactId>
            <version>1.6.6</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

On this Maven POM, we can note:

  • We define the Maven artifact repositories (in my case Apache Archiva) location in the <repositories> element.
  • The first <dependencies> is the route jar file itself.
  • We define the “Talend” dependencies, especially systemRoutines and userBeans.
  • Finally, we define the “Camel” dependencies: the Camel Test Kit itself, and a slf4j provider to have the log messages during the execution of the unit tests.

We are now ready to write the unit test itself. To do so, we create the src/test/java folder. In this folder, we create directly the unit test class. In my case, I create the ShowUnitTestTest class:

package test.showunittest_0_1;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.CamelTestSupport;
import org.junit.Test;

import java.io.IOException;

/**
 * Test on the ShowUnitTest routes
 */
public class ShowUnitTestTest extends CamelTestSupport {

    @Override
    public String isMockEndpoints() {
        return "*";
    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        ShowUnitTest route = new ShowUnitTest();
        route.initUriMap();
        return route;
    }

    @Test
    public void testRegionRouting() throws Exception {
        MockEndpoint regionMock = getMockEndpoint("mock:vm:region");
        MockEndpoint zipcodeMock = getMockEndpoint("mock:vm:zipcode");

        // we expect to receive one message on the JMS queue:region, and no message on the JMS queue:zipcode
        regionMock.setExpectedMessageCount(1);
        zipcodeMock.setExpectedMessageCount(0);

        // send a message with the region header
        template.sendBodyAndHeader("vm:start", "Foobar", "type", "region");

        // check the assertion
        assertMockEndpointsSatisfied();
    }

    @Test
    public void testZipCodeRouting() throws Exception {
        MockEndpoint regionMock = getMockEndpoint("mock:vm:region");
        MockEndpoint zipcodeMock = getMockEndpoint("mock:vm:zipcode");

        regionMock.setExpectedMessageCount(0);
        zipcodeMock.setExpectedMessageCount(1);

        // send a message with the region header
        template.sendBodyAndHeader("vm:start", "Foobar", "type", "zipcode");

        // check the assertion
        assertMockEndpointsSatisfied();
    }

    @Test
    public void testNoHeaderRouting() throws Exception {
        MockEndpoint regionMock = getMockEndpoint("mock:vm:region");
        MockEndpoint zipcodeMock = getMockEndpoint("mock:vm:zipcode");

        regionMock.setExpectedMessageCount(0);
        zipcodeMock.setExpectedMessageCount(1);

        // send a message with the region header
        template.sendBody("vm:start", "Foobar");

        // check the assertion
        assertMockEndpointsSatisfied();
    }

    @Test
    public void testErrorHandler() throws Exception {
        MockEndpoint zipcodeMock = getMockEndpoint("mock:vm:zipcode");
        MockEndpoint errorhandlingMock = getMockEndpoint("mock:vm:errorhandling");

        // raise an exception at the cLog processor step
        zipcodeMock.whenAnyExchangeReceived(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                throw new IOException("Test Error Handler");
            }
        });

        // the error handling route should have received a message
        errorhandlingMock.setExpectedMessageCount(1);

        // send a message, it should call the error handler
        template.sendBody("vm:start", "Foobar");

        // check the assertion
        assertMockEndpointsSatisfied();
    }

}

In this class, we said to Camel to be able to mockup any endpoint (overriding the isMockEndpoints() method). To find the Camel URI generated by the studio, you can switch to the source tab in the studio and take a look on the initUriMap() method: this method contains all URI of the route endpoints.

We also override the createRouteBuilder() method to load the route designed in the studio. To do it, we create the route object, call the initUriMap() method, and finally return this object.

Of course, we created four different tests:

  • the testRegionRouting() tests the route, and especially the content base router when setting the header ‘type’ to ‘region’. We mock up the vm:region and vm:zipcode endpoints. We use the producer template to send at the vm:start endpoint step.
  • the testZipCodeRouting() tests the route, and especially the content base router when setting the header ‘type’ to ‘zipcode’.
  • the testNoHeaderRouting() tests the route, and especially the content base router when the header ‘type’ is not set.
  • the testErrorHandler() tests the route, simulate an error to check if the error handler reacts correctly.

Special cases: JMS, context variables, cTalendJob,…

Depending of components that you use, Talend Studio manipulates the CamelContext for you. For instance, when you use the cJMS component, you have to create a cJMSConnectionFactory.

The Talend Studio generates the code to handle the CamelContext and “inject” the JMS connection factory into the Camel JMS component.

Unfortunately, it’s a done in a private method, so not callable directly from the test createRouteBuilder method (as we do with the initUriMap() method).

The workaround is to create the CamelContext in the test and copy the code generated by the studio here. Here’s an example how to use the “custom” JMS component (as the Studio does):

    @Override
    protected CamelContext createCamelContext() throws Exception {
        DefaultCamelContext camelContext = (DefaultCamelContext) super.createCamelContext();

        RouteName_Registry contextRegister = new RouteName_Registry(camelContext.getRegistry());
        camelContext.setRegistry(contextRegister);

        javax.jms.ConnectionFactory jmsConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
        camelContext.addComponent("cJMSConnectionFactory1", org.apache.camel.component.jms.JmsComponent.jmsComponent(jmsConnectionFactory));

        return camelContext;
    }

Another typical use case is about the Talend context variables. Thanks to the Talend Studio, you can define context variables that you can use in any place of your route.

In the route definition (in the studio), you can create multiple contexts.

In the unit test, you can decide which context you want to use for the test. To do so, you can use the readContextValues() method when you instanciate the route:

    @Override
    public RouteBuilder createRouteBuilder() throws Exception {
        RouteToTestName route = new RouteToTestName();
        route.readContextValues("Default");
        route.initUriMap();
        return route;
    }

Another feature provided in Talend ESB is that you can call Data Integration jobs in your Camel routes. To do so, Talend ESB registers a Camel component with “talend:” as URI prefix.
You have to load this component in the test CamelContext:

        TalendComponent talendComponent = new TalendComponent();
        camelContext.addComponent("talend", talendComponent);

Complete test

To summarize, if we take a look on the required resources, we need two things.

The first thing is a Maven POM containing all the resources and artifacts required for the route execution. Here’s a complete example:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>test</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>My Route Test</name>

  <properties>
    <talend.version>5.3.1</talend.version>
    <camel.version>2.10.4</camel.version>
  </properties>

  <dependencies>
    <!-- Route itself -->
    <dependency>
      <groupId>org.example</groupId>
      <artifactId>MyRoute</artifactId>
      <version>1.0-SNAPSHOT</version>
      <scope>test</scope>
    </dependency>
    <!-- Eventually job used in the route (via cTalendJob) -->
    <dependency>
      <groupId>org.example</groupId>
      <artifactId>MyRouteJob</artifactId>
      <version>1.0-SNAPSHOT</version>
      <scope>test</scope>
    </dependency>

    <!-- Eventually Camel components used in the route -->
    <!-- camel-ftp -->
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-ftp</artifactId>
      <version>${camel.version}</version>
      <scope>test</scope>
    </dependency>
    <!-- camel-http -->
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-http</artifactId>
      <version>${camel.version}</version>
      <scope>test</scope>
    </dependency>
    <!-- camel-xmljson -->
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-xmljson</artifactId>
      <version>${camel.version}</version>
      <scope>test</scope>
    </dependency>
    <!-- camel-cxf -->
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-cxf</artifactId>
      <version>${camel.version}</version>
      <scope>test</scope>
    </dependency>
    <!-- camel-jms and dependencies -->
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-jms</artifactId>
      <version>${camel.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.geronimo.specs</groupId>
      <artifactId>geronimo-jms_1.1_spec</artifactId>
      <version>1.1.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-core</artifactId>
      <version>5.7.0</version>
      <scope>test</scope>
    </dependency>
    <!-- camel-mail and mock-javamail -->
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-mail</artifactId>
      <version>${camel.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.jvnet.mock-javamail</groupId>
      <artifactId>mock-javamail</artifactId>
      <version>1.7</version>
      <scope>test</scope>
      <exclusions>
        <exclusion>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <!-- Talend dependencies -->
    <dependency>
      <groupId>org.talend</groupId>
      <artifactId>systemRoutines</artifactId>
      <version>${talend.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.talend</groupId>
      <artifactId>userBeans</artifactId>
      <version>${talend.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.talend.camel</groupId>
      <artifactId>camel-talendjob</artifactId>
      <version>${talend.version}</version>
      <scope>test</scope>
    </dependency>

    <!-- Camel dependencies -->
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-test-spring</artifactId>
      <version>${camel.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-jdk14</artifactId>
      <version>1.6.6</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

</project>

The second resource is the unit test itself (in src/test/java). Here’s a complete example, including registration of “custom” JMS component, Talend component, some custom beans registration:

package main.myroute_1_0;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import org.talend.camel.TalendComponent;

import java.util.*;

public class MyRoute_Test extends CamelTestSupport {

    @Override
    public String isMockEndpoints() {
        return "cJMSConnectionFactory1:*";
    }

    @Override
    public RouteBuilder createRouteBuilder() throws Exception {
        MyRoute route = new MyRoute();
        route.readContextValues("Default");
        route.initUriMap();
        return route;
    }

    @Override
    public CamelContext createCamelContext() throws Exception {
        DefaultCamelContext camelContext = (DefaultCamelContext) super.createCamelContext();
        MyRoute_Registry contextRegister = new MyRoute_Registry(camelContext.getRegistry());

        // custom MyBean
        beans.MyBean myBean = new beans.MyBean();
        contextRegister.register("myBean", myBean);

        // CXF_PAYLOAD_HEADER_FILTER bean required by cxf endpoint generated by the Studio
        CxfConsumerSoapHeaderFilter cxfConsumerSoapHeaderFilter = new CxfConsumerSoapHeaderFilter();
        registry.register("CXF_PAYLOAD_HEADER_FILTER", cxfConsumerSoapHeaderFilter);

        camelContext.setRegistry(contextRegister);

        // "custom" JMS component as generated by the Studio
        javax.jms.ConnectionFactory jmsConnectionFactory = new  org.apache.activemq.ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
        camelContext.addComponent("cJMSConnectionFactory1", org.apache.camel.component.jms.JmsComponent.jmsComponent(jmsConnectionFactory));

        // Talend component
        TalendComponent talendComponent = new TalendComponent();
        camelContext.addComponent("talend", talendComponent);        

        return camelContext;
    }

    @Test
    public void testRouteWithMyHeader() throws Exception {
        MockEndpoint queueMock = getMockEndpoint("mock:cJMSConnectionFactory1:queue:OUTPUT_QUEUE");

        queueMock.setMinimumExpectedMessageCount(1);

        String testHeader = "MyHeader";

        // construct the body
        List<String> body = new ArrayList<String>();
        body.add("foo");
        body.add("bar");

        Map<String, Object> camelHeaders = new HashMap<String, Object>();
        camelHeaders.put("MyHeader", testHeader);
        camelHeaders.put("CamelFileName", "/tmp/foobar.csv");
        template.sendBodyAndHeaders("cJMSConnectionFactory1:queue:INPUT_QUEUE", body, camelHeaders);

        assertMockEndpointsSatisfied();

        assertTrue(queueMock.getExchanges().get(0).getIn().getBody() instanceof List<String>);
    }

    class CxfConsumerSoapHeaderFilter extends org.apache.camel.component.cxf.common.header.CxfHeaderFilterStrategy {
        public boolean applyFilterToCamelHeaders(String headerName, Object headerValue, org.apache.camel.Exchange exchange) {
            if (org.apache.cxf.headers.Header.HEADER_LIST.equals(headerName)) {
                return true;
            }
            return super.applyFilterToCamelHeaders(headerName, headerValue,
                    exchange);
        }

        public boolean applyFilterToExternalHeaders(String headerName, Object headerValue, org.apache.camel.Exchange exchange) {
            if (org.apache.cxf.headers.Header.HEADER_LIST.equals(headerName)) {
                return true;
            }
            return super.applyFilterToExternalHeaders(headerName, headerValue,
                    exchange);
        }
    }

}

Integration with Jenkins

Now, we can periodically execute these unit tests.

To do so, I installed Jenkin in a Tomcat, and setup the Maven POM:

screen2

screen3

screen4

Next step

Unit test is the first step to a complete continuous integration process using Talend.

In the next article, I will deal with the usage of the Talend commandline via Maven, and integrate this in Jenkins.

Overview on Apache Karaf, Pax Web, and Camel archetypes

December 19, 2011 Posted by jbonofre

In my previous blog post, I introduced the Karaf Maven plugins.
The Karaf Maven plugins are really helpful, starting from an existing POM.

If you can write this POM by hand (it’s my favorite way ;)), we also provide several archetypes which create and pre-configure a Maven project for you.

Karaf Archetypes

The next Karaf release (2.2.5) provides a set of new archetypes:

Assembly Archetype

The karaf-assembly-archetype create a Maven project which create a custom Karaf distribution.

It allows you to create your own Karaf distribution. The project downloads a Karaf standard distribution (in tar.gz and zip formats), unpack it, and create a new distribution.

The easiest way to use it is to use the interactive mode:


mvn archetype:generate -DarchetypeGroupId=org.apache.karaf.archetypes -DarchetypeArtifactId=karaf-assembly-archetype -DarchetypeVersion=2.2.5-SNAPSHOT

Bundle Archetype

If basically a bundle is a jar file with some special statement in the MANIFEST, the easiest way to create a bundle is to use the Felix maven-bundle-plugin.

Karaf provides an archetype which prepare a Maven project and provide a bundle Activator (a special callback class when the bundle start/stop).

To generate this project, simply type:


mvn archetype:generate -DarchetypeGroupId=org.apache.karaf.archetypes -DarchetypeArtifactId=karaf-bundle-archetype -DarchetypeVersion=2.2.5-SNAPSHOT

NB: I think that this archetype should be in Felix (as the same level as maven-bundle-plugin), I will propose a donation to the Felix community.

Blueprint Archetype

Blueprint is IoC approach applied to OSGi (it comes from Spring DM in fact). It allows you to avoid to write bundle Activator, ServiceTracker, etc.

Karaf provides a karaf-blueprint-archetype which prepare a Maven project including the maven-bundle-plugin, and a blueprint descriptor with a sample of OSGi service definition:


mvn archetype:generate -DarchetypeGroupId=org.apache.karaf.archetypes -DarchetypeArtifactId=karaf-blueprint-archetype -DarchetypeVersion=2.2.5-SNAPSHOT

NB: I think that this archetype should be in Aries (as it’s the blueprint implementation used in Karaf), I will propose a donation to the Aries community.

Feature Archetype

In Karaf, a feature is an application descriptor, defining all bundles, configurations/configuration files, others features. This features descriptor could be generated by hand (it’s my favorite way, and I guess the recommended one), we also provide a archetype which prepare a project to generate features file regarding the POM dependencies:


mvn archetype:generate -DarchetypeGroupId=org.apache.karaf.archetypes -DarchetypeArtifactId=karaf-feature-archetype -DarchetypeVersion=2.2.5-SNAPSHOT

Kar Archetype

We saw in my previous post a new goal of the features-maven-plugin to create a kar file starting from a features XML. A KAR file is a zip file containing the features XML and all its dependencies.

We also provide an archetype to prepare a Maven project containing a features XML and generate a KAR file:


mvn archetype:generate -DarchetypeGroupId=org.apache.karaf.archetypes -DarchetypeArtifactId=karaf-kar-archetype -DarchetypeVersion=2.2.5-SNAPSHOT

Pax-Web Archetypes

We also add several archetypes in Pax Web to deal with the web bundle. Charles talk about wab-gwt archetype in his last blog entry (http://cmoulliard.blogspot.com/2011/12/run-google-web-toolkit-2-project-on.html), but I added also two others.

Web Bundle Archetype

The Web Bundle Archetype create a “special” bundle containing web resources and statements. It creates a Maven project with webapp resources and WebApp Context in the POM:


mvn archetype:generate -DarchetypeGroupId=org.ops4j.pax.web.archetypes -DarchetypeArtifactId=wab-archetype -DarchetypeVersion=1.1.2-SNAPSHOT

War Archetype

We also provide an archetype to create a Maven project which generate a standard war (that you can deploy “outside” of OSGi) but including OSGi statements in the MANIFEST (which allows you to use OSGi values):


mvn archetype:generate -DarchetypeGroupId=org.ops4j.pax.web.archetypes -DarchetypeArtifactId=war-archetype -DarchetypeVersion=1.1.2-SNAPSHOT

Camel Archetypes

Camel also provides a set of very useful archetypes, especially:

  • camel-archetype-blueprint to generate a Maven project with a blueprint XML in which you can define your routes
  • camel-archetype-component providing a template to create your own Camel component
  • camel-archetype-dataformat providing a template to create your own Camel data format
  • camel-archetype-java providing a template with a class in which you can define your routes
  • camel-archetype-spring to generate a Maven project with a Spring XML in which you can define your routes

For instance, to create a Camel blueprint Maven project, simply type:


mvn archetype:generate -DarchetypeGroupId=org.apache.camel.archetypes -DarchetypeArtifactId=camel-archetype-blueprint -DarchetypeVersion=2.9-SNAPSHOT

Use Camel, CXF and Karaf to implement batches

August 23, 2011 Posted by jbonofre

Introduction

Apache Camel has not be designed to be used for implementing batch tasks.

For instance, if your Camel route has a consumer endpoint polling files in a directory, Camel will periodically and indefinitely monitor the folder and poll any new incoming files.
It’s not a batch behavior: in batch mode, we want to run the file polling on demand, at a certain time, launched by a batch scheduler like ControlM, $Universe or Tivoli Worksheet Scheduler.

However, there are several interesting points to use Camel for batch implementation. First, Camel provides a large set of components. A lot of batches read/write files, read from a JMS queues, write into JMS queues, etc. Usage of Camel components in a batch way is really valuable.
Second, Camel uses a DSL to describe the process executed by the routes. Especially, it supports “human readable” DSL like Spring XML or Blueprint XML. It means that it’s easy to review what the batch is doing, eventually change an endpoint definition, etc. Most of the time, batches are “black box”: you run it, and you only get a status code to know if it’s OK or not. With Camel, you have a look on the batch process.
Third, Camel is a highly plug and play framework. It means that it’s easy to replace an endpoint by another one. For instance, if your batch polls files in a folder currently, it’s very easy to change this to poll messages from a JMS queue. You don’t really have to re-implement the whole batch.

More over, tools like Talend ESB Studio provide an IDE to create and design your Camel routes.

In this article, we are going to see how to use Camel in a “batch way”.

Design

In fact, we are going to have two Camel routes:
– the first one is called “control”. This route will “expose” a REST service to start the batch. A bean in this route will be responsible to start the “batch” route.
– the second one is called “batch”. It’s the core implementation of our batch. It’s a “standard” route, but at the end, we have a processor that “stop” the route (to avoid to have the route up indefinitely). This route is not auto started as it will be controller by the first one.

It means that a simple HTTP client (like a browser or REST client) will start the batch, on-demand. Most of enterprise batch schedulers ship a component to make HTTP requests.

POM

Our batch will be packaged as an OSGi bundle. It will allow us to deploy the batch in an Apache Karaf OSGi container:


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>net.nanthrax.examples</groupId>
  <artifactId>camel-batch</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>bundle</packaging>

  <properties>
    <camel.version>2.8.0</camel.version>
    <cxf.version>2.4.1</cxf.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-core</artifactId>
      <version>${camel.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-spring</artifactId>
      <version>${camel.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-cxf</artifactId>
      <version>${camel.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.cxf</groupId>
      <artifactId>cxf-rt-frontend-jaxrs</artifactId>
      <version>${cxf.version}</version>
    </dependency>
    <,dependency>
      <groupId>org.apache.cxf</groupId>
      <artifactId>cxf-rt-transports-http</artifactId>
      <version>${cxf.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.cxf</groupId>
      <artifactId>cxf-rt-transports-http-jetty</artifactId>
      <version>${cxf.version}</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.felix</groupId>
        <artifactId>maven-bundle-plugin</artifactId>
        <version>2.3.4</version>
        <extensions>true</extensions>
        <configuration>
          <instructions>
            <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
            <Require-Bundle>org.apache.cxf.bundle,org.apache.camel.camel-cxf,org.springframework.beans</Require-Bundle>
          </instructions>
        </configuration>
      </plugin>
    </plugins>
  </build>

</project>

In this POM, we can see:
– the packaging is an OSGi bundle. That’s why we use the Apache Felix maven-bundle-plugin. We name the bundle with the project artifactId, and we define Camel and CXF bundles as dependencies (Require-Bundle).
– in the dependency sets, we define the Camel components that we use (camel-core, camel-spring to use the Camel Spring XML DSL, and camel-cxf to use the CXF JAX-RS implementation) and the CXF dependency to be able to create a JAX-RS server.

Control route

The first Camel route is the “control” one. This route will bind a JAX-RS server, listening HTTP requests (consumer) and will start the “batch” route on-demand.

The route definition will be located in the META-INF/spring/routes.xml folder of our bundle. We use the Camel Spring XML DSL in this file:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:cxf="http://camel.apache.org/schema/cxf"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
    http://camel.apache.org/schema/cxf http://camel.apache.org/schema/cxf/camel-cxf.xsd
  ">

  <cxf:rsServer id="rsServer" address="http://localhost:9090/batch"
serviceClass="net.nanthrax.examples.camel.batch.impl.ControllerService"/&t;

  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <route id="control">
      <from uri="cxfrs:bean:rsServer"/>
      <to uri="log:net.nanthrax.examples.camel.batch"/>
      <to uri="controllerBean"/>
    </route>
  </camelContext>

  <bean id="controllerBean" class="net.nanthrax.examples.camel.batch.impl.ControllerBean">
    <property name="routeId" value="batch"/>
  </bean>

</beans>

We use the Camel CXF to create the JAX-RS server (using <cxf:rsServer/gt;). This JAX-RS server will listen on the local machine on the 9090 port, and the context path is /batch.
To “describe” the REST service behavior, we define the ControllerService class in the serviceClass attribute.
The ControllerService class is just an “empty container”. The purpose is just to “describe” the REST service, not to process it:


package net.nanthrax.examples.camel.batch.impl;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;

/**
* REST service implementation of the Camel batch service.
*/
@Path("/")
public class ControllerService {

  @GET
  @Path("/start")
  @Produces("text/plain")
  public String startRoute() throws Exception {
    // nothing to do, it's just a wrapper
    return null;
  }

}

We can see the JAX-RS annotations:
– the ControllerService REST Path is /, it means directly bound to the JAX-RS server context path.
– the startRoute() method will accept GET HTTP method, on the context path /start and it will produce pure text (text/plain).

The process itself will be performed in the controllerBean:


package net.nanthrax.examples.camel.batch.impl;

import org.apache.camel.CamelContext;
import org.apache.camel.Handler;

/**
* Camel controller bean involved in the starting routed
*/
public class ControllerBean {

  private String routeId;

  public String getRouteId() {
    return this.routeId;
  }

  public void setRouteId(String routeId) {
    this.routeId = routeId;
  }

  @Handler
  public String startRoute(CamelContext camelContext) throws Exception {
    camelContext.startRoute(routeId);
    return "Batch " + routeId + " started.";
  }

}

We inject the Camel route ID of the batch route: “batch”. The CamelContext is automatically injected by Camel. This bean is quite simple, as it only starts the “batch” route.

Batch route

This route contains the “batch logic”. You can use any kind of routes, components, Enterprise Integration Patterns, etc provided by Camel. The only specific parts are:
– the autoStartup attribute set to false, to avoid to start the route automatically at context bootstrap
– the final processor which stop the route after processing.

We gather the two routes in the same META-INF/spring/routes.xml file:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:cxf="http://camel.apache.org/schema/cxf"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
    http://camel.apache.org/schema/cxf http://camel.apache.org/schema/cxf/camel-cxf.xsd
  ">

  <cxf:rsServer id="rsServer" address="http://localhost:9090/batch"
  serviceClass="net.nanthrax.examples.camel.batch.impl.ControllerService"/>

  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <route id="control">
      <from uri="cxfrs:bean:rsServer"/>
      <to uri="log:net.nanthrax.examples.camel.batch"/>
      <to uri="controllerBean"/>
    </route>
    <route id="batch" autoStartup="false">
      <from uri="file:/tmp"/>
      <to uri="file:output"/>
      <process ref="stopProcessor"/>
    </route>
  </camelContext>

  <bean id="controllerBean" class="net.nanthrax.examples.camel.batch.impl.ControllerBean">
    <property name="routeId" value="batch"/>
  </bean>

  <bean id="stopProcessor" class="net.nanthrax.examples.camel.batch.impl.StopProcessor">
    <property name="routeId" value="batch"/>
  </bean>

</beans>

In this example, the batch polls files in the /tmp folder, and copies into the output folder.

The StopProcessor is Camel processor (aka, it implements the Camel Processor interface). It stops the route after processing the incoming message (we inject the “batch” route ID using Spring):


package net.nanthrax.examples.camel.batch.impl;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;

/**
* A Camel processor which stop routes.
*/
public class StopProcessor implements Processor {

  private String routeId;

  public String getRouteId() {
    return this.routeId;
  }

  public void setRouteId(String routeId) {
    this.routeId = routeId;
  }

  public void process(Exchange exchange) throws Exception {
    CamelContext camelContext = exchange.getContext();
    // remove myself from the in flight registry so we can stop this route without trouble
    camelContext.getInflightRepository().remove(exchange);
    // stop the route
    camelContext.stopRoute(routeId);
  }

}

Deployment and execution

Now, we can build our OSGi bundle, simply using:


mvn clean install

In a fresh Apache Karaf instance, we have first to install the CXF and Camel features:


karaf@root> features:addurl mvn:org.apache.cxf.karaf/apache-cxf/2.4.1/xml/features
karaf@root> features:install cxf
karaf@root> features:addurl mvn:org.apache.camel.karaf/apache-camel/2.8.0/xml/features
karaf@root> features:install camel-spring
karaf@root> features:install camel-cxf

Now, we can install our bundle:


karaf@root> osgi:install -s mvn:net.nanthrax.examples/camel-batch/1.0-SNAPSHOT

Our bundle appears as “created”:


karaf@root> la|grep -i batch
[ 134] [Active ] [ ] [Started] [ 60] camel-batch (1.0.0.SNAPSHOT)

Using a simple browser, we can access to http://localhost:9090/batch/start. The route is started (as a batch) and we can see in the browser:


Batch batch started.

Conclusion

Even if the first Camel route purpose is to be up and running all the time, we can use it in a more “batch” way. It allows developers to use the large set of Camel components, and be able to use all Enterprise Integration Patterns. For instance, the batch needs to copy a file, and after send an e-mail and a message into a JMS queue, it’s very easy using a recipient list. You have to send to a target endpoint depending of the content of the message, no problem using a Content Based Router.

You can run such kind of batches in Talend ESB. It’s an interesting addition to the Talend Data Integration products (ETL jobs, MDM, DQ, etc).

Use a “remote” EJB in Camel routes

August 9, 2011 Posted by jbonofre

Introduction

You have an existing application, let say developed using J2EE, including EJB (Session).
The application is running into a J2EE application server like JBoss, WebSphere or Weblogic.

This application “exposes” EJBs to perform some business services.

Now, you can to use these “remote” EJBs into Camel routes.

Context

We want to “expose” the EJB using WebService.

As for all EJBs, we have two interfaces for our EJB: the local and remote interfaces.
Let assume that we have:

* ejb.MyEjbSession
* ejb.MyEjbSessionHome

We assume that the MyEjbSession EJB provides a businessMethod() method, with a String in argument, and returning a String.

The first thing to do is to define an interface containing the WebService annotation. This interface will define the operations and will be used to generate the WSDL on the fly:


package net.nanthrax.blog.camel;

@WebService(targetNamespace = "http://www.nanthrax.net/blog", name = "MyEjbService")
public interface MyEjbService {

    public String businessService(String message);

}

Now, we can create a bean implementing this interface:


package net.nanthrax.blog.camel;

import ejb.MyEjbSession;

@WebService(serviceName = "myEjbService", targetNamespace = "http://www.nanthrax.net/blog", endpointInterface = "net.nanthrax.blog.camel.MyEjbService")
public class MyEjbServiceImpl implements MyEjbService {

&nbps;   private MyEjbSession proxy = null;

    public String businessService(String message) {
        return proxy.businessMethod(message);
    }

    public void setProxy(MyEjbSession proxy) {
        this.proxy = proxy;
    }

    public MyEjbSession getProxy() {
        return this.proxy;
    }

}

Camel routes

Now, we have a bean that we can use in a route. We use Spring Camel DSL. We also use Spring classes to connect to the J2EE application server and to inject the EJB proxy. In this example, we use JBoss application server:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:cxf="http://camel.apache.org/schema/cxf"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://camel.apache.org/schema/cxf http://camel.apache.org/schema/cxf/camel-cxf.xsd
">

    <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
    <property name="environment">
      <props>
        <prop key="java.naming.factory.initial">org.jnp.interfaces.NamingContextFactory </prop>
        <prop key="java.naming.provider.url">jnp://host:1099</prop>
      </props>
    </property>
    </bean>

    <bean id="ejbProxy" class="org.springframework.ejb.access.SimpleRemoteStatelessSessionProxyFactoryBean">
        <property name="jndiName" value="ejb/jndi/name/MyEjbSession" />
        <property name="businessInterface" value="ejb.MyEjbSession />
        <property name="homeInterface" value="ejb.MyEjbSessionHome" />
        <property name="refreshHomeOnConnectFailure" value="true" />
        <property name="cacheHome" value="true" />
        <property name="lookupHomeOnStartup" value="false" />
        <property name="resourceRef" value="false" />
        <property name="jndiTemplate" ref="jndiTemplate" />
    </bean>

    <bean id="ejbService" class="net.nanthrax.blog.camel.MyEjbServiceImpl">
        <property name="proxy" ref="ejbProxy"/>
    </bean>

    <import resource="classpath:META-INF/cxf/cxf.xml"/>
    <import resource="classpath:META-INF/cxf/cxf-extension-soap.xml"/>
    <import resource="classpath:META-INF/cxf/cxf-extension-http-jetty.xml"/>

    <cxf:cxfEndpoint id="cxfEndpoint"
serviceClass="net.nanthrax.blog.camel.MyEjbService"
address="http://0.0.0.0:9090/blog/ejb-service/"/>

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <route>
            <from uri="cxf:bean:assetServiceCxfEndpoint"/>
            <to uri="assetServiceBean"/>
        </route>
    </camelContext>

</beans>

Website mashup with Apache Camel

July 22, 2011 Posted by jbonofre

Mashup ?

You are browsing on some websites and you see an interesting information, that you want to poll to be used into your system.

Unfortunately, you don’t know the website provider, and you don’t know if a “plug” is provided, for instance a WebService.

So you have to find a way to get the information.

Using Camel

You can create a Camel route looking like:

  <route>
    <from uri="timer:fire?period=2000"/>
    <setHeaders>
        <constant>POST</constant>
    </setHeaders>
    <to uri="http:blog.nanthrax.net?param=value"/>
    <unmarshal>
        <tidyMarkup/>
    </unmarshal>
    <setBody><xpath>//span[@class='date']</xpath></setBody>
    <to uri="log:blog"/>
  </route>

Here, every 2 seconds, we access to blog.nanthrax.net to get the HTML source. We can eventually provide some parameters (with POST method here).
On this HTML, we use tidy markup (from camel-tagsoup component) to cleanup the HTML code and format it in XML.
After that, we extract from the source, only the content of element.
Finally, we send the filtered content into the log.

Camel 2.8.0 new features for Karaf/ServiceMix

June 29, 2011 Posted by jbonofre

Camel provides Karaf features descriptor since quite a long time now. But Camel 2.8.0 will include new Karaf features very useful and turning Karaf and ServiceMix as the main container to run Camel.

Install Camel in Karaf

Installing Camel in Karaf is very simple as a features descriptor is provided. It means that you can register the Camel features descriptor in your running Karaf instance:

karaf@root> features:addurl mvn:org.apache.camel.karaf/apache-camel/2.8.0/xml/features

Now, you have the Camel features available:

karaf@root> features:list|grep -i camel

[uninstalled] [2.8.0 ] camel repo-0

[uninstalled] [2.8.0 ] camel-core repo-0

[uninstalled] [2.8.0 ] camel-spring repo-0

[uninstalled] [2.8.0 ] camel-blueprint repo-0

Deploy Camel features

To start using Camel in Karaf, you have to install at least the camel feature:

root@karaf> features:install camel

Depending of your requirements, you will certainly install others Camel features.

For instance, if you use Blueprint Camel DSL, you have to install the camel-blueprint feature:

root@karaf> features:install camel-blueprint

or, if you use stream component in an endpoint (for instance “stream:out”), you will install the camel-stream feature:

root@karaf> features:install camel-stream

Camel and OSGi

When Camel is used in an OSGi environment, it automatically  exposes CamelContexts as OSGi services. It means that, when you deploy a route, the associated CamelContext is available for OSGi bundles.

You can look up for CamelContext OSGi services with this small code snippet:


ServiceReference[] references = bundleContext.getServiceReferences(CamelContext.class.getName(), null);
if (references != null) {
  for (ServiceReference reference : references) {
    if (reference != null) {
      CamelContext camelContext = (CamelContext) bundleContext.getService(reference);
      if (camelContext != null) {
        // do what you want on the CamelContext
      }
    }
  }
}

Camel Karaf commands

Camel 2.8.0 provides a set of Karaf commands which allow you to get information, start, stop about Camel contexts and routes.

  • camel:list-contexts displays the list of Camel context currently available in the running Karaf/ServiceMix instance
  • camel:list-routes displays the list of Camel routes currently available in the running Karaf/ServiceMix instance
  • camel:start-context starts a Camel context
  • camel:stop-context stops a Camel context
  • camel:info-context display detailed information about a Camel context
  • camel:start-route starts a Camel route
  • camel:stop-route stops a Camel route
  • camel:show-route displays the XML rendering of a Camel route (whatever your route DSL is)
  • camel:info-route displays detailed information about a Camel route including statistics