跳至主要內容

Android-EventBus修改纪实(一)-必达事件

guodongAndroid大约 14 分钟Androidandroideventbus

Android-EventBus修改纪实

背景

笔者在使用 EventBus 的过程中发现有时只能收到最后一次的黏性 Event ,导致业务逻辑出现混乱,下面是笔者的使用示例:

// Event.java
public final class Event {

    private final int code;

    public Event(int code) {
        this.code = code;
    }

    public int getCode() {
        return code;
    }
}
// Example.java
public class Example {

    // 调用多次
    public void test(int code) {
        EventBus.getDefault().postSticky(new Event(code));
    }
    
    // 调用多次 `test(int code)` 后再注册订阅者
    public void register() {
        EventBus.getDefault().register(this);
    }

    @Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
    public void receiveEvent(Event event) {
        // 发现只能收到最后一次的黏性事件
        System.out.println(event.getCode());
    }
}

所以去查看了 EventBus 的源码,接下来我们分析下 EventBus 发送黏性事件的流程。

分析

黏性事件

以下源码基于 EventBus 3.3.1 版本

下面是发送黏性事件的源码:

private final Map<Class<?>, Object> stickyEvents;

public void postSticky(Object event) {
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    // Should be posted after it is putted, in case the subscriber wants to remove immediately
    post(event);
}

postSticky 代码比较简单,首先对 stickyEvents 进行加锁,接下来把 event 事件的 Class 对象作为 Key,event 事件本身作为 value 放进 Map 中,其中stickyEvents 是 Map 对象,实例是 ConcurrentHashMap, 其 Key 和 Value 的泛型形参分别是 Class<?>Object, 它的作用就是用来存储黏性事件;然后调用 post(event) 把黏性事件当作普通事件发送一下。

首先我们看下最后为什么要调用下 post(event)

虽然 post(evnet) 上面有注释,简单翻译下:"在放进 Map 后应该再发送一次,以防止订阅者想立即删除此事件",读完注释后,可能还是不太明白,这里笔者认为:在前面存储完黏性事件后,这里调用 post 把黏性事件当作普通事件发送出去,或许是因为现在已经有注册的黏性事件订阅者,此时把已经注册的黏性事件订阅者当作普通事件的订阅者,这样已经注册的黏性事件订阅者可以立即收到相应的事件,只是此时事件不再是黏性的。

postSticky 中我们并没有看到黏性事件是在哪里发送的,想一想我们使用黏性事件的目的是什么?当注册订阅者时可以收到之前发送的事件,这样来看,黏性事件的发送是在注册订阅者时,下面是注册订阅者的源码,删除了一些无关代码:

public void register(Object subscriber) {
    
    // 省略无关代码
    
    Class<?> subscriberClass = subscriber.getClass();
    
    // 查找订阅者所有的Event接收方法
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}

register 代码也比较简单,首先通过订阅者的 Class 对象查找订阅者所有的Event事件接收方法,然后对 EventBus 对象加锁,遍历所有的Event事件接收方法 subscriberMethods 调用 subscribe 方法,以下是 subscribe 方法源码:

// Key 为 Event Class 对象,Value 为存储 Event 的订阅者和接收 Event 方法对象的集合 
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;

// Key 为订阅者对象,Value 为订阅者中的 Event Class对象集合
private final Map<Object, List<Class<?>>> typesBySubscriber;

// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    // Event Class对象
    Class<?> eventType = subscriberMethod.eventType;
    
    // 订阅者和接收 Event 方法对象
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    
    // 根据 Event Class对象,获取订阅者和接收 Event 方法对象的集合
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    
    // 判断订阅者和接收 Event 方法对象是否为空
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        // 判断是否已经包含了新的订阅者和接收 Event 方法对象,若是包含则认为是重复注册
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                                        + eventType);
        }
    }

    // 这里是按优先级排序插入到集合中
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }

    // 这里是把 Event Class对象添加进对应订阅者的 Event Class对象集合中
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    
    // 上面已经判断了是否重复注册,所以这里直接添加
    subscribedEvents.add(eventType);

    // 接下来就是黏性事件的发送逻辑了
    // 判断 Event 接收方法是否可以处理黏性事件
    if (subscriberMethod.sticky) {
        // 这里判断是否考虑 Event 事件类的继承关系,默认为 Ture
        if (eventInheritance) {
            // Existing sticky events of all subclasses of eventType have to be considered.
            // Note: Iterating over all events may be inefficient with lots of sticky events,
            // thus data structure should be changed to allow a more efficient lookup
            // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}

在上面的源码中,增加了不少注释有助于我们读懂源码,在源码的最后就是黏性事件的发送逻辑了,其中有两个分支,其中一个分支根据 Event 事件的继承关系发送事件,另外一个分支根据接收 Event 方法中的 Event Class 对象从 stickyEvents 中直接查找黏性事件,最后两个分支殊途同归,都调用了 checkPostStickyEventToSubscription 方法:

private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
    if (stickyEvent != null) {
        // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
        // --> Strange corner case, which we don't take care of here.
        postToSubscription(newSubscription, stickyEvent, isMainThread());
    }
}

checkPostStickyEventToSubscription 方法很简单,对黏性事件做下判空处理,继续调用 postToSubscription 方法,传入订阅者与接收 Event 方法对象,黏性事件和是否是主线程布尔值:

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster not decoupled from subscriber
                invokeSubscriber(subscription, event);
            }
            break;A
                case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

