Google的Guava包下的EventBus源码解析
2021/4/23 20:27:37
本文主要是介绍Google的Guava包下的EventBus源码解析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
EventBus解析
1、EventBus的构造方法
- 使用EventBus作为具体实现类
- 使用AsyncEventBus作为实现类
(1)使用EventBus作为实现类,其构造方法有:
public EventBus() { this("default"); } public EventBus(String identifier) { this(identifier, MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), EventBus.LoggingHandler.INSTANCE); } public EventBus(SubscriberExceptionHandler exceptionHandler) { this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler); }
(2)使用AsyncEventBus作为实现类,其构造方法为:
public AsyncEventBus(String identifier, Executor executor) { super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE); } public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) { super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler); } public AsyncEventBus(Executor executor) { super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE); }
统一调用的构造方法为:
EventBus(String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) { this.subscribers = new SubscriberRegistry(this); this.identifier = (String)Preconditions.checkNotNull(identifier); this.executor = (Executor)Preconditions.checkNotNull(executor); this.dispatcher = (Dispatcher)Preconditions.checkNotNull(dispatcher); this.exceptionHandler = (SubscriberExceptionHandler)Preconditions.checkNotNull(exceptionHandler); }
参数的意义分别是:
identifier:类似当前EventBus对象的别名,可以描述该EventBus的用途,默认为“default”
executor:使用异步执行时传入的自定义线程池
dispatcher:指定分发消息的模式
exceptionHandler:处理订阅消息异常的方法
subscribers:注册订阅者的类
2、两种实现方式创建对象时的区别
(1)EventBus实现:
- 默认使用default作为identifier,
- 执行器使用MoreExecutors.directExecutor()
- dispatcher:使用Dispatcher.perThreadDispatchQueue()队列
- exceptionHandler:默认使用EventBus提供的LoggingHandler.INSTANCE,如果有传入参数,就是用参数
(2)AsyncEventBus实现:
- 默认使用default作为identifier
- 执行器使用自定义的对象
- dispatcher:使用Dispatcher.legacyAsync()类型
- exceptionHandler:默认使用LoggingHandler.INSTANCE,如果有传入参数,就是用参数
(3)Dispatcher调度器:
eventbus包下的Dispatcher类提供了三种类型的调度器,分别为:
- PerThreadQueuedDispatcher
- LegacyAsyncDispatcher
- ImmediateDispatcher
(4)Executor执行器
- EventBus默认提供的是DirectExecutor,单线程的执行器
3、注册对象到EventBus
3.1、注册对象
使用this.subscribers对象的register方法注册,此处的subscribers对象为SubscriberRegistry类。
3.2、进入SubscriberRegistry类,register方法
3.2.1、查找所有订阅的方法
使用Muitmap集合存储一个对象下有哪些方法订阅了,具体实现findAllSubscribers方法,该方法内部如下:
(1)该方法内部使用getAnnotatedMethods方法获取clazz及其多级父类以及实现的接口中所有方法上有@Subscribe注解的方法。方法具体实现如下:
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) { //获取到传递的class对象的类以及父类以及实现的接口 Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); //创建一个Map集合 Map<MethodIdentifier, Method> identifiers = Maps.newHashMap(); //遍历得到的class对象 for (Class<?> supertype : supertypes) { //获取class对象的所有方法 for (Method method : supertype.getDeclaredMethods()) { //如果方法上有Subscribe注解,并且isSynthetic表示方法不是由java编译器生成的 if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { // 获取该方法的参数类型 Class<?>[] parameterTypes = method.getParameterTypes(); //检查方法的参数只能是1个 checkArgument( parameterTypes.length == 1, "Method %s has @Subscribe annotation but has %s parameters." + "Subscriber methods must have exactly 1 parameter.", method, parameterTypes.length); //根据方法创建MethodIdentifier对象,其中包含方法名、方法的参数类型 MethodIdentifier ident = new MethodIdentifier(method); //如果map集合中不包含该对象,就将ident和method对象存储到identifiers的map集合中 if (!identifiers.containsKey(ident)) { identifiers.put(ident, method); } } } } //返回map集合中的方法 return ImmutableList.copyOf(identifiers.values()); }
(2)遍历该对象中添加了@Subscribe注解的方法集合
(3)获取该方法的参数类型,将第0个参数类型赋值给eventType
再获取eventType时,会查找监听对象的父类以及接口,查看有没有订阅方法。
(4)将参数类型作为key,订阅者作为value,存储到Multimap集合中。value中包含被监听的bus,被监听的bus中的执行器,监听该bus的对象listener,以及监听参数类型的方法。
会去检查订阅方法上有没有注解AllowConcurrentEvents,如果有该注解,在使用create方法创建订阅者对象时,订阅者对象使用Subcriber,如果没有注解使用SynchronizedSubscriber对象。这两种对象在真正分发事件时区别才会体现出来。
示例如下:说明objA中的三个方法都有@Subcribe注解
对象继承情况:
对象实现接口,接口内使用JDK8提供的default来实现方法,并添加注解,也可以订阅。
(5)获取到该对象的所偶遇订阅者后,返回Multimap集合
3.2.2、将获取到的订阅方法进行缓存
(1)遍历得到的Multimap集合
- 获取Key值,也就是该对象内部订阅方法的参数类型
- 获取订阅者
- 根据参数类型,获取全局变量subscribers中已有的所有订阅者
全局变量:
- 判断订阅者set集合如果为空,创建CopyOnWriteArraySet集合,然后使用subscribers.putIfAbsent将订阅类型(参数类型)和set集合存储进去。使用MoreObjects的方法校验第一个参数返回如果是null,就是用第二个参数,如果第二个newSet参数还是null,就会报空指针异常,会返回一个空的set集合。
- 将该订阅类型对应的订阅者保存到eventSubscribers中,也就是保存到了全局变量subscribers中。
4、取消对象注册到EventBus
4.1、取消注册对象
当前EventBus对象中的subscribers是SubscriberRegistry类的对象,执行该类中的unregister方法
4.2、进入SubscriberRegistry类,unregister方法
4.2.1、获取该对象中的订阅方法
使用findAllSubscribers方法,与注册对象中的方法相同,都是查找该类以及其多级父类中的订阅类型和方法。
4.2.2、遍历获取到的订阅方法
(1)获取到集合中的key,也就是该对象中订阅的类型(方法上的参数类型)
(2)获取到集合中key对应的value,也就是该对象中的订阅方法。
(3)获取到全局变量subscribers中缓存的数据,赋值给currentSubscribers
(4)如果currentSubscribers为null,则抛出异常,如果移除该对象中的所有订阅者返回结果为false,也抛出异常,如果为true,则正常移除。
5、发送消息
5.1、EventBus中的post方法
5.1.1、获取该消息关联的类型所有订阅者
通过subscribers(SubscriberRegistry类)中的全局变量获取订阅该事件以及该事件父类和接口的所有方法。
获取传递的event事件的类、父类以及实现的接口。
例如:如果发送的是MsgA这个消息,那么就会找到Msg类,获取到的集合中就包含了MsgA类型和Msg类型,所有订阅了这两个类型的都会接收到该消息。接口也是一样的(MsgA和MsgB都实现了Msg接口)。
下图中就描述了发送的消息在实现了接口的情况。
如图中所示,MsgA和MsgB实现了Msg接口,如果有订阅者订阅的类型(参数类型)是Msg,那么发送的时候不管发送MsgA还是MsgB,订阅Msg的方法都可以接收到。
下图中表示发送的消息有父类的情况:
如图中所示,MsgA和MsgB继承了Msg类,如果有订阅者订阅的类型(参数类型)是Msg,那么发送的时候不管发送MsgA还是MsgB,订阅Msg的方法都可以接收到。如果发送的是Msg类,那就只有订阅了Msg类的方法可以接收到。
总结:发送消息的时候,会查找该消息的父类以及接口,查看有没有订阅的方法,如果有就会都发送一次。
5.1.2、使用dispacther向订阅者发送
根据构造方法调度器使用了两种类型,分别是:
- PerThreadQueuedDispatcher
- LegacyAsyncDispatcher
(1)默认PerThreadQueuedDispatcher调度器的实现
- 获取ThreadLocal中创建的Queue队列,如果已创建,就会获取当前线程对应的Queue队列,初始化一个ArrayDeque队列,ArrayDeque是一个双端队列,即可以实现队列的先进先出,也可以实现栈的先进后出。它是线程不安全的,而且不允许有null值。它是可以自动扩容的循环数组,每次扩容都是2的n次方,初始大小为16.
- 通过offer将事件以及订阅者存储到队列尾部
- 如果dispatching返回为false,说明没有在分发事件,将dispatching设置为true,表示正在分发事件
- 循环获取队列头部的事件(先进先出),然后再循环获取事件对应的订阅者,通过订阅者Subscriber对象的dispatchEvent方法发送event事件
- 使用executor执行器通过反射去执行监听的方法。默认执行器为MoreExecutors.directExecutor(),直接发送。
- 订阅者对象中包含了(注册的对象实例target,对象实例中监听的方法method),通过反射去invoke,因为订阅者对象根据订阅方法上有没有添加AllowConcurrentEvents注解分为两种,SynchronizedSubscriber和Subscriber,前者是后者的子类,重写了方法invokeSubscriberMethod方法。
- 在执行时,如果订阅方法标注了AllowConcurrentEvents注解,使用Subscriber中的方法,如果没有标注注解,则使用SynchronizedSubscriber中的invokeSubscriberMethod。下图为Subscriber的方法
下图为SynchronizedSubscriber中的方法,在原有基础上,使用synchronized锁住该对象,然后去执行。
两者的区别就是:如果添加了注解,那就直接使用Subscriber中的方法,如果没有添加注解,则使用加锁的方法去执行。
(2)LegacyAsyncDispatcher调度器的实现
- 在初始化时,创建了一个ConcurrentLinkedQueue队列,内部存储EventWithSubscriber对象。
- 在dispatch方法中,遍历传递的订阅者,使用event和订阅者构建EventWithSubscriber对象,存储到集合queue的尾部。
- 循环取队列头部的对象,使用订阅者的dispatchEvent方法发送事件event。
- 发送事件时,根据注册时检测到的是否添加注解,分为加锁执行方法和不加锁执行方法,内部逻辑与PerThreadQueuedDispatcher相同。
5.1.3、post->DeadEvent
在缓存中找不到订阅者并且它本身不是一个DeadEvent事件时,就会发送一个DeadEvent。如果找不到DeadEvent事件的订阅者,就会不进行处理。
6、流程总结
(1)两种创建方式对比
类型 | EventBus | AsyncEventBus |
---|---|---|
identifier | 默认“default” | 默认“default” |
executor | DirectExecutor | 自定义 |
dispatcher | PerThreadQueuedDispatcher | LegacyAsyncDispatcher |
subscribers | SubscriberRegistry | SubscriberRegistry |
exceptionHandler | 默认LoggingHandler | 默认LoggingHandler |
两种方式的相同点:
- 注册订阅者和取消订阅者逻辑都是相同的
- 根据订阅方法有没有添加注解决定执行订阅方法的方式(加锁或者不加锁)
两种方式的区别:
- EventBus使用默认的DirectExecutor,内部使用单线程去执行任务
- AsyncEventBus使用传递的Executor,去执行,如果传递的是Single线程,那和EventBus就没什么区别。
- EventBus的分发模式使用的是ArrayDeque双端队列,先存入然后再取出执行,不是线程安全的集合,通过ThredLocal来在线程内部维护ArrayDeque队列。
- AsyncEventBus使用的是ConcurrentLinkedQueue,同样是先存入然后取出,支持多线程同时访问。
(2)方法执行流程
这篇关于Google的Guava包下的EventBus源码解析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-20MongoDB教程:从入门到实践详解
- 2024-11-17执行 Google Ads API 查询后返回的是空数组什么原因?-icode9专业技术文章分享
- 2024-11-17google广告数据不同经理账户下的凭证可以获取对方的api数据吗?-icode9专业技术文章分享
- 2024-11-15SendGrid 的 Go 客户端库怎么实现同时向多个邮箱发送邮件?-icode9专业技术文章分享
- 2024-11-15SendGrid 的 Go 客户端库怎么设置header 和 标签tag 呢?-icode9专业技术文章分享
- 2024-11-12Cargo deny安装指路
- 2024-11-02MongoDB项目实战:从入门到初级应用
- 2024-11-01随时随地一键转录,Google Cloud 新模型 Chirp 2 让语音识别更上一层楼
- 2024-10-25Google Cloud动手实验详解:如何在Cloud Run上开发无服务器应用
- 2024-10-24AI ?先驱齐聚 BAAI 2024,发布大规模语言、多模态、具身、生物计算以及 FlagOpen 2.0 等 AI 模型创新成果。