Category: ‘Apache Hadoop’

Introducing Apache Beam (dataflow)

January 20, 2016 Posted by jbonofre

As part of the Google Cloud ecosystem, Google created Dataflow SDK. Now, as a Google, Talend, Cask, data Artisans, PayPal, and Cloudera join effort, we are proposing Apache Dataflow to the Apache Incubator.

I’m proud, glad and excited to be the champion on the Apache Dataflow proposal.

But first, I would like to thank James Malone and Frances Perry from Google for their help, always open minded and interesting discussion. It’s really great to work with them !

Let’s take a quick tour on what will be Apache Dataflow.

Architecture and Programming Model

Imagine, you have a Hadoop cluster where you used MapReduce jobs. Now, you want to “migrate” these jobs to Spark: you have to refactore all your jobs which requires lot of works and cost a lot. And after that, see the effort and cost if you want to change for a new platform like Flink: you have to refactore your jobs again.

Dataflow aims to provide an abstraction layer between your code and the execution runtime.

The SDK allows you to use an unified programming model: you implement your data processing logic using the Dataflow SDK, the same code will run on different backends. You don’t need to refactore and change the code anymore !

If your target backend is not yet supported by Dataflow, you can implement your own runner for this backend, again the code using Dataflow SDK doesn’t change.

Dataflow is able to deal with batch processing jobs, but also with streaming jobs.

Architecture: pipelines, translators, runners

Using the SDK, your jobs are actually designed as pipeline. A pipeline is a chain of processes on the data.

It’s basically the only part that you have to write.

Dataflow reads the pipelines definition, and translate them for a target runner. A translator is responsible of adapting the pipeline code depending of the runner. For instance, the MapReduce translator will transform pipelines as MapReduce jobs, the Spark translator will transform pipelines as Spark jobs, etc.

The runners are the “execution” layer. Once a pipeline has been “translated” by a translator, it can be run directly on a runner. The runner “hides” the actual backend: MapReduce/Yarn cluster, Spark cluster (running on Yarn or Mesos), etc.

If Dataflow comes with ready to use translators and runners, you can create your own ones.

For instance, you can implement your own runner by creating a class extending PipelineRunner. You will have to implement different runner behaviours (like the transform evaluators, supported options, apply main transform hook method, etc).

SDK

The SDK is composed by four parts:

  • Pipelines are the streaming and processing logic that you want to implement. It’s a chain of processes. Basically, in a pipeline, you read data from a source, you apply transformations on the data, and eventually send the data to a destination (named sink in Dataflow wording).
  • PCollection is the object transported inside a pipeline. It’s the container of the data, flowing between each step of the pipeline.
  • Transform is a step of a pipeline. It takes an incoming PCollection and creates an outcoming PCollection. You can implement your own transform function.
  • Sink and Source are used to retrieve data as input (first step) of a pipeline, and eventually send the data outside of the pipeline.

In action

Let’s implement a very simple Wordcount job using Dataflow SDK.

First, we create a pipeline:

DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("Dataflow-Wordcount");
Pipeline pipeline = Pipeline.create(options);

We can see in the options that we define the pipeline runner: of course, it’s not a good practice to define the runner in the pipeline code. You can externalize the runner definition using commandline arguments.

We have a pipeline, we can now define the source of the data for this pipeline. It’s basically what we name a source in Dataflow:

pipeline.apply(TextIO.Read.from("/path/to/test.txt"))

Basically, the apply() method allows you to define the steps of a pipeline. A step can be:

  • a source (input)
  • a transform
  • a sink (output)

So, actually, you can see a pipeline like a chain of apply calls: pipeline.apply().apply().apply()….

Now, we have the data coming from a file (step 1). The purpose of the next step (step 2) is to identify the words in the input data. This step is a transformation step: it takes a PCollection as input (coming from the file source) and creates a resulting set of PCollections. To implement this transformation step, we use the ParDo function in the apply method. The ParDo function allows you to inject a DoFn (DoFunction) on the PCollection:

  .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
     @Override
     public void processElement(ProcessContext c) {
       for (String word : c.element().split("[^a-zA-Z']+")) {
         if (!word.isEmpty()) {
           c.output(word);
         }
       }
     }
  }))

The PCollections (input and output) are wrapped in a ProcessContext.

Now, we have identified the words, we are ready to count them. It’s a new step (step 3): it transforms the words as a map containing the count for each word.
To implement this step, as always, we use the apply method. We can use the same logic as in step 2 (using ParDo and DoFn). Fortunately, Dataflow provides ready to use function. Especially, in our case, Dataflow provides a Count function that we can use directly in the apply method:

.apply(Count.<String>perElement())

After this step (step 3), we have a Map containing word => count. We want to format this Map as a String, ready to be written in a file.
So, we add a new transformation step (step 4), again using the apply method on a pipeline. In this apply, we implement a SimpleFunction interface providing our own apply() method:

  .apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
     @Override
     public String apply(KV element) {
       return element.getKey() + ": " + element().getValue();
     }
  }))

After step 4, we have a String with word and count for each word. We are ready to send this String to an output file (a sink): it’s a new (and final) step of the pipeline, so we use again the apply() method:

.apply(TextIO.Write.to("gs://my-bucket/counts.txt"));

Here we are: our pipeline is done and ready to run.

To run it, it’s pretty simple: just call run() method on a pipeline:

pipeline.run();

What’s next

We are thinking some new features in Dataflow.

NB: these improvements and changes are still under review and discussion.

New runners, sinks, and sources

First, we will extend the provided runners, and sinks/sources to be much more “open” and turnkey.

For instance, why not having a runner for ignite, for Karaf, for a simple JVM (as the DirectPipelineRunner), etc.

In term of sinks & sources, I’m planning to be able to use Camel components, or at least provide more and more out of the box I/O support, especially for the Data Integration Dataflow layer.

DSL

If the usage of apply() method on a pipeline is very flexible, it may appear “complex” to new user. We are thinking about providing a DSL. So basically, instead of definition a pipeline as a chain of apply calls, you would be able to use specialized function like (of course, it would be still possible to use the apply method):

from("file:...").apply().count().to("file:...");

Data Integration

Dataflow is great abstraction layer for data processing (batch and streaming), but it could also become a very flexible integration platform. Additionally to the processing DSL, we will provide a integration DSL. It will bring EIP (Enterprise Integration Patterns) to the data integration.
Imagine an integration pipeline like:

from("http:...").filter().aggregate().to("jms:...").when(...).to().otherwise().to("kafka:...");

We can mix processing and integration in a very smart and powerful way.

It means that we will have the process/stream DSL and the integration DSL on top of the core one. Furthermore, the Sinks/Sources will not be only data source, but also message and data mixes.

Hadoop CDC and processes notification with Apache Falcon, Apache ActiveMQ, and Apache Camel

March 19, 2014 Posted by jbonofre

Some weeks (months ? ;)) ago, I started to work on Apache Falcon. First of all, I would like to thanks all Falcon guys: they are really awesome and do a great job (special thanks to Srikanth, Venkatesh, Swetha).