postToSubscription 方法比较长,但是比较好理解,就是根据接收 Event 方法上的 @Subscribe 注解中传入的线程模型进行事件的分发,具体的事件分发流程,有空再分析,本文就先不分析了,现在我们只需知道最后都会调用 invokeSubscriber(Subscription subscription, Object event) 方法即可:

void invokeSubscriber(Subscription subscription, Object event) {
    try {
        // 反射调用 Event 接收方法传入 Event 事件
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

终于在 invokeSubscriber 方法中找到调用 Event 接收方法的地方了,原来 EventBus 最后是通过反射调用 Event 接收方法并传入相应 Event 事件的。

分析完 Event 事件的发送流程,好像没有发现为什么有时收不到黏性事件。

我们回过头来再看下笔者的使用示例,为了方便查看,下面贴出使用示例代码:

// Example.java
public class Example {

    // 调用多次
    public void test(int code) {
        EventBus.getDefault().postSticky(new Event(code));
    }
    
    // 调用多次 `test(int code)` 后再注册订阅者
    public void register() {
        EventBus.getDefault().register(this);
    }

    @Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
    public void receiveEvent(Event event) {
        // 发现只能收到最后一次的黏性事件
        System.out.println(event.getCode());
    }
}

可能细心的读者已经发现 test 方法调用了,问题应该出在 postSticky 方法中,让我们再次查看 postSticky 方法:

private final Map<Class<?>, Object> stickyEvents;

public void postSticky(Object event) {
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    // Should be posted after it is putted, in case the subscriber wants to remove immediately
    post(event);
}

根据前面分析 postSticky 方法的结果,stickyEvents 用于存储黏性事件,它是个 Map 结构,而 stickyEvents 的 Key 正是 Event 的 Class 对象,根据 Map 结构的存储原理:如果存在相同的 Key,则覆盖 Value 的值,而 stickyEvents 的 Value 正是 Event 本身。

终于真相大白,多次调用 test 方法发送黏性事件,EventBus 只会存储最后一次的黏性事件。

小结

EventBus 针对同一个黏性 Event 事件只会存储最后一次发送的黏性事件。

EventBus 的上述实现可能是因为多次发送同一个黏性事件,则认为之前的事件是过期事件应该抛弃,因此只传递最新的黏性事件。

EventBus 的这种实现无法满足笔者的业务逻辑需求,笔者希望多次发送的黏性事件,订阅者都能接收到,而不是只接收最新的黏性事件,可以理解为黏性事件必达订阅者,下面让我们修改 EventBus 的源码来满足需求吧。

修改

上一节我们分析了黏性事件的发送流程,为了满足黏性事件必达的需求,基于现有黏性事件流程,我们可以仿照黏性事件的发送来提供一个发送必达消息的方法。

Subscribe

首先我们定义 Event 接收方法可以接收黏性事件是在 @Subscribesticky = true , 所以我们可以修改 Subscribe 注解,增加黏性事件必达的方法:

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
    ThreadMode threadMode() default ThreadMode.POSTING;

    /**
     * If true, delivers the most recent sticky event (posted with
     * {@link EventBus#postSticky(Object)}) to this subscriber (if event available).
     */
    boolean sticky() default false;

    // 增加消息必达的方法
    boolean rendezvous() default false;

    /** Subscriber priority to influence the order of event delivery.
     * Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before
     * others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of
     * delivery among subscribers with different {@link ThreadMode}s! */
    int priority() default 0;
}

rendezvous 以为约会、约定的意思,可以理解为不见不散,在这里它有两层作用,其一是标记方法可以接收黏性事件,其二是标记方法接收的事件是必达的。

findSubscriberMethods

接下来就需要解析 rendezvous 了,我们先看看 sticky 是如何解析的,在上一节我们分析了 register 方法,方便查看,下面再贴出 register 方法源码:

public void register(Object subscriber) {
    
    // 省略无关代码
    
    Class<?> subscriberClass = subscriber.getClass();
    
    // 查找订阅者所有的Event接收方法
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}

上一节分析中,我们没有分析查找订阅者中所有的 Event 接收方法 findSubscriberMethods ,接下来我们分析下在 findSubscriberMethods 方法是如何查找 Event 接收方法的:

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
    // 先从缓存中查找
    List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
    if (subscriberMethods != null) {
        return subscriberMethods;
    }

    // 是否忽略生成索引,默认为False,所以这里走else分支
    if (ignoreGeneratedIndex) {
        subscriberMethods = findUsingReflection(subscriberClass);
    } else {
        // 查找Event接收方法
        subscriberMethods = findUsingInfo(subscriberClass);
    }
    
    // 如果订阅者和订阅者父类中没有Event接收方法则抛出异常
    if (subscriberMethods.isEmpty()) {
        throw new EventBusException("Subscriber " + subscriberClass
                                    + " and its super classes have no public methods with the @Subscribe annotation");
    } else {
        // 添加进缓存中
        METHOD_CACHE.put(subscriberClass, subscriberMethods);
        return subscriberMethods;
    }
}

