EventBus源码解析

2020/5/26 23:25:32

本文主要是介绍EventBus源码解析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

前言

EventBus是Square公司公司开源的一款通讯组件。他最大的功能就是可以在Activity与Activity、Activity与Fragment中进行消息的通讯传输。而且他的使用比广播要容易,也更轻便,广受好评。

使用方式

在gradle中添加引用

implementation 'org.greenrobot:eventbus:3.0.0'
复制代码

之后在Activity的生命周期onCreate()中添加注册,在onDestroy()中进行注销操作,然后再写一个订阅方法用来接收消息和处理消息即可。

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_demo);
        //注册EventBus
        EventBus.getDefault().register(this);


    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        //注销EventBus
        EventBus.getDefault().unregister(this);
    }

    @Subscribe
    public void onMessage(Object object){
        //TODO...
    }


复制代码

EventBus2.0和EventBus3.0的区别?

这是在面试过程中,面试官最常问的一个问题。

EventBus2.0和3.0最大的区别有两点:

1.EventBus2.0中我们在书写订阅方法时的名字必须是onEvent开头,然后通过命名不同来区别不同的线程模式。例如对应posting则命名为onEvent(),onEventMainThread()则对应main等。而3.0则可以用任何名字作为方法名称,只需要在方法名的前面用@Subscribe注解来进行注释,然后使用threadMode来设置在哪里线程中接收事件和处理事件

2.EventBus2.0使用的是反射的方式来查找所有的订阅方法,而3.0则是在编译时通过注解处理器的方式来查找所有的订阅方法。性能上来说,3.0比2.0要高的多。

下面开始分析源码。

我们先从EventBus的构造方法说起。

构造方法

    EventBus(EventBusBuilder builder) {
   1     subscriptionsByEventType = new HashMap<>();
   2     typesBySubscriber = new HashMap<>();
   3     stickyEvents = new ConcurrentHashMap<>();
   4     mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
   5     backgroundPoster = new BackgroundPoster(this);
   6     asyncPoster = new AsyncPoster(this);
   7     indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
   8     subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
   9             builder.strictMethodVerification, builder.ignoreGeneratedIndex);
   10     logSubscriberExceptions = builder.logSubscriberExceptions;
   11     logNoSubscriberMessages = builder.logNoSubscriberMessages;
   12     sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
   13     sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
   14     throwSubscriberException = builder.throwSubscriberException;
   15     eventInheritance = builder.eventInheritance;
   16     executorService = builder.executorService;
    }

复制代码

构造方法中我们可以看到里面实例化了很多的集合有一些类。我们先看前三行。

   1     subscriptionsByEventType = new HashMap<>();
   2    typesBySubscriber = new HashMap<>();
   3     stickyEvents = new ConcurrentHashMap<>();

复制代码

我们看到这三行都是在创建一个hashMap。

第一行的hashMap是一个叫做subscriptionsByEventType的hashMap。这个hashMap的主要作用就是存储以Event发送事件为key,subscriptions订阅事件为value的hashMap。也就是说所有通过post发送出去的事件和对应事件接收的被@subscribe修饰的方法都是用这个来存储。

第二行的hashMap为typesBySubscriber的hashMap。这个hashMap和上面那个相反,是以订阅方法@subscribe修饰的为key,post发送的event事件为value的hashMap。

第三行是stickyEvents,这是个黏性事件,所有被postSticky发送的事件都是用这个来存储的。

这三个方法在后面都会有用到,这里只是先做一下了解。

之后的三行是实例化三个线程调度器mainThreadPoster、backgroundPoster和asyncPoster

        mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
        backgroundPoster = new BackgroundPoster(this);
        asyncPoster = new AsyncPoster(this);

复制代码

这三个调度器的作用就是处理threadMode选取不同模式的时候做相应的处理。我们来看下这几个的源码,先来看HandlerPoster

final class HandlerPoster extends Handler {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}


复制代码

