Category: ‘Apache Camel’

Use Camel, CXF and Karaf to implement batches

August 23, 2011 Posted by jbonofre

Introduction

Apache Camel has not be designed to be used for implementing batch tasks.

For instance, if your Camel route has a consumer endpoint polling files in a directory, Camel will periodically and indefinitely monitor the folder and poll any new incoming files.
It’s not a batch behavior: in batch mode, we want to run the file polling on demand, at a certain time, launched by a batch scheduler like ControlM, $Universe or Tivoli Worksheet Scheduler.

However, there are several interesting points to use Camel for batch implementation. First, Camel provides a large set of components. A lot of batches read/write files, read from a JMS queues, write into JMS queues, etc. Usage of Camel components in a batch way is really valuable.
Second, Camel uses a DSL to describe the process executed by the routes. Especially, it supports “human readable” DSL like Spring XML or Blueprint XML. It means that it’s easy to review what the batch is doing, eventually change an endpoint definition, etc. Most of the time, batches are “black box”: you run it, and you only get a status code to know if it’s OK or not. With Camel, you have a look on the batch process.
Third, Camel is a highly plug and play framework. It means that it’s easy to replace an endpoint by another one. For instance, if your batch polls files in a folder currently, it’s very easy to change this to poll messages from a JMS queue. You don’t really have to re-implement the whole batch.

More over, tools like Talend ESB Studio provide an IDE to create and design your Camel routes.

In this article, we are going to see how to use Camel in a “batch way”.

Design

In fact, we are going to have two Camel routes:
– the first one is called “control”. This route will “expose” a REST service to start the batch. A bean in this route will be responsible to start the “batch” route.
– the second one is called “batch”. It’s the core implementation of our batch. It’s a “standard” route, but at the end, we have a processor that “stop” the route (to avoid to have the route up indefinitely). This route is not auto started as it will be controller by the first one.

It means that a simple HTTP client (like a browser or REST client) will start the batch, on-demand. Most of enterprise batch schedulers ship a component to make HTTP requests.

POM