调用 findSubscriberMethods 方法需要传入订阅者 Class 对象,通过笔者在源码中增加的注释分析发现默认调用 findUsingInfo 方法查找 Event 接收方法,我们继续跟踪 findUsingInfo 方法:

private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
    // FindState对订阅者Class对象和Event接收方法进行了一层封装
    FindState findState = prepareFindState();
    findState.initForSubscriber(subscriberClass); // ①
    while (findState.clazz != null) {
        
        // 查找订阅者信息,包含订阅者Class对象、 订阅者父类、Event接收方法等
        findState.subscriberInfo = getSubscriberInfo(findState); // ②
        
        // 在 ① initForSubscriber中会把subscriberInfo置为null,
        // 在 ② getSubscriberInfo中没有Index对象,
        // 所以第一次时这里会走else分支
        if (findState.subscriberInfo != null) {
            SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
            for (SubscriberMethod subscriberMethod : array) {
                if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                    findState.subscriberMethods.add(subscriberMethod);
                }
            }
        } else {
            // 查找Event接收方法
            findUsingReflectionInSingleClass(findState);
        }
        
        // 查找父类的Event接收方法
        findState.moveToSuperclass();
    }
    
    // 通过findState返回Event接收方法,并回收findState
    return getMethodsAndRelease(findState);
}

根据笔者在源码中的注释分析,在 findUsingInfo 方法中使用「享元模式」对 FindState 进行回收利用,避免创建大量临时的 FindState 对象占用内存,最后再次调用 findUsingReflectionInSingleClass 方法查找 Event 接收方法,看方法名字应该是使用反射查找,findUsingReflectionInSingleClass 源码较长,删减一些不关心的代码:

private void findUsingReflectionInSingleClass(FindState findState) {
    Method[] methods;
    try {
        // This is faster than getMethods, especially when subscribers are fat classes like Activities
        // 通过反射获取当前类中声明的所有方法
        methods = findState.clazz.getDeclaredMethods();
    } catch (Throwable th) {
        // 删减不关心的代码
    }
    
    // 遍历所有方法
    for (Method method : methods) {
        
        // 获取方法的修饰符
        int modifiers = method.getModifiers();
        
        // 判断方法是否是public的;是否是抽象方法,是否是静态方法,是否是桥接方法,是否是合成方法
        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
            
            // 获取方法的形参Class对象数组
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (parameterTypes.length == 1) {
                
                // 获取方法上的Subscribe注解
                Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                if (subscribeAnnotation != null) {
                    Class<?> eventType = parameterTypes[0];
                    
                    // 检测是否已经添加了相同签名的方法,考虑子类复写父类方法的情况
                    if (findState.checkAdd(method, eventType)) {
                        
                        // 获取注解的参数
                        ThreadMode threadMode = subscribeAnnotation.threadMode();
                        findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
								subscribeAnnotation.priority(), subscribeAnnotation.sticky(),
								
								// 这里我们添加rendezvous参数 ①
								subscribeAnnotation.rendezvous()));
                    }
                }
            }
            // 删减不关心的代码
        }
        // 删减不关心的代码
    }
}