可以看到这个handlerPoster继承了一个Handler,里面主要有两个重点方法enqueue()和handleMessage(),是不是很熟悉?一个是用来将消息入队的enqueue(),一个是处理消息的handleMessage()。

在构造方法中创建了一个PendingPostQueue()的队列,然后在enqueue()方法中从一个叫PendingPost中取出一个叫PendingPost的对象放入到队列中去。

        void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

复制代码

可以看到这个添加到队列的操作的外部用了一个同步锁synchronized来修饰,证明这个入队操作只能是一个串行的方式。

我们来看下这个添加到队列中的PendingPost到底是什么?

   1     final class PendingPost {
   2     private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

   3     Object event;
   4     Subscription subscription;
   5     PendingPost next;

   6     private PendingPost(Object event, Subscription subscription) {
   7     this.event = event;
   8     this.subscription = subscription;
   9  }

   10     static PendingPost obtainPendingPost(Subscription subscription, Object event) {
   11     synchronized (pendingPostPool) {
   12         int size = pendingPostPool.size();
   13         if (size > 0) {
   14             PendingPost pendingPost = pendingPostPool.remove(size - 1);
   15             pendingPost.event = event;
   16             pendingPost.subscription = subscription;
   17             pendingPost.next = null;
   18             return pendingPost;
   19         }
   20     }
   21     return new PendingPost(event, subscription);
   22     }

   23     static void releasePendingPost(PendingPost pendingPost) {
   24     pendingPost.event = null;
   25     pendingPost.subscription = null;
   26     pendingPost.next = null;
   27     synchronized (pendingPostPool) {
   28         if (pendingPostPool.size() < 10000) {
   29             pendingPostPool.add(pendingPost);
   30         }
   31     }
   32     }

   33     }


复制代码

我们把重点放在构造方法中,看到传入了一个Event事件和一个订阅事件subsription。并给这两个对象赋值。从而可以得知PendingPost其实是一个存储发送事件和订阅方法的类。而这个类中有一个obtainPendingPost的方法,这其实有点像handler中的Message的obtain()方法,都是一个对象复用池,用来复用之前使用过的对象,从而提高对象使用效率的。

那么由此知道PendingPost是一个用来存储Event事件和订阅事件的类,我们在HandlerPoster类中通过enqueue()把订阅事件和发送的event事件添加到队列中去存储。

而我们再来看下handleMessage()方法。

@Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }


复制代码

代码虽有有点长,但是并不复杂,最主要的功能就是把刚才从enqueue()添加的pendingPost取出来然后交给eventBus.invokeSubscriber(pendingPost)去处理。

再来看下BackgroundPoster。

final class BackgroundPoster implements Runnable {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

}


复制代码

可以看到,BackgroundPoster是一个Runnable线程,也拥有添加enqueue()方法和执行方法。只不过他的执行方法不是handleMessage(),而是run()方法。从run()从取出PendingPost,然后做两重判断,是否为null,如果为null,则让executorRunning = false。这样在enqueue方法中就可以通过 eventBus.getExecutorService().execute(this)获取到一个线程池在线程中处理消息。

而AsyncPoster和BackgroundPoster是一样的,只不过AsyncPoster在run()方法里面可以处理多个pendingPost,而BackgroundPoster由于在run()方法里面加了一个synchronized锁使得它一次只能处理一次消息。

总结一下

到此构造方法分析完毕。我们来总结下,EventBus的构造方法里面主要是实例化了三个hashMap,subscriptionsByEventType、typesBySubscriber和stickyEvents,分别用来存储的是以Event为key,订阅事件为value的hashMap;以订阅事件为key,Event为value的hashMap以及粘性事件的hashMap。

然后EventBus又创建了三个线程调度器HandlerPoster、BackgroundPoster和AsyncPoster用来处理threadMode时候选择不同线程来做处理。

这里突发奇想想到一个问题,如何判断他的事件到底是从主线程发送出来还是从子线程发送的呐?

不知道大家有没有注意到,在创建第一个HandlerPoster调度器的时候我们传入了一个参数

mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);

复制代码

也就是Looper.getMainLooper(),把主线程的Looper传递进去进行判断,看看这个looper是不是主线程的loop,如果是则代表是主线程的,如果不是则代表不是主线程的。

getDefault()

我们跳到getDefault()方法中去看看

        public static EventBus getDefault() {
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = new EventBus();
                }
            }
        }
        return defaultInstance;
        }

复制代码

看到这里面是一个双重检查模式的单例。而且这个方法居然还是个public修饰,这是为什么?

我们知道EventBus通过post发送一个event到被@Subscribe修饰的方法接收到消息处理event称之为一个总线。而我们每次在使用EventBus的时候不仅仅每次只能发送一次event,可以发送很多次event的,因此也有很多个总线。所以把他设置为public代表我们可以拥有很多个总线,而绝非一个。

register()注册方法

我们从注册方法来看看。

   1     public void register(Object subscriber) {
   2     Class<?> subscriberClass = subscriber.getClass();
   3     List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
   4     synchronized (this) {
   5         for (SubscriberMethod subscriberMethod : subscriberMethods) {
   6             subscribe(subscriber, subscriberMethod);
   7         }
   8     }
   9     }

复制代码

从注册代码中可以看到,第三行通过一个findSubscriberMethods()方法查找到订阅方法的集合List< SubscriberMethod>,之后遍历这个集合将取出来的订阅方法subscriberMethod交给subscribe()方法去处理。

那么findSubscriberMethods()是如何找到订阅方法集合的呐?

我们跳到这个方法内部去看看。

   1     List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
   2     List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
   3     if (subscriberMethods != null) {
   4         return subscriberMethods;
   5     }

   6     if (ignoreGeneratedIndex) {
   7         subscriberMethods = findUsingReflection(subscriberClass);
   8     } else {
   9         subscriberMethods = findUsingInfo(subscriberClass);
   10     }
   11     if (subscriberMethods.isEmpty()) {
   12         throw new EventBusException("Subscriber " + subscriberClass
   13                 + " and its super classes have no public methods with the @Subscribe annotation");
   14     } else {
   15         METHOD_CACHE.put(subscriberClass, subscriberMethods);
   16         return subscriberMethods;
   17     }
   18     }

复制代码

首先在第二行看到,会先从METHOD_CACHE里去找集合,如果找到那么直接return返回,如果没找到那么通过findUsingInfo()方法来查找,之后判断找到的集合是否为空,如何为空则抛出一个异常,代表没有被@subscribe修饰的方法。否则把找到的集合再放入到METHOD_CACHE中,之后再把subscriberMethods给返回。

private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();

复制代码

这里我们看到METHOD_CACHE是一个HashMap,他的key为当前class类,value为存储订阅方法的集合。他的主要作用其实是起到了一个缓存的作用,用来缓存所有的订阅方法的集合。

再来看下findUsingInfo()方法

   1  private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
   2     FindState findState = prepareFindState();
   3     findState.initForSubscriber(subscriberClass);
   4     while (findState.clazz != null) {
   5         findState.subscriberInfo = getSubscriberInfo(findState);
   6         if (findState.subscriberInfo != null) {
   7             SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
   8             for (SubscriberMethod subscriberMethod : array) {
   9                 if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
   10                     findState.subscriberMethods.add(subscriberMethod);
   11                 }
   12             }
   13         } else {
   14             findUsingReflectionInSingleClass(findState);
   15         }
   16         findState.moveToSuperclass();
   17     }
   18     return getMethodsAndRelease(findState);
   19  }

复制代码

这个方法中一开始获取一个FindState的对象,然后将订阅的类subscriberClass放入到这个对象的initForSubscriber()方法中。

FindState其实是一个对象复用池,专门用来存储订阅的事件和方法的集合,代码如下:

        static class FindState {
        final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
        final Map<Class, Object> anyMethodByEventType = new HashMap<>();
        final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
        final StringBuilder methodKeyBuilder = new StringBuilder(128);
        }