Our batch will be packaged as an OSGi bundle. It will allow us to deploy the batch in an Apache Karaf OSGi container:


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>net.nanthrax.examples</groupId>
  <artifactId>camel-batch</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>bundle</packaging>

  <properties>
    <camel.version>2.8.0</camel.version>
    <cxf.version>2.4.1</cxf.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-core</artifactId>
      <version>${camel.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-spring</artifactId>
      <version>${camel.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-cxf</artifactId>
      <version>${camel.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.cxf</groupId>
      <artifactId>cxf-rt-frontend-jaxrs</artifactId>
      <version>${cxf.version}</version>
    </dependency>
    <,dependency>
      <groupId>org.apache.cxf</groupId>
      <artifactId>cxf-rt-transports-http</artifactId>
      <version>${cxf.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.cxf</groupId>
      <artifactId>cxf-rt-transports-http-jetty</artifactId>
      <version>${cxf.version}</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.felix</groupId>
        <artifactId>maven-bundle-plugin</artifactId>
        <version>2.3.4</version>
        <extensions>true</extensions>
        <configuration>
          <instructions>
            <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
            <Require-Bundle>org.apache.cxf.bundle,org.apache.camel.camel-cxf,org.springframework.beans</Require-Bundle>
          </instructions>
        </configuration>
      </plugin>
    </plugins>
  </build>

</project>

In this POM, we can see:
– the packaging is an OSGi bundle. That’s why we use the Apache Felix maven-bundle-plugin. We name the bundle with the project artifactId, and we define Camel and CXF bundles as dependencies (Require-Bundle).
– in the dependency sets, we define the Camel components that we use (camel-core, camel-spring to use the Camel Spring XML DSL, and camel-cxf to use the CXF JAX-RS implementation) and the CXF dependency to be able to create a JAX-RS server.

Control route

The first Camel route is the “control” one. This route will bind a JAX-RS server, listening HTTP requests (consumer) and will start the “batch” route on-demand.

The route definition will be located in the META-INF/spring/routes.xml folder of our bundle. We use the Camel Spring XML DSL in this file:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:cxf="http://camel.apache.org/schema/cxf"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
    http://camel.apache.org/schema/cxf http://camel.apache.org/schema/cxf/camel-cxf.xsd
  ">

  <cxf:rsServer id="rsServer" address="http://localhost:9090/batch"
serviceClass="net.nanthrax.examples.camel.batch.impl.ControllerService"/&t;

  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <route id="control">
      <from uri="cxfrs:bean:rsServer"/>
      <to uri="log:net.nanthrax.examples.camel.batch"/>
      <to uri="controllerBean"/>
    </route>
  </camelContext>

  <bean id="controllerBean" class="net.nanthrax.examples.camel.batch.impl.ControllerBean">
    <property name="routeId" value="batch"/>
  </bean>

</beans>

We use the Camel CXF to create the JAX-RS server (using <cxf:rsServer/gt;). This JAX-RS server will listen on the local machine on the 9090 port, and the context path is /batch.
To “describe” the REST service behavior, we define the ControllerService class in the serviceClass attribute.
The ControllerService class is just an “empty container”. The purpose is just to “describe” the REST service, not to process it:


package net.nanthrax.examples.camel.batch.impl;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;

/**
* REST service implementation of the Camel batch service.
*/
@Path("/")
public class ControllerService {

  @GET
  @Path("/start")
  @Produces("text/plain")
  public String startRoute() throws Exception {
    // nothing to do, it's just a wrapper
    return null;
  }

}

We can see the JAX-RS annotations:
– the ControllerService REST Path is /, it means directly bound to the JAX-RS server context path.
– the startRoute() method will accept GET HTTP method, on the context path /start and it will produce pure text (text/plain).

The process itself will be performed in the controllerBean:


package net.nanthrax.examples.camel.batch.impl;

import org.apache.camel.CamelContext;
import org.apache.camel.Handler;

/**
* Camel controller bean involved in the starting routed
*/
public class ControllerBean {

  private String routeId;

  public String getRouteId() {
    return this.routeId;
  }

  public void setRouteId(String routeId) {
    this.routeId = routeId;
  }

  @Handler
  public String startRoute(CamelContext camelContext) throws Exception {
    camelContext.startRoute(routeId);
    return "Batch " + routeId + " started.";
  }

}

We inject the Camel route ID of the batch route: “batch”. The CamelContext is automatically injected by Camel. This bean is quite simple, as it only starts the “batch” route.

Batch route

This route contains the “batch logic”. You can use any kind of routes, components, Enterprise Integration Patterns, etc provided by Camel. The only specific parts are:
– the autoStartup attribute set to false, to avoid to start the route automatically at context bootstrap
– the final processor which stop the route after processing.

We gather the two routes in the same META-INF/spring/routes.xml file:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:cxf="http://camel.apache.org/schema/cxf"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
    http://camel.apache.org/schema/cxf http://camel.apache.org/schema/cxf/camel-cxf.xsd
  ">

  <cxf:rsServer id="rsServer" address="http://localhost:9090/batch"
  serviceClass="net.nanthrax.examples.camel.batch.impl.ControllerService"/>

  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <route id="control">
      <from uri="cxfrs:bean:rsServer"/>
      <to uri="log:net.nanthrax.examples.camel.batch"/>
      <to uri="controllerBean"/>
    </route>
    <route id="batch" autoStartup="false">
      <from uri="file:/tmp"/>
      <to uri="file:output"/>
      <process ref="stopProcessor"/>
    </route>
  </camelContext>

  <bean id="controllerBean" class="net.nanthrax.examples.camel.batch.impl.ControllerBean">
    <property name="routeId" value="batch"/>
  </bean>

  <bean id="stopProcessor" class="net.nanthrax.examples.camel.batch.impl.StopProcessor">
    <property name="routeId" value="batch"/>
  </bean>

</beans>

In this example, the batch polls files in the /tmp folder, and copies into the output folder.

The StopProcessor is Camel processor (aka, it implements the Camel Processor interface). It stops the route after processing the incoming message (we inject the “batch” route ID using Spring):


package net.nanthrax.examples.camel.batch.impl;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;

/**
* A Camel processor which stop routes.
*/
public class StopProcessor implements Processor {

  private String routeId;

  public String getRouteId() {
    return this.routeId;
  }

  public void setRouteId(String routeId) {
    this.routeId = routeId;
  }

  public void process(Exchange exchange) throws Exception {
    CamelContext camelContext = exchange.getContext();
    // remove myself from the in flight registry so we can stop this route without trouble
    camelContext.getInflightRepository().remove(exchange);
    // stop the route
    camelContext.stopRoute(routeId);
  }

}

Deployment and execution

Now, we can build our OSGi bundle, simply using:


mvn clean install

In a fresh Apache Karaf instance, we have first to install the CXF and Camel features:


karaf@root> features:addurl mvn:org.apache.cxf.karaf/apache-cxf/2.4.1/xml/features
karaf@root> features:install cxf
karaf@root> features:addurl mvn:org.apache.camel.karaf/apache-camel/2.8.0/xml/features
karaf@root> features:install camel-spring
karaf@root> features:install camel-cxf

Now, we can install our bundle:


karaf@root> osgi:install -s mvn:net.nanthrax.examples/camel-batch/1.0-SNAPSHOT

Our bundle appears as “created”:


karaf@root> la|grep -i batch
[ 134] [Active ] [ ] [Started] [ 60] camel-batch (1.0.0.SNAPSHOT)

Using a simple browser, we can access to http://localhost:9090/batch/start. The route is started (as a batch) and we can see in the browser:


Batch batch started.

Conclusion

Even if the first Camel route purpose is to be up and running all the time, we can use it in a more “batch” way. It allows developers to use the large set of Camel components, and be able to use all Enterprise Integration Patterns. For instance, the batch needs to copy a file, and after send an e-mail and a message into a JMS queue, it’s very easy using a recipient list. You have to send to a target endpoint depending of the content of the message, no problem using a Content Based Router.

You can run such kind of batches in Talend ESB. It’s an interesting addition to the Talend Data Integration products (ETL jobs, MDM, DQ, etc).

Use a “remote” EJB in Camel routes

August 9, 2011 Posted by jbonofre

Introduction

You have an existing application, let say developed using J2EE, including EJB (Session).
The application is running into a J2EE application server like JBoss, WebSphere or Weblogic.

This application “exposes” EJBs to perform some business services.

Now, you can to use these “remote” EJBs into Camel routes.

Context

We want to “expose” the EJB using WebService.

As for all EJBs, we have two interfaces for our EJB: the local and remote interfaces.
Let assume that we have:

* ejb.MyEjbSession
* ejb.MyEjbSessionHome

We assume that the MyEjbSession EJB provides a businessMethod() method, with a String in argument, and returning a String.

The first thing to do is to define an interface containing the WebService annotation. This interface will define the operations and will be used to generate the WSDL on the fly:


package net.nanthrax.blog.camel;

@WebService(targetNamespace = "http://www.nanthrax.net/blog", name = "MyEjbService")
public interface MyEjbService {

    public String businessService(String message);

}

Now, we can create a bean implementing this interface:


package net.nanthrax.blog.camel;

import ejb.MyEjbSession;

@WebService(serviceName = "myEjbService", targetNamespace = "http://www.nanthrax.net/blog", endpointInterface = "net.nanthrax.blog.camel.MyEjbService")
public class MyEjbServiceImpl implements MyEjbService {

&nbps;   private MyEjbSession proxy = null;

    public String businessService(String message) {
        return proxy.businessMethod(message);
    }

    public void setProxy(MyEjbSession proxy) {
        this.proxy = proxy;
    }

    public MyEjbSession getProxy() {
        return this.proxy;
    }

}

Camel routes

Now, we have a bean that we can use in a route. We use Spring Camel DSL. We also use Spring classes to connect to the J2EE application server and to inject the EJB proxy. In this example, we use JBoss application server:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:cxf="http://camel.apache.org/schema/cxf"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://camel.apache.org/schema/cxf http://camel.apache.org/schema/cxf/camel-cxf.xsd
">

    <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
    <property name="environment">
      <props>
        <prop key="java.naming.factory.initial">org.jnp.interfaces.NamingContextFactory </prop>
        <prop key="java.naming.provider.url">jnp://host:1099</prop>
      </props>
    </property>
    </bean>

    <bean id="ejbProxy" class="org.springframework.ejb.access.SimpleRemoteStatelessSessionProxyFactoryBean">
        <property name="jndiName" value="ejb/jndi/name/MyEjbSession" />
        <property name="businessInterface" value="ejb.MyEjbSession />
        <property name="homeInterface" value="ejb.MyEjbSessionHome" />
        <property name="refreshHomeOnConnectFailure" value="true" />
        <property name="cacheHome" value="true" />
        <property name="lookupHomeOnStartup" value="false" />
        <property name="resourceRef" value="false" />
        <property name="jndiTemplate" ref="jndiTemplate" />
    </bean>

    <bean id="ejbService" class="net.nanthrax.blog.camel.MyEjbServiceImpl">
        <property name="proxy" ref="ejbProxy"/>
    </bean>

    <import resource="classpath:META-INF/cxf/cxf.xml"/>
    <import resource="classpath:META-INF/cxf/cxf-extension-soap.xml"/>
    <import resource="classpath:META-INF/cxf/cxf-extension-http-jetty.xml"/>

    <cxf:cxfEndpoint id="cxfEndpoint"
serviceClass="net.nanthrax.blog.camel.MyEjbService"
address="http://0.0.0.0:9090/blog/ejb-service/"/>

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <route>
            <from uri="cxf:bean:assetServiceCxfEndpoint"/>
            <to uri="assetServiceBean"/>
        </route>
    </camelContext>

</beans>

Website mashup with Apache Camel

July 22, 2011 Posted by jbonofre

Mashup ?

You are browsing on some websites and you see an interesting information, that you want to poll to be used into your system.

Unfortunately, you don’t know the website provider, and you don’t know if a “plug” is provided, for instance a WebService.

So you have to find a way to get the information.

Using Camel

You can create a Camel route looking like:

  <route>
    <from uri="timer:fire?period=2000"/>
    <setHeaders>
        <constant>POST</constant>
    </setHeaders>
    <to uri="http:blog.nanthrax.net?param=value"/>
    <unmarshal>
        <tidyMarkup/>
    </unmarshal>
    <setBody><xpath>//span[@class='date']</xpath></setBody>
    <to uri="log:blog"/>
  </route>

Here, every 2 seconds, we access to blog.nanthrax.net to get the HTML source. We can eventually provide some parameters (with POST method here).
On this HTML, we use tidy markup (from camel-tagsoup component) to cleanup the HTML code and format it in XML.
After that, we extract from the source, only the content of element.
Finally, we send the filtered content into the log.

Camel 2.8.0 new features for Karaf/ServiceMix

June 29, 2011 Posted by jbonofre

Camel provides Karaf features descriptor since quite a long time now. But Camel 2.8.0 will include new Karaf features very useful and turning Karaf and ServiceMix as the main container to run Camel.

Install Camel in Karaf

Installing Camel in Karaf is very simple as a features descriptor is provided. It means that you can register the Camel features descriptor in your running Karaf instance:

karaf@root> features:addurl mvn:org.apache.camel.karaf/apache-camel/2.8.0/xml/features

Now, you have the Camel features available:

karaf@root> features:list|grep -i camel

[uninstalled] [2.8.0 ] camel repo-0

[uninstalled] [2.8.0 ] camel-core repo-0

[uninstalled] [2.8.0 ] camel-spring repo-0

[uninstalled] [2.8.0 ] camel-blueprint repo-0

Deploy Camel features

To start using Camel in Karaf, you have to install at least the camel feature:

root@karaf> features:install camel

Depending of your requirements, you will certainly install others Camel features.

For instance, if you use Blueprint Camel DSL, you have to install the camel-blueprint feature:

root@karaf> features:install camel-blueprint

or, if you use stream component in an endpoint (for instance “stream:out”), you will install the camel-stream feature:

root@karaf> features:install camel-stream

Camel and OSGi

When Camel is used in an OSGi environment, it automatically  exposes CamelContexts as OSGi services. It means that, when you deploy a route, the associated CamelContext is available for OSGi bundles.

You can look up for CamelContext OSGi services with this small code snippet:


ServiceReference[] references = bundleContext.getServiceReferences(CamelContext.class.getName(), null);
if (references != null) {
  for (ServiceReference reference : references) {
    if (reference != null) {
      CamelContext camelContext = (CamelContext) bundleContext.getService(reference);
      if (camelContext != null) {
        // do what you want on the CamelContext
      }
    }
  }
}

Camel Karaf commands

Camel 2.8.0 provides a set of Karaf commands which allow you to get information, start, stop about Camel contexts and routes.

  • camel:list-contexts displays the list of Camel context currently available in the running Karaf/ServiceMix instance
  • camel:list-routes displays the list of Camel routes currently available in the running Karaf/ServiceMix instance
  • camel:start-context starts a Camel context
  • camel:stop-context stops a Camel context
  • camel:info-context display detailed information about a Camel context
  • camel:start-route starts a Camel route
  • camel:stop-route stops a Camel route
  • camel:show-route displays the XML rendering of a Camel route (whatever your route DSL is)
  • camel:info-route displays detailed information about a Camel route including statistics