Some solutions are to:
1. Wrap the Exception with a AmqpRejectAndDontRequeueException.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Override | |
public void onMessage(Message message) { | |
//this is where you consume the message | |
try{ | |
consumeMessage(message); | |
}catch(Exception exception){ | |
//if any exception is thrown and you never want the message to requeue | |
//this is dangerous because at this point this message is dropped | |
//at least do some minor logging | |
System.out.println(e.getMessage()); | |
throw new AmqpRejectAndDontRequeueException(e); | |
} | |
} |
2. Change the default to not requeue. If you change the default to not requeue, then you need to setup a deadletter exchange and queue otherwise the message is dropped on the floor.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RabbitMQListenerContainerNoRequeue implements MessageListener{ | |
@Autowired | |
private ConnectionFactory connectionFactory; | |
@Autowired | |
private AmqpAdmin amqpAdmin; | |
protected SimpleMessageListenerContainer simpleMessageListenerContainer; | |
/* | |
* Setup the SimpleMessageListenerContainer to not requeue messages that throw exceptions by default. If a message | |
* throws an exception it will be thrown out. | |
*/ | |
public SimpleMessageListenerContainer createSimpleMessageListenerContainer() { | |
amqpAdmin.declareQueue("yourQueueName"); | |
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); | |
simpleMessageListenerContainer.setQueueNames("yourQueueName"); | |
simpleMessageListenerContainer.setMessageListener(this); | |
simpleMessageListenerContainer.setDefaultRequeueRejected(false); | |
return simpleMessageListenerContainer; | |
} | |
@Override | |
public void onMessage(Message message) { | |
//this is where you consume the message | |
consumeMessage(message); | |
} | |
} |
3. A more sophisticated solution is to setup a retry threshold and then deadletter the message.
Example:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* The DeadLetterProducer will be responsible for creating the DeadLetter queue, creating the DeadLetter Exchange, | |
* and binding them together according to the routing key. The routing key is the queue name for simplicity. | |
*/ | |
@Service | |
public class DeadLetterProducer{ | |
public static String EXCHANGE_NAME = "DeadLetter.Exchange"; | |
public static String QUEUE_NAME = "DeadLetter.Queue"; | |
public Queue createQueue() { | |
//the deadletter queue arguement must be provided to allow rabbitmq to treat it as such. | |
//https://www.rabbitmq.com/dlx.html | |
Map<String, Object> args = new HashMap<>(); | |
args.put("x-dead-letter-exchange", QUEUE_NAME); | |
return new Queue(QUEUE_NAME, false, false, false, args); | |
} | |
@PostConstruct | |
public void postConstruct() { | |
bind(); | |
} | |
public Queue createQueue() { | |
return new Queue(QUEUE_NAME); | |
} | |
public String getRoutingKey() { | |
return QUEUE_NAME; | |
} | |
public void bind() { | |
//create the exchange | |
DirectExchange exchange = new DirectExchange(EXCHANGE_NAME); | |
amqpAdmin.declareExchange(exchange); | |
//create the queue | |
Queue queue = createQueue(); | |
amqpAdmin.declareQueue(queue); | |
//create the binding between the two | |
Binding binding = BindingBuilder.bind(queue).to(exchange).with(getRoutingKey()); | |
amqpAdmin.declareBinding(binding); | |
} | |
public void send(T message) { | |
rabbitTemplate.convertAndSend(getExchangeName(), getRoutingKey(), message); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class SampleRabbitmqConsumerWithDeadletterSupport implements MessageListener{ | |
@Autowired | |
private ConnectionFactory connectionFactory; | |
@Autowired | |
private AmqpAdmin amqpAdmin; | |
/* Autowire in the RetryOperationsInteceptor that is setup in the configuration so that it can be attached to the | |
* SimpleMessageListenerContainer. Now when the container throws an expception, the new retry operations for the | |
* deadletter queue will be invoked. | |
*/ | |
@Autowired | |
private RetryOperationsInterceptor retryOperationsInterceptor; | |
protected SimpleMessageListenerContainer simpleMessageListenerContainer; | |
public SimpleMessageListenerContainer createSimpleMessageListenerContainer() { | |
amqpAdmin.declareQueue("yourQueueName"); | |
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); | |
//attach the retryOperationsInterceptor as part of the adviceChain | |
simpleMessageListenerContainer.setAdviceChain(new Advice[] { retryOperationsInterceptor }); | |
simpleMessageListenerContainer.setQueueNames("yourQueueName"); | |
simpleMessageListenerContainer.setMessageListener(this); | |
return simpleMessageListenerContainer; | |
} | |
@Override | |
public void onMessage(Message message) { | |
//this is where you consume the message | |
consumeMessage(message); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* The main point of this Spring configuration is to demonstrate the retryTemplate and the deadLetterInteceptor that needs | |
* to be coded to requeue messages on their appropriate queues until the retry threshold is exceeded and then | |
* the message is placed on the deadletter queue. | |
* The amqpConnectionFactory with deadletter retry support. | |
* The retry template uses an ExponentialBackOffPolicy with some basic settings assumed. | |
* The connection factory also assumes basic settings. | |
*/ | |
@Configuration | |
class SpringRabbitmqDeadletterConfiguration{ | |
//Basic connectionFactory setup, there is no deadletter or retry specific settings here | |
@Bean | |
ConnectionFactory amqpConnectionFactory() { | |
CachingConnectionFactory connectionFactory = null; | |
connectionFactory = new CachingConnectionFactory("localhost"); | |
connectionFactory.setUsername("guest"); | |
connectionFactory.setPassword("guest"); | |
connectionFactory.setChannelCacheSize(25); | |
connectionFactory.setCloseTimeout(30000); | |
connectionFactory.setConnectionCacheSize(1); | |
connectionFactory.setRequestedHeartBeat(0); | |
return connectionFactory; | |
} | |
/** | |
* Sets up the RetryTemplate, ExponentialBackOffPolicy with some basic settings, attaches that to the template | |
* as a SimpleRetryPolicy | |
* | |
* @return RetryTemplate | |
*/ | |
@Bean | |
public RetryTemplate retryTemplate() { | |
//create retryTemplate and attach the backoffPolicy | |
RetryTemplate retryTemplate = new RetryTemplate(); | |
retryTemplate.setBackOffPolicy(exponentialBackOffPolicy()); | |
//create a SimpleRetryPolicy and attach to the retryTemplate | |
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); | |
retryTemplate.setRetryPolicy(retryPolicy); | |
return retryTemplate; | |
} | |
public ExponentialBackOffPolicy exponentialBackOffPolicy(){ | |
//setup ExponentialBackOffPolicy | |
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); | |
backOffPolicy.setInitialInterval(1000); | |
backOffPolicy.setMultiplier(10.0); | |
backOffPolicy.setMaxInterval(30000); | |
return backoffPolicy; | |
} | |
/** | |
* Creates an interceptor that is can be attached to queues that will try to requeue a failed message a number of | |
* times and then it will place on the deadletter queue. | |
*/ | |
@Bean | |
RetryOperationsInterceptor deadLetterInterceptor() { | |
return RetryInterceptorBuilder.stateless().maxAttempts(5).recoverer( | |
new RepublishMessageRecoverer(rabbitTemplate(), DeadLetterProducer.EXCHANGE_NAME, | |
DeadLetterProducer.QUEUE_NAME)) | |
.backOffPolicy(exponentialBackOffPolicy()).build(); | |
} | |
/** | |
* A message queue manager that applies a retry template | |
* | |
* @return RabbitTemplate | |
*/ | |
@Bean | |
public RabbitTemplate rabbitTemplate() { | |
RabbitTemplate messageQueueManager = new RabbitTemplate(amqpConnectionFactory()); | |
messageQueueManager.setRetryTemplate(this.retryTemplate()); | |
return messageQueueManager; | |
} | |
} |