findUsingReflectionInSingleClass 方法中通过反射获取订阅者中声明的所有方法,然后遍历所有方法:

  1. 首先判断方法的修饰符是否符合,
  2. 其次判断方法是否只有一个形参,
  3. 再次判断方法是否有 Subscribe 注解,
  4. 然后检测是否已经添加了相同签名的方法,主要是考虑子类复写父类方法这种情况,
  5. 最后获取 Subscribe 注解的参数,在这里我们解析 rendezvous,封装进 SubscriberMethod 中。

SubscriberMethod 中增加 rendezvous 字段,删除不关心的代码:

public class SubscriberMethod {
    final Method method;
    final ThreadMode threadMode;
    final Class<?> eventType;
    final int priority;
    final boolean sticky;

    // 增加 `rendezvous` 字段
    final boolean rendezvous;
    /** Used for efficient comparison */
    String methodString;

    public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, 
                            int priority, boolean sticky,
                            
                            // 增加 `rendezvous` 形参
                            boolean rendezvous) {
        this.method = method;
        this.threadMode = threadMode;
        this.eventType = eventType;
        this.priority = priority;
        this.sticky = sticky;
        this.rendezvous = rendezvous;
    }
}

postRendezvous

好的,rendezvous 已经解析出来了,接下来我们对外提供发送必达事件的接口:

// 选择List存储必达事件,使用Pair封装必达事件的Key和Value
private final List<Pair<Class<?>, Object>> rendezvousEvents;

public void postRendezvous(Object event) {
    synchronized (rendezvousEvents) {
        rendezvousEvents.add(Pair.create(event.getClass(), event));
    }
    // Should be posted after it is putted, in case the subscriber wants to remove immediately
    post(event);
}

上面的源码,我们通过仿照 postSticky 方法实现了 postRendezvous 方法,在 postSticky 方法中使用 Map 存储黏性事件,不过我们在 postRendezvous 方法中使用 List 存储必达事件,保证必达事件不会因为 Key 相同而被覆盖丢失,最后也是调用 post 方法尝试先发送一次必达事件。

register

在上一节中我们分析了黏性事件是在 register 中调用 subscribe 方法进行发送的,这里我们仿照黏性事件的发送逻辑,实现必达事件的发送逻辑,我们可以在 subscribe 方法最后增加发送必达事件的逻辑,以下源码省略了一些不关心的代码:

private final List<Pair<Class<?>, Object>> rendezvousEvents;

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    // 省略不关心的代码

    // 黏性事件发送逻辑
    if (subscriberMethod.sticky) {
        if (eventInheritance) {
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }

    // 新增必达事件发送逻辑
    // 判断方法是否可以接收必达事件
    if (subscriberMethod.rendezvous) {
        if (eventInheritance) {
            for (Pair<Class<?>, Object> next : rendezvousEvents) {
                Class<?> candidateEventType = next.first;
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = next.second;
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object rendezvousEvent = getRendezvousEvent(eventType);
            if (rendezvousEvent != null) {
                checkPostStickyEventToSubscription(newSubscription, rendezvousEvent);
            }
        }
    }
}

subscribe 方法中,我们通过仿照黏性事件的发送逻辑增加了必达事件的发送:

  1. 首先判断 Event 接收方法是否可以接收必达事件
  2. 其次考虑 Event 必达事件的继承关系,
  3. 最后两个分支都调用 checkPostStickyEventToSubscription 方法发送必达事件

happy~

总结

使用第三方库时,发现问题不要慌张,带着问题去查看源码总有一番收获,这也告诫我们在使用第三库时最好先搞明白它的实现原理,遇到问题时不至于束手无策。

通过分析 EventBus 的源码,我们有以下收获:

  1. 明白了我们注册订阅者时 EventBus 做了哪些事情
  2. 知晓了我们发送黏性事件时,EventBus 是如何处理及何时发送黏性事件的
  3. 了解到 EventBus 是通过反射调用 Event 事件的接收方法
  4. 学习了 EventBus 中的一些优化点,比如对 FindState 使用「享元模式」避免创建大量临时对象占用内存
  5. 进一步了解到对并发的处理

通过以上收获,我们成功修改 EventBus 源码实现了我们必达事件的需求。

到这里我们已经完成了必达事件的发送,不过我们还剩下获取必达事件,移除必达事件没有实现,最后 EventBus 中还有单元测试 module,我们还没有针对 rendezvous 编写单元测试,读者有兴趣的话,可以自己试着实现。

希望可以帮到你~