Spring AMQP 错误处理策略详解
2021/5/30 10:23:00
本文主要是介绍Spring AMQP 错误处理策略详解,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
1.介绍
异步消息传递是一种松耦合的分布式通信,在事件驱动体系结构实现中越来越受欢迎。幸运的是,Spring框架提供了Spring AMQP项目,可以帮助我们构建基于AMQP的消息传递解决方案。
另一方面,在这种环境中处理错误并不简单。本文将讨论错误处理策略。
2.配置环境
这里使用RabbitMQ实现AMQP标准。此外,Spring AMQP还提供了spring-rabbit模块,让集成更容易。
RabbitMQ作为独立服务器运行。执行下面命令,在Docker容器中运行:
docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management
了解更多配置信息和项目依赖项设置,请参阅Spring AMQP教程。
3.失败场景
由于分布式特性,相比一个独立整体的应用程序,通常基于消息传递的系统发生的错误类型会更多。
下面列举了一些异常类型:
网络或I/O相关故障:网络连接和I/O操作故障;
协议或基础架构相关错误:消息传递基础架构配置错误;
代理(Broker)相关故障:客户端与AMQP代理之间配置不正确。例如,达到定义的限制或阈值、身份验证或策略配置无效。
应用程序和消息相关的异常:表现为违反某些业务或应用程序规则的异常;
当然,上述故障列表并不全面,但包含了最常见的错误类型。
应该注意到,Spring AMQP默认处理了与连接有关的问题和底层问题,开箱即用。例如,重试机制、重排队策略等。此外,大多数故障和错误都会转换为AmqpException或它的子类。
接下来,我们会主要关注应用错误和高级别错误,然后介绍全局错误处理策略。
4.配置项目
首先,定义一个简单队列并进行exchange配置:
public static final String QUEUE_MESSAGES = "baeldung-messages-queue"; public static final String EXCHANGE_MESSAGES = "baeldung-messages-exchange"; @Bean Queue messagesQueue() { return QueueBuilder.durable(QUEUE_MESSAGES) .build(); } @Bean DirectExchange messagesExchange() { return new DirectExchange(EXCHANGE_MESSAGES); } @Bean Binding bindingMessages() { return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); }
接下来,创建一个简单的producer:
public void sendMessage() { rabbitTemplate .convertAndSend(SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES, SimpleDLQAmqpConfiguration.QUEUE_MESSAGES, "Some message id:"+ messageNumber++); }
最后,consumer会抛出一个异常:
@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES) public void receiveMessage(Message message) throws BusinessException { throw new BusinessException(); }
默认情况下,所有失败的消息会马上不断添加到目标队列。
执行Maven命令运行示例程序:
mvn spring-boot:run -Dstart-class=com.baeldung.springamqp.errorhandling.ErrorHandlingApp
现在应该看到类似下面的输出:
WARN 22260 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed. Caused by: com.baeldung.springamqp.errorhandling.errorhandler.BusinessException: null
因此,默认情况下会在输出中看到无数这样的消息。
处理这种情况有两种选择:
为listener设置default-requeue-rejected参数,false-spring.rabbitmq.listener.simple.default-requeue-rejected = false。
抛出AmqpRejectAndDontRequeueException,对于将来没有意义的消息可以直接丢掉。
下面讨论如何更智能地处理失败消息。
5.死信队列
死信队列(Dead Letter Queue,DLQ)用来保存未送达或失败的消息。DLQ能帮助我们处理错误消息或不良消息,监视故障模式并从系统异常中恢复。
更重要的是,这样能够防止队列陷入处理不良消息的死循环,造成系统性能下降。
这里有两个主要概念:死信交换(DLX)和死信队列(DLQ)。实际上,DLX是一种普通exchange,可以把它定义为一种常见类型:direct, topic 或者 fanout。
producer对队列一无所知,了解这一点很重要。它只知道exchange,并且所有产生的消息都根据exchange配置和message routing key进行路由。
接下来,让我们看看如何通过应用“死信队列”处理异常。
5.1.基础配置
为了配置DLQ,需要在定义队列时指定其他参数:
@Bean Queue messagesQueue() { return QueueBuilder.durable(QUEUE_MESSAGES) .withArgument("x-dead-letter-exchange", "") .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ) .build(); } @Bean Queue deadLetterQueue() { return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build(); }
上面的示例中,增加了两个参数:x-dead-letter-exchange 和 x-dead-letter-routing-key。x-dead-letter-exchange使用空字符串,通知代理使用默认exchange。
第二个参数与简单消息设置routing key一样重要。该选项修改了消息初始routing key,为后面通过DLX路由做准备。
5.2.失败消息路由
当消息传递失败时,将被路由到Dead Letter Exchange(DLE)。但是正如我们已经指出的那样,DLX是一种正常的交换方式。因此,如果失败消息的routing key与exchange不匹配,那么不会传给DLQ。
Exchange: (AMQP default) Routing Key: baeldung-messages-queue.dlq
如果上面的示例忽略x-dead-letter-routing-key,那么失败消息将会无限循环重试。
此外,消息元信息可以在x-death header中找到:
x-death: count: 1 exchange: baeldung-messages-exchange queue: baeldung-messages-queue reason: rejected routing-keys: baeldung-messages-queue time: 1571232954
上面的信息可以在RabbitMQ管理控制台中看到,通常在本地端口15672上运行。
除了上面这样配置,如果使用Spring Cloud Stream,还可以利用republishToDlq和。autoBindDlq属性简化配置过程。
5.3.Dead Letter Exchange
上一节中已经看到,消息路由到DLE时更改了routing key。但是这种处理并不适用于所有情况。可以自己配置DLX并使用fanout类型定义:
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx"; @Bean Queue messagesQueue() { return QueueBuilder.durable(QUEUE_MESSAGES) .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) .build(); } @Bean FanoutExchange deadLetterExchange() { return new FanoutExchange(DLX_EXCHANGE_MESSAGES); } @Bean Queue deadLetterQueue() { return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build(); } @Bean Binding deadLetterBinding() { return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()); }
这次我们使用fanout类型自定义exchange,因此消息将被发送到所有bind队列。此外,我们把x-dead-letter-exchange参数设为DLX的名字。同时,删除了x-dead-letter-routing-key参数。
现在运行示例时,无需更改初始routing key就能把失败消息发送到DLQ:
Exchange: baeldung-messages-queue.dlx Routing Key: baeldung-messages-queue
5.4.处理死信队列消息
当然,把消息移到“死信队列”中的原因是为了在其他时间可以重新处理。
为“死信队列”定义一个listener:
@RabbitListener(queues = QUEUE_MESSAGES_DLQ) public void processFailedMessages(Message message) { log.info("Received failed message: {}", message.toString()); }
现在运行示例代码,应该能够看到log输出:
WARN 11752 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed. INFO 11752 --- [ntContainer#1-1] c.b.s.e.consumer.SimpleDLQAmqpContainer : Received failed message:
收到了一条失败消息,接下来该怎么办?根据不同的系统要求、异常类型或者消息类型,答案也各有不同。
例如,可以让消息重新排队发送到原目的地址:
@RabbitListener(queues = QUEUE_MESSAGES_DLQ) public void processFailedMessagesRequeue(Message failedMessage) { log.info("Received failed message, requeueing: {}", failedMessage.toString()); rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); }
但是这种异常处理逻辑与默认的重试策略没有区别:
INFO 23476 --- [ntContainer#0-1] c.b.s.e.c.RoutingDLQAmqpContainer : Received message: WARN 23476 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed. INFO 23476 --- [ntContainer#1-1] c.b.s.e.c.RoutingDLQAmqpContainer : Received failed message, requeueing:
通常采用的策略可能需要重试n次然后拒绝该消息。让我们利用消息头实现这种策略:
public void processFailedMessagesRetryHeaders(Message failedMessage) { Integer retriesCnt = (Integer) failedMessage.getMessageProperties() .getHeaders().get(HEADER_X_RETRIES_COUNT); if (retriesCnt == null) retriesCnt = 1; if (retriesCnt > MAX_RETRIES_COUNT) { log.info("Discarding message"); return; } log.info("Retrying message for the {} time", retriesCnt); failedMessage.getMessageProperties() .getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt); rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); }
首先获取x-retries-count header值,然后与重试最大次数进行比较。如果计数器达到重试最大次数,则丢弃该消息:
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed. INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer : Retrying message for the 1 time WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed. INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer : Retrying message for the 2 time WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed. INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer : Discarding message
补充一点,还可以利用x-message-ttl header设置消息丢弃时间,超过该时间消息将被丢弃。这样可以防止队列无限增长。
5.5.停车场队列
还有一种情况需要考虑,即不能简单把消息丢弃,因为它可能是一项银行交易。有时可能需要手动处理某条消息,或者仅仅需要记录失败n次以上的消息。
对于这种情况,有一概念称作停车场队列。可以将DLQ中失败次数超过允许值的所有消息转发到停车场队列,以便进一步处理。
下面展示了对应的实现:
public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot"; public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + "exchange.parking-lot"; @Bean FanoutExchange parkingLotExchange() { return new FanoutExchange(EXCHANGE_PARKING_LOT); } @Bean Queue parkingLotQueue() { return QueueBuilder.durable(QUEUE_PARKING_LOT).build(); } @Bean Binding parkingLotBinding() { return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange()); }
接下来重构listener逻辑,把消息发送到停车场队列:
@RabbitListener(queues = QUEUE_MESSAGES_DLQ) public void processFailedMessagesRetryWithParkingLot(Message failedMessage) { Integer retriesCnt = (Integer) failedMessage.getMessageProperties() .getHeaders().get(HEADER_X_RETRIES_COUNT); if (retriesCnt == null) retriesCnt = 1; if (retriesCnt > MAX_RETRIES_COUNT) { log.info("Sending message to the parking lot queue"); rabbitTemplate.send(EXCHANGE_PARKING_LOT, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); return; } log.info("Retrying message for the {} time", retriesCnt); failedMessage.getMessageProperties() .getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt); rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); }
最后,处理转发到停车场队列的消息:
@RabbitListener(queues = QUEUE_PARKING_LOT) public void processParkingLotQueue(Message failedMessage) { log.info("Received message in parking lot queue"); // Save to DB or send a notification. }
现在可以把失败的消息保存到数据库,或者发送email通知。
运行程序进行测试:
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed. INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer : Retrying message for the 1 time WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed. INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer : Retrying message for the 2 time WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed. INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer : Sending message to the parking lot queue INFO 14768 --- [ntContainer#2-1] c.b.s.e.c.ParkingLotDLQAmqpContainer : Received message in parking lot queue
从输出中可以看到,几次尝试失败后,该消息已发送到停车场队列。
6.自定义错误处理
在上一节中,我们已经了解了如何使用专用队列和exchange处理故障。但有时可能需要捕获所有错误,记录到日志或持久化到数据库中。
6.1.全局ErrorHandler
目前为止,我们使用的是默认SimpleRabbitListenerContainerFactory,并且该工厂类默认调用ConditionalRejectingErrorHandler。handler会捕获各种不同的异常并将其转换为AmqpException继承树中的异常。
值得一提的是,如果想要处理连接错误,需要实现ApplicationListener接口。
简而言之,ConditionalRejectingErrorHandler负责指定是否拒绝特定消息。由异常触发的消息被拒绝后不会重新排队。
让我们自定义一个ErrorHandler,仅支持BusinessException重新排队:
public class CustomErrorHandler implements ErrorHandler { @Override public void handleError(Throwable t) { if (!(t.getCause() instanceof BusinessException)) { throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t); } } }
此外,由于是在listener方法中抛出异常,因此将可以把异常包装为ListenerExecutionFailedException。这样就需要调用getCause方法来获取异常。
6.2.FatalExceptionStrategy
handler在后台使用FatalExceptionStrategy检查异常等级是否为fatal。如果为fatal,失败消息将被reject。
默认情况下,下列为fatal异常:
MessageConversionException
MessageConversionException
MethodArgumentNotValidException
MethodArgumentTypeMismatchException
NoSuchMethodException
ClassCastException
除了实现ErrorHandler接口,还可以自定义FatalExceptionStrategy:
public class CustomFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy { @Override public boolean isFatal(Throwable t) { return !(t.getCause() instanceof BusinessException); } }
最后,需要把自定义策略传给ConditionalRejectingErrorHandler构造函数:
@Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( ConnectionFactory connectionFactory, SimpleRabbitListenerContainerFactoryConfigurer configurer) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); factory.setErrorHandler(errorHandler()); return factory; } @Bean public ErrorHandler errorHandler() { return new ConditionalRejectingErrorHandler(customExceptionStrategy()); } @Bean FatalExceptionStrategy customExceptionStrategy() { return new CustomFatalExceptionStrategy(); }
7.结论
在文讨论了使用Spring AMQP(尤其是RabbitMQ)不同的错误处理方法。
每个系统都需要指定错误处理策略。我们已经介绍了事件驱动架构中最常见的错误处理方法。此外,文章中展示了可以组合多种策略来构建更全面、更强大的解决方案。
这篇关于Spring AMQP 错误处理策略详解的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程
- 2024-11-23JAVA对接阿里云智能语音服务入门教程
- 2024-11-23Java副业入门:初学者的简单教程
- 2024-11-23JAVA副业入门:初学者的实战指南