@Async 深度源码解析

2022/1/12 1:04:05

本文主要是介绍@Async 深度源码解析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

文章目录

  • 1、实现流程
  • 2、源码解析
    • 2.1 @EnableAsync
    • 2.2 AsyncAnnotationBeanPostProcessor
    • 2.3 AnnotationAsyncExecutionInterceptor
  • 3、深度解析
    • 3.1 怎么判断类是否需要被代理
    • 3.2代理对象怎么执行AnnotationAsyncExecutionInterceptor的


1、实现流程

  1. @EnableAsync 开启异步注解,注入BeanPostProcess
  2. 对类中有方法带有@Async的进行拦截创建代理类
  3. 最终会由AnnotationAsyncExecutionInterceptor处理异步方法

2、源码解析

通过流程顺序,异步一句解析

2.1 @EnableAsync

直接看代码

  • 通过Import导入一个Selector
  • 导入配置类ProxyAsyncConfiguration(默认)
  • 通过配置类导入AsyncAnnotationBeanPostProcessor
//通过Import导入一个Selector
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
	//.......省略
}

#导入配置类ProxyAsyncConfiguration
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

	private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
			"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";


	/**
	 * Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
	 * for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
	 * respectively.
	 */
	@Override
	@Nullable
	public String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
			case PROXY:
				return new String[] {ProxyAsyncConfiguration.class.getName()};
			case ASPECTJ:
				return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
			default:
				return null;
		}
	}

}


//通过配置类导入BeanPostProcess
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
		bpp.configure(this.executor, this.exceptionHandler);
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	}

}

2.2 AsyncAnnotationBeanPostProcessor

核心方法

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
	if (this.advisor == null || bean instanceof AopInfrastructureBean) {
		// Ignore AOP infrastructure such as scoped proxies.
		return bean;
	}
	
	//暂时忽略
	if (bean instanceof Advised) {
		Advised advised = (Advised) bean;
		if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
			// Add our local Advisor to the existing proxy's Advisor chain...
			if (this.beforeExistingAdvisors) {
				advised.addAdvisor(0, this.advisor);
			}
			else {
				advised.addAdvisor(this.advisor);
			}
			return bean;
		}
	}
	//最后我们详细讲解
	//判断Bean是否有资格创建代理
	//通过Advisor的Pointcut判断bean是否有资格,此处可以简单理解为判断Class或Class中的Method是否标记@Async(满足一个即可) 
	if (isEligible(bean, beanName)) {
		ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
		if (!proxyFactory.isProxyTargetClass()) {
			evaluateProxyInterfaces(bean.getClass(), proxyFactory);
		}
		//创建代理对象需要的Advisor(最中方法的异步执行都在这个类中)  最后我们详细讲解
		proxyFactory.addAdvisor(this.advisor);
		customizeProxyFactory(proxyFactory);

		// Use original ClassLoader if bean class not locally loaded in overriding class loader
		ClassLoader classLoader = getProxyClassLoader();
		if (classLoader instanceof SmartClassLoader && classLoader != bean.getClass().getClassLoader()) {
			classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader();
		}
		//创建代理对象返回
		return proxyFactory.getProxy(classLoader);
	}

	// No proxy needed.
	return bean;
}

2.3 AnnotationAsyncExecutionInterceptor

@Async标记的方法是怎么异步执行的,又是怎么可以返回结果的

核心代码

public Object invoke(final MethodInvocation invocation) throws Throwable {
	Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
	Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
	final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

	AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
	if (executor == null) {
		throw new IllegalStateException(
				"No executor specified and no default executor set on AsyncExecutionInterceptor either");
	}

	//创建线程任务
	Callable<Object> task = () -> {
		try {
			Object result = invocation.proceed();
			if (result instanceof Future) {
				return ((Future<?>) result).get();
			}
		}
		catch (ExecutionException ex) {
			handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
		}
		catch (Throwable ex) {
			handleError(ex, userDeclaredMethod, invocation.getArguments());
		}
		return null;
	};
	//传递任务,Executor,和方法的返回值。 @Async是否需要返回值就是根据方法的返回值类型判断的
	return doSubmit(task, executor, invocation.getMethod().getReturnType());
}



protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
	//判断返回值类型,执行方式不同 
	if (CompletableFuture.class.isAssignableFrom(returnType)) {
		return CompletableFuture.supplyAsync(() -> {
			try {
				return task.call();
			}
			catch (Throwable ex) {
				throw new CompletionException(ex);
			}
		}, executor);
	}
	else if (ListenableFuture.class.isAssignableFrom(returnType)) {
		return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
	}
	else if (Future.class.isAssignableFrom(returnType)) {
		return executor.submit(task);
	}
	else {
		executor.submit(task);
		return null;
	}
}

3、深度解析

后续会不上AOP章节