This blog post is a preparation to a set of “recipes documentation” that I will propose in Apache Falcon.

Falcon is in incubation at Apache. The purpose is to provide a data processing and management solution for Hadoop designed for data motion, coordination of data pipelines, lifecycle management, and data discovery. Falcon enables end consumers to quickly onboard their data and its associated processing and management tasks on Hadoop clusters.

A interesting feature provided by Falcon is notifications of the activities in the Hadoop cluster “outside” of the cluster 😉
In this article, we will see how to get two kinds of notification in Camel routes “outside” of the Hadoop cluster:

  • a Camel route will be notified and triggered when a process is executed in the Hadoop cluster
  • a Camel route will be notified and triggered when a HDFS location changes (a first CDC feature)

Requirements

If you already have your Hadoop cluster, or you know to install/prepare it, you can skip this step.

In this section, I will create a “real fake” Hadoop cluster on one machine. It’s not really a pseudo-distributed as I will use multiple datanodes and tasktrackers, but all on one machine (of course, it doesn’t make sense, but it’s just for demo purpose ;)).

In addition of Hadoop common components (HDFS namenode/datanodes and M/R jobtracker/tasktracker), Falcon requires Oozie (for scheduling) and ActiveMQ.

By default, Falcon embeds ActiveMQ, but for the demo (and provide screenshots to the ActiveMQ WebConsole), I will use a standalone ActiveMQ instance.

Hadoop “fake” cluster preparation

For the demo, I will “fake” three machines.

I create a demo folder on my machine, and I uncompress hadoop-1.1.2-bin.tar.gz tarball in node1, node2, node3 folders:

$ mkdir demo
$ cd demo
$ tar zxvf ~/hadoop-1.1.2-bin.tar.gz
$ cp -r hadoop-1.1.2 node1
$ cp -r hadoop-1.1.2 node2
$ cp -r hadoop-1.1.2 node3
$ mkdir storage

I also create a storage folder where I will put the nodes’ files. This folder is just for convenience, as it’s easier to restart from scratch, just be deleting the storage folder content.

Node1 will hosts:

  • the HDFS namenode
  • a HDFS datanode
  • the M/R jobtracker
  • a M/R tasktracker

So, the node1/conf/core-site.xml file contains the location of the namenode:

<?xml version="1.0">
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node1 conf/core-site.xml -->
<configuration>
  <property>
     <name>fs.default.name</name>
     <value>hdfs://localhost</value>
  </property>
</configuration>

In the node1/conf/hdfs-site.xml file, we define the storage location for the namenode and the datanode (in the storage folder), and the default replication:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node1 conf/hdfs-site.xml -->
<configuration>
  <property>
    <name>dfs.name.dir</name>
    <value>/home/jbonofre/demo/storage/node1/namenode</value>
  </property>
  <property>
    <name>dfs.data.dir</name>
    <value>/home/jbonofre/demo/storage/node1/datanode</value>
  </property>
  <property>
    <name>dfs.replication</name>
    <value>3</value>
  </property>
</configuration>

Finally, in node1/conf/mapred-site.xml file, we define the location of the job tracker:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node1 conf/mapred-site.xml -->
<configuration>
  <property>
    <name>mapred.job.tracker</name>
    <value>localhost:8021</value>
  </property>
</configuration>

Node1 is not ready.

Node2 hosts a datanode and a tasktracker. As for node1, the node2/conf/core-site.xml file contains the location of the namenode:

<?xml version="1.0">
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node2 conf/core-site.xml -->
<configuration>
  <property>
     <name>fs.default.name</name>
     <value>hdfs://localhost</value>
  </property>
</configuration>

The node2/conf/hdfs-site.xml file contains:

  • the storage location of the datanode
  • the network location of the namenode (from node1)
  • the port numbers used by the datanode (core, IPC, and HTTP in order to be able to start multiple datanodes on the same machine)
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsd"?>
<!-- node2 conf/hdfs-site.xml -->
<configuration>
  <property>
    <name>dfs.data.dir</name>
    <value>/home/jbonofre/demo/storage/node2/datanode</value>
  </property>
  <property>
    <name>dfs.datanode.address</name>
    <value>localhost:50110</value>
  </property>
  <property>
    <name>dfs.datanode.ipc.address</name>
    <value>localhost:50120</value>
  </property>
  <property>
    <name>dfs.datanode.http.address</name>
    <value>localhost:50175</value>
  </property>
</configuration>

The node2/conf/mapred-site.xml file contains the network location of the jobtracker, and the HTTP port number used by the tasktracker (in order to be able to run multiple tasktracker on the same machine):

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node2 conf/mapred-site.xml -->
<configuration>
  <property>
    <name>mapred.job.tracker</name>
    <value>localhost:8021</value>
  </property>
  <property>
    <name>mapred.task.tracker.http.address</name>
    <value>localhost:50160</value>
  </property>
</configuration>

Node3 is very similar to node2: it hosts a datanode and a tasktracker. So the configuration is very similar to node2 (just the storage location, and the datanode and tasktracker port numbers are different).

Here’s the node3/conf/core-site.xml file:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node3 conf/core-site.xml -->
<configuration>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost</value>
  </property>
</configuration>

Here’s the node3/conf/hdfs-site.xml:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node3 conf/hdfs-site.xml -->
<configuration>
  <property>
    <name>dfs.data.dir</name>
    <value>/home/jbonofre/demo/storage/node3/datanode</value>
  </property>
  <property>
    <name>dfs.datanode.address</name>
    <value>localhost:50210</value>
  </property>
  <property>
    <name>dfs.datanode.ipc.address</name>
    <value>localhost:50220</value>
  </property>
  <property>
    <name>dfs.datanode.http.address</name>
    <value>localhost:50275</value>
  </property>
</configuration>

Here’s the node3/conf/mapred-site.xml file:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node2 conf/mapred-site.xml -->
<configuration>
  <property>
    <name>mapred.job.tracker</name>
    <value>localhost:8021</value>
  </property>
  <property>
    <name>mapred.task.tracker.http.address</name>
    <value>localhost:50260</value>
  </property>
</configuration>

Our “fake” cluster configuration is now ready.

We can format the namenode on node1:

