Wednesday, March 30, 2016

RabbitMQ DeadLetter Queue with a Requeueing Threshold

When an exception is thrown on a Spring consumer for RabbitMQ, Spring's default behavior is to requeue the message on the head of the queue. This usually results in a message being infinitely requeued if the exception is thrown again. Not only does this cause an infinite loop in requeueing, since the message is placed at the head of the queue, all other messages are stopped from being processed if there is only one consumer.

Some solutions are to:

1. Wrap the Exception with a AmqpRejectAndDontRequeueException.
@Override
public void onMessage(Message message) {
//this is where you consume the message
try{
consumeMessage(message);
}catch(Exception exception){
//if any exception is thrown and you never want the message to requeue
//this is dangerous because at this point this message is dropped
//at least do some minor logging
System.out.println(e.getMessage());
throw new AmqpRejectAndDontRequeueException(e);
}
}

2. Change the default to not requeue. If you change the default to not requeue, then you need to setup a deadletter exchange and queue otherwise the message is dropped on the floor.
public class RabbitMQListenerContainerNoRequeue implements MessageListener{
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private AmqpAdmin amqpAdmin;
protected SimpleMessageListenerContainer simpleMessageListenerContainer;
/*
* Setup the SimpleMessageListenerContainer to not requeue messages that throw exceptions by default. If a message
* throws an exception it will be thrown out.
*/
public SimpleMessageListenerContainer createSimpleMessageListenerContainer() {
amqpAdmin.declareQueue("yourQueueName");
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
simpleMessageListenerContainer.setQueueNames("yourQueueName");
simpleMessageListenerContainer.setMessageListener(this);
simpleMessageListenerContainer.setDefaultRequeueRejected(false);
return simpleMessageListenerContainer;
}
@Override
public void onMessage(Message message) {
//this is where you consume the message
consumeMessage(message);
}
}

3. A more sophisticated solution is to setup a retry threshold and then deadletter the message.
Example: 
/**
* The DeadLetterProducer will be responsible for creating the DeadLetter queue, creating the DeadLetter Exchange,
* and binding them together according to the routing key. The routing key is the queue name for simplicity.
*/
@Service
public class DeadLetterProducer{
public static String EXCHANGE_NAME = "DeadLetter.Exchange";
public static String QUEUE_NAME = "DeadLetter.Queue";
public Queue createQueue() {
//the deadletter queue arguement must be provided to allow rabbitmq to treat it as such.
//https://www.rabbitmq.com/dlx.html
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", QUEUE_NAME);
return new Queue(QUEUE_NAME, false, false, false, args);
}
@PostConstruct
public void postConstruct() {
bind();
}
public Queue createQueue() {
return new Queue(QUEUE_NAME);
}
public String getRoutingKey() {
return QUEUE_NAME;
}
public void bind() {
//create the exchange
DirectExchange exchange = new DirectExchange(EXCHANGE_NAME);
amqpAdmin.declareExchange(exchange);
//create the queue
Queue queue = createQueue();
amqpAdmin.declareQueue(queue);
//create the binding between the two
Binding binding = BindingBuilder.bind(queue).to(exchange).with(getRoutingKey());
amqpAdmin.declareBinding(binding);
}
public void send(T message) {
rabbitTemplate.convertAndSend(getExchangeName(), getRoutingKey(), message);
}
}
public class SampleRabbitmqConsumerWithDeadletterSupport implements MessageListener{
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private AmqpAdmin amqpAdmin;
/* Autowire in the RetryOperationsInteceptor that is setup in the configuration so that it can be attached to the
* SimpleMessageListenerContainer. Now when the container throws an expception, the new retry operations for the
* deadletter queue will be invoked.
*/
@Autowired
private RetryOperationsInterceptor retryOperationsInterceptor;
protected SimpleMessageListenerContainer simpleMessageListenerContainer;
public SimpleMessageListenerContainer createSimpleMessageListenerContainer() {
amqpAdmin.declareQueue("yourQueueName");
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
//attach the retryOperationsInterceptor as part of the adviceChain
simpleMessageListenerContainer.setAdviceChain(new Advice[] { retryOperationsInterceptor });
simpleMessageListenerContainer.setQueueNames("yourQueueName");
simpleMessageListenerContainer.setMessageListener(this);
return simpleMessageListenerContainer;
}
@Override
public void onMessage(Message message) {
//this is where you consume the message
consumeMessage(message);
}
}
/**
* The main point of this Spring configuration is to demonstrate the retryTemplate and the deadLetterInteceptor that needs
* to be coded to requeue messages on their appropriate queues until the retry threshold is exceeded and then
* the message is placed on the deadletter queue.
* The amqpConnectionFactory with deadletter retry support.
* The retry template uses an ExponentialBackOffPolicy with some basic settings assumed.
* The connection factory also assumes basic settings.
*/
@Configuration
class SpringRabbitmqDeadletterConfiguration{
//Basic connectionFactory setup, there is no deadletter or retry specific settings here
@Bean
ConnectionFactory amqpConnectionFactory() {
CachingConnectionFactory connectionFactory = null;
connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setChannelCacheSize(25);
connectionFactory.setCloseTimeout(30000);
connectionFactory.setConnectionCacheSize(1);
connectionFactory.setRequestedHeartBeat(0);
return connectionFactory;
}
/**
* Sets up the RetryTemplate, ExponentialBackOffPolicy with some basic settings, attaches that to the template
* as a SimpleRetryPolicy
*
* @return RetryTemplate
*/
@Bean
public RetryTemplate retryTemplate() {
//create retryTemplate and attach the backoffPolicy
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(exponentialBackOffPolicy());
//create a SimpleRetryPolicy and attach to the retryTemplate
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
public ExponentialBackOffPolicy exponentialBackOffPolicy(){
//setup ExponentialBackOffPolicy
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(30000);
return backoffPolicy;
}
/**
* Creates an interceptor that is can be attached to queues that will try to requeue a failed message a number of
* times and then it will place on the deadletter queue.
*/
@Bean
RetryOperationsInterceptor deadLetterInterceptor() {
return RetryInterceptorBuilder.stateless().maxAttempts(5).recoverer(
new RepublishMessageRecoverer(rabbitTemplate(), DeadLetterProducer.EXCHANGE_NAME,
DeadLetterProducer.QUEUE_NAME))
.backOffPolicy(exponentialBackOffPolicy()).build();
}
/**
* A message queue manager that applies a retry template
*
* @return RabbitTemplate
*/
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate messageQueueManager = new RabbitTemplate(amqpConnectionFactory());
messageQueueManager.setRetryTemplate(this.retryTemplate());
return messageQueueManager;
}
}