When you use Apache Camel routes for your integration, when a failure occurs, a classic pattern is to retry the message. That’s especially true for the recoverable errors: for instance, if you have a network outage, you can replay the messages, hoping we will have a network recovery.
In Apache Camel, this redelivery policy is configured in the error handler. The default and dead letter error handlers supports such policy.
However, by default, the redelivered exchange is stored in memory, and new exchanges are not coming through since the first redelivered exchange with exception is not flagged as “handled”.
This approach could be an issue as if you restart the container hosting the Camel route (like Apache Karaf), the exchange is lost. More other, in term of performance, we might want to still get the exchanges going through.
There are several solutions to achieve this. In this blog, I will illustrate a possible implementation of a persistent redelivery policy with backoff support.
Apache Camel route
Let’s start with routes to illustrate the use case. I’m using the Camel Blueprint DSL in this example as my preferred container is Apache Karaf (of course 😉 ).
The first route is pretty simple: it’s listening for incoming HTTP request (using the Camel Jetty component), convert this as a String, and send to a JMS queue.
The second route consumes messages from the JMS queue, prepare a JSON using a processor (registered as a service in Karaf) and call a REST service.
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:cxf="http://camel.apache.org/schema/cxf"> <reference id="connectionFactory" interface="javax.jms.ConnectionFactory"/> <reference id="jsonProcessor" interface="org.apache.camel.Processor" filter="(name=json)"/> <cxf:rsClient id="rsClient" address="http://localhost:8181/cxf/test/"/> <camelContext xmlns="http://camel.apache.org/schema/blueprint"> <route id="first"> <from uri="jetty:http://0.0.0.0:9090/first"/> <convertBodyTo type="java.lang.String"/> <wireTap uri=jms:queue:second?connectionFactory=connectionFactory"/> <setBody><constant>OK</constant></setBody> </route> <route id="second"> <from uri="jms:queue:second?connectionFactory=connectionFactory&acknowledgementModeName=CLIENT_ACKNOWLEDGE&cacheLevelName=CACHE_CONSUMER"/> <process ref="jsonProcessor"/> <setHeader headerName="operationName"><constant>updateResource</constant></setHeader> <setHeader headerName="CamelCxfRsUsingHttpAPI"><constant>false</constant></setHeader> <setHeader herderName="CamelAcceptContentType"><constant>application/json</constant></setHeader> <to uri="cxfrs://bean://rsClient?synchronous=true"/> </route> </camelContext> </blueprint>
By default, when you create a Camel route, Camel automatically enables the default error handler. This error handler is an interceptor on the Camel channel (the “wire” between two processors in the route), and reacts when an exchange contains an exception.
In the case, it stops the exchange going through (waiting the handled flag) and retry the exchange.
Here, we wants:
- persist the message to be able to resume even in case of a platform restart or failure
- actually remove the message from the JMS queue only when it has been processed successfully
- replay the message for a number of times or infinite
Point 1 is already cover by default in Camel: by default, Camel will set the JMS messages as persistent. So, it means that the messages are stored in the ActiveMQ kahadb persistent store. Even if we restart ActiveMQ, those messages are not lost.
For point 2, we change the JMS acknowledgement mode. By default, Camel uses auto ack: it means that the ack is sent to the broker as soon as it’s consumed, so just after the jms endpoint in the Camel route. If a failure/exception occurs in the Camel route, then you have to deal with that, the message is already removed from the queue.
Changing the ack mode to client ack allows us to send the ack only at the OnCompletion time of the exchange. So the message will be removed from the queue only if it has been processed completely and succesfully. To do so, we add the ack mode on the jms endpoint URI:
For point 3, we are going to change the provider URL in the ActiveMQ connection factory. By default, the ActiveMQ connection factory will redelivery a message 7 times max.
The number of redelivery attempt can be changed on the URL used by connection factory. For instance, to define a max number of redeliveries to 10, you can do:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=10" /> </bean>
You can also redelivery for ever (actually up to the expiration date of the message), using
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=-1" /> </bean>
In the previous section, we used the ActiveMQ redelivery policy mechanism. That’s pretty convenient, but in order to optimize the number of attempts, we may want to introduce some delay between redeliveries. And more we have redeliveries, more we want to increase this delay. It’s what we call backoff redelivery.
This also can be configured on the URL of the connection factory using:
useExponentialBackOffenables an exponential backoff. It’s disabled by default.
backOffMultiplierdefines how we increase the delay. By default, the “new” delay is 5 times longer than the “previous” one.
initialRedeliveryDelayis the early first redelivery delay. It’s the “startup” delay of the backoff policy. By default it’s
1000L, meaning 1 second.
maximumRedeliveryDelayis the max delay we can have. By default, it’s
meaning there’s not max.
So, let’s update the connection factory URL to enable backoff redelivery:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=-1&jms.redeliveryPolicy.useExponentialBackOff=true&jms.redeliveryPolicy.initialRedeliveryDelay=2000L&jms.redeliveryPolicy.backOffMultiplier=2" /> </bean>
Here we have a redelivery policy with backoff that will double the delay for each attempt, starting from 2 seconds.
Thanks to that, we keep our Camel route pretty simple, we have consistent & persistent redelivery, using backoff policy.