Encrypt ConfigAdmin properties values in Apache Karaf

October 3, 2014 Posted by jbonofre

Apache Karaf loads all the configuration from etc/*.cfg files by default, using a mix of Felix FileInstall and Felix ConfigAdmin.

These files are regular properties file looking like:

key=value

Some values may be critical, and so not store in plain text. It could be critical business data (credit card number, etc), or technical data (password to different systems, like database for instance).

We want to encrypt such kind of data in the etc/*.cfg files, but being able to use it regulary in the application.

Karaf provides a nice feature for that: jasypt-encryption.

It’s very easy to use especially with Blueprint.

The jasypt-encryption feature is an optional feature, so it means that you have to install it first:

karaf@root()> feature:install jasypt-encryption

This feature provides:

  • jasypt bundle
  • a namespace handler (enc:*) for blueprint

Now, we can create a cfg file containing encrypted value. The encrypted value is “wrapped” in a ENC() function.

For instance, we can create etc/my.cfg file containing:

mydb.url=host:port
mydb.username=username
mydb.password=ENC(zRM7Pb/NiKyCalroBz8CKw==)

In the Blueprint descriptor of our application (like a Camel route Blueprint XML for instance), we use the “regular” cm namespace (to load ConfigAdmin), but we add a Jasypt configuration using the enc namespace.

For instance, the blueprint XML could look like:

<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
           xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0"
           xmlns:enc="http://karaf.apache.org/xmlns/jasypt/v1.0.0">

  <cm:property-placeholder persistent-id="my" update-strategy="reload">
    <cm:default-properties>
      <cm:property name="mydb.url" value="localhost:9999"/>
      <cm:property name="mydb.username" value="sa"/>
      <cm:property name="mydb.password" value="ENC(xxxxx)"/>
    </cm:default-properties>
  </cm:property-placeholder>

  <enc:property-placeholder>
    <enc:encryptor class="org.jasypt.encryption.pbe.StandardPBEStringEncryptor">
      <property name="config">
        <bean class="org.jasypt.encryption.pbe.config.EnvironmentStringPBEConfig">
          <property name="algorithm" value="PBEWithMD5AndDES"/>
          <property name="passwordEnvName" value="ENCRYPTION_PASSWORD"/>
        </bean>
      </property>
    </enc:encryptor>
  </enc:property-placeholder>

  <bean id="dbbean" class="...">
    <property name="url" value="${mydb.url}"/>
    <property name="username" value="${mydb.username}"/>
    <property name="password" value="${mydb.password}"/>
  </bean>

  <camelContext xmlns="http://camel.apache.org/schemas/blueprint">
     <route>
        ...
        <process ref="dbbean"/>
        ...
     </route>
  </camelContext>

</blueprint>

It’s also possible to use encryption not in ConfigAdmin, directly loading an “external” properties file using the ext blueprint namespace:

<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
           xmlns:ext="http://aries.apache.org/blueprint/xmlns/blueprint-ext/v1.0.0"
           xmlns:enc="http://karaf.apache.org/xmlns/jasypt/v1.0.0">

  <ext:property-placeholder>
    <ext:location>file:etc/db.properties</ext:location>
  </ext:property-placeholder>

  <enc:property-placeholder>
    <enc:encryptor class="org.jasypt.encryption.pbe.StandardPBEStringEncryptor">
      <property name="config">
        <bean class="org.jasypt.encryption.pbe.config.EnvironmentStringPBEConfig">
          <property name="algorithm" value="PBEWithMD5AndDES"/>
          <property name="passwordEnvName" value="ENCRYPTION_PASSWORD"/>
        </bean>
      </property>
    </enc:encryptor>
  </enc:property-placeholder>

  ...

</blueprint>

where etc/db.properties looks like:

mydb.url=host:port
mydb.username=username
mydb.password=ENC(zRM7Pb/NiKyCalroBz8CKw==)

It’s also possible to use directly the ConfigAdmin in code. In that case, you have to create the Jasypt configuration programmatically:

StandardPBEStringEncryptor enc = new StandardPBEStringEncryptor();
EnvironmentStringPBEConfig env = new EnvironmentStringPBEConfig();
env.setAlgorithm("PBEWithMD5AndDES");
env.setPassword("ENCRYPTION_PASSWORD");
enc.setConfig(env);
...

MDC logging with Apache Karaf and Camel

August 31, 2014 Posted by jbonofre

MDC (Mapped Diagnostic Context) logging is an interesting feature to log contextual messages.

It’s classic to want to log contextual messages in your application. For instance, we want to log the actions performed by an user (identified by an username or user id). As you have a lot of simultaneous users on your application, it’s easier to “follow” the log.

MDC is supported by several logging frameworks, like log4j or slf4j, and so by Karaf (thanks to pax-logging) as well.
The approach is pretty simple:

  1. You define the context using a key ID and a value for the key:
    MDC.put("userid", "user1");
    
  2. You use the logger as usual, the log messages to this logger will be contextual to the context:
    logger.debug("my message");
    
  3. After that, we can change the context by overriding the key:
    MDC.put("userid", "user2");
    logger.debug("another message");
    

    Or you can remove the key, so to remove the context, and the log will be “global” (not local to a context):

    MDC.remove("userid"); // or MDC.clear() to remove all
    logger.debug("my global message");
    
  4. In the configuration, we can use pattern with %X{key} to log context. A pattern like %X{userid} - %m%n will result to a log file looking like:
    user1 - my message
    user2 - another message
    

In this blog, we will see how to use MDC in different cases (directly in your bundle, generic Karaf OSGi, and in Camel routes.

The source code of the blog post are available on my github: http://github.com/jbonofre/blog-mdc.

Using MDC in your application/bundle

The purpose here is to use slf4j MDC in our bundle and configure Karaf to create one log file per context.

To illustrate this, we will create multiple threads in the bundle, given a different context key for each thread:

package net.nanthrax.blog.mdc;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class MdcExampleBean {

    private Logger logger = LoggerFactory.getLogger(MdcExampleBean.class);

    public void init() throws Exception {
        CycleThread thread1 = new CycleThread("thread1");
        CycleThread thread2 = new CycleThread("thread2");
        CycleThread thread3 = new CycleThread("thread3");
        thread1.start();
        thread2.start();
        thread3.start();
    }

    class CycleThread extends Thread {
        private String mdcContext;
        public CycleThread(String mdcContext) {
            this.mdcContext = mdcContext;
        }
        public void run() {
            MDC.put("threadId", mdcContext);
            for (int i = 0; i < 20; i++) {
                logger.info("Cycle {}", i);
            }
        }
    }

}

After deploying this bundle in Karaf 3.0.1, we can see the log messages:

karaf@root()> bundle:install mvn:net.nanthrax.blog/mdc-bundle/1.0-SNAPSHOT
karaf@root()> log:display
...
2014-08-30 09:44:25,594 | INFO  | Thread-15        | MdcExampleBean                   | 78 - net.nanthrax.blog.mdc-bundle - 1.0.0.SNAPSHOT | Cycle 17
2014-08-30 09:44:25,594 | INFO  | Thread-13        | MdcExampleBean                   | 78 - net.nanthrax.blog.mdc-bundle - 1.0.0.SNAPSHOT | Cycle 19
2014-08-30 09:44:25,594 | INFO  | Thread-15        | MdcExampleBean                   | 78 - net.nanthrax.blog.mdc-bundle - 1.0.0.SNAPSHOT | Cycle 18
2014-08-30 09:44:25,595 | INFO  | Thread-15        | MdcExampleBean                   | 78 - net.nanthrax.blog.mdc-bundle - 1.0.0.SNAPSHOT | Cycle 19

Now, we can setup the Karaf etc/org.ops4j.pax.logging.cfg file to use our MDC. For that, we add a MDCSiftingAppender, providing the threadId as MDC key, and displaying the threadId in the log message pattern. We will create one log file per key (threadId in our case), and finally, we add this appender to the rootLogger:

...
log4j.rootLogger=INFO, out, mdc-bundle, osgi:*
...
# MDC Bundle appender
log4j.appender.mdc-bundle=org.apache.log4j.sift.MDCSiftingAppender
log4j.appender.mdc-bundle.key=threadId
log4j.appender.mdc-bundle.default=unknown
log4j.appender.mdc-bundle.appender=org.apache.log4j.FileAppender
log4j.appender.mdc-bundle.appender.layout=org.apache.log4j.PatternLayout
log4j.appender.mdc-bundle.appender.layout.ConversionPattern=%d | %-5.5p | %X{threadId} | %m%n
log4j.appender.mdc-bundle.appender.file=${karaf.data}/log/mdc-bundle-$\\{threadId\\}.log
log4j.appender.mdc-bundle.appender.append=true
...

Now, in the Karaf data/log folder, we can see:

mdc-bundle-thread1.log
mdc-bundle-thread2.log
mdc-bundle-thread3.log

each file containing the log messages contextual to the thread:

$ cat data/log/mdc-bundle-thread1.log
2014-08-30 09:54:48,287 | INFO  | thread1 | Cycle 0
2014-08-30 09:54:48,298 | INFO  | thread1 | Cycle 1
2014-08-30 09:54:48,298 | INFO  | thread1 | Cycle 2
2014-08-30 09:54:48,299 | INFO  | thread1 | Cycle 3
2014-08-30 09:54:48,299 | INFO  | thread1 | Cycle 4
...
$ cat data/log/mdc-bundle-thread2.log
2014-08-30 09:54:48,287 | INFO  | thread2 | Cycle 0
2014-08-30 09:54:48,298 | INFO  | thread2 | Cycle 1
2014-08-30 09:54:48,298 | INFO  | thread2 | Cycle 2
2014-08-30 09:54:48,299 | INFO  | thread2 | Cycle 3
2014-08-30 09:54:48,299 | INFO  | thread2 | Cycle 4
2014-08-30 09:54:48,299 | INFO  | thread2 | Cycle 5
...

In addition, Karaf “natively” provides OSGi MDC data that we can use.

Using Karaf OSGi MDC

So, in Karaf, you can use directly some OSGi headers for MDC logging, especially the bundle name.

We can use this MDC key to create one log file per bundle.

Karaf already provides a pre-defined appender configuration in etc/org.ops4j.pax.logging.cfg:

...
# Sift appender
log4j.appender.sift=org.apache.log4j.sift.MDCSiftingAppender
log4j.appender.sift.key=bundle.name
log4j.appender.sift.default=karaf
log4j.appender.sift.appender=org.apache.log4j.FileAppender
log4j.appender.sift.appender.layout=org.apache.log4j.PatternLayout
log4j.appender.sift.appender.layout.ConversionPattern=%d{ISO8601} | %-5.5p | %-16.16t | %-32.32c{1} | %m%n
log4j.appender.sift.appender.file=${karaf.data}/log/$\\{bundle.name\\}.log
log4j.appender.sift.appender.append=true
...

The only thing that we have to do is to add this appender to the rootLogger:

log4j.rootLogger=INFO, out, sift, osgi:*

Now, in the Karaf data/log folder, we can see one file per bundle:

data/log$ ls -1
karaf.log
net.nanthrax.blog.mdc-bundle.log
org.apache.aries.blueprint.core.log
org.apache.aries.jmx.core.log
org.apache.felix.fileinstall.log
org.apache.karaf.features.core.log
org.apache.karaf.region.persist.log
org.apache.karaf.shell.console.log
org.apache.sshd.core.log

Especially, we can see our mdc-bundle, containing the log messages “local” to the bundle.

However, if this approach works great, it doesn’t always create interesting log file. For instance, when you use Camel, using OSGi headers for MDC logging will gather most of the log messages into the camel-core bundle log file, so, not really contextual to something or easy to read/seek.

The good news is that Camel also provides MDC logging support.

Using Camel MDC

If Camel provides MDC logging support, it’s not enabled by default. It’s up to you to enable it on the camel context.

Once enabled, Camel provides the following MDC logging properties:

  • camel.exchangeId providing the exchange ID
  • camel.messageId providing the message ID
  • camel.routeId providing the route ID
  • camel.contextId providing the Camel Context ID
  • camel.breadcrumbId providing an unique id used for tracking messages across transports
  • camel.correlationId providing the correlation ID of the exchange (if it’s correlated, for instance like in Splitter EIP)
  • camel.trasactionKey providing the ID of the transaction (for transacted exchange).

To enable the MDC logging, you have to:

  • if you use the Blueprint or Spring XML DSL:
    <camelContext xmlns="http://camel.apache.org/schema/blueprint" useMDCLogging="true">
    
  • if you use the Java DSL:
    CamelContext context = ...
    context.setUseMDCLogging(true);
    
  • using the Talend ESB studio, you have to use a cConfig component from the palette:
    studio1

So, let say, we create the following route using the Blueprint DSL:

<?xml version="1.0" encoding="UTF-8"?> 
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"> 

   <camelContext xmlns="http://camel.apache.org/schema/blueprint" useMDCLogging="true"> 
      <route id="my-route"> 
         <from uri="timer:fire?period=5000"/> 
         <setBody> 
            <constant>Hello Blog</constant> 
         </setBody> 
         <to uri="log:net.nanthrax.blog?level=INFO"/>
      </route>
   </camelContext>
 
</blueprint>

We want to create one log file per route (using the routeId). So, we update the Karaf etc/org.ops4j.pax.logging.cfg file to add a MDC sifting appender using the Camel MDC properties, and we add this appender to the rootLogger:

...
log4j.rootLogger=INFO, out, camel-mdc, osgi:*
...
# Camel MDC appender
log4j.appender.camel-mdc=org.apache.log4j.sift.MDCSiftingAppender
log4j.appender.camel-mdc.key=camel.routeId
log4j.appender.camel-mdc.default=unknown 
log4j.appender.camel-mdc.appender=org.apache.log4j.FileAppender
log4j.appender.camel-mdc.appender.layout=org.apache.log4j.PatternLayout
log4j.appender.camel-mdc.appender.layout.ConversionPattern=%d{ISO8601} | %-5.5p | %-16.16t | %-32.32c{1} | %X{camel.exchangeId} | %m%n
log4j.appender.camel-mdc.appender.file=${karaf.data}/log/camel-$\\{camel.routeId\\}.log
log4j.appender.camel-mdc.appender.append=true
...

The camel-mdc appender will create one log file by route (named camel-(routeId).log). The log messages will contain the exchange ID.

We start Karaf, and after the installation of the camel-blueprint feature, we can drop our route.xml directly in the deploy folder:

karaf@root()> feature:repo-add camel 2.12.1
Adding feature url mvn:org.apache.camel.karaf/apache-camel/2.12.1/xml/features
karaf@root()> feature:install camel-blueprint
cp route.xml apache-karaf-3.0.1/deploy/

Using log:display command in Karaf, we can see the messages for our route:

karaf@root()> log:display

2014-08-31 08:58:24,176 | INFO | 0 – timer://fire | blog | 85 – org.apache.camel.camel-core – 2.12.1 | Exchange[ExchangePattern: InOnly, BodyType: String, Body: Hello Blog]
2014-08-31 08:58:29,176 | INFO | 0 – timer://fire | blog | 85 – org.apache.camel.camel-core – 2.12.1 | Exchange[ExchangePattern: InOnly, BodyType: String, Body: Hello Blog]

Now, if we go into the Karaf data/log folder, we can see the log file for our route:

$ ls -1 data/log
camel-my-route.log
...

If we take a look in the camel-my-route.log file, we can see the messages contextual to the route, including the exchange ID:

2014-08-31 08:58:19,196 | INFO  | 0 - timer://fire | blog                             | ID-latitude-57336-1409468297774-0-2 | Exchange[ExchangePattern: InOnly, BodyType: String, Body: Hello Blog]
2014-08-31 08:58:24,176 | INFO  | 0 - timer://fire | blog                             | ID-latitude-57336-1409468297774-0-4 | Exchange[ExchangePattern: InOnly, BodyType: String, Body: Hello Blog]
2014-08-31 08:58:29,176 | INFO  | 0 - timer://fire | blog                             | ID-latitude-57336-1409468297774-0-6 | Exchange[ExchangePattern: InOnly, BodyType: String, Body: Hello Blog]
2014-08-31 08:58:34,176 | INFO  | 0 - timer://fire | blog                             | ID-latitude-57336-1409468297774-0-8 | Exchange[ExchangePattern: InOnly, BodyType: String, Body: Hello Blog]

Testing (utest and itest) Apache Camel Blueprint route

August 28, 2014 Posted by jbonofre

In any integration project, testing is vital for multiple reasons:

  • to guarantee that the integration logic matches the expectations
  • to quickly identify some regression issues
  • to test some special cases, like the errors for instance
  • to validate the succesful provisioning (deployment) on a runtime as close as possible to the target platform

We distinguish two kinds of tests:

  • the unit tests (utest) aim to test the behaviors of integration logic, and define the expectations that the logic has to match
  • the integration tests (itest) aim to provision the integration logic artifact to a runtime, and check the behaviors on the actual platform

Camel is THE framework to implement your integration logic (mediation).

It provides the Camel Test Kit, based on JUnit to implement utest. In combinaison with Karaf and Pax Exam, we can cover both utest and itest.

In this blog, we will:

  • create an OSGi service
  • create a Camel route using the Blueprint DSL, using the previously created OSGi service
  • implement the utest using the Camel Blueprint Test
  • implement the itest using Pax Exam and Karaf

You can find the whole source code used for this blog post on my github: https://github.com/jbonofre/blog-camel-blueprint.

Blueprint Camel route and features

We create a project (using Maven) containing the following modules:

  • my-service is the OSGi bundle providing the service that we will use in the Camel route
  • my-route is the OSGi bundle providing the Camel route, using the Blueprint DSL. This route uses the OSGi service provided by my-service. It’s where we will implement the utest.
  • features packages the OSGi bundles as a Karaf features XML, ready to be deployed.
  • itests contains the integration test (itest) leveraging Karaf and Pax Exam.

It means we have the following parent pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>net.nanthrax.blog</groupId>
    <artifactId>net.nanthrax.blog.camel.route.blueprint</artifactId>
    <name>Nanthrax Blog :: Camel :: Blueprint</name>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>

    <modules>
        <module>my-service</module>
        <module>my-route</module>
        <module>features</module>
        <module>itests</module>
    </modules>

</project>

OSGi service

The my-service Maven module provides an OSGi bundle providing an echo service.

It uses the following Maven POM:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>net.nanthrax.blog</groupId>
        <artifactId>net.nanthrax.blog.camel.route.blueprint</artifactId>
        <version>1.0-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>

    <artifactId>net.nanthrax.blog.camel.route.blueprint.service</artifactId>
    <name>Nanthrax Blog :: Camel :: Blueprint :: Service</name>
    <packaging>bundle</packaging>

    <dependencies>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.5</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.felix</groupId>
                <artifactId>maven-bundle-plugin</artifactId>
                <version>2.4.0</version>
                <extensions>true</extensions>
                <configuration>
                    <instructions>
                        <Export-Package>
                            net.nanthrax.blog.service
                        </Export-Package>
                        <Import-Package>
                            org.slf4j*;resolution:=optional
                        </Import-Package>
                        <Private-Package>
                            net.nanthrax.blog.service.internal
                        </Private-Package>
                    </instructions>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

The echo service is described by the net.nanthrax.blog.service.EchoService interface:

package net.nanthrax.blog.service;

public interface EchoService {

    public String echo(String message);

}

We expose the package containing this interface using OSGi Export-Package header.

The implementation of the EchoService is hidden using the OSGi Private-Package header. This implementation is very simple, it gets a message and return the same message with the “Echoing ” prefix:

package net.nanthrax.blog.service.internal;

import net.nanthrax.blog.service.EchoService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EchoServiceImpl implements EchoService {

    private final static Logger LOGGER = LoggerFactory.getLogger(EchoServiceImpl.class);

    public String echo(String message) {
        return "Echoing " + message;
    }

}

To expose this service in OSGi, we use blueprint. We create the blueprint descriptor in src/main/resources/OSGI-INF/blueprint/blueprint.xml:

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">

    <service interface="net.nanthrax.blog.service.EchoService">
        <bean class="net.nanthrax.blog.service.internal.EchoServiceImpl"/>
    </service>

</blueprint>

The Camel route will use this Echo service.

Camel route and utest

We use the Camel Blueprint DSL to design the route.

The route is packaged as an OSGi bundle, in the my-route Maven module, using the following pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>net.nanthrax.blog</groupId>
        <artifactId>net.nanthrax.blog.camel.route.blueprint</artifactId>
        <version>1.0-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>

    <artifactId>net.nanthrax.blog.camel.route.blueprint.myroute</artifactId>
    <name>Nanthrax Blog :: Camel :: Blueprint :: My Route</name>
    <packaging>bundle</packaging>

    <dependencies>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-test-blueprint</artifactId>
            <version>2.12.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.5</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>net.nanthrax.blog</groupId>
            <artifactId>net.nanthrax.blog.camel.route.blueprint.service</artifactId>
            <version>1.0-SNAPSHOT</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.felix</groupId>
                <artifactId>maven-bundle-plugin</artifactId>
                <version>2.4.0</version>
                <extensions>true</extensions>
                <configuration>
                    <instructions>
                        <Import-Package>
                            net.nanthrax.blog.service
                        </Import-Package>
                    </instructions>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

The src/main/resources/OSGI-INF/blueprint/route.xml contains the route definition:

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">

    <reference id="myService" interface="net.nanthrax.blog.service.EchoService"/>

    <camelContext xmlns="http://camel.apache.org/schema/blueprint">
        <route>
            <from uri="timer:fire?period=5000"/>
            <setBody>
                <constant>Hello Blog</constant>
            </setBody>
            <to uri="bean:myService"/>
            <to uri="log:net.nanthrax.blog.route"/>
            <to uri="file:camel-output"/>
        </route>
    </camelContext>

</blueprint>

This route:

  • creates an exchange every 5 secondes, using a Camel timer
  • we set the body of the “in” message in the exchange to “Hello Blog”
  • the message is sent to the EchoService, which prefix the message with “Echoing”, resulting to an updated message containing “Echoing Hello Blog”
  • we log the exchange
  • we create a file for each exchange, in the camel-output folder, using the Camel file component

We are now to create the utest for this route.

As this route uses Blueprint, and Blueprint is an OSGi specific technology, normally, we would have to deploy the route on Karaf to test it.

However, thanks to Camel Blueprint Test and the use of PojoSR, we can test the Blueprint route “outside” of OSGi. Camel Blueprint Test also supports a mock of the OSGi service registry, allowing to mock the OSGi service as well.

Basically, in the unit test, we:

  • load the route Blueprint XML by overridding the getBlueprintDescriptor() method
  • mock the timer and file endpoints by overridding the isMockEndpointsAndSkip() method (skip means that we don’t send the message to the actual endpoint)
  • mock the Echo OSGi service by overriding the addServicesOnStartup() method
  • finally implement a test in the testMyRoute() method

The test itself get the mocked file endpoint, and define the expectations on this endpoint: we expect one message containing “Echoing Hello Blog” on the file endpoint.
Instead of using the actual timer endpoint, we mock it and we use the producer template to send an exchange (in order to control the number of created exchange).
Finally, we check if the expectations are satisfied on the mocked file endpoint.

package net.nanthrax.blog;

import net.nanthrax.blog.service.EchoService;
import net.nanthrax.blog.service.internal.EchoServiceImpl;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.language.ConstantExpression;
import org.apache.camel.test.blueprint.CamelBlueprintTestSupport;
import org.apache.camel.util.KeyValueHolder;
import org.junit.Test;

import java.util.Dictionary;
import java.util.Map;

public class MyRouteTest extends CamelBlueprintTestSupport {

    @Override
    protected String getBlueprintDescriptor() {
        return "OSGI-INF/blueprint/route.xml";
    }

    @Override
    public String isMockEndpointsAndSkip() {
        return "((file)|(timer)):(.*)";
    }

    @Override
    protected void addServicesOnStartup(Map<String, KeyValueHolder<Object, Dictionary>> services) {
        KeyValueHolder serviceHolder = new KeyValueHolder(new EchoServiceImpl(), null);
        services.put(EchoService.class.getName(), serviceHolder);
    }

    @Test
    public void testMyRoute() throws Exception {

        // mocking the file endpoint and define the expectation
        MockEndpoint mockEndpoint = getMockEndpoint("mock:file:camel-output");
        mockEndpoint.expectedMessageCount(1);
        mockEndpoint.expectedBodiesReceived("Echoing Hello Blog");

        // send a message at the timer endpoint level
        template.sendBody("mock:timer:fire", "empty");

        // check if the expectation is satisfied
        assertMockEndpointsSatisfied();
    }

}

We can see that we mock the Echo OSGi service using the actual EchoServiceImpl. However, of course, it’s possible to use your own local test implementation of the EchoService. It’s interesting to test some use cases, or to simulate errors.

We can note that we use a regex (((file)|(timer)):(.*)) to mock both timer and file endpoints.

We load the route.xml blueprint descriptor directly from the bundle location (OSGI-INF/blueprint/route.xml).

We can run mvn to test the route:

my-route$ mvn clean install
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Nanthrax Blog :: Camel :: Blueprint :: My Route 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.4.1:clean (default-clean) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 1 resource
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/jbonofre/Workspace/blog-camel-blueprint/my-route/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[INFO] Changes detected - recompiling the module!
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 1 source file to /home/jbonofre/Workspace/blog-camel-blueprint/my-route/target/test-classes
[WARNING] /home/jbonofre/Workspace/blog-camel-blueprint/my-route/src/test/java/net/nanthrax/blog/MyRouteTest.java: /home/jbonofre/Workspace/blog-camel-blueprint/my-route/src/test/java/net/nanthrax/blog/MyRouteTest.java uses unchecked or unsafe operations.
[WARNING] /home/jbonofre/Workspace/blog-camel-blueprint/my-route/src/test/java/net/nanthrax/blog/MyRouteTest.java: Recompile with -Xlint:unchecked for details.
[INFO]
[INFO] --- maven-surefire-plugin:2.17:test (default-test) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[INFO] Surefire report directory: /home/jbonofre/Workspace/blog-camel-blueprint/my-route/target/surefire-reports

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running net.nanthrax.blog.MyRouteTest
[main] INFO org.apache.camel.test.blueprint.CamelBlueprintHelper - Using Blueprint XML file: /home/jbonofre/Workspace/blog-camel-blueprint/my-route/target/classes/OSGI-INF/blueprint/route.xml
Aug 28, 2014 2:57:43 PM org.ops4j.pax.swissbox.tinybundles.core.metadata.RawBuilder run
INFO: Copy thread finished.
[main] INFO org.apache.camel.impl.osgi.Activator - Camel activator starting
[main] INFO org.apache.camel.impl.osgi.Activator - Camel activator started
[main] INFO org.apache.aries.blueprint.container.BlueprintExtender - No quiesce support is available, so blueprint components will not participate in quiesce operations
[main] INFO net.nanthrax.blog.MyRouteTest - ********************************************************************************
[main] INFO net.nanthrax.blog.MyRouteTest - Testing: testMyRoute(net.nanthrax.blog.MyRouteTest)
[main] INFO net.nanthrax.blog.MyRouteTest - ********************************************************************************
[main] INFO net.nanthrax.blog.MyRouteTest - Skipping starting CamelContext as system property skipStartingCamelContext is set to be true.
[main] INFO org.apache.camel.blueprint.BlueprintCamelContext - Apache Camel 2.12.1 (CamelContext: 23-camel-3) is starting
[main] INFO org.apache.camel.management.DefaultManagementStrategy - JMX is disabled
[main] INFO org.apache.camel.impl.InterceptSendToMockEndpointStrategy - Adviced endpoint [timer://fire?period=5000] with mock endpoint [mock:timer:fire]
[main] INFO org.apache.camel.impl.InterceptSendToMockEndpointStrategy - Adviced endpoint [file://camel-output] with mock endpoint [mock:file:camel-output]
[main] INFO org.apache.camel.blueprint.BlueprintCamelContext - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
[main] INFO org.apache.camel.blueprint.BlueprintCamelContext - Route: route1 started and consuming from: Endpoint[timer://fire?period=5000]
[main] INFO org.apache.camel.blueprint.BlueprintCamelContext - Total 1 routes, of which 1 is started.
[main] INFO org.apache.camel.blueprint.BlueprintCamelContext - Apache Camel 2.12.1 (CamelContext: 23-camel-3) started in 0.069 seconds
[main] INFO org.apache.camel.component.mock.MockEndpoint - Asserting: Endpoint[mock://file:camel-output] is satisfied
[Camel (23-camel-3) thread #0 - timer://fire] INFO net.nanthrax.blog.route - Exchange[ExchangePattern: InOnly, BodyType: String, Body: Echoing Hello Blog]
[main] INFO org.apache.camel.component.mock.MockEndpoint - Asserting: Endpoint[mock://timer:fire] is satisfied
[main] INFO net.nanthrax.blog.MyRouteTest - ********************************************************************************
[main] INFO net.nanthrax.blog.MyRouteTest - Testing done: testMyRoute(net.nanthrax.blog.MyRouteTest)
[main] INFO net.nanthrax.blog.MyRouteTest - Took: 1.094 seconds (1094 millis)
[main] INFO net.nanthrax.blog.MyRouteTest - ********************************************************************************
[main] INFO org.apache.camel.blueprint.BlueprintCamelContext - Apache Camel 2.12.1 (CamelContext: 23-camel-3) is shutting down
[main] INFO org.apache.camel.impl.DefaultShutdownStrategy - Starting to graceful shutdown 1 routes (timeout 10 seconds)
[Camel (23-camel-3) thread #1 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Route: route1 shutdown complete, was consuming from: Endpoint[timer://fire?period=5000]
[main] INFO org.apache.camel.impl.DefaultShutdownStrategy - Graceful shutdown of 1 routes completed in 0 seconds
[main] INFO org.apache.camel.blueprint.BlueprintCamelContext - Apache Camel 2.12.1 (CamelContext: 23-camel-3) uptime 1.117 seconds
[main] INFO org.apache.camel.blueprint.BlueprintCamelContext - Apache Camel 2.12.1 (CamelContext: 23-camel-3) is shutdown in 0.021 seconds
[main] INFO org.apache.aries.blueprint.container.BlueprintExtender - Destroying BlueprintContainer for bundle MyRouteTest
[main] INFO org.apache.aries.blueprint.container.BlueprintExtender - Destroying BlueprintContainer for bundle net.nanthrax.blog.camel.route.blueprint.service
[main] INFO org.apache.aries.blueprint.container.BlueprintExtender - Destroying BlueprintContainer for bundle org.apache.aries.blueprint
[main] INFO org.apache.aries.blueprint.container.BlueprintExtender - Destroying BlueprintContainer for bundle org.apache.camel.camel-blueprint
[main] INFO org.apache.camel.impl.osgi.Activator - Camel activator stopping
[main] INFO org.apache.camel.impl.osgi.Activator - Camel activator stopped
[main] INFO org.apache.camel.test.blueprint.CamelBlueprintHelper - Deleting work directory target/bundles/1409230663118
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2.581 sec - in net.nanthrax.blog.MyRouteTest

Results :

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0

[INFO] 
[INFO] --- maven-bundle-plugin:2.4.0:bundle (default-bundle) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[WARNING] Bundle net.nanthrax.blog:net.nanthrax.blog.camel.route.blueprint.myroute:bundle:1.0-SNAPSHOT : Unused Private-Package instructions, no such package(s) on the class path: [!*]
[INFO] 
[INFO] --- maven-install-plugin:2.5.1:install (default-install) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[INFO] Installing /home/jbonofre/Workspace/blog-camel-blueprint/my-route/target/net.nanthrax.blog.camel.route.blueprint.myroute-1.0-SNAPSHOT.jar to /home/jbonofre/.m2/repository/net/nanthrax/blog/net.nanthrax.blog.camel.route.blueprint.myroute/1.0-SNAPSHOT/net.nanthrax.blog.camel.route.blueprint.myroute-1.0-SNAPSHOT.jar
[INFO] Installing /home/jbonofre/Workspace/blog-camel-blueprint/my-route/pom.xml to /home/jbonofre/.m2/repository/net/nanthrax/blog/net.nanthrax.blog.camel.route.blueprint.myroute/1.0-SNAPSHOT/net.nanthrax.blog.camel.route.blueprint.myroute-1.0-SNAPSHOT.pom
[INFO] 
[INFO] --- maven-bundle-plugin:2.4.0:install (default-install) @ net.nanthrax.blog.camel.route.blueprint.myroute ---
[INFO] Installing net/nanthrax/blog/net.nanthrax.blog.camel.route.blueprint.myroute/1.0-SNAPSHOT/net.nanthrax.blog.camel.route.blueprint.myroute-1.0-SNAPSHOT.jar
[INFO] Writing OBR metadata
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 6.906s
[INFO] Finished at: Thu Aug 28 14:57:47 CEST 2014
[INFO] Final Memory: 22M/557M
[INFO] ------------------------------------------------------------------------

Again, the purpose of the utest is to test the behaviors of the route: check if the message content is what we expect, if the message arrives on the expected endpoint, etc.

Karaf features and itests

The purpose of the itest is not really to test the behavior of the route: it’s more to test if the provisioning (deployment) of the route is OK, if the route starts without problem, and, when possible, if the “default” behavior is what we expect.

If it’s possible to deploy bundle per bundle (first the one providing the Echo service, and after the one providing the route), with Karaf, it’s largely easier to create a features XML.

It’s what we do in the features Maven module, grouping the bundles in two features as show in the following features XML:

<?xml version="1.0" encoding="UTF-8"?>
<features name="blog-camel-blueprint" xmlns="http://karaf.apache.org/xmlns/features/v1.0.0">

    <feature name="blog-camel-blueprint-service" version="${project.version}">
        <bundle>mvn:net.nanthrax.blog/net.nanthrax.blog.camel.route.blueprint.service/${project.version}</bundle>
    </feature>

    <feature name="blog-camel-blueprint-route" version="${project.version}">
        <feature>blog-camel-blueprint-service</feature>
        <bundle>mvn:net.nanthrax.blog/net.nanthrax.blog.camel.route.blueprint.myroute/${project.version}</bundle>
    </feature>

</features>

Now, we can use Pax Exam to implement our itests, by:

  • bootstrap a Karaf container, where we deploy the camel-blueprint, and our features
  • test if the provisioning is OK
  • create a local route to test the output of my-route

We do that in the itests Maven module, where we define the Pax Exam dependency:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>net.nanthrax.blog</groupId>
        <artifactId>net.nanthrax.blog.camel.route.blueprint</artifactId>
        <version>1.0-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>

    <artifactId>itests</artifactId>

    <dependencies>

        <dependency>
            <groupId>net.nanthrax.blog</groupId>
            <artifactId>camel-blueprint</artifactId>
            <version>1.0-SNAPSHOT</version>
            <classifier>features</classifier>
            <type>xml</type>
            <scope>test</scope>
        </dependency>

        <!-- Pax Exam -->
        <dependency>
            <groupId>org.ops4j.pax.exam</groupId>
            <artifactId>pax-exam-container-karaf</artifactId>
            <version>3.4.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.ops4j.pax.exam</groupId>
            <artifactId>pax-exam-junit4</artifactId>
            <version>3.4.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.ops4j.pax.exam</groupId>
            <artifactId>pax-exam-inject</artifactId>
            <version>3.4.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.geronimo.specs</groupId>
            <artifactId>geronimo-atinject_1.0_spec</artifactId>
            <version>1.0</version>
            <scope>test</scope>
        </dependency>

        <!-- Camel Test -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-test</artifactId>
            <version>2.12.1</version>
            <scope>test</scope>
        </dependency>

        <!-- Karaf -->
        <dependency>
            <groupId>org.apache.karaf</groupId>
            <artifactId>apache-karaf</artifactId>
            <version>2.3.6</version>
            <type>tar.gz</type>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.karaf</groupId>
                    <artifactId>org.apache.karaf.client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>

</project>

We create MyRouteTest in src/test/java/net/nanthrax/blog/itests:

package net.nanthrax.blog.itests;

import static org.ops4j.pax.exam.CoreOptions.maven;
import static org.ops4j.pax.exam.karaf.options.KarafDistributionOption.*;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.language.ConstantExpression;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.karaf.features.FeaturesService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.Configuration;
import org.ops4j.pax.exam.Option;
import org.ops4j.pax.exam.junit.PaxExam;
import org.ops4j.pax.exam.karaf.options.LogLevelOption;
import org.osgi.framework.BundleContext;

import javax.inject.Inject;

import java.io.File;

@RunWith(PaxExam.class)
public class MyRouteTest extends CamelTestSupport {

    @Inject
    protected FeaturesService featuresService;

    @Inject
    protected BundleContext bundleContext;

    @Configuration
    public static Option[] configure() throws Exception {
        return new Option[] {
                karafDistributionConfiguration()
                        .frameworkUrl(maven().groupId("org.apache.karaf").artifactId("apache-karaf").type("tar.gz").version("2.3.6"))
                        .karafVersion("2.3.6")
                        .useDeployFolder(false)
                        .unpackDirectory(new File("target/paxexam/unpack")),
                logLevel(LogLevelOption.LogLevel.WARN),
                features(maven().groupId("org.apache.camel.karaf").artifactId("apache-camel").type("xml").classifier("features").version("2.12.1"), "camel-blueprint", "camel-test"),
                features(maven().groupId("net.nanthrax.blog").artifactId("camel-blueprint").type("xml").classifier("features").version("1.0-SNAPSHOT"), "blog-camel-blueprint-route"),
                keepRuntimeFolder()
        };
    }

    @Test
    public void testProvisioning() throws Exception {
        // first check that the features are installed
        assertTrue(featuresService.isInstalled(featuresService.getFeature("camel-blueprint")));
        assertTrue(featuresService.isInstalled(featuresService.getFeature("blog-camel-blueprint-route")));

        // now we check if the OSGi services corresponding to the camel context and route are there

    }

    @Test
    public void testMyRoute() throws Exception {
        MockEndpoint itestMock = getMockEndpoint("mock:itest");
        itestMock.expectedMinimumMessageCount(3);
        itestMock.whenAnyExchangeReceived(new Processor() {
            public void process(Exchange exchange) {
                System.out.println(exchange.getIn().getBody(String.class));
            }
        });

        template.start();

        Thread.sleep(20000);

        assertMockEndpointsSatisfied();
    }

    @Override
    protected RouteBuilder createRouteBuilder() {
        return new RouteBuilder() {
            public void configure() {
                from("file:camel-output").to("mock:itest");
            }
        };
    }

}

In this test class, we can see:

  • the configure() method where we define the Karaf distribution to use, the log level, the Camel features XML location and the Camel features that we want to install (camel-blueprint and camel-test), the location of our features XML and the feature that we want to install (blog-camel-blueprint-route)
  • the testProvisioning() method where we check if the features have been correctly installed
  • the createRouteBuilder() method where we programmatically create a new route (using the Java DSL here) consuming the files created by my-route and sending to a mock endpoint
  • the testMyRoute() gets the itest mock endpoint (from the route created by the createRouteBuilder() method) and check that it receives at least 3 messages, during an update of 20 secondes (and also display the content of the message)

Running mvn, it bootstraps a Karaf instance, install the features, deploy our test bundle, and check the execution:

itests$ mvn clean install
...
-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running net.nanthrax.blog.itests.MyRouteTest
[org.ops4j.pax.exam.spi.DefaultExamSystem] : Pax Exam System (Version: 3.4.0) created.
[org.ops4j.store.intern.TemporaryStore] : Storage Area is /tmp/1409248259083-0
[org.ops4j.pax.exam.junit.impl.ProbeRunner] : creating PaxExam runner for class net.nanthrax.blog.itests.MyRouteTest
...
[org.ops4j.pax.exam.karaf.container.internal.KarafTestContainer] : Test Container started in 3 millis
[org.ops4j.pax.exam.karaf.container.internal.KarafTestContainer] : Wait for test container to finish its initialization [ RelativeTimeout value = 180000 ]
[org.ops4j.pax.exam.rbc.client.RemoteBundleContextClient] : Waiting for remote bundle context.. on 21414 name: 7cd8df34-0ed2-4449-8d60-d51f395cfa1d timout: [ RelativeTimeout value = 180000 ]
        __ __                  ____
       / //_/____ __________ _/ __/
      / ,<  / __ `/ ___/ __ `/ /_
     / /| |/ /_/ / /  / /_/ / __/
    /_/ |_|\__,_/_/   \__,_/_/

  Apache Karaf (2.3.6)

Hit '<tab>' for a list of available commands
and '[cmd] --help' for help on a specific command.
Hit '<ctrl-d>' or type 'osgi:shutdown' or 'logout' to shutdown Karaf.

karaf@root> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[org.ops4j.pax.exam.rbc.client.RemoteBundleContextClient] : Remote bundle context found after 5774 millis
[org.ops4j.pax.tinybundles.core.intern.RawBuilder] : make()
[org.ops4j.store.intern.TemporaryStore] : Enter store()
[org.ops4j.pax.tinybundles.core.intern.RawBuilder] : Creating manifest from added headers.
...
[org.ops4j.pax.exam.container.remote.RBCRemoteTarget] : Installed bundle (from stream) as ID: 102
[org.ops4j.pax.exam.container.remote.RBCRemoteTarget] : call [[TestAddress:PaxExam-d7899c82-74e1-445e-9fcb-ab9b18e286b4 root:PaxExam-5dfb0f4b-96d9-4226-bdea-5b057e7e7335]]
Echoing Hello Blog
Echoing Hello Blog
Echoing Hello Blog
Echoing Hello Blog
...
Results :

Tests run: 2, Failures: 0, Errors: 0, Skipped: 0

[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ itests ---
[WARNING] JAR will be empty - no content was marked for inclusion!
[INFO] Building jar: /home/jbonofre/Workspace/blog-camel-blueprint/itests/target/itests-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-install-plugin:2.3.1:install (default-install) @ itests ---
[INFO] Installing /home/jbonofre/Workspace/blog-camel-blueprint/itests/target/itests-1.0-SNAPSHOT.jar to /home/jbonofre/.m2/repository/net/nanthrax/blog/itests/1.0-SNAPSHOT/itests-1.0-SNAPSHOT.jar
[INFO] Installing /home/jbonofre/Workspace/blog-camel-blueprint/itests/pom.xml to /home/jbonofre/.m2/repository/net/nanthrax/blog/itests/1.0-SNAPSHOT/itests-1.0-SNAPSHOT.pom
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 35.904s
[INFO] Finished at: Thu Aug 28 19:51:32 CEST 2014
[INFO] Final Memory: 28M/430M
[INFO] ------------------------------------------------------------------------

Integration in Jenkins

We can now integrate our project in Jenkins CI. We now have a complete CI covering, build of the service, packaging of the route, utest on the route, itest of the service and route in Karaf.

jenkins1

jenkins2

jenkins3

Apache JMeter to test Apache ActiveMQ on CI with Maven/Jenkins

August 27, 2014 Posted by jbonofre

Apache JMeter is a great tool for testing, especially performance testing.
It provides a lot of samplers that you can use to test your web services, web applications, etc.

It also includes a couple of samplers for JMS that we can use with ActiveMQ.

The source code of this blog post is https://github.com/jbonofre/blog-jmeter.

Preparing JMeter for ActiveMQ

For this article, I downloaded JMeter 2.10 from http://jmeter.apache.org.

We uncompress jmeter in a folder:

$ tar zxvf apache-jmeter-2.10.tgz

We are going to create a test plan for ActiveMQ. After downloading ActiveMQ 5.9.0 from http://activemq.apache.org, we install and start an ActiveMQ broker on the machine.

$ tar zxvf apache-activemq-5.9.0-bin.tar.gz
$ cd apache-activemq-5.9.0/bin
$ ./activemq console
...
 INFO | Apache ActiveMQ 5.9.0 (localhost, ID:latitude-45782-1409139630277-0:1) started

In order to use ActiveMQ with JMeter, we have to copy the activemq-all-5.9.0.jar file provided in the ActiveMQ distribution into the JMeter lib folder:

$ cp apache-activemq-5.9.0/activemq-all-5.9.0.jar apache-jmeter-2.11/lib/

We can now start jmeter and start to create our ActiveMQ test plan:

$ cd apache-jmeter-2.10/bin
$ ./jmeter.sh

In the default test plan, we add a thread group to simulate 5 JMS clients that will perform the samplers 10 times:

jmeter1

In this thread group, we add a JMS Publisher sampler that will produce a message in ActiveMQ:

jmeter2

We can note the ActiveMQ configuration:

  • the sampler uses the ActiveMQ JNDI initial context factory (org.apache.activemq.jndi.ActiveMQInitialContextFactory)
  • the Provider URL is the ActiveMQ connection URL (tcp://localhost:61616 in my case). You can use here any kind of ActiveMQ URL, for instance failover:(tcp://host1:61616,tcp://host2:61616)).
  • the connection factory is simply the default one provided by ActiveMQ: ConnectionFactory.
  • the destination is the name of the JMS queue where we want to produce the message, prefixed with dynamicQueues: dynamicQueues/MyQueue.
  • by default, ActiveMQ 5.9.0 uses the authorization plugin. So, the client has to use authentication to be able to produce a message. The default ActiveMQ username is admin, and admin is the default password.
  • finally, we set the body of the message as static using the textarea: JMeter message ...

Now, we save the plan in a file named activemq.jmx.

For a quick test, we can add a Graph Results listener to the thread group and run the plan:

jmeter3

We can check in the ActiveMQ console (pointing a browser on http://localhost:8161/admin) that we can see the queue MyQueue containing the messages sent by JMeter:

activemq1

activemq2

Our test plan is working, we have some metrics about the execution in the graph (it’s really fast on my laptop ;)).

This approach is great to easily implement performance benchmark, and creates some load on ActiveMQ (to test some tuning and configuration for instance).

It can make sense to do it in a continuous integration process. So, let’s see how we can run JMeter with Maven and integrate it in Jenkins.

Using jmeter maven plugin

We have two ways to call JMeter with Maven:

  • we can call the local JMeter instance using the exec-maven-plugin. JMeter can be called in “batch mode” (without the GUI) using the following command:
    $ apache-jmeter-2.10/bin/jmeter.sh -n -t activemq.jmx -l activemq.jtl -j activemq.jmx.log
    

    We use the options:

    • -n to disable the GUI
    • -t to specify the location of the test plan file (.jmx)
    • -l to specify the location of the test plan execution results
    • -j to specify the location of the test plan execution log
  • we have a JMeter Maven plugin. It’s the one that I will use for this blog.

The JMeter Maven plugin allows you to run a JMeter meter plan directly from Maven. It doesn’t require a local JMeter instance: the plugin will download and bootstrap a JMeter instance.

The plugin will look for JMeter JMX files in the src/test/jmeter folder by default.

We create a POM to run JMeter:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>net.nanthrax.blog</groupId>
    <artifactId>jmeter</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>

    <build>
        <plugins>
            <plugin>
                <groupId>com.lazerycode.jmeter</groupId>
                <artifactId>jmeter-maven-plugin</artifactId>
                <version>1.9.1</version>
                <executions>
                    <execution>
                        <id>jmeter-test</id>
                        <phase>verify</phase>
                        <goals>
                            <goal>jmeter</goal>
                        </goals>
                    </execution>
                </executions>
                <dependencies>
                    <dependency>
                        <groupId>org.apache.activemq</groupId>
                        <artifactId>activemq-all</artifactId>
                        <version>5.9.0</version>
                    </dependency>
                </dependencies>
            </plugin>
        </plugins>
    </build>

</project>

We can now run the JMeter test plan:

$ mvn clean verify
...
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building jmeter 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- jmeter-maven-plugin:1.9.1:jmeter (jmeter-test) @ jmeter ---
[INFO]
[INFO] -------------------------------------------------------
[INFO]  P E R F O R M A N C E    T E S T S
[INFO] -------------------------------------------------------
[INFO]
[INFO]
[info]
[debug] JMeter is called with the following command line arguments: -n -t /home/jbonofre/Workspace/jmeter/src/test/jmeter/activemq.jmx -l /home/jbonofre/Workspace/jmeter/target/jmeter/results/20140827-activemq.jtl -d /home/jbonofre/Workspace/jmeter/target/jmeter -j /home/jbonofre/Workspace/jmeter/target/jmeter/logs/activemq.jmx.log
[info] Executing test: activemq.jmx
[info] Completed Test: activemq.jmx
[INFO]
[INFO] Test Results:
[INFO]
[INFO] Tests Run: 1, Failures: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.077s
[INFO] Finished at: Wed Aug 27 14:58:09 CEST 2014
[INFO] Final Memory: 14M/303M
[INFO] ------------------------------------------------------------------------

We can see in the ActiveMQ console that the JMeter messages have been sent.

We are now ready to integrate this build in Jenkins:

jenkins1

jenkins2

We have now included the performance tests in our Jenkins CI.

I would advice to execute the performance tests on a dedicated module or profile, and configure the Jenkins job to execute once per week for instance, or link to a release.

So, we still have our development oriented nightly builds, and we can periodically execute performance tests, and execute the performance tests for a release.

Webex on Ubuntu 14.04

August 22, 2014 Posted by jbonofre

Webex is a great tool but unfortunately, it doesn’t work “out of the box” on Ubuntu 14.04 (and also with previous Ubuntu releases).

For instance, the webex applet starts but it doesn’t refresh correctly, or the share of desktop/application doesn’t work.

Actually, the issue is due to:

  • some libraries required by webex are missing on the Ubuntu installation
  • webex expects to run in i386 (not amd64) platform, so, even if you have the libraries installed, you have to install the i386 version.

To find the libraries required, you have to go in $HOME/.webex/1324 and $HOME/.webex/1424 folders and check the libraries with:

ldd *.so|grep -i not

For all missing libraries (not found), you have to find the package providing the library using:

apt-file search xxxxxx.so

Once you found the package providing the library, you have to install the package for both x64 (that should be the default if your machine is 64bits) and i386. For instance:

aptitude install libpangox-1.0-0
aptitude install libpangox-1.0-0:i386

For instance, on my laptop, I had to install:

libxtst6:i386
libxft2:i386
libpangoxft-1.0-0:i386
libpangox-1.0-0:i386
libgcj14-awt
libgcj14-awt:i386

Apache Syncope backend with Apache Karaf

August 17, 2014 Posted by jbonofre

Apache Syncope is an identity manager (IdM). It comes with a web console where you can manage users, attributes, roles, etc.
It also comes with a REST API allowing to integrate with other applications.

By default, Syncope has its own database, but it can also “façade” another backend (LDAP, ActiveDirectory, JDBC) by using ConnId.

In the next releases (4.0.0, 3.0.2, 2.4.0, and 2.3.7), Karaf provides (by default) a SyncopeLoginModule allowing you to use Syncope as backend for users and roles.

This blog introduces this new feature and explains how to configure and use it.

Installing Apache Syncope

The easiest way to start with Syncope is to use the Syncope standalone distribution. It comes with a Apache Tomcat instance already installed with the different Syncope modules.

You can download the Syncope standalone distribution archive from http://www.apache.org/dyn/closer.cgi/syncope/1.1.8/syncope-standalone-1.1.8-distribution.zip.

Uncompress the distribution in the directory of your choice:

$ unzip syncope-standalone-1.1.8-distribution.zip

You can find the ready to use Tomcat instance in directory. We can start the Tomcat:

$ cd syncope-standalone-1.1.8
$ cd apache-tomcat-7.0.54
$ bin/startup.sh

The Tomcat instance runs on the 9080 port.

You can access the Syncope console by pointing your browser on http://localhost:9080/syncope-console.

The default admin username is “admin”, and password is “password”.

The Syncope REST API is available on http://localhost:9080/syncope/cxf.

The purpose is to use Syncope as backend for Karaf users and roles (in the “karaf” default security realm).
So, first, we create the “admin” role in Syncope:

screen1

screen2

screen3

Now, we can create an user of our choice, let say “myuser” with “myuser01″ as password.

screen4

As we want “myuser” as Karaf administrator, we define the “admin” role for “myuser”.

screen5

“myuser” has been created.

screen5

screen6

Syncope is now ready to be used by Karaf (including users and roles).

Karaf SyncopeLoginModule

Karaf provides a complete security framework allowing to use JAAS in an OSGi compliant way.

Karaf itself uses a realm named “karaf”: it’s the one used by SSH, JMX, WebConsole by default.

By default, Karaf uses two login modules for the “karaf” realm:

  • the PropertiesLoginModule uses the etc/users.properties as storage for users and roles (with user password)
  • the PublickeyLoginModule uses the etc/keys.properties as storage for users and roles (with user public key)

In the coming Karaf versions (3.0.2, 2.4.0, 2.3.7), a new login module is available: the SyncopeLoginModule.

To enable the SyncopeLoginModule, we just create a blueprint descriptor that we drop into the deploy folder. The configuration of the Syncope login module is pretty simple, it just requires the address of the Syncope REST API:

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
           xmlns:jaas="http://karaf.apache.org/xmlns/jaas/v1.1.0"
           xmlns:ext="http://aries.apache.org/blueprint/xmlns/blueprint-ext/v1.0.0">

    <jaas:config name="karaf" rank="1">
        <jaas:module className="org.apache.karaf.jaas.modules.syncope.SyncopeLoginModule"
                     flags="required">
           address=http://localhost:9080/syncope/cxf
        </jaas:module>
    </jaas:config>

</blueprint>

You can see that the login module is enabled for the “karaf” realm using the jaas:realm-list command:

karaf@root()> jaas:realm-list 
Index | Realm Name | Login Module Class Name                                 
-----------------------------------------------------------------------------
1     | karaf      | org.apache.karaf.jaas.modules.syncope.SyncopeLoginModule

We can now login on SSH using “myuser” which is configured in Syncope:

~$ ssh -p 8101 myuser@localhost
The authenticity of host '[localhost]:8101 ([127.0.0.1]:8101)' can't be established.
DSA key fingerprint is b3:4a:57:0e:b8:2c:7e:e6:1c:f1:e2:88:dc:bf:f9:8c.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added '[localhost]:8101' (DSA) to the list of known hosts.
Password authentication
Password:
        __ __                  ____      
       / //_/____ __________ _/ __/      
      / ,<  / __ `/ ___/ __ `/ /_        
     / /| |/ /_/ / /  / /_/ / __/        
    /_/ |_|\__,_/_/   \__,_/_/         

  Apache Karaf (4.0.0-SNAPSHOT)

Hit '<tab>' for a list of available commands
and '[cmd] --help' for help on a specific command.
Hit 'system:shutdown' to shutdown Karaf.
Hit '<ctrl-d>' or type 'logout' to disconnect shell from current session.

myuser@root()> 

Our Karaf instance now uses Syncope for users and roles.

Karaf SyncopeBackendEngine

In addition of the login module, Karaf also ships a SyncopeBackendEngine. The purpose of the Syncope backend engine is to manipulate the users and roles back directly from Karaf. Thanks to the backend engine, you can list the users, add a new user, etc directly from Karaf.

However, for security reason and consistency, the SyncopeBackendEngine only supports the listing of users and roles defined in Syncope: the creation/deletion of an user or role directly from Karaf is disabled as those operations should be performed directly from the Syncope console.

To enable the Syncope backend engine, you have to register the backend engine as an OSGi service. Moreoever, the SyncopeBackendEngine requires two additional options on the login module: the admin.user and admin.password corresponding a Syncope admin user.

We have to update the blueprint descriptor like this:

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
           xmlns:jaas="http://karaf.apache.org/xmlns/jaas/v1.1.0"
           xmlns:ext="http://aries.apache.org/blueprint/xmlns/blueprint-ext/v1.0.0">

    <jaas:config name="karaf" rank="5">
        <jaas:module className="org.apache.karaf.jaas.modules.syncope.SyncopeLoginModule"
                     flags="required">
           address=http://localhost:9080/syncope/cxf
           admin.user=admin
           admin.password=password
        </jaas:module>
    </jaas:config>

    <service interface="org.apache.karaf.jaas.modules.BackingEngineFactory">
        <bean class="org.apache.karaf.jaas.modules.syncope.SyncopeBackingEngineFactory"/>
    </service>

</blueprint>

With the SyncopeBackendEngineFactory register as an OSGi service, for instance, we can list the users (and their roles) defined in Syncope.

To do it, we can use the jaas:user-list command:

myuser@root()> jaas:realm-list
Index | Realm Name | Login Module Class Name
-----------------------------------------------------------------------------
1     | karaf      | org.apache.karaf.jaas.modules.syncope.SyncopeLoginModule
myuser@root()> jaas:realm-manage --index 1
myuser@root()> jaas:user-list
User Name | Group | Role
------------------------------------
rossini   |       | root
rossini   |       | otherchild
verdi     |       | root
verdi     |       | child
verdi     |       | citizen
vivaldi   |       |
bellini   |       | managingDirector
puccini   |       | artDirector
myuser    |       | admin

We can see all the users and roles defined in Syncope, including our “myuser” with our “admin” role.

Using Karaf JAAS realms

In Karaf, you can create any number of JAAS realms that you want.
It means that existing applications or your own applications can directly use a realm to delegate authentication and authorization.

For instance, Apache CXF provides a JAASLoginInterceptor allowing you to add authentication by configuration. The following Spring or Blueprint snippet shows how to use the “karaf” JAAS realm:

<jaxws:endpoint address="/service">
 <jaxws:inInterceptors>
   <ref bean="authenticationInterceptor"/>
 </jaxws:inInterceptors>
</jaxws:endpoint>
 
<bean id="authenticationInterceptor" class="org.apache.cxf.interceptor.security.JAASLoginInterceptor">
   <property name="contextName" value="karaf"/>
</bean>

The same configuration can be applied for jaxrs endpoint instead of jaxws endpoint.

As Pax Web leverages and uses Jetty, you can also define your Jetty security configuration in your Web Application.
For instance, in the META-INF/spring/jetty-security.xml of your application, you can define the security contraints:

<?xml version="1.0" encoding="UTF-8"?>
<beans    xmlns="http://www.springframework.org/schema/beans"     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
  <bean id="loginService" class="org.eclipse.jetty.plus.jaas.JAASLoginService">       
    <property name="name" value="karaf" />
    <property name="loginModuleName" value="karaf" />    
 </bean>    
 <bean id="constraint" class="org.eclipse.jetty.util.security.Constraint">        
   <property name="name" value="BASIC"/>       
   <property name="roles" value="user"/>        
   <property name="authenticate" value="true"/>   
</bean>    
<bean id="constraintMapping" class="org.eclipse.jetty.security.ConstraintMapping">        
  <property name="constraint" ref="constraint"/>        
  <property name="pathSpec" value="/*"/>    
</bean>
 <bean id="securityHandler" class="org.eclipse.jetty.security.ConstraintSecurityHandler">       
   <property name="authenticator">          
     <bean class="org.eclipse.jetty.security.authentication.BasicAuthenticator"/>     
   </property>        
   <property name="constraintMappings">          
    <list>           
     <ref bean="constraintMapping"/>     
   </list>   
   </property>        
  <property name="loginService" ref="loginService" />      
  <property name="strict" value="false" />   
 </bean>
</beans>

We can link the security constraint in the web.xml:

<?xml version="1.0" encoding="UTF-8"?>
<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd">
    <display-name>example_application</display-name>
    <welcome-file-list>
        <welcome-file>index.jsp</welcome-file>
    </welcome-file-list>
    <security-constraint>
        <display-name>authenticated</display-name>
        <web-resource-collection>
            <web-resource-name>All files</web-resource-name>
            <description/>
            <url-pattern>/*</url-pattern>
        </web-resource-collection>
        <auth-constraint>
            <description/>
            <role-name>user</role-name>
        </auth-constraint>
    </security-constraint>
    <login-config>
        <auth-method>BASIC</auth-method>
        <realm-name>karaf</realm-name>
    </login-config>
    <security-role>
        <description/>
        <role-name>user</role-name>
    </security-role>
</web-app>

Thanks to that, your web application will use the “karaf” JAAS realm, which can delegates the storage of users and roles to Syncope.

Thanks to the Syncope Login Module, Karaf becomes even more flexible for the authentication and authorization of the users, as the users/roles backend doesn’t have to be embedded in Karaf itself (as for the PropertiesLoginModule): Karaf can delegates to Syncope which is able to façade a lot of different actual backends.

Hadoop CDC and processes notification with Apache Falcon, Apache ActiveMQ, and Apache Camel

March 19, 2014 Posted by jbonofre

Some weeks (months ? ;)) ago, I started to work on Apache Falcon. First of all, I would like to thanks all Falcon guys: they are really awesome and do a great job (special thanks to Srikanth, Venkatesh, Swetha).

This blog post is a preparation to a set of “recipes documentation” that I will propose in Apache Falcon.

Falcon is in incubation at Apache. The purpose is to provide a data processing and management solution for Hadoop designed for data motion, coordination of data pipelines, lifecycle management, and data discovery. Falcon enables end consumers to quickly onboard their data and its associated processing and management tasks on Hadoop clusters.

A interesting feature provided by Falcon is notifications of the activities in the Hadoop cluster “outside” of the cluster ;)
In this article, we will see how to get two kinds of notification in Camel routes “outside” of the Hadoop cluster:

  • a Camel route will be notified and triggered when a process is executed in the Hadoop cluster
  • a Camel route will be notified and triggered when a HDFS location changes (a first CDC feature)

Requirements

If you already have your Hadoop cluster, or you know to install/prepare it, you can skip this step.

In this section, I will create a “real fake” Hadoop cluster on one machine. It’s not really a pseudo-distributed as I will use multiple datanodes and tasktrackers, but all on one machine (of course, it doesn’t make sense, but it’s just for demo purpose ;)).

In addition of Hadoop common components (HDFS namenode/datanodes and M/R jobtracker/tasktracker), Falcon requires Oozie (for scheduling) and ActiveMQ.

By default, Falcon embeds ActiveMQ, but for the demo (and provide screenshots to the ActiveMQ WebConsole), I will use a standalone ActiveMQ instance.

Hadoop “fake” cluster preparation

For the demo, I will “fake” three machines.

I create a demo folder on my machine, and I uncompress hadoop-1.1.2-bin.tar.gz tarball in node1, node2, node3 folders:

$ mkdir demo
$ cd demo
$ tar zxvf ~/hadoop-1.1.2-bin.tar.gz
$ cp -r hadoop-1.1.2 node1
$ cp -r hadoop-1.1.2 node2
$ cp -r hadoop-1.1.2 node3
$ mkdir storage

I also create a storage folder where I will put the nodes’ files. This folder is just for convenience, as it’s easier to restart from scratch, just be deleting the storage folder content.

Node1 will hosts:

  • the HDFS namenode
  • a HDFS datanode
  • the M/R jobtracker
  • a M/R tasktracker

So, the node1/conf/core-site.xml file contains the location of the namenode:

<?xml version="1.0">
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node1 conf/core-site.xml -->
<configuration>
  <property>
     <name>fs.default.name</name>
     <value>hdfs://localhost</value>
  </property>
</configuration>

In the node1/conf/hdfs-site.xml file, we define the storage location for the namenode and the datanode (in the storage folder), and the default replication:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node1 conf/hdfs-site.xml -->
<configuration>
  <property>
    <name>dfs.name.dir</name>
    <value>/home/jbonofre/demo/storage/node1/namenode</value>
  </property>
  <property>
    <name>dfs.data.dir</name>
    <value>/home/jbonofre/demo/storage/node1/datanode</value>
  </property>
  <property>
    <name>dfs.replication</name>
    <value>3</value>
  </property>
</configuration>

Finally, in node1/conf/mapred-site.xml file, we define the location of the job tracker:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node1 conf/mapred-site.xml -->
<configuration>
  <property>
    <name>mapred.job.tracker</name>
    <value>localhost:8021</value>
  </property>
</configuration>

Node1 is not ready.

Node2 hosts a datanode and a tasktracker. As for node1, the node2/conf/core-site.xml file contains the location of the namenode:

<?xml version="1.0">
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node2 conf/core-site.xml -->
<configuration>
  <property>
     <name>fs.default.name</name>
     <value>hdfs://localhost</value>
  </property>
</configuration>

The node2/conf/hdfs-site.xml file contains:

  • the storage location of the datanode
  • the network location of the namenode (from node1)
  • the port numbers used by the datanode (core, IPC, and HTTP in order to be able to start multiple datanodes on the same machine)
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsd"?>
<!-- node2 conf/hdfs-site.xml -->
<configuration>
  <property>
    <name>dfs.data.dir</name>
    <value>/home/jbonofre/demo/storage/node2/datanode</value>
  </property>
  <property>
    <name>dfs.datanode.address</name>
    <value>localhost:50110</value>
  </property>
  <property>
    <name>dfs.datanode.ipc.address</name>
    <value>localhost:50120</value>
  </property>
  <property>
    <name>dfs.datanode.http.address</name>
    <value>localhost:50175</value>
  </property>
</configuration>

The node2/conf/mapred-site.xml file contains the network location of the jobtracker, and the HTTP port number used by the tasktracker (in order to be able to run multiple tasktracker on the same machine):

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node2 conf/mapred-site.xml -->
<configuration>
  <property>
    <name>mapred.job.tracker</name>
    <value>localhost:8021</value>
  </property>
  <property>
    <name>mapred.task.tracker.http.address</name>
    <value>localhost:50160</value>
  </property>
</configuration>

Node3 is very similar to node2: it hosts a datanode and a tasktracker. So the configuration is very similar to node2 (just the storage location, and the datanode and tasktracker port numbers are different).

Here’s the node3/conf/core-site.xml file:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node3 conf/core-site.xml -->
<configuration>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost</value>
  </property>
</configuration>

Here’s the node3/conf/hdfs-site.xml:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node3 conf/hdfs-site.xml -->
<configuration>
  <property>
    <name>dfs.data.dir</name>
    <value>/home/jbonofre/demo/storage/node3/datanode</value>
  </property>
  <property>
    <name>dfs.datanode.address</name>
    <value>localhost:50210</value>
  </property>
  <property>
    <name>dfs.datanode.ipc.address</name>
    <value>localhost:50220</value>
  </property>
  <property>
    <name>dfs.datanode.http.address</name>
    <value>localhost:50275</value>
  </property>
</configuration>

Here’s the node3/conf/mapred-site.xml file:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node2 conf/mapred-site.xml -->
<configuration>
  <property>
    <name>mapred.job.tracker</name>
    <value>localhost:8021</value>
  </property>
  <property>
    <name>mapred.task.tracker.http.address</name>
    <value>localhost:50260</value>
  </property>
</configuration>

Our “fake” cluster configuration is now ready.

We can format the namenode on node1:

$ cd node1/bin
$ ./hadoop namenode -format
14/03/06 17:26:38 INFO namenode.NameNode: STARTUP_MSG: 
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = vostro/127.0.0.1
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 1.1.2
STARTUP_MSG:   build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.1 -r 1440782; compiled by 'hortonfo' on Thu Jan 31 02:03:24 UTC 2013
************************************************************/
14/03/06 17:26:39 INFO util.GSet: VM type       = 64-bit
14/03/06 17:26:39 INFO util.GSet: 2% max memory = 17.78 MB
14/03/06 17:26:39 INFO util.GSet: capacity      = 2^21 = 2097152 entries
14/03/06 17:26:39 INFO util.GSet: recommended=2097152, actual=2097152
14/03/06 17:26:39 INFO namenode.FSNamesystem: fsOwner=jbonofre
14/03/06 17:26:39 INFO namenode.FSNamesystem: supergroup=supergroup
14/03/06 17:26:39 INFO namenode.FSNamesystem: isPermissionEnabled=true
14/03/06 17:26:39 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=100
14/03/06 17:26:39 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
14/03/06 17:26:39 INFO namenode.NameNode: Caching file names occuring more than 10 times 
14/03/06 17:26:40 INFO common.Storage: Image file of size 114 saved in 0 seconds.
14/03/06 17:26:40 INFO namenode.FSEditLog: closing edit log: position=4, editlog=/home/jbonofre/demo/storage/node1/namenode/current/edits
14/03/06 17:26:40 INFO namenode.FSEditLog: close success: truncate to 4, editlog=/home/jbonofre/demo/storage/node1/namenode/current/edits
14/03/06 17:26:40 INFO common.Storage: Storage directory /home/jbonofre/demo/storage/node1/namenode has been successfully formatted.
14/03/06 17:26:40 INFO namenode.NameNode: SHUTDOWN_MSG: 
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at vostro/127.0.0.1
************************************************************/

