源码解析: Spring RabbitMQ消费者
2022/2/20 17:27:07
本文主要是介绍源码解析: Spring RabbitMQ消费者,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
从Spring RabbitMQ消费者启动,到接收消息和执行消费逻辑,一步步了解其实现。
目录
- 1. 消费者如何启动过程
- 1.1 启动配置类
- 1.2 创建消费者核心逻辑
- 1.3 PS: BeanPostPorcessor如何被Spring处理?
- 2. RabbitMQ消息如何被消费
- 2.1 SimpleMessageListenerContainer
- 2.2 BlockingQueueConsumer
1. 消费者如何启动过程
1.1 启动配置类
创建RabbitListenerAnnotationBeanPostProcessor
@Configuration public class RabbitBootstrapConfiguration { @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() { return new RabbitListenerAnnotationBeanPostProcessor(); } ..... }
1.2 创建消费者核心逻辑
核心逻辑在RabbitListenerAnnotationBeanPostProcessor,在Spring Bean初始化过程中执行。
对于每个消息监听都会创建对应的MessageListenerContainer(默认实现为SimpleMessageListenerContainer)
// 通过BeanPostProcessor在Bean创建后,创建消息监听器 public class RabbitListenerAnnotationBeanPostProcessor implements BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware, SmartInitializingSingleton { ...... @Override public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { Class<?> targetClass = AopUtils.getTargetClass(bean); // 通过反射获取@RabbitListener修饰的方法 final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata); for (ListenerMethod lm : metadata.listenerMethods) { for (RabbitListener rabbitListener : lm.annotations) { // 创建MethodRabbitListenerEndpoint,并注册到RabbitListenerEndpointRegistrar processAmqpListener(rabbitListener, lm.method, bean, beanName); } } if (metadata.handlerMethods.length > 0) { processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName); } return bean; } protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) { Method methodToUse = checkProxy(method, bean); MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint(); endpoint.setMethod(methodToUse); processListener(endpoint, rabbitListener, bean, methodToUse, beanName); } // 创建RabbitMQ消费者核心逻辑 protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean, Object adminTarget, String beanName) { endpoint.setBean(bean); endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory); endpoint.setId(getEndpointId(rabbitListener)); // resolveQueues方法会处理创建队列的工作 endpoint.setQueueNames(resolveQueues(rabbitListener)); ....... // registerEndpoint()里核心创建MessageListenerContainer,其默认实现是SimpleMessageListenerContainer this.registrar.registerEndpoint(endpoint, factory); } ...... }
1.3 PS: BeanPostPorcessor如何被Spring处理?
虽然大家都很熟悉Spring Bean初始化流程里,但唠叨一下
调用链路:getBean -> doGetBean -> createBean -> initializeBean
->applyBeanPostProcessorsBeforeInitialization -> applyBeanPostProcessorsAfterInitialization
public abstract class AbstractAutowireCapableBeanFactory extends AbstractBeanFactory implements AutowireCapableBeanFactory{ // 创建一个Bean实例对象,应用post-processors protected Object createBean(String beanName, RootBeanDefinition mbd, Object[] args) throws BeanCreationException { // 各种准备工作 ...... // 最后调用doCreateBean Object beanInstance = doCreateBean(beanName, mbdToUse, args); if (logger.isDebugEnabled()) { logger.debug("Finished creating instance of bean '" + beanName + "'"); } return beanInstance; } protected Object doCreateBean(final String beanName, final RootBeanDefinition mbd, final Object[] args) throws BeanCreationException { ...... // Initialize the bean instance. Object exposedObject = bean; try { populateBean(beanName, mbd, instanceWrapper); if (exposedObject != null) { // 调用initializeBean exposedObject = initializeBean(beanName, exposedObject, mbd); } } catch (Throwable ex) { ..... } } // 初始化Bean实例 protected Object initializeBean(final String beanName, final Object bean, RootBeanDefinition mbd) { ...... if (mbd == null || !mbd.isSynthetic()) { wrappedBean = applyBeanPostProcessorsBeforeInitialization(wrappedBean, beanName); } try { invokeInitMethods(beanName, wrappedBean, mbd); } catch (Throwable ex) { throw new BeanCreationException( (mbd != null ? mbd.getResourceDescription() : null), beanName, "Invocation of init method failed", ex); } if (mbd == null || !mbd.isSynthetic()) { wrappedBean = applyBeanPostProcessorsAfterInitialization(wrappedBean, beanName); } return wrappedBean; } }
2. RabbitMQ消息如何被消费
2.1 SimpleMessageListenerContainer
上面说了消费者启动会创建SimpleMessageListenerContainer,它启动时会创建一个AsyncMessageProcessingConsumer内部类的对象(实现了Runnable接口,核心属性是BlockingQueueConsumer),AsyncMessageProcessingConsumer的run()通过while循环不断接收消息并调用我们使用@RabbitListener修饰的方法实现的消费逻辑。
@Override protected void doStart() throws Exception { ...... super.doStart(); synchronized (this.consumersMonitor) { if (this.consumers != null) { throw new IllegalStateException("A stopped container should not have consumers"); } // 根据配置的并发数创建对应数量BlockingQueueConsumer int newConsumers = initializeConsumers(); ...... Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>(); for (BlockingQueueConsumer consumer : this.consumers) { AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer); processors.add(processor); // 执行AsyncMessageProcessingConsumer,轮询调用获取队列里的消息并执行消费逻辑 getTaskExecutor().execute(processor); if (getApplicationEventPublisher() != null) { getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer)); } } for (AsyncMessageProcessingConsumer processor : processors) { FatalListenerStartupException startupException = processor.getStartupException(); if (startupException != null) { throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException); } } } }
2.2 BlockingQueueConsumer
BlockingQueueConsumer扮演一个解耦消息接收和消息消费的角色,一方面负责承接Channel接收的消息并压入BlockingQueue queue,另一方面被AsyncMessageProcessingConsumer轮询调用获取队列里的消息并执行消费逻辑。
// 从队列中获取消息 public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException { ...... Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS)); if (message == null && this.cancelled.get()) { throw new ConsumerCancelledException(); } return message; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ...... try { // 如果BlockingQueueConsumer已被标记为停止,调用offer将消息入队,如果队列满了会马上返回false if (BlockingQueueConsumer.this.abortStarted > 0) { //如果offer失败,发送basic.nack命令通知服务端消息没有消费成功,然后发送basic.cancel命令通知服务端取消订阅,服务端不再发送消息到该消费者 if (!BlockingQueueConsumer.this.queue.offer( new Delivery(consumerTag, envelope, properties, body, this.queue), BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) { RabbitUtils.setPhysicalCloseRequired(getChannel(), true); // Defensive - should never happen BlockingQueueConsumer.this.queue.clear(); getChannel().basicNack(envelope.getDeliveryTag(), true, true); getChannel().basicCancel(consumerTag); try { getChannel().close(); } catch (TimeoutException e) { // no-op } } } else { // 如果BlockingQueueConsumer没有标记为停止,调用put入队,如果队列空间满了则会一直等待直到空间可用 BlockingQueueConsumer.this.queue .put(new Delivery(consumerTag, envelope, properties, body, this.queue)); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }
这篇关于源码解析: Spring RabbitMQ消费者的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-19永别了,微服务架构!
- 2024-05-15鸿蒙生态设备数量超8亿台
- 2024-05-13TiDB + ES:转转业财系统亿级数据存储优化实践
- 2024-05-09“2024鸿蒙零基础快速实战-仿抖音App开发(ArkTS版)”实战课程已上线
- 2024-05-09聊聊如何通过arthas-tunnel-server来远程管理所有需要arthas监控的应用
- 2024-05-09log4j2这么配就对了
- 2024-05-09nginx修改Content-Type
- 2024-05-09Redis多数据源,看这篇就够了
- 2024-05-09Google Chrome驱动程序 124.0.6367.62(正式版本)去哪下载?
- 2024-05-09有没有大佬知道这种数据应该怎么抓取呀?