复制代码

看到里面又是实例化了一堆集合来存储不同的对象,作用只有一个就是对象的复用操作。

这里有一个判断,如果findState.subscriberInfo != null则代表findState里面有订阅消息;如果为null,则代表没有找到订阅消息,那么此时则通过方法findUsingReflectionInSingleClass(findState)去查找。

我们来看下这个方法内部

   1  private void findUsingReflectionInSingleClass(FindState findState) {
   2     Method[] methods;
   3     try {
   4         // This is faster than getMethods, especially when subscribers are fat classes like Activities
   5         methods = findState.clazz.getDeclaredMethods();
   6     } catch (Throwable th) {
   7         // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
   8         methods = findState.clazz.getMethods();
   9         findState.skipSuperClasses = true;
   10     }
   11     for (Method method : methods) {
   12         int modifiers = method.getModifiers();
   13         if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
   14             Class<?>[] parameterTypes = method.getParameterTypes();
   15             if (parameterTypes.length == 1) {
   16                 Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
   17                 if (subscribeAnnotation != null) {
   18                     Class<?> eventType = parameterTypes[0];
   19                     if (findState.checkAdd(method, eventType)) {
   20                         ThreadMode threadMode = subscribeAnnotation.threadMode();
   21                         findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
   22                                 subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
   23                     }
   24                 }
   25             } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
   26                 String methodName = method.getDeclaringClass().getName() + "." + method.getName();
   27                 throw new EventBusException("@Subscribe method " + methodName +
   28                         "must have exactly 1 parameter but has " + parameterTypes.length);
   29             }
   30         } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
   31             String methodName = method.getDeclaringClass().getName() + "." + method.getName();
   32             throw new EventBusException(methodName +
   33                     " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
   34         }
   35     }
   36  }

复制代码

在第五行显示,这里先通过反射来获取到这个activity里面的一个方法的集合。之后遍历这个methods集合,剔除掉一些不符合要求的方法。

例如第13行剔除掉没有被public修饰的方法,第15行剔除掉参数不为只有一个的方法,第16行剔除掉没有被subscribe修饰的方法。

最后将筛选出来的方法和他们的threadMode线程模式、priority优先级和sticky粘性事件等信息全部添加到findState中去,方便下次使用时对对象进行复用操作。

到此就完成了findSubscriberMethods()这个方法的整体流程。我们来总结下

总结一下

findSubscriberMethods()这个方法的主要作用是为了查找到所有的订阅方法,并以集合的形式返回回来。

他的内部首先有一个HashMap的缓存METHOD_CACHE,先从这个缓存中去查找订阅方法集合,如果找到则直接返回。如果没找到则执行findUserInfo()方法查找,并把查找到的结果放入到缓存集合METHOD_CACHE中。

在findUserInfo()的内部有一个对象复用池,用来复用那些被订阅的方法信息。这样做的好处就是可以不用每次都调用反射去查找订阅方法,节约提高效率。

而如果复用池中没有订阅信息,那么则通过反射去查找所有的方法,然后通过条件进行筛选之后放入到这个对象复用池findState中。然后在把找到的订阅方法的集合返回回去。

到此,findSubscriberMethods()的流程才算结束。

subscribe()方法

找到了订阅方法信息的集合之后,我们会通过串行的方式把订阅信息subscriberMethods放到subscribe()方法里面去。