We are now ready to start the namenode on node1:

$ cd node1/bin
$ ./hadoop namenode &

We start the datanode on node1:

$ cd node1/bin
$ ./hadoop datanode &

We start the jobtracker on node1:

$ cd node1/bin
$ ./hadoop jobtracker &

We start the tasktracker on node1:

$ cd node1/bin
$ ./hadoop tasktracker &

Node1 is fully started with the namenode, a datanode, the jobtracker, and a tasktracker.

We start a datanode and a tasktracker on node2:

$ cd node2/bin
$ ./hadoop datanode &
$ ./hadoop tasktracker &

And finally, we start a datanode and a tasktracker on node3:

$ cd node3/bin
$ ./hadoop datanode &
$ ./hadoop tasktracker &

We access to the HDFS web console (http://localhost:50070) to verify that the namenode is able to see the 3 live datanodes:
hdfs1
We also access to the MapReduce web console (http://localhost:50030) to verify that the jobtracker is able to see the 3 live tasktrackers:
mapred1

Oozie

Falcon delegates scheduling of jobs (plannification, re-execution, etc) to Oozie.

Oozie is a workflow scheduler system to manage hadoop jobs, using Quartz internally.

It uses a “custom” Oozie distribution: Falcon adds some addition EL extensions on top of a “regular” Oozie.

Falcon provides a script to create the Falcon custom Oozie distribution: we provide the Hadoop and Oozie version that we need.

We can clone Falcon sources from git and call the src/bin/package.sh with the Hadoop and Oozie target versions that we want:

$ git clone https://git-wip-us.apache.org/repos/asf/incubator-falcon falcon
$ cd falcon
$ src/bin/package.sh 1.1.2 4.0.0

The package.sh script creates target/oozie-4.0.0-distro.tar.gz in the Falcon sources folder.

In the demo folder, I uncompress oozie-4.0.0-distro.tar.gz tarball:

$ cp ~/oozie-4.0.0-distro.tar.gz
$ tar zxvf oozie-4.0.0-distro.tar.gz

We now have a oozie-4.0.0-falcon folder.

Oozie requires a special configuration on the namenode (so on node1). We have to update the node1/conf/core-site.xml file to define the system user “proxied” by Oozie:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node1 conf/core-site.xml -->
<configuration>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost</value>
  </property>
  <property>
    <name>hadoop.proxyuser.jbonofre.hosts</name>
    <value>localhost</value>
  </property>
  <property>
    <name>hadoop.proxyuser.jbonofre.groups</name>
    <value>localhost</value>
  </property>
</configuration>

NB: don’t forget to restart the namenode to include these changes.

Now, we can prepare the Oozie webapplication. Due to license restriction, it’s up to you to add ExtJS library for Oozie webconsole. To enable it, first, we create a oozie-4.0.0-falcon/libext folder and put ext-2.2.zip archive:

$ cd oozie-4.0.0-falcon
$ mkdir libext
$ cd libext
$ wget "http://extjs.com/deploy/ext-2.2.zip"

We have to populate the libext folder with different additional jar files:

  • the Hadoop jar files:

    $ cp node1/hadoop-core-1.1.2.jar oozie-4.0.0-falcon/libext
    $ cp node1/hadoop-client-1.1.2.jar oozie-4.0.0-falcon/libext
    $ cp node1/hadoop-tools-1.1.2.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-beanutils-1.7.0.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-beanutils-core-1.8.0.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-codec-1.4.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-collections-3.2.1.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-configuration-1.6.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-digester-1.8.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-el-1.0.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-io-2.1.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-lang-2.4.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-logging-api-1.0.4.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-logging-1.1.1.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-math-2.1.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-net-3.1.jar oozie-4.0.0-falcon/libext
    
  • the Falcon Oozie extender:

    $ cp falcon-0.5-incubating-SNAPSHOT/oozie/libext/falcon-oozie-el-extension-0.5-incubating-SNAPSHOT.jar oozie-4.0.0-falcon/libext
    
  • jar files required for hcatalog, pig from oozie-sharelib-4.0.0-falcon.tar.gz:

    $ cd oozie-4.0.0-falcon
    $ tar zxvf oozie-sharelib-4.0.0-falcon.tar.gz
    $ cp share/lib/hcatalog/hcatalog-core-0.5.0-incubating.jar libext
    $ cp share/lib/hcatalog/hive-* libext
    $ cp share/lib/pig/hsqldb-1.8.0.7.jar libext
    $ cp share/lib/pig/jackson-* libext
    $ cp share/lib/hcatalog/libfb303-0.7.0.jar libext
    $ cp share/lib/hive/log4j-1.2.16.jar libext
    $ cp libtools/oro-2.0.8.jar libext
    $ cp share/lib/hcatalog/webhcat-java-client-0.5.0-incubating.jar libext
    $ cp share/lib/pig/xmlenc-0.52.jar libext
    $ cp share/lib/pig/guava-11.0.2.jar libext
    $ cp share/lib/hcatalog/oozie-* libext
    

We are now ready to setup Oozie.

First, we “assemble” the oozie webapplication (war):

$ cd oozie-4.0.0-falcon/bin
$ ./oozie-setup.sh prepare-war
  setting CATALINA_OPTS="$CATALINA_OPTS -Xmx1024m"

INFO: Adding extension: /home/jbonofre/demo/oozie-4.0.0-falcon/libext/ant-1.6.5.jar
...

New Oozie WAR file with added 'ExtJS library, JARs' at /home/jbonofre/demo/oozie-4.0.0-falcon/oozie-server/webapps/oozie.war


INFO: Oozie is ready to be started

Now, we “upload” the oozie shared libraries on our HDFS, including the falcon shared lib:

$ cd oozie-4.0.0/bin
$ ./oozie-setup.sh sharelib create -fs hdfs://localhost -locallib ../oozie-sharelib-4.0.0-falcon.tar.gz
  setting CATALINA_OPTS="$CATALINA_OPTS -Xmx1024m"
the destination path for sharelib is: /user/jbonofre/share/lib

If we browse the HDFS, we can see the folders created by Oozie.
hdfs2
Finally, we create the Oozie database (where it stores the jobs definition, etc).

$ cd oozie-4.0.0-falcon/bin
$ ./oozie-setup.sh db create -run
  setting CATALINA_OPTS="$CATALINA_OPTS -Xmx1024m"

Validate DB Connection
DONE
Check DB schema does not exist
DONE
Check OOZIE_SYS table does not exist
DONE
Create SQL schema
DONE
Create OOZIE_SYS table
DONE

Oozie DB has been created for Oozie version '4.0.0'


The SQL commands have been written to: /tmp/ooziedb-4527318150729236810.sql

The Oozie configuration is done, we start it:

$ cd oozie-4.0.0-falcon/bin
$ ./oozied.sh start

Setting OOZIE_HOME:          /home/jbonofre/demo/oozie-4.0.0-falcon
Setting OOZIE_CONFIG:        /home/jbonofre/demo/oozie-4.0.0-falcon/conf
Sourcing:                    /home/jbonofre/demo/oozie-4.0.0-falcon/conf/oozie-env.sh
  setting CATALINA_OPTS="$CATALINA_OPTS -Xmx1024m"
Setting OOZIE_CONFIG_FILE:   oozie-site.xml
Setting OOZIE_DATA:          /home/jbonofre/demo/oozie-4.0.0-falcon/data
Setting OOZIE_LOG:           /home/jbonofre/demo/oozie-4.0.0-falcon/logs
Setting OOZIE_LOG4J_FILE:    oozie-log4j.properties
Setting OOZIE_LOG4J_RELOAD:  10
Setting OOZIE_HTTP_HOSTNAME: vostro.nanthrax.net
Setting OOZIE_HTTP_PORT:     11000
Setting OOZIE_ADMIN_PORT:     11001
Setting OOZIE_HTTPS_PORT:     11443
Setting OOZIE_BASE_URL:      http://vostro.nanthrax.net:11000/oozie
Setting CATALINA_BASE:       /home/jbonofre/demo/oozie-4.0.0-falcon/oozie-server
Setting OOZIE_HTTPS_KEYSTORE_FILE:     /home/jbonofre/.keystore
Setting OOZIE_HTTPS_KEYSTORE_PASS:     password
Setting CATALINA_OUT:        /home/jbonofre/demo/oozie-4.0.0-falcon/logs/catalina.out
Setting CATALINA_PID:        /home/jbonofre/demo/oozie-4.0.0-falcon/oozie-server/temp/oozie.pid

Using   CATALINA_OPTS:        -Xmx1024m -Dderby.stream.error.file=/home/jbonofre/demo/oozie-4.0.0-falcon/logs/derby.log
Adding to CATALINA_OPTS:     -Doozie.home.dir=/home/jbonofre/demo/oozie-4.0.0-falcon -Doozie.config.dir=/home/jbonofre/demo/oozie-4.0.0-falcon/conf -Doozie.log.dir=/home/jbonofre/demo/oozie-4.0.0-falcon/logs -Doozie.data.dir=/home/jbonofre/demo/oozie-4.0.0-falcon/data -Doozie.config.file=oozie-site.xml -Doozie.log4j.file=oozie-log4j.properties -Doozie.log4j.reload=10 -Doozie.http.hostname=vostro.nanthrax.net -Doozie.admin.port=11001 -Doozie.http.port=11000 -Doozie.https.port=11443 -Doozie.base.url=http://vostro.nanthrax.net:11000/oozie -Doozie.https.keystore.file=/home/jbonofre/.keystore -Doozie.https.keystore.pass=password -Djava.library.path=

Using CATALINA_BASE:   /home/jbonofre/demo/oozie-4.0.0-falcon/oozie-server
Using CATALINA_HOME:   /home/jbonofre/demo/oozie-4.0.0-falcon/oozie-server
Using CATALINA_TMPDIR: /home/jbonofre/demo/oozie-4.0.0-falcon/oozie-server/temp
Using JRE_HOME:        /opt/jdk/1.7.0_51
Using CLASSPATH:       /home/jbonofre/demo/oozie-4.0.0-falcon/oozie-server/bin/bootstrap.jar
Using CATALINA_PID:    /home/jbonofre/demo/oozie-4.0.0-falcon/oozie-server/temp/oozie.pid

We access to the Oozie webconsole on http://localhost:11000/oozie/:
oozie1

ActiveMQ

By default, Falcon embeds ActiveMQ, so generally speaking, you don’t have to install ActiveMQ. However, for the demo, I would like to show how to use a external and standalone ActiveMQ.

I uncompress the apache-activemq-5.7.0-bin.tar.gz tarball in the demo folder:

$ cd demo
$ tar zxvf ~/apache-activemq-5.7.0-bin.tar.gz

The default ActiveMQ configuration is fine, we can just start the broker on the default port (61616):

$ cd demo/apache-activemq-5.7.0/bin
$ ./activemq console

All the Falcon pre-requirements are done.

Falcon installation

Falcon can be deployed:

  • standalone: it’s the “regular” deployment mode when you have only one hadoop cluster. It’s the deployment mode that I will use for this CDC demo.
  • distributed: it’s the deployment to use when you have multiple hadoop clusters, especially if you want to use the Falcon replication feature.

For the installation, we uncompress the falcon-0.5-incubating-SNAPSHOT-bin.tar.gz tarball in the demo folder:

$ cd demo
$ tar zxvf ~/falcon-0.5-incubating-SNAPSHOT-bin.tar.gz

Before starting Falcon, we disable the default embedded ActiveMQ broker in the conf/falcon-env.sh file:

# conf/falcon-env.sh
...
export FALCON_OPTS="-Dfalcon.embeddedmq=false"
...

We start the falcon server:

$ cd falcon-0.5-incubating-SNAPSHOT/bin
$ ./falcon-start 
Could not find installed hadoop and HADOOP_HOME is not set.
Using the default jars bundled in /home/jbonofre/demo/falcon-0.5-incubating-SNAPSHOT/hadooplibs/
/home/jbonofre/demo/falcon-0.5-incubating-SNAPSHOT/bin
falcon started using hadoop version:  Hadoop 1.1.2

The falcon server starts actually a Jetty container with jersey to expose the Falcon REST API.

You can check if the falcon server started correctly using bin/falcon-status or bin/falcon:

$ bin/falcon-status
Falcon server is running (on http://localhost:15000/)
$ bin/falcon admin -status
Falcon server is running (on http://localhost:15000/)
$ bin/falcon admin -version
Falcon server build version: {"properties":[{"key":"Version","value":"0.5-incubating-SNAPSHOT-r5445e109bc7fbfea9295f3411a994485b65d1477"},{"key":"Mode","value":"embedded"}]}

Falcon usage: the entities

In Falcon, the configuration is defined by “entity”. Falcon supports three types of entity:

  • cluster entity defines the hadoop cluster (location of the namenode, location of the jobtracker), related falcon module (Oozie, ActiveMQ), and the location of the Falcon working directories (on HDFS)
  • feed entity defines a location on HDFS
  • process entity defines a hadoop job scheduled by Oozie

An entity is described using XML. You can do different actions on an entity:

  • Submit: register an entity in Falcon. Submitted entity are not scheduled, meaning it would simply be in the configuration store of Falcon.
  • List: provide the list of all entities registered in the configuration store of Falcon.
  • Dependency: provide the dependency of an entity. For example, a feed would show process that are dependent on the feed and the clusters that it depends on.
  • Schedule: feeds or processes that are already submitted and present in the configuration store can be scheduled. Upon schedule, Falcon system wraps the required repeatable action as a bundle of oozie coordinators and executes them on the Oozie scheduler.
  • Suspend: this action is applicable only on scheduled entity. This triggers suspend on the oozie bundle that was scheduled earlier through the schedule function. No further instances are executed on a suspended process/feed.
  • Resume: put a suspended process/feed back to active, which in turn resumes applicable oozie bundle.
  • Status: to display the current status of an entity.
  • Definition: dump the entity definition from the configuration store.
  • Delete: remote an entity from the Falcon configuration store.
  • Update: update operation allows an already submitted/scheduled entity to be updated. Cluster update is currently not allowed. Feed update can cause cascading update to all the processes already scheduled. The following set of actions are performed in Oozie to realize an update.
    • Suspend the previously scheduled Oozie coordinator. This is prevent any new action from being triggered.
    • Update the coordinator to set the end time to “now”
    • Resume the suspended coordinators
    • Schedule as per the new process/feed definition with the start time as “now”

Cluster

The cluster entity defines the configuration of the hadoop cluster and components used by Falcon.

We will store the entity descriptors in the entity folder:

$ mkdir entity

For the cluster, we create entity/local.xml file:

<?xml version="1.0" encoding="UTF-8"?>
<cluster colo="local" description="Local cluster" name="local" xmlns="uri:falcon:cluster:0.1">
  <interfaces>
    <interface type="readonly" endpoint="hftp://localhost:50010" version="1.1.2"/>
    <interface type="write" endpoint="hdfs://localhost:8020" version="1.1.2"/>
    <interface type="execute" endpoint="localhost:8021" version="1.1.2"/>
    <interface type="workflow" endpoint="http://localhost:11000/oozie/" version="4.0.0"/>
    <interface type="messaging" endpoint="tcp://localhost:61616" version="5.7.0"/>
  </interfaces>
  <locations>
    <location name="staging" path="/falcon/staging"/>
    <location name="temp" path="/falcon/temp"/>
    <location name="working" path="/falcon/working"/>
  </locations>
  <properties></properties>
</cluster>

A cluster contains different interfaces and locations used by Falcon. A cluster is referenced by feeds and processes entities (using the cluster name). A cluster can’t be scheduled (it doesn’t make sense).

The colo specifies a kind of cluster grouping. It’s used in distributed deployment mode, so not useful in our demo (as we have only one cluster).
The readonly interface specifies the Hadoop’s HFTP protocol, only used in the case of feed replication between clusters (again, not use in our demo).
The write interface specifies the write access to hdfs, containing the fs.default.name value. Falcon uses this interface to write system data to hdfs and feeds referencing this cluster are written to hdfs using this interface.
The execute interface specifies the location of the jobtracker, containing the mapred.job.tracker value. Falcon uses this interface to submit the processes as jobs in the jobtracker defined here.
The workflow interface specifies the interface for worklow engine (the Oorie URL). Falcon uses this interface to schedule the processes referencing this cluster on workflow engine defined here.
Optionally, you can have a registry interface (defininng thrift URL) to specify the metadata catalog, such as Hive Metastore (or HCatalog). We don’t use it in our demo.
The messaging interface specifies the interface for sending feed availability messages. It’s the URL of the ActiveMQ broker.

A cluster has a list of locations with a name (working, temp, staging) and a path on HDFS. Falcon would use the location to do intermediate processing of entities in hdfs and hence Falcon should have read/write/execute permission on these locations.

Optionally, a cluster may have a list of properties. It’s a list of key-value pairs used in Falcon and propagated to the workflow engine. For instance, you can specify the JMS broker connection factory:

<property name="brokerImplClass" value="org.apache.activemq.ActiveMQConnectionFactory" />

Now, that we have the XML description, we can register our cluster in Falcon. We use the Falcon client commandline to do submit our cluster definition:

$ cd falcon-0.5-incubating-SNAPSHOT
$ bin/falcon entity -submit -type cluster -file ~/demo/local.xml
default/Submit successful (cluster) local

We can check that our local cluster is actually present in the Falcon configuration store:

$ bin/falcon entity -list -type cluster
(cluster) local(null)

We can see our cluster “local”, for now without any dependency (null).

If we take a look on hdfs, we can see that the falcon directory has been created:

$ cd node1
$ bin/hadoop fs -ls /
Found 3 items
drwxr-xr-x   - jbonofre supergroup          0 2014-03-08 07:48 /falcon
drwxr-xr-x   - jbonofre supergroup          0 2014-03-06 17:32 /tmp
drwxr-xr-x   - jbonofre supergroup          0 2014-03-06 18:05 /user

Feed

A feed entity is a location on the cluster. It also defines additional attributes like frequency, late-arrival handling, and retention policies. A feed can be scheduled, meaning that Falcon will create processes to deal with retention and replication on the cluster.

As other entity, a feed is described using a XML. We create entity/output.xml file:

<?xml version="1.0" encoding="UTF-8"?>
<feed description="RandomProcess output feed" name="output" xmlns="uri:falcon:feed:0.1">
  <group>output</group>
 
  <frequency>minutes(1)</frequency>
  <timezone>UTC</timezone>
  <late-arrival cut-off="minutes(5)"/>

  <clusters>
    <cluster name="local">
       <validity start="2012-07-20T03:00Z" end="2099-07-16T00:00Z"/>
       <retention limit="hours(10)" action="delete"/>
    </cluster>
  </clusters>

  <locations>
    <location type="data" path="/data/output"/>
  </locations>

  <ACL owner="jbonofre" group="supergroup" permission="0x644"/>

  <schema location="none" provider="none"/>
</feed>

The locations element define the feed storage. It’s paths on HDFS or table names for Hive. A location is define on a cluster, identified by name. In our example, we use the “local” cluster that we submitted before.

The group element defines a list of comma separated groups. A group is a logical grouping of feeds. A group is said available if all the feeds belonging to a group are available. The frequency of all the feeds which belong to the same group must be same.

The frequency element specifies the frequency by which this feed is generated (for instance, it can generated every hour, every 5 minutes, daily, weekly, etc). Falcon uses this frequency to check if the feed has changed or not (the size has changed). In our example, we define a frequency of every minute. Falcon creates a job in Oozie to monitor the feed.
Falcon system can handle late arrival of input data and appropriately re-trigger processing for the affected instance. From the perspective of late handling, there are two main configuration parameters late-arrival cut-off and late-inputs section in feed and process entity definition that are central. These configurations govern how and when the late processing happens. In the current implementation (oozie based) the late handling is very simple and basic. The falcon system looks at all dependent input feeds for a process and computes the max late cut-off period. Then it uses a scheduled messaging framework, like the one available in Apache ActiveMQ to schedule a message with a cut-off period, then after a cut-off period the message is dequeued and Falcon checks for changes in the feed data which is recorded in HDFS in late data file by Falcons “record-size” action, if it detects any changes then the workflow will be rerun with the new set of feed data.

The retention element specifies how long the feed is retained on the cluster and the action to be taken on the feed after the expiration of the retention period. In our example, we delete the feed after a retention of 10 days.

The validity of a feed on cluster specifies duration for which this feed is valid on this cluster (considered for scheduling by Falcon).

The ACL defines the permission on the feed (owner/group/permission).

The schema allows you to specific the “format” of the feed (for instance csv). In our case, we don’t define any schema.

We can now submit the feed (register the feed) into Falcon:

$ cd falcon-0.5-incubating-SNAPSHOT
$ bin/falcon entity -submit -type feed -file ~/demo/entity/output.xml
default/Submit successful (feed) output

Process

A process entity defines a job in the cluster.

Like other entity, a process is described with XML (entity/process.xml):

<?xml version="1.0" encoding="UTF-8"?>
<process name="my-process" xmlns="uri:falcon:process:0.1">
    <clusters>
        <cluster name="local">
            <validity start="2013-11-15T00:05Z" end="2030-11-15T01:05Z"/>
        </cluster>
    </clusters>

    <parallel>1</parallel>
    <order>FIFO</order>
    <frequency>minutes(5)</frequency>
    <timezone>UTC</timezone>

    <inputs>
        <!-- In the workflow, the input paths will be available in a variable 'inpaths' -->
        <input name="inpaths" feed="input" start="now(0,-5)" end="now(0,-1)"/>
    </inputs>

    <outputs>
        <!-- In the workflow, the output path will be available in a variable 'outpath' -->
        <output name="outpath" feed="output" instance="now(0,0)"/>
    </outputs>

    <properties>
        <!-- In the workflow, these properties will be available with variable - key -->
        <property name="queueName" value="default"/>
        <!-- The schedule time available as a property in workflow -->
        <property name="time" value="${instanceTime()}"/>
    </properties>

    <workflow engine="oozie" path="/app/mr"/>

    <late-process policy="periodic" delay="minutes(1)">
       <late-input input="inpaths" workflow-path="/app/mr"/>
    </late-process>

</process>

The cluster element defines where the process will be executed. Each cluster has a validity period, telling the times between which the job should run on the cluster. For the demo, we set a large validity period.

The parallel element defines how many instances of the process can run concurrently. We set a value of 1 here to ensure that only one instance of the process can run at a time.

The order element defines the order in which the ready instances are picked up. The possible values are FIFO(First In First Out), LIFO(Last In First Out), and ONLYLAST(Last Only). It’s not really used in our case.

The frequency element defines how frequently the process should run. In our case, minutes(5) means that the job will run every 5 minutes.

The inputs element defines the input data for the process. The process job will start executing only after the schedule time and when all the inputs are available. There can be 0 or more inputs and each of the input maps to a feed. The path and frequency of input data is picked up from feed definition. Each input should also define start and end instances in terms of EL expressions and can optionally specify specific partition of input that the process requires. The components in partition should be subset of partitions defined in the feed.
For each input, Falcon will create a property with the input name that contains the comma separated list of input paths. This property can be used in process actions like pig scripts and so on.

The outputs element defines the output data that is generated by the process. A process can define 0 or more outputs. Each output is mapped to a feed and the output path is picked up from feed definition. The output instance that should be generated is specified in terms of EL expression.
For each output, Falcon creates a property with output name that contains the path of output data. This can be used in workflows to store in the path.

The properties element contains key value pairs that are passed to the process. These properties are optional and can be used to parameterize the process.

The workflow element defines the workflow engine that should be used and the path to the workflow on hdfs. The workflow definition on hdfs contains the actual job that should run and it should confirm to the workflow specification of the engine specified. The libraries required by the workflow should be in lib folder inside the workflow path.
The properties defined in the cluster and cluster properties(nameNode and jobTracker) will also be available for the workflow.
Currently, Falcon supports three workflow engines:

  • oozie enables users to provide a Oozie workflow definition (in XML).
  • pig enables users to embed a Pig script as a process
  • hive enables users to embed a Hive script as a process. This would enable users to create materialized queries in a declarative way.

NB: I proposed to support a new type of workflow: MapReduce, to be able to directly execute MapReduce job.

In this demo, we use the oozie workflow engine.

We create a Oozie workflow.xml:

<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.2" name="map-reduce-wf">
    <start to="mr-node"/>
    <action name="mr-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${outpath}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.hadoop.mapred.lib.IdentityMapper</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.hadoop.mapred.lib.IdentityReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${inpaths}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outpath}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

This workflow is very simple: it uses IdentityMapper and IdentityReducer (provided in Hadoop core) to copy input data as output data.

We upload this workflow.xml on HDFS (in the location specified in the Falcon process workflow element):

$ cd node1
$ bin/hadoop fs -mkdir /app/mr
$ bin/hadoop fs -put ~/demo/workflow.xml /app/mr

The late-process allows the process to react with the input feed changes and trigger an action (here, we re-execute the oozie workflow).

We are now ready to submit the process in Falcon:

$ cd falcon-*
$ bin/falcon entity -submit -type process -file ~/entity/process.xml

The process is ready to be scheduled.

Before scheduling the process, we create the input data. The input data is a simple file (containing a string) that we upload to HDFS:

$ cat > file1
This is a test file
$ node1/bin/hadoop fs -mkdir /data/input
$ node1/bin/hadoop fs -put file1 /data/input

We can now trigger the process:

$ cd falcon*
$ bin/falcon entity -schedule -type process -name my-process
default/my-process(process) scheduled successfully

We can see the different jobs in Oozie (accessing http://localhost:11000/oozie):
oozie_bj
oozie_cj
oozie_wj

On the other hand, we see new topics and queues created in ActiveMQ:
mq_queues
amq_topics

Especially, in ActiveMQ, we have two topics:

  • Falcon publishes messages in the FALCON.my-process topic for each execution of the process
  • Falcon publishes messages in the FALCON.ENTITY.TOPIC topic for each change on the feeds

It’s where our Camel routes subscribe.

Camel routes in Karaf

Now that we have our Falcon platform ready, we just have to create Camel routes (hosted in Karaf container), subscribing on the Falcon topics in ActiveMQ.

We uncompress a Karaf container, and install the Camel features (camel-spring, activemq-camel):

$ tar zxvf apache-karaf-2.3.1.tar.gz
$ cd apache-karaf-2.3.1
$ bin/karaf
karaf@root> features:chooseurl camel
adding feature url mvn:org.apache.camel.karaf/apache-camel/LATEST/xml/features
karaf@root> features:install camel-spring
karaf@root> features:chooseurl activemq
karaf@root> features:install activemq-camel

We create a falcon-route.xml route file containing the Camel routes (using Spring DSL):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
         http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

  <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <route id="process-listener">
       <from uri="jms:topic:FALCON.my-process"/>
       <to uri="log:process-listener"/>
    </route>
    <route id="feed-listener">
       <from uri="jms:topic:FALCON.ENTITY.TOPIC"/>
       <to uri="log:feed-listener"/>
    </route>
  </camelContext>

  <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
      </bean>
    </property>
  </bean>

</beans>

In the Camel context, we create two routes, both connecting on the ActiveMQ broker, and listening on the two topics.

We drop the falcon-routes.xml in the deploy folder, and we can see it active:

karaf@root> la|grep -i falcon
[ 114] [Active     ] [            ] [Started] [   80] falcon-routes.xml (0.0.0)
karaf@root> camel:route-list 
 Context        Route              Status   
 -------        -----              ------   
 camel          feed-listener      Started  
 camel          process-listener   Started 

The routes subscribed on the topics and just send to the log (it’s very very simple).

So, we just have to take a look on the log (log:tail):

2014-03-19 11:25:43,273 | INFO  | LCON.my-process] | process-listener                 | rg.apache.camel.util.CamelLogger  176 | 74 - org.apache.camel.camel-core - 2.13.0.SNAPSHOT | Exchange[ExchangePattern: InOnly, BodyType: java.util.HashMap, Body: {brokerUrl=tcp://localhost:61616, timeStamp=2014-03-19T10:24Z, status=SUCCEEDED, logFile=hdfs://localhost:8020/falcon/staging/falcon/workflows/process/my-process/logs/instancePaths-2013-11-15-06-05.csv, feedNames=output, runId=0, entityType=process, nominalTime=2013-11-15T06:05Z, brokerTTL=4320, workflowUser=null, entityName=my-process, feedInstancePaths=hdfs://localhost:8020/data/output, operation=GENERATE, logDir=null, workflowId=0000026-140319105443372-oozie-jbon-W, cluster=local, brokerImplClass=org.apache.activemq.ActiveMQConnectionFactory, topicName=FALCON.my-process}]
2014-03-19 11:25:43,693 | INFO  | ON.ENTITY.TOPIC] | feed-listener                    | rg.apache.camel.util.CamelLogger  176 | 74 - org.apache.camel.camel-core - 2.13.0.SNAPSHOT | Exchange[ExchangePattern: InOnly, BodyType: java.util.HashMap, Body: {brokerUrl=tcp://localhost:61616, timeStamp=2014-03-19T10:24Z, status=SUCCEEDED, logFile=hdfs://localhost:8020/falcon/staging/falcon/workflows/process/my-process/logs/instancePaths-2013-11-15-06-05.csv, feedNames=output, runId=0, entityType=process, nominalTime=2013-11-15T06:05Z, brokerTTL=4320, workflowUser=jbonofre, entityName=my-process, feedInstancePaths=hdfs://localhost:8020/data/output, operation=GENERATE, logDir=hdfs://localhost:8020/falcon/staging/falcon/workflows/process/my-process/logs/job-2013-11-15-06-05/, workflowId=0000026-140319105443372-oozie-jbon-W, cluster=local, brokerImplClass=org.apache.activemq.ActiveMQConnectionFactory, topicName=FALCON.ENTITY.TOPIC}]

And we can see our notifications:

  • on the process-listener logger, we can see that my-process (entityName) has been executed with SUCCEEDED (status) at 2014-03-19T10:24Z (timeStamp). We also have the location of the job execution log on HDFS.
  • on the feed-listener logger, we can see quite the same messages. This message comes from the late-arrival, so it means that the input field changed.

For sure, the Camel routes are very simple now (just a log), but there is no limit: you bring all the powerful from ESB and BigData all together.
Once the Camel routes get the messages on ActiveMQ coming from Falcon, you can implement the integration process of your choice (sending e-mails, using Camel EIPs, calling beans, etc).

What’s next ?

I’m working on different enhancements on the late-arrival/CDC feature:

  1. The late-arrival messages in the FALCON.ENTITY.TOPIC should be improved: the message should contain a message with the feed changed, the location of the feed, eventually the size gap.
  2. We should provide a more straight forward CDC feature which doesn’t require a process to monitor a feed. Just scheduling a feed should be enough with the late cut-off.
  3. In addition of the oozie, pig, and hive workflow engine, we should provide a “pure” MapReduce jar workflow engine.
  4. The package.sh should be improved to provide a more “ready” to use Falcon Oozie custom distribution.

I’m working on this different enhancements and improvements.

On the other hand, I will propose a set of documentation improvements, especially some kind of “recipe documentation” like this one.

Stay tuned, I’m preparing a new blog about Falcon, this time about the replication between two Hadoop clusters.

Apache Karaf, Cellar, Camel, ActiveMQ monitoring with ELK (ElasticSearch, Logstash, and Kibana)

March 17, 2014 Posted by jbonofre

Apache Karaf, Cellar, Camel, and ActiveMQ provides a lot of information via JMX.

More over, another very useful source of information is in the log files.

If these two sources are very interesting, for a “real life” monitoring, we need some additional features:

  • The JMX information and log messages should be stored in order to be requested later and history. For instance, using jconsole, you can request all the JMX attributes to get the number, but these numbers have to be store somewhere. It’s quite the same for the log. Most of the time, you define a log file rotation, or you periodically cleanup the logs. So the log messages should be store as well to be requested later.
  • Numbers are good, graphics are even better. Once the JMX “numbers” are stored somewhere, a good feature is to use these numbers to create some charts. And also, we can define some kind of SLA: at some point, if a number is not “acceptable” for instance greater than a “watermark” value), we should raise a alert.
  • For high availability and scalability, most of production systems use multiple Karaf instances (synchronize with Cellar for instance). It means that the log files are spread on different machines. In that case, it’s really helpful to “centralize” the log messages.

Of course, there are already open source solutions (zabbix, nagios, etc) or commercial solutions (dynatrace, etc) to cover these needs.

In this blog, I just introduce a possible solution leveraging “big data” tools: we will see how to use the ELK (Elasticsearch, Logstash, and Kibana) solution.

Toplogy

For this example, let say we have to following architecture:

  • node1 is a machine hosting a Karaf container with a set of Camel routes.
  • node2 is a machine hosting a Karaf container with another set of Camel routes.
  • node3 is a machine hosting a ActiveMQ broker (used by the Camel routes from node1 and node2).
  • monitor is a machine hosting the monitoring platform.

Local to node1, node2, and node3, we install and configure logstash with both file and JMX input plugins. This logstash will get the log messages and pool JMX MBeans attributes, and send to a “central” Redis server (using the redis output plugin).

On monitor, we install:

  • redis server to receive the messages and events coming from logstash installed on node1, node2, and node3
  • elasticsearch to store the messages and events
  • a first logstash acting as an indexer to take the messages/events from redis and store into elasticsearch (including the update of indexes, etc)
  • a second logstash providing the kibana web console

Redis and Elasticsearch

Redis

Redis is a key-value store. But it also may acts as a broker to receive the messages/events from the different logstash (node1, node2, and node3).

For the demo, I use Redis 2.8.7 (that you can download from http://download.redis.io/releases/redis-2.8.7.tar.gz.

We uncompress the redis tarball in the /opt/monitor folder:

cp redis-2.8.7.tar.gz /opt/monitor
tar zxvf redis-2.8.7.tar.gz

Now, we have to compile Redis server on the machine. To do so, we have to execute make in the Redis src folder:

cd redis-2.8.7/src
make

NB: this step requires make and gcc installed on the machine.

make created a redis-server binary in the src folder. It’s the binary that we use to start Redis:

./redis-server --loglevel verbose
[12130] 16 Mar 21:04:28.387 # Unable to set the max number of files limit to 10032 (Operation not permitted), setting the max clients configuration to 3984.
                _._                                                  
           _.-``__ ''-._                                             
      _.-``    `.  `_.  ''-._           Redis 2.8.7 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._                                   
 (    '      ,       .-`  | `,    )     Running in stand alone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 12130
  `-._    `-._  `-./  _.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |           http://redis.io        
  `-._    `-._`-.__.-'_.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |                                  
  `-._    `-._`-.__.-'_.-'    _.-'                                   
      `-._    `-.__.-'    _.-'                                       
          `-._        _.-'                                           
              `-.__.-'                                               

[12130] 16 Mar 21:04:28.388 # Server started, Redis version 2.8.7
[12130] 16 Mar 21:04:28.388 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
[12130] 16 Mar 21:04:28.388 * The server is now ready to accept connections on port 6379
[12130] 16 Mar 21:04:28.389 - 0 clients connected (0 slaves), 443376 bytes in use

The redis server is now ready to accept connections coming from the “remote” logstash.

Elasticsearch

We use elasticsearch as storage backend for all messages and events. For this demo, I use elasticsearch 1.0.1, that you can download from https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.0.1.tar.gz.

We uncompress the elasticsearch tarball in the /opt/monitor folder:

cp elasticsearch-1.0.1.tar.gz /opt/monitore
tar zxvf elasticsearch-1.0.1.tar.gz

We start elasticsearch with the bin/elasticsearch binary (the default configuration is OK):

cd elasticsearch-1.0.1
bin/elasticsearch
[2014-03-16 21:16:13,783][INFO ][node                     ] [Solarr] version[1.0.1], pid[12466], build[5c03844/2014-02-25T15:52:53Z]
[2014-03-16 21:16:13,783][INFO ][node                     ] [Solarr] initializing ...
[2014-03-16 21:16:13,786][INFO ][plugins                  ] [Solarr] loaded [], sites []
[2014-03-16 21:16:15,763][INFO ][node                     ] [Solarr] initialized
[2014-03-16 21:16:15,764][INFO ][node                     ] [Solarr] starting ...
[2014-03-16 21:16:15,902][INFO ][transport                ] [Solarr] bound_address {inet[/0:0:0:0:0:0:0:0:9300]}, publish_address {inet[/192.168.134.11:9300]}
[2014-03-16 21:16:18,990][INFO ][cluster.service          ] [Solarr] new_master [Solarr][V9GO0DiaT4SFmRmxgwYv0A][vostro][inet[/192.168.134.11:9300]], reason: zen-disco-join (elected_as_master)
[2014-03-16 21:16:19,010][INFO ][discovery                ] [Solarr] elasticsearch/V9GO0DiaT4SFmRmxgwYv0A
[2014-03-16 21:16:19,021][INFO ][http                     ] [Solarr] bound_address {inet[/0:0:0:0:0:0:0:0:9200]}, publish_address {inet[/192.168.134.11:9200]}
[2014-03-16 21:16:19,072][INFO ][gateway                  ] [Solarr] recovered [0] indices into cluster_state
[2014-03-16 21:16:19,072][INFO ][node                     ] [Solarr] started

Logstash

Logstash is a tool for managing events and logs.

It works with a chain of input, filter, output.

On node1, node2, and node3, we will setup logstash with:

  • a file input plugin to read the log files
  • a jmx input plugin to read the different MBeans attributes
  • a redis output to send the messages and events to the monitor machine.

For this blog, I use logstash 1.4 SNAPSHOT with a contrib that I did. You can find my modified plugin on my github: https://github.com/jbonofre/logstash-contrib.

The first thing is to checkout latest logstach codebase and build it:

git clone https://github.com/elasticsearch/logstash/
cd logstash
make tarball

It will create the logstash distribution tarball in the build folder.

We can install logstash in a folder (for instance /opt/monitor/logstash):

mkdir /opt/monitor
cp build/logstash-1.4.0.rc1.tar.gz /opt/monitor
cd /opt/monitor
tar zxvf logstash-1.4.0.rc1.tar.gz
rm logstash-1.4.0.rc1.tar.gz

JMX is not a “standard” logstash plugin. It’s a plugin from logstash-contrib project. As I modified the logstash JMX plugin (to work “smoothly” with Karaf MBeanServer), waiting that my pull request will be integrated in logstash-contrib (I hope ;)), you have to clone my github fork:

git clone https://github.com/jbonofre/logstash-contrib/
cd logstash-contrib
make tarball

We can add the contrib plugins into our logstash installation (in /opt/monitor/logstash-1.4.0.rc1 folder):

cd build
tar zxvf logstash-contrib-1.4.0.beta2.tar.gz
cd logstash-contrib-1.4.0.beta2
cp -r * /opt/monitor/logstash-1.4.0.rc1

Our logstash installation is now ready, including the logstash-contrib plugins.

It means that on node1, node2, node3 and monitor, you should have the /opt/monitor/logstash-1.4.0.rc1 folder with the installation (you can use scp or rsync to install logstash on the machines).

Indexer

On monitor machine, we have a logstash instance acting as an indexer: it gets the messages from redis and stores in elasticsearch.

We create the /opt/monitor/logstash-1.4.0.rc1/conf/indexer.conf file containing:

input {
  redis {
    host => "localhost"
    data_type => "list"
    key => "logstash"
    codec => json
  }
}
output {
  elasticsearch {
    host => "localhost"
  }
}

We can start logstash using this configuration file:

cd /opt/monitor/logstash-1.4.0.rc1
bin/logstash -f conf/indexer.conf

Collector

On node1, node2, and node3, logstash will act as a collector:

  • the file input plugin will read the messages from the log files (you can configure multiple log files)
  • the jmx input plugin will periodically pool MBean attributes

Both will send messages to the redis server using the redis output plugin.

We create a folder /opt/monitor/logstash-1.4.0.rc1/conf. It’s where we store the logstash configuration. In this folder, we create a collector.conf file.

For node1 and node2 (both hosting a karaf container with camel routes), the collector.conf file contains:

input {
  file {
    type => "log"
    path => ["/opt/karaf/data/log/*.log"]
  }
  jmx {
    path => "/opt/monitor/logstash-1.4.0.rc1/conf/jmx"
    polling_frequency => 10
    type => "jmx"
    nb_thread => 4
  }
}
output {
  redis {
    host => "monitor"
    data_type => "list"
    key => "logstash"
  }
}

On node3 (hosting an ActiveMQ broker), the collector.conf is the same, just the location of the log file is different:

input {
  file {
    type => "log"
    path => ["/opt/activemq/data/*.log"]
  }
  jmx {
    path => "/opt/monitor/logstash-1.4.0.rc1/conf/jmx"
    polling_frequency => 10
    type => "jmx"
    nb_thread => 4
  }
}
output {
  redis {
    host => "monitor"
    data_type => "list"
    key => "logstash"
  }
}

The redis output plugin send the messages/events to the redis server located on “monitor” machine.

These messages and events come from two input plugins:

  • the file input plugin takes the path of the log file (using glob)
  • the jmx input plugin takes a folder. This folder contains json file (see later) with the MBeans queries. The plugin executes the queries every 10 seconds (polling_frequency).

So, the jmx input plugin reads all files located in the /opt/monitor/logstash-1.4.0.rc1/conf/jmx folder.

On node1 and node2 (again hosting a karaf container with camel routes), for instance, we want to monitor the number of thread on the Karaf instance (using the thread MBean), and a route named “route1″ (using the Camel route MBean).
We specify this in /opt/monitor/logstash-1.4.0.rc1/conf/jmx/karaf file:

{
  "host" : "localhost",
  "port" : 1099,
  "url" : "service:jmx:rmi:///jndi/rmi://localhost:1099/karaf-root",
  "username" : "karaf",
  "password" : "karaf",
  "alias" : "node1",
  "queries" : [
    {
      "object_name" : "java.lang:type=Threading",
      "object_alias" : "Threading"
    }, {
      "object_name" : "org.apache.camel:context=*,type=routes,name=\"route1\"",
      "object_alias" : "Route1"
    }
   ]
}

On node3, we will have a different JMX configuration file (for instance /opt/monitor/logstash-1.4.0.rc1/conf/jmx/activemq) containing the ActiveMQ MBeans that we want to query.

Now, we can start the logstash “collector”:

cd /opt/monitor/logstash-1.4.0.rc1
bin/logstash -f conf/collector.conf

We can see the clients connected in the redis log:

[12130] 17 Mar 14:33:27.041 - Accepted 127.0.0.1:46598
[12130] 17 Mar 14:33:31.267 - 2 clients connected (0 slaves), 484992 bytes in use

and the data populated in the elasticsearch log:

[2014-03-17 14:21:59,539][INFO ][cluster.service          ] [Solarr] added {[logstash-vostro-32001-2010][dhXgnFLwTHmbdsawAEJbyg][vostro][inet[/192.168.134.11:9301]]{client=true, data=false},}, reason: zen-disco-receive(join from node[[logstash-vostro-32001-2010][dhXgnFLwTHmbdsawAEJbyg][vostro][inet[/192.168.134.11:9301]]{client=true, data=false}])
[2014-03-17 14:30:59,584][INFO ][cluster.metadata         ] [Solarr] [logstash-2014.03.17] creating index, cause [auto(bulk api)], shards [5]/[1], mappings [_default_]
[2014-03-17 14:31:00,665][INFO ][cluster.metadata         ] [Solarr] [logstash-2014.03.17] update_mapping [log] (dynamic)
[2014-03-17 14:33:28,247][INFO ][cluster.metadata         ] [Solarr] [logstash-2014.03.17] update_mapping [jmx] (dynamic)

Now, we have JMX data and log messages from different containers, brokers, etc stored in one centralized place (the monitor machine).

We can now add a web application to read the data and create charts using the data.

Kibana

Kibana is the web application provided with logstash. The default configuration use elasticsearch default port. So, we just have to start Kibana on the monitor machine:

cd /opt/monitor/logstash-1.4.0.rc1
bin/logstash-web

We access to Kibana on http://monitor:9292.

On the welcome page, we click on the “Logstash dashboard” link, and we arrive on a console looking like:
logstash1

It’s time to configure Kibana.

We remove the default histogram, to add a custom one to chart the thread count.

First, we create a query to isolate the thread count for node1. Kibana uses the Apache Lucene query syntax.
Our query is here very simple: metric_path:"node1.Threading.ThreadCount".

Now, we can create a histogram using this query, getting the metric_value_number:
kibana2

Now, we want to chart the lastProcessingTime on the Camel route (to see for instance if the route takes more time at some point).
We create a new query to isolate the route1 lastProcessingTime on node1: metric_path:"node1.Route1.LastProcessingTime".

We can now create a histogram using this query, getting the metric_value_number:
kibana3

For the demo, we can create a histogram chart to display the exchanges completed and failed for route1 on node1. We create two queries:

  • metric_path:”node1.Route1.ExchangesFailed”
  • metric_path:”node1.Route1.ExchangesCompleted”

We create a new chart in the same row:
kibana4

We cleanup a bit the events panel. We create a query to display only the log messages (not the JMX queries): type:"log".
We configure the log event panel to change the name and use the log query:
kibana6

We have now a kibana console looking like:

kibana5

With this very simple kibana configuration, we have:
– a chart of the thread count on node1
– a chart of the last processing time for route1 (on node1)
– a chart of the exchanges (failed/completed) for route1 (on node1)
– a view of all logs messages

You can now play with Kibana, add a lot of new charts leveraging all information that you have into elasticsearch (both log messages and JMX data).

Next

I’m working on some new Karaf, Cellar, ActiveMQ, Camel features providing “native” and “enhanced” support for logstash. The purpose is to just type feature:install monitoring to get:

  • jmx:* commands in Karaf
  • broadcast of event in elasticsearch
  • integration of redis, elasticsearch, logstash in Karaf (to avoid to install it “externally” from Karaf) and provide ready to use configuration (pre-configured logstash jmx input plugin, pre-configured kibana console/charts, …).

If you have other ideas to enhance and improve monitoring in Karaf, Cellar, Camel, ActiveMQ, don’t hesitate to propose on the mailing lists ! Any idea is welcome.

Coming in Karaf 3.0.0: new enterprise JPA (OpenJPA, Hibernate) and CDI (OpenWebBeans, JBoss Weld) features

December 20, 2013 Posted by jbonofre

Apache Karaf 3.0.0 is now mostly ready (I’m just polishing the documentation).

In previous post, I introduced new enterprise features like JNDI, JDBC, JMS.

As I said, the purpose is to provide a full flexible enterprise ready container, easy to use and extend for the users.

Easy to use means that a simple command will extend your container, with feature that can help you a lot.

JPA

Previous Karaf version already provided a jpa feature. However, this feature “only” installs the Aries JPA bundles, allowing to expose the EntityManager as an OSGi service. It doesn’t install any JPA engine. It means that, previously, the users had to install all bundles required to have a persistence engine.

As very popular persistence engines, Karaf 3.0.0 provides two ready-to-use features:

karaf@root()> feature:install openjpa

The openjpa feature brings Apache OpenJPA in Apache Karaf.

karaf@root()> feature:install hibernate

The hibernate feature brings Hibernate in Apache Karaf.

CDI

Karaf 3.0.0 now refers Pax CDI. It means that you can install pax-cdi* features in Apache Karaf.

However, Pax-CDI doesn’t install any CDI container, it’s up to the users to install all bundles required to have a CDI container.

As very popular CDI containers, Karaf 3.0.0 provides two ready-to-use features:

karaf@root()> feature:repo-add pax-cdi
karaf@root()> feature:install openwebbeans

The openwebbeans feature brings Apache OpenWebBeans CDI container in Apache Karaf.

karaf@root()> feature:repo-add pax-cdi
karaf@root()> feature:install weld

The weld feature brings JBoss Weld CDI container in Apache Karaf.

EJB

As a reminder, waiting to have KarafEE back in Karaf directly (as a ejb feature, I plan to work on it next week), you can install Apache OpenEJB in Apache Karaf:

karaf@root()> feature:repo-add openejb
karaf@root()> feature:install openejb-core
karaf@root()> feature:install openejb-server

Coming in Karaf 3.0.0: new enterprise JMS feature

December 19, 2013 Posted by jbonofre

In my previous post, I introduced the new enterprise JDBC feature.

To follow the same purpose, we introduced the new enterprise JMS feature.

JMS feature

Like the JDBC feature, the JMS feature is an optional one. It means that you have to install it first:

karaf@root()> feature:install jms

The jms feature installs the JMS service which is mostly a JMS “client”. It doesn’t install any broker.

For the rest of this post, I’m using a ActiveMQ embedded in my Karaf:

karaf@root()> feature:repo-add activemq 5.9.0
karaf@root()> feature:install activemq-broker

Like the JDBC feature, the JMS feature provides:

  • an OSGi service
  • jms:* commands
  • a JMX JMS MBean

The OSGi service provides a set of operation to create JMS connection factories, send JMS messages, browse a JMS queue, etc.

The commands and MBean manipulate the OSGi service.

Commands

The jms:create command allows you to create a JMS connection factory.

This command automatically creates a connectionfactory-[name].xml blueprint file in the deploy folder.

However, it doesn’t install any bundle or feature providing the JMS connection factory classes. It’s up to you to previously install the jar files, bundles, or features providing the actual JMS connection factory.

The jms:create command expects one argument and two options:

karaf@root()> jms:create --help
DESCRIPTION
        jms:create

        Create a JMS connection factory.

SYNTAX
        jms:create [options] name 

ARGUMENTS
        name
                The JMS connection factory name

OPTIONS
        -u, --url
                The JMS URL. NB: for WebsphereMQ type, the URL is hostname/port/queuemanager/channel
        --help
                Display this help message
        -t, --type
                The JMS connection factory type (ActiveMQ or WebsphereMQ)
  • name argument is the JMS connection factory name. It’s used in the JNDI name given to the connection factory (e.g. /jms/[name]) and in the blueprint file name in the deploy folder.
  • -t (--type) option is the JMS connection factory type. For now, the command supports two kinds of connection factory: ActiveMQ or WebsphereMQ. If you want to use another kind of connection factory, you can create the connection factory file yourself (using a ActiveMQ file created by the jms:create command as a template).
  • -u (--url) is the URL used by the connection factory. For instance, for ActiveMQ type, the URL looks like tcp://localhost:61616. For WebSphereMQ type, the URL looks like host/port/queuemanager/channel.

As I installed the activemq-broker feature in my Karaf, I can create the JMS connection factory for this broker:

karaf@root()> jms:create -t activemq -u tcp://localhost:61616 default

We can see the JMS connection factory file correclty deployed:

karaf@root()> la
...
151 | Active   |  80 | 0.0.0                 | connectionfactory-default.xml

The connectionfactory-default.xml file has been created in the deploy folder and contains:

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">

    <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="maxConnections" value="8" />
        <property name="connectionFactory" ref="activemqConnectionFactory" />
    </bean>

    <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
        <property name="transactionManager" ref="transactionManager" />
        <property name="connectionFactory" ref="activemqConnectionFactory" />
        <property name="resourceName" value="activemq.localhost" />
    </bean>

    <reference id="transactionManager" interface="javax.transaction.TransactionManager" />

    <service ref="pooledConnectionFactory" interface="javax.jms.ConnectionFactory">
        <service-properties>
            <entry key="name" value="default" />
            <entry key="osgi.jndi.service.name" value="/jms/default" />
        </service-properties>
    </service>

</blueprint>

We can see the JMS connection factories available in Karaf (created by the jms:create command, or by hand) using the jms:connectionfactories command:

karaf@root()> jms:connectionfactories 
JMS Connection Factory
----------------------
/jms/default    

The jms:info command gives details about a JMS connection factory:

karaf@root()> jms:info /jms/default 
Property | Value   
-------------------
product  | ActiveMQ
version  | 5.9.0  

We are now ready to manipulate the JMS broker.

Let start by sending some messages to a queue in the JMS broker. We can use the jms:send command to do that:

karaf@root()> jms:send /jms/default MyQueue "Hello World"
karaf@root()> jms:send /jms/default MyQueue "Hello Karaf"
karaf@root()> jms:send /jms/default MyQueue "Hello ActiveMQ"

The jms:count command counts the number of messages in a JMS queue. We can check if we have our messages:

karaf@root()> jms:count /jms/default MyQueue
Messages Count
--------------
3             

When using ActiveMQ, the jms:queues and jms:topics commands can lists the queues and topics available in the JMS broker:

karaf@root()> jms:queues /jms/default 
JMS Queues
----------
MyQueue   

We can see the MyQueue queue where we sent our messages.

We can also browse the messages in a queue using the jms:browse command. We can have the details of the messages:

karaf@root()> jms:browse /jms/default MyQueue
Message ID                              | Content        | Charset | Type | Correlation ID | Delivery Mode | Destination     | Expiration | Priority | Redelivered | ReplyTo | Timestamp                   
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
ID:vostro-33323-1387464670760-3:2:1:1:1 | Hello World    | UTF-8   |      |                | Persistent    | queue://MyQueue | Never      | 4        | false       |         | Thu Dec 19 15:57:06 CET 2013
ID:vostro-33323-1387464670760-3:3:1:1:1 | Hello Karaf    | UTF-8   |      |                | Persistent    | queue://MyQueue | Never      | 4        | false       |         | Thu Dec 19 15:57:10 CET 2013
ID:vostro-33323-1387464670760-3:4:1:1:1 | Hello ActiveMQ | UTF-8   |      |                | Persistent    | queue://MyQueue | Never      | 4        | false       |         | Thu Dec 19 15:57:14 CET 2013

By default, the jms:browse command displays all messages in the given queue. You can specify a selector with the -s (--selector) option to select the messages to browse.

The jms:consume command consumes messages from a queue. By consuming, it means removing.

To consume/remove the messages in MyQueue queue, we can use:

karaf@root()> jms:consume /jms/default MyQueue
3 message(s) consumed

JMX JMS MBean

All actions that we did using the jms:* commands can be performed using the JMS MBean (the object name is org.apache.karaf:type=jms,name=*).

More over, if you want to perform JMS operations programmatically, you can use the org.apache.karaf.jms.JmsService OSGi service.