前面我们提到了几个点,怎么判断@Async,为什么会最终AnnotationAsyncExecutionInterceptor执行的。 这些需要又AOP的前置知识,不然会一头雾水。
基础知识虽然很枯燥,但是很重要,是一切实现的基础。

3.1 怎么判断类是否需要被代理

通过AopUtils判断,此处传入了一个advisor.
Advisor接口整合了Adivce和Pointcut。
最终执行方法的AnnotationAsyncExecutionInterceptor就是一个Advice。就是在当前的this.advisor对象中存储。
Pointcut是在Aop中实现判断的,整合CalssFilter和MethodMatcher.

protected boolean isEligible(Class<?> targetClass) {
	Boolean eligible = this.eligibleBeans.get(targetClass);
	if (eligible != null) {
		return eligible;
	}
	if (this.advisor == null) {
		return false;
	}
	//此处判断
	eligible = AopUtils.canApply(this.advisor, targetClass);
	this.eligibleBeans.put(targetClass, eligible);
	return eligible;
}



判断方法

public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) {
	Assert.notNull(pc, "Pointcut must not be null");
	
	if (!pc.getClassFilter().matches(targetClass)) {
		return false;
	}
	//通过Pointcut判断. 此处使用ClassFilterAwareUnionMethodMatcher
	MethodMatcher methodMatcher = pc.getMethodMatcher();
	if (methodMatcher == MethodMatcher.TRUE) {
		// No need to iterate the methods if we're matching any method anyway...
		return true;
	}

	IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null;
	if (methodMatcher instanceof IntroductionAwareMethodMatcher) {
		introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher;
	}

	Set<Class<?>> classes = new LinkedHashSet<>();
	if (!Proxy.isProxyClass(targetClass)) {
		classes.add(ClassUtils.getUserClass(targetClass));
	}
	classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass));

	for (Class<?> clazz : classes) {
		Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);
		for (Method method : methods) {
			if (introductionAwareMethodMatcher != null ?
					introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) :
					//此处判断
					methodMatcher.matches(method, targetClass)) {
				return true;
			}
		}
	}

	return false;
}

最终判断的MethodMatcher是AnnotationMethodMatcher
判断Method上是否携带annotationType注解 那annotationType会不会是@Async呢? 答案是肯定的

public class AnnotationMethodMatcher extends StaticMethodMatcher {

	private final Class<? extends Annotation> annotationType;
	//判断Method上是否携带annotationType注解  那annotationType会不会是@Async呢? 答案是肯定的
	private boolean matchesMethod(Method method) {
		return (this.checkInherited ? AnnotatedElementUtils.hasAnnotation(method, this.annotationType) :
				method.isAnnotationPresent(this.annotationType));
	}
}

线索还需要通过AsyncAnnotationBeanPostProcessor 找,我们刚才说了Advisor整合了Adive和Pointcu,Pointcut又整合了CalssFilter和MethodMatcher. 最终是通过MethodMatcher处理的。 我们从之前的源码得到信息AsyncAnnotationBeanPostProcessor 中Advisor的属性属性,我们通过找advisor的复制地方下手

AsyncAnnotationBeanPostProcessor 是一个BeanFactoryAware,在bean生命周期回调setBeanFactory时复制,通过AsyncAnnotationAdvisor构造器,对Adivce和Pointcut初始化,Pointcut中传递了的Annotation就是@Async,Adice就是最开始我们说的AnnotationAsyncExecutionInterceptor

public void setBeanFactory(BeanFactory beanFactory) {
	super.setBeanFactory(beanFactory);,

	AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
	if (this.asyncAnnotationType != null) {
		advisor.setAsyncAnnotationType(this.asyncAnnotationType);
	}
	advisor.setBeanFactory(beanFactory);
	this.advisor = advisor;
}


public AsyncAnnotationAdvisor(
		@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

	Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
	//添加@Async注解
	asyncAnnotationTypes.add(Async.class);
	try {
		asyncAnnotationTypes.add((Class<? extends Annotation>)
				ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
	}
	catch (ClassNotFoundException ex) {
		// If EJB 3.1 API not present, simply ignore.
	}
	this.advice = buildAdvice(executor, exceptionHandler);
	this.pointcut = buildPointcut(asyncAnnotationTypes);
}

//此处通过ComposablePointcut通过union组合,(union满足一个即可)
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
	ComposablePointcut result = null;
	for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
		Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
		Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
		if (result == null) {
			result = new ComposablePointcut(cpc);
		}
		else {
			result.union(cpc);
		}
		result = result.union(mpc);
	}
	return (result != null ? result : Pointcut.TRUE);
}



protected Advice buildAdvice(
		@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
	//创建Advice,就是我们最开始说的最终方法异步执行的Advice: AnnotationAsyncExecutionInterceptor  
	AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
	interceptor.configure(executor, exceptionHandler);
	return interceptor;
}

3.2代理对象怎么执行AnnotationAsyncExecutionInterceptor的



这篇关于@Async 深度源码解析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程