$ cd node1/bin
$ ./hadoop namenode -format
14/03/06 17:26:38 INFO namenode.NameNode: STARTUP_MSG: 
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = vostro/127.0.0.1
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 1.1.2
STARTUP_MSG:   build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.1 -r 1440782; compiled by 'hortonfo' on Thu Jan 31 02:03:24 UTC 2013
************************************************************/
14/03/06 17:26:39 INFO util.GSet: VM type       = 64-bit
14/03/06 17:26:39 INFO util.GSet: 2% max memory = 17.78 MB
14/03/06 17:26:39 INFO util.GSet: capacity      = 2^21 = 2097152 entries
14/03/06 17:26:39 INFO util.GSet: recommended=2097152, actual=2097152
14/03/06 17:26:39 INFO namenode.FSNamesystem: fsOwner=jbonofre
14/03/06 17:26:39 INFO namenode.FSNamesystem: supergroup=supergroup
14/03/06 17:26:39 INFO namenode.FSNamesystem: isPermissionEnabled=true
14/03/06 17:26:39 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=100
14/03/06 17:26:39 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
14/03/06 17:26:39 INFO namenode.NameNode: Caching file names occuring more than 10 times 
14/03/06 17:26:40 INFO common.Storage: Image file of size 114 saved in 0 seconds.
14/03/06 17:26:40 INFO namenode.FSEditLog: closing edit log: position=4, editlog=/home/jbonofre/demo/storage/node1/namenode/current/edits
14/03/06 17:26:40 INFO namenode.FSEditLog: close success: truncate to 4, editlog=/home/jbonofre/demo/storage/node1/namenode/current/edits
14/03/06 17:26:40 INFO common.Storage: Storage directory /home/jbonofre/demo/storage/node1/namenode has been successfully formatted.
14/03/06 17:26:40 INFO namenode.NameNode: SHUTDOWN_MSG: 
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at vostro/127.0.0.1
************************************************************/

We are now ready to start the namenode on node1:

$ cd node1/bin
$ ./hadoop namenode &

We start the datanode on node1:

$ cd node1/bin
$ ./hadoop datanode &

We start the jobtracker on node1:

$ cd node1/bin
$ ./hadoop jobtracker &

We start the tasktracker on node1:

$ cd node1/bin
$ ./hadoop tasktracker &

Node1 is fully started with the namenode, a datanode, the jobtracker, and a tasktracker.

We start a datanode and a tasktracker on node2:

$ cd node2/bin
$ ./hadoop datanode &
$ ./hadoop tasktracker &

And finally, we start a datanode and a tasktracker on node3:

$ cd node3/bin
$ ./hadoop datanode &
$ ./hadoop tasktracker &

