Introducing Apache Beam (dataflow)

January 20, 2016 Posted by jbonofre

As part of the Google Cloud ecosystem, Google created Dataflow SDK. Now, as a Google, Talend, Cask, data Artisans, PayPal, and Cloudera join effort, we are proposing Apache Dataflow to the Apache Incubator.

I’m proud, glad and excited to be the champion on the Apache Dataflow proposal.

But first, I would like to thank James Malone and Frances Perry from Google for their help, always open minded and interesting discussion. It’s really great to work with them !

Let’s take a quick tour on what will be Apache Dataflow.

Architecture and Programming Model

Imagine, you have a Hadoop cluster where you used MapReduce jobs. Now, you want to “migrate” these jobs to Spark: you have to refactore all your jobs which requires lot of works and cost a lot. And after that, see the effort and cost if you want to change for a new platform like Flink: you have to refactore your jobs again.

Dataflow aims to provide an abstraction layer between your code and the execution runtime.

The SDK allows you to use an unified programming model: you implement your data processing logic using the Dataflow SDK, the same code will run on different backends. You don’t need to refactore and change the code anymore !

If your target backend is not yet supported by Dataflow, you can implement your own runner for this backend, again the code using Dataflow SDK doesn’t change.

Dataflow is able to deal with batch processing jobs, but also with streaming jobs.

Architecture: pipelines, translators, runners

Using the SDK, your jobs are actually designed as pipeline. A pipeline is a chain of processes on the data.

It’s basically the only part that you have to write.

Dataflow reads the pipelines definition, and translate them for a target runner. A translator is responsible of adapting the pipeline code depending of the runner. For instance, the MapReduce translator will transform pipelines as MapReduce jobs, the Spark translator will transform pipelines as Spark jobs, etc.

The runners are the “execution” layer. Once a pipeline has been “translated” by a translator, it can be run directly on a runner. The runner “hides” the actual backend: MapReduce/Yarn cluster, Spark cluster (running on Yarn or Mesos), etc.

If Dataflow comes with ready to use translators and runners, you can create your own ones.

For instance, you can implement your own runner by creating a class extending PipelineRunner. You will have to implement different runner behaviours (like the transform evaluators, supported options, apply main transform hook method, etc).

SDK

The SDK is composed by four parts:

  • Pipelines are the streaming and processing logic that you want to implement. It’s a chain of processes. Basically, in a pipeline, you read data from a source, you apply transformations on the data, and eventually send the data to a destination (named sink in Dataflow wording).
  • PCollection is the object transported inside a pipeline. It’s the container of the data, flowing between each step of the pipeline.
  • Transform is a step of a pipeline. It takes an incoming PCollection and creates an outcoming PCollection. You can implement your own transform function.
  • Sink and Source are used to retrieve data as input (first step) of a pipeline, and eventually send the data outside of the pipeline.

In action

Let’s implement a very simple Wordcount job using Dataflow SDK.

First, we create a pipeline:

DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("Dataflow-Wordcount");
Pipeline pipeline = Pipeline.create(options);

We can see in the options that we define the pipeline runner: of course, it’s not a good practice to define the runner in the pipeline code. You can externalize the runner definition using commandline arguments.

We have a pipeline, we can now define the source of the data for this pipeline. It’s basically what we name a source in Dataflow:

pipeline.apply(TextIO.Read.from("/path/to/test.txt"))

Basically, the apply() method allows you to define the steps of a pipeline. A step can be:

  • a source (input)
  • a transform
  • a sink (output)

So, actually, you can see a pipeline like a chain of apply calls: pipeline.apply().apply().apply()….

Now, we have the data coming from a file (step 1). The purpose of the next step (step 2) is to identify the words in the input data. This step is a transformation step: it takes a PCollection as input (coming from the file source) and creates a resulting set of PCollections. To implement this transformation step, we use the ParDo function in the apply method. The ParDo function allows you to inject a DoFn (DoFunction) on the PCollection:

  .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
     @Override
     public void processElement(ProcessContext c) {
       for (String word : c.element().split("[^a-zA-Z']+")) {
         if (!word.isEmpty()) {
           c.output(word);
         }
       }
     }
  }))

The PCollections (input and output) are wrapped in a ProcessContext.

Now, we have identified the words, we are ready to count them. It’s a new step (step 3): it transforms the words as a map containing the count for each word.
To implement this step, as always, we use the apply method. We can use the same logic as in step 2 (using ParDo and DoFn). Fortunately, Dataflow provides ready to use function. Especially, in our case, Dataflow provides a Count function that we can use directly in the apply method:

.apply(Count.<String>perElement())

After this step (step 3), we have a Map containing word => count. We want to format this Map as a String, ready to be written in a file.
So, we add a new transformation step (step 4), again using the apply method on a pipeline. In this apply, we implement a SimpleFunction interface providing our own apply() method:

  .apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
     @Override
     public String apply(KV element) {
       return element.getKey() + ": " + element().getValue();
     }
  }))

After step 4, we have a String with word and count for each word. We are ready to send this String to an output file (a sink): it’s a new (and final) step of the pipeline, so we use again the apply() method:

.apply(TextIO.Write.to("gs://my-bucket/counts.txt"));

Here we are: our pipeline is done and ready to run.

To run it, it’s pretty simple: just call run() method on a pipeline:

pipeline.run();

What’s next

We are thinking some new features in Dataflow.

NB: these improvements and changes are still under review and discussion.

New runners, sinks, and sources

First, we will extend the provided runners, and sinks/sources to be much more “open” and turnkey.

For instance, why not having a runner for ignite, for Karaf, for a simple JVM (as the DirectPipelineRunner), etc.

In term of sinks & sources, I’m planning to be able to use Camel components, or at least provide more and more out of the box I/O support, especially for the Data Integration Dataflow layer.

DSL

If the usage of apply() method on a pipeline is very flexible, it may appear “complex” to new user. We are thinking about providing a DSL. So basically, instead of definition a pipeline as a chain of apply calls, you would be able to use specialized function like (of course, it would be still possible to use the apply method):

from("file:...").apply().count().to("file:...");

Data Integration

Dataflow is great abstraction layer for data processing (batch and streaming), but it could also become a very flexible integration platform. Additionally to the processing DSL, we will provide a integration DSL. It will bring EIP (Enterprise Integration Patterns) to the data integration.
Imagine an integration pipeline like:

from("http:...").filter().aggregate().to("jms:...").when(...).to().otherwise().to("kafka:...");

We can mix processing and integration in a very smart and powerful way.

It means that we will have the process/stream DSL and the integration DSL on top of the core one. Furthermore, the Sinks/Sources will not be only data source, but also message and data mixes.

About jbonofre

ASF Member, PMC for Apache Karaf, PMC for Apache ServiceMix, PMC for Apache ACE, PMC for Apache Syncope, Committer for Apache ActiveMQ, Committer for Apache Archiva, Committer for Apache Camel, Contributor for Apache Falcon Twitter: jbonofre IRC: jbonofre on #servicemix,#karaf,#camel,#cxf on Freenode

One Response to Introducing Apache Beam (dataflow)

  1. Pingback: An Overview of Apache Streaming Technologies | Databaseline