Apache Beam: easily implement backoff policy in your DoFn

In Apache Beam, DoFn is your swiss knife: when you don’t have an existing PTransform or CompositeTransform provided by the SDK, you can create your own function.

DoFn ?

A DoFn applies your logic in each element in the input PCollection and let you populate the elements of an output PCollection. To be included in your pipeline, it’s wrapped in a ParDo PTransform.

For instance, you can transform element using a DoFn:

pipeline.apply("ReadFromJms", JmsIO.read().withConnectionFactory(CF).withQueue("city"))
         .apply("TransformJmsRecordAsPojo", ParDo.of(new DoFn<JmsRecord, MyCityPojo>() {
                   public void processElement(ProcessContext c) {
                       String payload = c.element().getPayload();
                       MyCityPojo city = new MyCityPojo(payload);

We can see here the core method of DoFn: processElement() annotated with @ProcessElement.

If you have to deal with resources, other annotated methods allow you to tweak the DoFn lifecycle:

  1. @Setup is called when the DoFn is created on the worker.
  2. @StartBundle is called when the runner starts a bundle of data. The bundle size depends of the runner.
  3. @FinishBundle is called when all elements in the bundle has been processed.
  4. @Teardown is called when the DoFn is stopped on the worker.

Adding Backoff Policy

Now, in your @ProcessElement method, if an exception occurs, the current bundle is just invalidated and not processed.

So, you might want to implement a retry strategy, replaying the data processing logic.

Apache Beam provides utils allowing you to easily implement this retry policy: it’s what we name backoff policy.

FluentBackoff is a convenient util class where you can define a retry policy with:

  • an initial backoff delay between the retry (incremented at each attempt)
  • a max number of retries

To illustrate this, let’s create a DoFn using this:

public class DoFnWithBackoff extends DoFn<JmsRecord, MyCityPojo> {

   private static final FluentBackoff BACKOFF =

   public void processElement(ProcessContext c) {
     Sleeper sleeper = Sleeper.DEFAULT;
     BackOff backoff = BACKOFF.backoff();
     while (true) {
       try {
          String payload = c.element().getPayload();
          MyCityPojo city = new MyCityPojo(payload); // can raise an exception
       } catch (Exception e) {
         if (e instanceOf BusinessException.class) {
           // in case of custom business exception, we don't retry
           throw new BusinessException();
         // retrying changing the content if required
         if (!BackOffUtils.next(sleeper, backoff)) {
           // we retried the max number of times
           throw e;

The Apache Beam backoff utils give you complete control and you can implement your own retry policy.

You May Also Like

About the Author: jbonofre

ASF Member, PMC for Apache Karaf, PMC for Apache ServiceMix, PMC for Apache Archiva, PMC for Apache Felix, PMC for Apache Camel, PMC for Apache Syncope, PMC for Apache Beam, PMC for Apache CarbonData, PMC for Apache Bahir, PMC for Apache Brooklyn, PMC for Apache Falcon, PMC for Apache Guacamole, PMC for Apache Lens, Committer for Apache ActiveMQ and much more ! Twitter: jbonofre IRC: jbonofre on #servicemix,#karaf,#camel,#cxf on Freenode