We access to the HDFS web console (http://localhost:50070) to verify that the namenode is able to see the 3 live datanodes:
hdfs1
We also access to the MapReduce web console (http://localhost:50030) to verify that the jobtracker is able to see the 3 live tasktrackers:
mapred1

Oozie

Falcon delegates scheduling of jobs (plannification, re-execution, etc) to Oozie.

Oozie is a workflow scheduler system to manage hadoop jobs, using Quartz internally.

It uses a “custom” Oozie distribution: Falcon adds some addition EL extensions on top of a “regular” Oozie.

Falcon provides a script to create the Falcon custom Oozie distribution: we provide the Hadoop and Oozie version that we need.

We can clone Falcon sources from git and call the src/bin/package.sh with the Hadoop and Oozie target versions that we want:

$ git clone https://git-wip-us.apache.org/repos/asf/incubator-falcon falcon
$ cd falcon
$ src/bin/package.sh 1.1.2 4.0.0

The package.sh script creates target/oozie-4.0.0-distro.tar.gz in the Falcon sources folder.

In the demo folder, I uncompress oozie-4.0.0-distro.tar.gz tarball:

$ cp ~/oozie-4.0.0-distro.tar.gz
$ tar zxvf oozie-4.0.0-distro.tar.gz

We now have a oozie-4.0.0-falcon folder.

Oozie requires a special configuration on the namenode (so on node1). We have to update the node1/conf/core-site.xml file to define the system user “proxied” by Oozie:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- node1 conf/core-site.xml -->
<configuration>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost</value>
  </property>
  <property>
    <name>hadoop.proxyuser.jbonofre.hosts</name>
    <value>localhost</value>
  </property>
  <property>
    <name>hadoop.proxyuser.jbonofre.groups</name>
    <value>localhost</value>
  </property>
</configuration>

NB: don’t forget to restart the namenode to include these changes.

Now, we can prepare the Oozie webapplication. Due to license restriction, it’s up to you to add ExtJS library for Oozie webconsole. To enable it, first, we create a oozie-4.0.0-falcon/libext folder and put ext-2.2.zip archive:

$ cd oozie-4.0.0-falcon
$ mkdir libext
$ cd libext
$ wget "http://extjs.com/deploy/ext-2.2.zip"

We have to populate the libext folder with different additional jar files:

  • the Hadoop jar files:

    $ cp node1/hadoop-core-1.1.2.jar oozie-4.0.0-falcon/libext
    $ cp node1/hadoop-client-1.1.2.jar oozie-4.0.0-falcon/libext
    $ cp node1/hadoop-tools-1.1.2.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-beanutils-1.7.0.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-beanutils-core-1.8.0.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-codec-1.4.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-collections-3.2.1.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-configuration-1.6.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-digester-1.8.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-el-1.0.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-io-2.1.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-lang-2.4.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-logging-api-1.0.4.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-logging-1.1.1.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-math-2.1.jar oozie-4.0.0-falcon/libext
    $ cp node1/lib/commons-net-3.1.jar oozie-4.0.0-falcon/libext
    
  • the Falcon Oozie extender:

    $ cp falcon-0.5-incubating-SNAPSHOT/oozie/libext/falcon-oozie-el-extension-0.5-incubating-SNAPSHOT.jar oozie-4.0.0-falcon/libext
    
  • jar files required for hcatalog, pig from oozie-sharelib-4.0.0-falcon.tar.gz:

    $ cd oozie-4.0.0-falcon
    $ tar zxvf oozie-sharelib-4.0.0-falcon.tar.gz
    $ cp share/lib/hcatalog/hcatalog-core-0.5.0-incubating.jar libext
    $ cp share/lib/hcatalog/hive-* libext
    $ cp share/lib/pig/hsqldb-1.8.0.7.jar libext
    $ cp share/lib/pig/jackson-* libext
    $ cp share/lib/hcatalog/libfb303-0.7.0.jar libext
    $ cp share/lib/hive/log4j-1.2.16.jar libext
    $ cp libtools/oro-2.0.8.jar libext
    $ cp share/lib/hcatalog/webhcat-java-client-0.5.0-incubating.jar libext
    $ cp share/lib/pig/xmlenc-0.52.jar libext
    $ cp share/lib/pig/guava-11.0.2.jar libext
    $ cp share/lib/hcatalog/oozie-* libext
    

We are now ready to setup Oozie.

First, we “assemble” the oozie webapplication (war):

$ cd oozie-4.0.0-falcon/bin
$ ./oozie-setup.sh prepare-war
  setting CATALINA_OPTS="$CATALINA_OPTS -Xmx1024m"

INFO: Adding extension: /home/jbonofre/demo/oozie-4.0.0-falcon/libext/ant-1.6.5.jar
...

New Oozie WAR file with added 'ExtJS library, JARs' at /home/jbonofre/demo/oozie-4.0.0-falcon/oozie-server/webapps/oozie.war


INFO: Oozie is ready to be started

Now, we “upload” the oozie shared libraries on our HDFS, including the falcon shared lib:

$ cd oozie-4.0.0/bin
$ ./oozie-setup.sh sharelib create -fs hdfs://localhost -locallib ../oozie-sharelib-4.0.0-falcon.tar.gz
  setting CATALINA_OPTS="$CATALINA_OPTS -Xmx1024m"
the destination path for sharelib is: /user/jbonofre/share/lib

If we browse the HDFS, we can see the folders created by Oozie.
hdfs2
Finally, we create the Oozie database (where it stores the jobs definition, etc).

$ cd oozie-4.0.0-falcon/bin
$ ./oozie-setup.sh db create -run
  setting CATALINA_OPTS="$CATALINA_OPTS -Xmx1024m"

Validate DB Connection
DONE
Check DB schema does not exist
DONE
Check OOZIE_SYS table does not exist
DONE
Create SQL schema
DONE
Create OOZIE_SYS table
DONE

Oozie DB has been created for Oozie version '4.0.0'


The SQL commands have been written to: /tmp/ooziedb-4527318150729236810.sql

The Oozie configuration is done, we start it:

$ cd oozie-4.0.0-falcon/bin
$ ./oozied.sh start

Setting OOZIE_HOME:          /home/jbonofre/demo/oozie-4.0.0-falcon
Setting OOZIE_CONFIG:        /home/jbonofre/demo/oozie-4.0.0-falcon/conf
Sourcing:                    /home/jbonofre/demo/oozie-4.0.0-falcon/conf/oozie-env.sh
  setting CATALINA_OPTS="$CATALINA_OPTS -Xmx1024m"
Setting OOZIE_CONFIG_FILE:   oozie-site.xml
Setting OOZIE_DATA:          /home/jbonofre/demo/oozie-4.0.0-falcon/data
Setting OOZIE_LOG:           /home/jbonofre/demo/oozie-4.0.0-falcon/logs
Setting OOZIE_LOG4J_FILE:    oozie-log4j.properties
Setting OOZIE_LOG4J_RELOAD:  10
Setting OOZIE_HTTP_HOSTNAME: vostro.nanthrax.net
Setting OOZIE_HTTP_PORT:     11000
Setting OOZIE_ADMIN_PORT:     11001
Setting OOZIE_HTTPS_PORT:     11443
Setting OOZIE_BASE_URL:      http://vostro.nanthrax.net:11000/oozie
Setting CATALINA_BASE:       /home/jbonofre/demo/oozie-4.0.0-falcon/oozie-server
Setting OOZIE_HTTPS_KEYSTORE_FILE:     /home/jbonofre/.keystore
Setting OOZIE_HTTPS_KEYSTORE_PASS:     password
Setting CATALINA_OUT:        /home/jbonofre/demo/oozie-4.0.0-falcon/logs/catalina.out
Setting CATALINA_PID:        /home/jbonofre/demo/oozie-4.0.0-falcon/oozie-server/temp/oozie.pid

Using   CATALINA_OPTS:        -Xmx1024m -Dderby.stream.error.file=/home/jbonofre/demo/oozie-4.0.0-falcon/logs/derby.log
Adding to CATALINA_OPTS:     -Doozie.home.dir=/home/jbonofre/demo/oozie-4.0.0-falcon -Doozie.config.dir=/home/jbonofre/demo/oozie-4.0.0-falcon/conf -Doozie.log.dir=/home/jbonofre/demo/oozie-4.0.0-falcon/logs -Doozie.data.dir=/home/jbonofre/demo/oozie-4.0.0-falcon/data -Doozie.config.file=oozie-site.xml -Doozie.log4j.file=oozie-log4j.properties -Doozie.log4j.reload=10 -Doozie.http.hostname=vostro.nanthrax.net -Doozie.admin.port=11001 -Doozie.http.port=11000 -Doozie.https.port=11443 -Doozie.base.url=http://vostro.nanthrax.net:11000/oozie -Doozie.https.keystore.file=/home/jbonofre/.keystore -Doozie.https.keystore.pass=password -Djava.library.path=

Using CATALINA_BASE:   /home/jbonofre/demo/oozie-4.0.0-falcon/oozie-server
Using CATALINA_HOME:   /home/jbonofre/demo/oozie-4.0.0-falcon/oozie-server
Using CATALINA_TMPDIR: /home/jbonofre/demo/oozie-4.0.0-falcon/oozie-server/temp
Using JRE_HOME:        /opt/jdk/1.7.0_51
Using CLASSPATH:       /home/jbonofre/demo/oozie-4.0.0-falcon/oozie-server/bin/bootstrap.jar
Using CATALINA_PID:    /home/jbonofre/demo/oozie-4.0.0-falcon/oozie-server/temp/oozie.pid

We access to the Oozie webconsole on http://localhost:11000/oozie/:
oozie1

ActiveMQ

By default, Falcon embeds ActiveMQ, so generally speaking, you don’t have to install ActiveMQ. However, for the demo, I would like to show how to use a external and standalone ActiveMQ.

I uncompress the apache-activemq-5.7.0-bin.tar.gz tarball in the demo folder:

$ cd demo
$ tar zxvf ~/apache-activemq-5.7.0-bin.tar.gz

The default ActiveMQ configuration is fine, we can just start the broker on the default port (61616):

$ cd demo/apache-activemq-5.7.0/bin
$ ./activemq console

All the Falcon pre-requirements are done.

Falcon installation

Falcon can be deployed:

  • standalone: it’s the “regular” deployment mode when you have only one hadoop cluster. It’s the deployment mode that I will use for this CDC demo.
  • distributed: it’s the deployment to use when you have multiple hadoop clusters, especially if you want to use the Falcon replication feature.

For the installation, we uncompress the falcon-0.5-incubating-SNAPSHOT-bin.tar.gz tarball in the demo folder:

$ cd demo
$ tar zxvf ~/falcon-0.5-incubating-SNAPSHOT-bin.tar.gz

Before starting Falcon, we disable the default embedded ActiveMQ broker in the conf/falcon-env.sh file:

# conf/falcon-env.sh
...
export FALCON_OPTS="-Dfalcon.embeddedmq=false"
...

We start the falcon server:

$ cd falcon-0.5-incubating-SNAPSHOT/bin
$ ./falcon-start 
Could not find installed hadoop and HADOOP_HOME is not set.
Using the default jars bundled in /home/jbonofre/demo/falcon-0.5-incubating-SNAPSHOT/hadooplibs/
/home/jbonofre/demo/falcon-0.5-incubating-SNAPSHOT/bin
falcon started using hadoop version:  Hadoop 1.1.2

The falcon server starts actually a Jetty container with jersey to expose the Falcon REST API.

You can check if the falcon server started correctly using bin/falcon-status or bin/falcon:

$ bin/falcon-status
Falcon server is running (on http://localhost:15000/)
$ bin/falcon admin -status
Falcon server is running (on http://localhost:15000/)
$ bin/falcon admin -version
Falcon server build version: {"properties":[{"key":"Version","value":"0.5-incubating-SNAPSHOT-r5445e109bc7fbfea9295f3411a994485b65d1477"},{"key":"Mode","value":"embedded"}]}

Falcon usage: the entities

In Falcon, the configuration is defined by “entity”. Falcon supports three types of entity:

  • cluster entity defines the hadoop cluster (location of the namenode, location of the jobtracker), related falcon module (Oozie, ActiveMQ), and the location of the Falcon working directories (on HDFS)
  • feed entity defines a location on HDFS
  • process entity defines a hadoop job scheduled by Oozie

An entity is described using XML. You can do different actions on an entity:

  • Submit: register an entity in Falcon. Submitted entity are not scheduled, meaning it would simply be in the configuration store of Falcon.
  • List: provide the list of all entities registered in the configuration store of Falcon.
  • Dependency: provide the dependency of an entity. For example, a feed would show process that are dependent on the feed and the clusters that it depends on.
  • Schedule: feeds or processes that are already submitted and present in the configuration store can be scheduled. Upon schedule, Falcon system wraps the required repeatable action as a bundle of oozie coordinators and executes them on the Oozie scheduler.
  • Suspend: this action is applicable only on scheduled entity. This triggers suspend on the oozie bundle that was scheduled earlier through the schedule function. No further instances are executed on a suspended process/feed.
  • Resume: put a suspended process/feed back to active, which in turn resumes applicable oozie bundle.
  • Status: to display the current status of an entity.
  • Definition: dump the entity definition from the configuration store.
  • Delete: remote an entity from the Falcon configuration store.
  • Update: update operation allows an already submitted/scheduled entity to be updated. Cluster update is currently not allowed. Feed update can cause cascading update to all the processes already scheduled. The following set of actions are performed in Oozie to realize an update.
    • Suspend the previously scheduled Oozie coordinator. This is prevent any new action from being triggered.
    • Update the coordinator to set the end time to “now”
    • Resume the suspended coordinators
    • Schedule as per the new process/feed definition with the start time as “now”

Cluster

The cluster entity defines the configuration of the hadoop cluster and components used by Falcon.

We will store the entity descriptors in the entity folder:

$ mkdir entity

For the cluster, we create entity/local.xml file:

<?xml version="1.0" encoding="UTF-8"?>
<cluster colo="local" description="Local cluster" name="local" xmlns="uri:falcon:cluster:0.1">
  <interfaces>
    <interface type="readonly" endpoint="hftp://localhost:50010" version="1.1.2"/>
    <interface type="write" endpoint="hdfs://localhost:8020" version="1.1.2"/>
    <interface type="execute" endpoint="localhost:8021" version="1.1.2"/>
    <interface type="workflow" endpoint="http://localhost:11000/oozie/" version="4.0.0"/>
    <interface type="messaging" endpoint="tcp://localhost:61616" version="5.7.0"/>
  </interfaces>
  <locations>
    <location name="staging" path="/falcon/staging"/>
    <location name="temp" path="/falcon/temp"/>
    <location name="working" path="/falcon/working"/>
  </locations>
  <properties></properties>
</cluster>

A cluster contains different interfaces and locations used by Falcon. A cluster is referenced by feeds and processes entities (using the cluster name). A cluster can’t be scheduled (it doesn’t make sense).

The colo specifies a kind of cluster grouping. It’s used in distributed deployment mode, so not useful in our demo (as we have only one cluster).
The readonly interface specifies the Hadoop’s HFTP protocol, only used in the case of feed replication between clusters (again, not use in our demo).
The write interface specifies the write access to hdfs, containing the fs.default.name value. Falcon uses this interface to write system data to hdfs and feeds referencing this cluster are written to hdfs using this interface.
The execute interface specifies the location of the jobtracker, containing the mapred.job.tracker value. Falcon uses this interface to submit the processes as jobs in the jobtracker defined here.
The workflow interface specifies the interface for worklow engine (the Oorie URL). Falcon uses this interface to schedule the processes referencing this cluster on workflow engine defined here.
Optionally, you can have a registry interface (defininng thrift URL) to specify the metadata catalog, such as Hive Metastore (or HCatalog). We don’t use it in our demo.
The messaging interface specifies the interface for sending feed availability messages. It’s the URL of the ActiveMQ broker.

A cluster has a list of locations with a name (working, temp, staging) and a path on HDFS. Falcon would use the location to do intermediate processing of entities in hdfs and hence Falcon should have read/write/execute permission on these locations.

Optionally, a cluster may have a list of properties. It’s a list of key-value pairs used in Falcon and propagated to the workflow engine. For instance, you can specify the JMS broker connection factory:

<property name="brokerImplClass" value="org.apache.activemq.ActiveMQConnectionFactory" />

Now, that we have the XML description, we can register our cluster in Falcon. We use the Falcon client commandline to do submit our cluster definition:

$ cd falcon-0.5-incubating-SNAPSHOT
$ bin/falcon entity -submit -type cluster -file ~/demo/local.xml
default/Submit successful (cluster) local

We can check that our local cluster is actually present in the Falcon configuration store:

$ bin/falcon entity -list -type cluster
(cluster) local(null)

We can see our cluster “local”, for now without any dependency (null).

If we take a look on hdfs, we can see that the falcon directory has been created:

$ cd node1
$ bin/hadoop fs -ls /
Found 3 items
drwxr-xr-x   - jbonofre supergroup          0 2014-03-08 07:48 /falcon
drwxr-xr-x   - jbonofre supergroup          0 2014-03-06 17:32 /tmp
drwxr-xr-x   - jbonofre supergroup          0 2014-03-06 18:05 /user

Feed

A feed entity is a location on the cluster. It also defines additional attributes like frequency, late-arrival handling, and retention policies. A feed can be scheduled, meaning that Falcon will create processes to deal with retention and replication on the cluster.

As other entity, a feed is described using a XML. We create entity/output.xml file:

<?xml version="1.0" encoding="UTF-8"?>
<feed description="RandomProcess output feed" name="output" xmlns="uri:falcon:feed:0.1">
  <group>output</group>
 
  <frequency>minutes(1)</frequency>
  <timezone>UTC</timezone>
  <late-arrival cut-off="minutes(5)"/>

  <clusters>
    <cluster name="local">
       <validity start="2012-07-20T03:00Z" end="2099-07-16T00:00Z"/>
       <retention limit="hours(10)" action="delete"/>
    </cluster>
  </clusters>

  <locations>
    <location type="data" path="/data/output"/>
  </locations>

  <ACL owner="jbonofre" group="supergroup" permission="0x644"/>

  <schema location="none" provider="none"/>
</feed>

The locations element define the feed storage. It’s paths on HDFS or table names for Hive. A location is define on a cluster, identified by name. In our example, we use the “local” cluster that we submitted before.

The group element defines a list of comma separated groups. A group is a logical grouping of feeds. A group is said available if all the feeds belonging to a group are available. The frequency of all the feeds which belong to the same group must be same.

The frequency element specifies the frequency by which this feed is generated (for instance, it can generated every hour, every 5 minutes, daily, weekly, etc). Falcon uses this frequency to check if the feed has changed or not (the size has changed). In our example, we define a frequency of every minute. Falcon creates a job in Oozie to monitor the feed.
Falcon system can handle late arrival of input data and appropriately re-trigger processing for the affected instance. From the perspective of late handling, there are two main configuration parameters late-arrival cut-off and late-inputs section in feed and process entity definition that are central. These configurations govern how and when the late processing happens. In the current implementation (oozie based) the late handling is very simple and basic. The falcon system looks at all dependent input feeds for a process and computes the max late cut-off period. Then it uses a scheduled messaging framework, like the one available in Apache ActiveMQ to schedule a message with a cut-off period, then after a cut-off period the message is dequeued and Falcon checks for changes in the feed data which is recorded in HDFS in late data file by Falcons “record-size” action, if it detects any changes then the workflow will be rerun with the new set of feed data.

The retention element specifies how long the feed is retained on the cluster and the action to be taken on the feed after the expiration of the retention period. In our example, we delete the feed after a retention of 10 days.

The validity of a feed on cluster specifies duration for which this feed is valid on this cluster (considered for scheduling by Falcon).

The ACL defines the permission on the feed (owner/group/permission).

The schema allows you to specific the “format” of the feed (for instance csv). In our case, we don’t define any schema.

We can now submit the feed (register the feed) into Falcon:

$ cd falcon-0.5-incubating-SNAPSHOT
$ bin/falcon entity -submit -type feed -file ~/demo/entity/output.xml
default/Submit successful (feed) output

Process

A process entity defines a job in the cluster.

Like other entity, a process is described with XML (entity/process.xml):

<?xml version="1.0" encoding="UTF-8"?>
<process name="my-process" xmlns="uri:falcon:process:0.1">
    <clusters>
        <cluster name="local">
            <validity start="2013-11-15T00:05Z" end="2030-11-15T01:05Z"/>
        </cluster>
    </clusters>

    <parallel>1</parallel>
    <order>FIFO</order>
    <frequency>minutes(5)</frequency>
    <timezone>UTC</timezone>

    <inputs>
        <!-- In the workflow, the input paths will be available in a variable 'inpaths' -->
        <input name="inpaths" feed="input" start="now(0,-5)" end="now(0,-1)"/>
    </inputs>

    <outputs>
        <!-- In the workflow, the output path will be available in a variable 'outpath' -->
        <output name="outpath" feed="output" instance="now(0,0)"/>
    </outputs>

    <properties>
        <!-- In the workflow, these properties will be available with variable - key -->
        <property name="queueName" value="default"/>
        <!-- The schedule time available as a property in workflow -->
        <property name="time" value="${instanceTime()}"/>
    </properties>

    <workflow engine="oozie" path="/app/mr"/>

    <late-process policy="periodic" delay="minutes(1)">
       <late-input input="inpaths" workflow-path="/app/mr"/>
    </late-process>

</process>

The cluster element defines where the process will be executed. Each cluster has a validity period, telling the times between which the job should run on the cluster. For the demo, we set a large validity period.

The parallel element defines how many instances of the process can run concurrently. We set a value of 1 here to ensure that only one instance of the process can run at a time.

The order element defines the order in which the ready instances are picked up. The possible values are FIFO(First In First Out), LIFO(Last In First Out), and ONLYLAST(Last Only). It’s not really used in our case.

The frequency element defines how frequently the process should run. In our case, minutes(5) means that the job will run every 5 minutes.

The inputs element defines the input data for the process. The process job will start executing only after the schedule time and when all the inputs are available. There can be 0 or more inputs and each of the input maps to a feed. The path and frequency of input data is picked up from feed definition. Each input should also define start and end instances in terms of EL expressions and can optionally specify specific partition of input that the process requires. The components in partition should be subset of partitions defined in the feed.
For each input, Falcon will create a property with the input name that contains the comma separated list of input paths. This property can be used in process actions like pig scripts and so on.

The outputs element defines the output data that is generated by the process. A process can define 0 or more outputs. Each output is mapped to a feed and the output path is picked up from feed definition. The output instance that should be generated is specified in terms of EL expression.
For each output, Falcon creates a property with output name that contains the path of output data. This can be used in workflows to store in the path.

The properties element contains key value pairs that are passed to the process. These properties are optional and can be used to parameterize the process.

The workflow element defines the workflow engine that should be used and the path to the workflow on hdfs. The workflow definition on hdfs contains the actual job that should run and it should confirm to the workflow specification of the engine specified. The libraries required by the workflow should be in lib folder inside the workflow path.
The properties defined in the cluster and cluster properties(nameNode and jobTracker) will also be available for the workflow.
Currently, Falcon supports three workflow engines:

  • oozie enables users to provide a Oozie workflow definition (in XML).
  • pig enables users to embed a Pig script as a process
  • hive enables users to embed a Hive script as a process. This would enable users to create materialized queries in a declarative way.

NB: I proposed to support a new type of workflow: MapReduce, to be able to directly execute MapReduce job.

In this demo, we use the oozie workflow engine.

We create a Oozie workflow.xml:

<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.2" name="map-reduce-wf">
    <start to="mr-node"/>
    <action name="mr-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${outpath}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.hadoop.mapred.lib.IdentityMapper</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.hadoop.mapred.lib.IdentityReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${inpaths}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outpath}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

This workflow is very simple: it uses IdentityMapper and IdentityReducer (provided in Hadoop core) to copy input data as output data.

We upload this workflow.xml on HDFS (in the location specified in the Falcon process workflow element):

$ cd node1
$ bin/hadoop fs -mkdir /app/mr
$ bin/hadoop fs -put ~/demo/workflow.xml /app/mr

The late-process allows the process to react with the input feed changes and trigger an action (here, we re-execute the oozie workflow).

We are now ready to submit the process in Falcon:

$ cd falcon-*
$ bin/falcon entity -submit -type process -file ~/entity/process.xml

The process is ready to be scheduled.

Before scheduling the process, we create the input data. The input data is a simple file (containing a string) that we upload to HDFS:

$ cat > file1
This is a test file
$ node1/bin/hadoop fs -mkdir /data/input
$ node1/bin/hadoop fs -put file1 /data/input

We can now trigger the process:

$ cd falcon*
$ bin/falcon entity -schedule -type process -name my-process
default/my-process(process) scheduled successfully

We can see the different jobs in Oozie (accessing http://localhost:11000/oozie):
oozie_bj
oozie_cj
oozie_wj

On the other hand, we see new topics and queues created in ActiveMQ:
mq_queues
amq_topics

Especially, in ActiveMQ, we have two topics:

  • Falcon publishes messages in the FALCON.my-process topic for each execution of the process
  • Falcon publishes messages in the FALCON.ENTITY.TOPIC topic for each change on the feeds

It’s where our Camel routes subscribe.

Camel routes in Karaf

Now that we have our Falcon platform ready, we just have to create Camel routes (hosted in Karaf container), subscribing on the Falcon topics in ActiveMQ.

We uncompress a Karaf container, and install the Camel features (camel-spring, activemq-camel):

$ tar zxvf apache-karaf-2.3.1.tar.gz
$ cd apache-karaf-2.3.1
$ bin/karaf
karaf@root> features:chooseurl camel
adding feature url mvn:org.apache.camel.karaf/apache-camel/LATEST/xml/features
karaf@root> features:install camel-spring
karaf@root> features:chooseurl activemq
karaf@root> features:install activemq-camel

We create a falcon-route.xml route file containing the Camel routes (using Spring DSL):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
         http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

  <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <route id="process-listener">
       <from uri="jms:topic:FALCON.my-process"/>
       <to uri="log:process-listener"/>
    </route>
    <route id="feed-listener">
       <from uri="jms:topic:FALCON.ENTITY.TOPIC"/>
       <to uri="log:feed-listener"/>
    </route>
  </camelContext>

  <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
      </bean>
    </property>
  </bean>

</beans>

In the Camel context, we create two routes, both connecting on the ActiveMQ broker, and listening on the two topics.

We drop the falcon-routes.xml in the deploy folder, and we can see it active:

karaf@root> la|grep -i falcon
[ 114] [Active     ] [            ] [Started] [   80] falcon-routes.xml (0.0.0)
karaf@root> camel:route-list 
 Context        Route              Status   
 -------        -----              ------   
 camel          feed-listener      Started  
 camel          process-listener   Started 

The routes subscribed on the topics and just send to the log (it’s very very simple).

So, we just have to take a look on the log (log:tail):

2014-03-19 11:25:43,273 | INFO  | LCON.my-process] | process-listener                 | rg.apache.camel.util.CamelLogger  176 | 74 - org.apache.camel.camel-core - 2.13.0.SNAPSHOT | Exchange[ExchangePattern: InOnly, BodyType: java.util.HashMap, Body: {brokerUrl=tcp://localhost:61616, timeStamp=2014-03-19T10:24Z, status=SUCCEEDED, logFile=hdfs://localhost:8020/falcon/staging/falcon/workflows/process/my-process/logs/instancePaths-2013-11-15-06-05.csv, feedNames=output, runId=0, entityType=process, nominalTime=2013-11-15T06:05Z, brokerTTL=4320, workflowUser=null, entityName=my-process, feedInstancePaths=hdfs://localhost:8020/data/output, operation=GENERATE, logDir=null, workflowId=0000026-140319105443372-oozie-jbon-W, cluster=local, brokerImplClass=org.apache.activemq.ActiveMQConnectionFactory, topicName=FALCON.my-process}]
2014-03-19 11:25:43,693 | INFO  | ON.ENTITY.TOPIC] | feed-listener                    | rg.apache.camel.util.CamelLogger  176 | 74 - org.apache.camel.camel-core - 2.13.0.SNAPSHOT | Exchange[ExchangePattern: InOnly, BodyType: java.util.HashMap, Body: {brokerUrl=tcp://localhost:61616, timeStamp=2014-03-19T10:24Z, status=SUCCEEDED, logFile=hdfs://localhost:8020/falcon/staging/falcon/workflows/process/my-process/logs/instancePaths-2013-11-15-06-05.csv, feedNames=output, runId=0, entityType=process, nominalTime=2013-11-15T06:05Z, brokerTTL=4320, workflowUser=jbonofre, entityName=my-process, feedInstancePaths=hdfs://localhost:8020/data/output, operation=GENERATE, logDir=hdfs://localhost:8020/falcon/staging/falcon/workflows/process/my-process/logs/job-2013-11-15-06-05/, workflowId=0000026-140319105443372-oozie-jbon-W, cluster=local, brokerImplClass=org.apache.activemq.ActiveMQConnectionFactory, topicName=FALCON.ENTITY.TOPIC}]

And we can see our notifications:

  • on the process-listener logger, we can see that my-process (entityName) has been executed with SUCCEEDED (status) at 2014-03-19T10:24Z (timeStamp). We also have the location of the job execution log on HDFS.
  • on the feed-listener logger, we can see quite the same messages. This message comes from the late-arrival, so it means that the input field changed.

For sure, the Camel routes are very simple now (just a log), but there is no limit: you bring all the powerful from ESB and BigData all together.
Once the Camel routes get the messages on ActiveMQ coming from Falcon, you can implement the integration process of your choice (sending e-mails, using Camel EIPs, calling beans, etc).

What’s next ?

I’m working on different enhancements on the late-arrival/CDC feature:

  1. The late-arrival messages in the FALCON.ENTITY.TOPIC should be improved: the message should contain a message with the feed changed, the location of the feed, eventually the size gap.
  2. We should provide a more straight forward CDC feature which doesn’t require a process to monitor a feed. Just scheduling a feed should be enough with the late cut-off.
  3. In addition of the oozie, pig, and hive workflow engine, we should provide a “pure” MapReduce jar workflow engine.
  4. The package.sh should be improved to provide a more “ready” to use Falcon Oozie custom distribution.

I’m working on this different enhancements and improvements.

On the other hand, I will propose a set of documentation improvements, especially some kind of “recipe documentation” like this one.

Stay tuned, I’m preparing a new blog about Falcon, this time about the replication between two Hadoop clusters.

Apache Hadoop and Karaf, Article 1: Karaf as HDFS client

July 8, 2013 Posted by jbonofre

Maybe some of you remember that, a couple of months ago, I posted some messages on the Hadoop mailing list about OSGi support in Hadoop (http://mail-archives.apache.org/mod_mbox/hadoop-common-dev/201202.mbox/%3C4F3285F1.2000704@nanthrax.net%3E).

In order to move forward on this topic, instead of an important refactoring, I started to work on standalone and atomic bundles that we can deploy in Karaf. The purpose is to avoid to change Hadoop core, but provides a good Hadoop support directly in Karaf.

I worked on Hadoop trunk (3.0.0-SNAPSHOT) and prepared patches (https://issues.apache.org/jira/browse/HADOOP-9706).

I also deployed bundles on my Maven repository to give users the possibility to directly deploy karaf-hadoop in a running Karaf instance.

The purpose is to explain what you can do, the values about this, and maybe you will vote to “include” it in Hadoop directly 😉

To explain exactly what you can do, I prepared a serie of blog posts:

  • Article 1: Karaf as HDFS client. This is this first post. We will see the hadoop-karaf bundle installation, the hadoop and hdfs Karaf shell commands, and how you can use HDFS to store bundles or features using the HDFS URL handler.
  • Article 2: Karaf as MapReduce job client. We will see how to run MapReduce jobs directly from Karaf, and the “hot-deploy-and-run” of MapReduce jobs using the Hadoop deployer.
  • Article 3: Exposing Hadoop, HDFS, Yarn, and MapReduce features as OSGi services. We will see how to use Hadoop features programmatically thanks to OSGi services.
  • Article 4: Karaf as a HDFS datanode (and eventually namenode). Here, more than using Karaf as a simple HDFS client, Karaf will be part of HDFS acting as a datanode, and/or namenode.
  • Article 5: Karaf, Camel, Hadoop all together. In this article, we will use the Hadoop OSGi services now available in Karaf inside Camel routes (plus the camel-hdfs component).
  • Article 6: Karaf as complete Hadoop container. I will explain here what I did in Hadoop to add a complete support of OSGi and Karaf.

Karaf as HDFS client

Just a reminder about HDFS (Hadoop Distributed FileSystem).

HDFS is composed by:
– a namenode hosting the metadata of the filesystem (directories, blocks location, file permissions or modes, …). There is only one namenode per HDFS, and the metadata are stored in memory by default.
– a set of datanode hosting the file blocks. Files are composed by blocks (like in all filesystems). The blocks are located on different datanodes. The blocks can be replicated.

A HDFS client connects to the namenode to execute actions on the filesystem (ls, rm, mkdir, cat, …).

Preparing HDFS

The first step is to set up the HDFS filesystem.

I gonna use a “pseudo-cluster”: a HDFS with the namenode and only one datanode on a single machine.
To do so, I configure the $HADOOP_INSTALL/etc/hadoop/core-site.xml file like this:


<configuration>

  <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost/</value>
  </property>

</configuration>

For a pseudo-cluster, we setup only one replica per block (as we have only one datanode) in the $HADOOP_INSTALL/etc/hadoop/hdfs-site.xml file:

<configuration>

  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>

</configuration>

Now, we can format the namenode:


$HADOOP_INSTALL/bin/hdfs namenode -format

and start the HDFS (both namenode and datanode):


$HADOOP_INSTALL/sbin/start-dfs.sh

Now, we can connect to the HDFS and create a first folder:


$HADOOP_INSTALL/bin/hadoop fs -mkdir /bundles
$HADOOP_INSTALL/bin/hadoop fs -ls /
Found 1 items
drwxr-xr-x - jbonofre supergroup 0 2013-07-07 22:18 /bundles

Our HDFS is up and running.

Configuration and installation of hadoop-karaf

I created the hadoop-karaf bundle as standalone. It means that it embeds a lot of dependencies internally (directly in the bundle classloader).

The purpose is to:

  1. avoid to alter anything in Hadoop core. Thanks to this approach, I can provide hadoop-karaf bundle for different Hadoop versions, and I don’t need to alter Hadoop itself.
  2. ship all dependencies in the same bundle classloader. Of course it’s not ideal in term of OSGi, but to provide a very easy and ready to use bundle, I gather most of dependencies in the hadoop-karaf bundle.

I worked on trunk directly (for now, if you are interested I can provide hadoop-karaf for existing Hadoop releases): Hadoop 3.0.0-SNAPSHOT.

Before deploying the hadoop-karaf bundle, we have to prepare the Hadoop configuration. In order to be integrated in Karaf, I implemented a mechanism to create and populate the Hadoop configuration from OSGi ConfigAdmin.
The only requirement for the user is to create a org.apache.hadoop PID in the Karaf etc folder containing the Hadoop properties. Actually, it means to just create a $KARAF_INSTALL/etc/org.apache.hadoop.cfg file containing:


fs.default.name = hdfs://localhost/

If you don’t want to compile hadoop-karaf bundle yourself, you can use the artifact that I deployed on my Maven repository (http://maven.nanthrax.net/org/apache/hadoop/hadoop-karaf/3.0.0-SNAPSHOT/hadoop-karaf-3.0.0-20130708.050912-1.jar).

To do this, you have to edit my Maven repository in etc/org.ops4j.pax.url.mvn.cfg and add my repository in the org.ops4j.pax.url.mvn.repositories property:


org.ops4j.pax.url.mvn.repositories = \
  http://maven.nanthrax.net/@snapshots@id=maven, \
  http://repo1.maven.org/maven2@id=central, \
  ...

Now, we can start Karaf as usual:


$KARAF_INSTALL/bin/karaf

NB: I use Karaf 2.3.1.

We can now install the hadoop-karaf bundle:


karaf@root> osgi:install -s mvn:org.apache.hadoop/hadoop-karaf/3.0.0-SNAPSHOT
karaf@root> la|grep -i hadoop
[ 54] [Active ] [Created ] [ 80] Apache Hadoop Karaf (3.0.0.SNAPSHOT)

hadoop:* and hdfs:* commands

The hadoop-karaf bundle comes with new Karaf shell commands.

For this first blog post, we are going to use only one command: hadoop:fs.

The hadoop:fs command allow you to use a HDFS directly in Karaf (it’s a wrapper to hadoop -fs):


karaf@root> hadoop:fs -ls /
Found 1 items
drwxr-xr-x - jbonofre supergroup 0 2013-07-07 22:18 /bundles
karaf@root> hadoop:fs -df
Filesystem Size Used Available Use%
hdfs://localhost 5250875392 307200 4976799744 0%

HDFS URL handler

Another thing provided by the hadoop-karaf bundle is an URL handler to support directly hdfs URL.

It means that you can use hdfs URL in Karaf commands, as osgi:install, features:addurl, ….

It also means that you can use HDFS to store your Karaf bundles, features, or configuration files.

For instance, we can copy an OSGi bundle in the HDFS:


$HADOOP_INSTALL/bin/hadoop fs -copyFromLocal ~/.m2/repository/org/apache/servicemix/bundles/org.apache.servicemix.bundles.commons-lang/2.4_6/org.apache.servicemix.bundles.commons-lang-2.4_6.jar /bundles/org.apache.servicemix.bundles.commons-lang-2.4_6.jar

The commons-lang bundle is now available in the HDFS. We can check that directly in Karaf using the hadoop:fs command:


karaf@root> hadoop:fs -ls /bundles
Found 1 items
-rw-r--r-- 1 jbonofre supergroup 272039 2013-07-07 22:18 /bundles/org.apache.servicemix.bundles.commons-lang-2.4_6.jar

Now, we can install the commons-lang bundle in Karaf directly from HDFS, using a hdfs URL:


karaf@root> osgi:install hdfs:/bundles/org.apache.servicemix.bundles.commons-lang-2.4_6.jar
karaf@root> la|grep -i commons-lang
[ 55] [Installed ] [ ] [ 80] Apache ServiceMix :: Bundles :: commons-lang (2.4.0.6)

If we list the bundles location, we can the hdfs URL support:


karaf@root> la -l
...
[ 53] [Active ] [Created ] [ 30] mvn:org.apache.karaf.management.mbeans/org.apache.karaf.management.mbeans.dev/2.3.1
[ 54] [Active ] [Created ] [ 80] mvn:org.apache.hadoop/hadoop-karaf/3.0.0-SNAPSHOT
[ 55] [Installed ] [ ] [ 80] hdfs:/bundles/org.apache.servicemix.bundles.commons-lang-2.4_6.jar

Conclusion

This first blog post shows how to use Karaf as a HDFS client. The big advantage is that the hadoop-karaf bundle doesn’t change anything from Hadoop core, and so I can provide it for Hadoop 0.20.x, 1.x, 2.x, or trunk (3.0.0-SNAPSHOT).
In Article 3, you will see how to leverage directly HDFS as OSGi services (and so use in your bundles, Camel routes, …).

Again, if you think that this articles serie is interesting, and you would like to see the Karaf support in Hadoop, feel free to post a comment, a message on the Hadoop mailing list, and whatever to promote it 😉