再来看subscribe()方法。

   1  private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
   2     Class<?> eventType = subscriberMethod.eventType;
   3     Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
   4     CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
   5     if (subscriptions == null) {
   6         subscriptions = new CopyOnWriteArrayList<>();
   7         subscriptionsByEventType.put(eventType, subscriptions);
   8     } else {
   9         if (subscriptions.contains(newSubscription)) {
   10             throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
   11                     + eventType);
   12         }
   13     }

   14     int size = subscriptions.size();
   15     for (int i = 0; i <= size; i++) {
   16         if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
   17             subscriptions.add(i, newSubscription);
   18             break;
   19         }
   20     }

   21     List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
   22     if (subscribedEvents == null) {
   23         subscribedEvents = new ArrayList<>();
   24         typesBySubscriber.put(subscriber, subscribedEvents);
   25     }
   26     subscribedEvents.add(eventType);

   27     if (subscriberMethod.sticky) {
   28         if (eventInheritance) {
   29             // Existing sticky events of all subclasses of eventType have to be considered.
   30             // Note: Iterating over all events may be inefficient with lots of sticky events,
   31             // thus data structure should be changed to allow a more efficient lookup
   32             // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
   33             Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
   34             for (Map.Entry<Class<?>, Object> entry : entries) {
   35                 Class<?> candidateEventType = entry.getKey();
   36                 if (eventType.isAssignableFrom(candidateEventType)) {
   37                     Object stickyEvent = entry.getValue();
   38                     checkPostStickyEventToSubscription(newSubscription, stickyEvent);
   39                 }
   40             }
   41         } else {
   42             Object stickyEvent = stickyEvents.get(eventType);
   43             checkPostStickyEventToSubscription(newSubscription, stickyEvent);
   44         }
   45     }
   46  }

复制代码

这段代码虽然长了点,但是主要做的事情并不算多。

首先判断是否有注册过,然后再按照优先级加入到 subscriptionsByEventType 的 value 的 List 中,而 subscriptionsByEventType 是事件订阅者的保存队列,找到该事件所订阅的订阅者以及订阅者的方法、参数等,然后再添加到 typesBySubscriber 的 value 的 List 中,而 typesBySubscriber 是订阅者订阅的事件列表,找到改订阅者所订阅的所有事件,最后判断一下是否是粘性事件,是的话判断事件是否需要考虑继承关系,再分发这个黏性事件。

再看看 postToSubscription 这个方法:

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
	switch (subscription.subscriberMethod.threadMode) {
		case POSTING://当前线程直接调用
			invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {//如果现在是UI线程,直接调用
				invokeSubscriber(subscription, event);
            } else {//否则加入到mainThreadPoster队列中
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
		case BACKGROUND:
			if (isMainThread) {//如果现在是UI线程,加入到backgroundPoster队列中
                backgroundPoster.enqueue(subscription, event);
            } else {//否则直接调用
       	        invokeSubscriber(subscription, event);
            }
            break;
      	case ASYNC://无论如何都加入到asyncPoster队列中
            asyncPoster.enqueue(subscription, event);
            break;
         default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

复制代码

可以看到,这里就是开始调用那几个线程分发器,而这三个 Poster 在上文都已经分析过了,那么来看看 invokeSubscriber 方法吧:

void invokeSubscriber(Subscription subscription, Object event) {
	try {
		subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

复制代码

最终通过反射调用。

注销方法unregister()

     public synchronized void unregister(Object subscriber) {
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                unsubscribeByEventType(subscriber, eventType);
            }
            typesBySubscriber.remove(subscriber);
        } else {
            Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }

复制代码

再看看 unsubscribeByEventType :

private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
	//subscriptionsByEventType是以eventType为key,Subscription的ArrayList为value的HashMap,事件订阅者的保存队列,找到该事件所订阅的订阅者以及订阅者的方法、参数等
  	List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions != null) {
		int size = subscriptions.size();
		for (int i = 0; i < size; i++) {
			Subscription subscription = subscriptions.get(i);
			if (subscription.subscriber == subscriber) {
				subscription.active = false;
	            //remove掉
                subscriptions.remove(i);
                i--;
                size--;
            }
        }
    }
}

复制代码

注销的流程就是将 typesBySubscriber 和 subscriptionsByEventType 中的关于该订阅者以及该订阅者中的方法、事件等 remove 掉。

发送事件post()

   1  public void post(Object event) {
   2     PostingThreadState postingState = currentPostingThreadState.get();
   3     List<Object> eventQueue = postingState.eventQueue;
   4     eventQueue.add(event);

   5     if (!postingState.isPosting) {
   6         postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
   7         postingState.isPosting = true;
   8         if (postingState.canceled) {
   9             throw new EventBusException("Internal error. Abort state was not reset");
   10         }
   11         try {
   12             while (!eventQueue.isEmpty()) {
   13                 postSingleEvent(eventQueue.remove(0), postingState);
   14             }
   15         } finally {
   16             postingState.isPosting = false;
   17             postingState.isMainThread = false;
   18         }
   19     }
   20  }

复制代码

可以看到,首先会从currentPostingThreadState中获取到一个PostingThreadState。之后通过PostingThreadState获取到事件队列,将发送的event事件放入到队列中,最后交给postSingleEvent()去处理。

currentPostingThreadState是什么?通过看代码可以发现:

     private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };

复制代码

其实是一个线程内部存储类ThreadLcoal,也就是通过这个类来获取到的PostingThreadState为线程专属,各个线程之间从而达到互不干扰的作用。

而其实现是返回一个 PostingThreadState 对象,而 PostingThreadState 类的结构是:

final static class PostingThreadState {
	final List<Object> eventQueue = new ArrayList<Object>();
    boolean isPosting;
    boolean isMainThread;
    Subscription subscription;
    Object event;
    boolean canceled;
}


复制代码

PostingThreadState 封装的是当前线程的 post 信息,包括事件队列、是否正在分发中、是否在主线程、订阅者信息、事件实例、是否取消。那么回到 post 方法中:

public void post(Object event) {
	//1.得到PostingThreadState
	PostingThreadState postingState = currentPostingThreadState.get();
  	//2.获取其中的队列
	List<Object> eventQueue = postingState.eventQueue;
  	//2.将该事件添加到队列中
	eventQueue.add(event);
	//2.如果postingState没有进行发送
	if (!postingState.isPosting) {
      	//2. 判断当前线程是否是主线程
		postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
        //2.将isPosting状态改为true,表明正在发送中
      	postingState.isPosting = true;
      	//2.如果取消掉了,抛出异常
        if (postingState.canceled) {
			throw new EventBusException("Internal error. Abort state was not reset");
		}
        try {
          	//2.循环,直至队列为空
			while (!eventQueue.isEmpty()) {
              	//2.发送事件
				postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
			postingState.isPosting = false;
			postingState.isMainThread = false;
        }
    }
}


复制代码

最后走到一个 while 循环,判断事件队列是否为空了,如果不为空,继续循环,进行 postSingleEvent 操作,从事件队列中取出一个事件进行发送。


private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
	Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    if (eventInheritance) {//是否查看事件的继承关系
      	//找到事件的所以继承关系的事件类型
		List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
			Class<?> clazz = eventTypes.get(h);
          	//发送事件
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
	} else {
      	//直接发送事件
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
  	
    if (!subscriptionFound) {//如果没有任何事件
		if (logNoSubscriberMessages) {
			Log.d(TAG, "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) {
          	//发送一个NoSubscriberEvent的事件出去
            post(new NoSubscriberEvent(this, event));
        }
    }
}


复制代码

lookupAllEventTypes() 就是查找该事件的所有父类,返回所有的该事件的父类的 class 。它通过循环和递归一起用,将一个类的父类(接口)全部添加到全局静态变量 eventTypes 集合中。再看一下 postSingleEventForEventType 方法:

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
	CopyOnWriteArrayList<Subscription> subscriptions;
	synchronized (this) {
      	//所有订阅了event的事件集合
		subscriptions = subscriptionsByEventType.get(eventClass);
	}
	if (subscriptions != null && !subscriptions.isEmpty()) {
		for (Subscription subscription : subscriptions) {
			postingState.event = event;
			postingState.subscription = subscription;
            boolean aborted = false;
            try {
              	//这里调用的postToSubscription方法,上面有解析
				postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
             }
             if (aborted) {
                break;
             }
        }
    	return true;
    }
    return false;
}


复制代码

至此,EventBus 的解析就结束了。



这篇关于EventBus源码解析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程