跳至主要內容

Android-EventBus修改纪实(二)-线程模型

guodongAndroid大约 12 分钟Androidandroideventbus

Android-EventBus修改纪实(二)

前言

在上一篇 Android-EventBus修改纪实open in new window 中笔者分析了 EventBus 黏性事件的发送流程并对 EventBus 进行增强实现了必达事件的支持,上一篇文章最后笔者没有实现以下方法:

  • 获取必达事件,
  • 移除必达事件,
  • 编写必达事件单元测试

本篇文章补全上一篇文章未实现的部分,最后应上一篇文章中掘友的需求对 EventBus 的线程切换做下分析。

纪实

getRendezvousEvent

// 选择List存储必达事件,使用Pair封装必达事件的Key和Value
private final List<Pair<Class<?>, Object>> rendezvousEvents;

public <T> T getRendezvousEvent(Class<T> eventType) {
    // 对`rendezvousEvents`加锁
    synchronized (rendezvousEvents) {
        // 遍历必达事件集合
        for (Pair<Class<?>, Object> next : rendezvousEvents) {
            
            // 取出Key
            Class<?> first = next.first;
            
            // 与入参进行比较
            if (eventType.equals(first)) {
                
                // 把Value转换为入参的类型
                return eventType.cast(next.second);
            }
        }
        return null;
    }
}

通过仿照 EventBus 中获取黏性事件的实现来写出获取必达事件的实现,在源码实现中首先对存储必达事件的集合 rendezvousEvents 进行加锁,防止并发修改必达事件集合,其次遍历必达集合,取出必达事件的 Key,即必达事件的 Class 对象,与入参的 Class 对象进行比较,如果相等,则取出必达事件的 Value,强制转换为入参类型。

removeRendezvousEvent

有两个移除必达事件的方法,一个根据必达事件的 Class 对象进行移除,另外一个根据必达事件的实例进行移除,下面我们一一实现:

removeRendezvousEvent(Class eventType)

我们先实现根据必达事件的 Class 对象进行移除的方法,实现源码如下:

// 选择List存储必达事件,使用Pair封装必达事件的Key和Value
private final List<Pair<Class<?>, Object>> rendezvousEvents;

// 入参必达事件的 Class 对象
public <T> T removeRendezvousEvent(Class<T> eventType) {
    // 对`rendezvousEvents`加锁
    synchronized (rendezvousEvents) {
        int size = rendezvousEvents.size();
        
        // 使用普通的 for 循环,避免产生 ConcurrentModificationException
        for (int i = 0; i < size; i++) {
            Pair<Class<?>, Object> pair = rendezvousEvents.get(i);
            Class<?> first = pair.first;
            // 比较必达事件的 Class 对象与入参是否相等
            if (eventType.equals(first)) {
                rendezvousEvents.remove(i);
                return eventType.cast(pair.second);
            }
            i--;
            size--;
        }
        return null;
    }
}

在上面的源码中首先对 rendezvousEvents 进行加锁,在临界区内使用普通的 for 循环,而不使用增强的 forEach 循环遍历 rendezvousEvents 是避免发生同步修改异常 ConcurrentModificationException,虽然 rendezvousEvents 的实例是 CopyOnWriteArrayList

在循环体内,取出存储必达事件的 Class 对象与入参比较,判断是否相等,相等即移除此必达事件,同时返回必达事件的实例。

不知读者有没有发现这里移除必达事件有什么问题?

我们的 rendezvousEvents 是 List 集合结构,上面的实现中,我们是不是只移除了第一个必达事件方法就返回了,如何实现根据必达事件的 Class 对象移除必达事件相信读者可以自行修改实现,笔者这里就不实现了。

removeRendezvousEvent(Object event)

接下来我们实现根据必达事件的实例进行移除,实现源码如下:

// 选择List存储必达事件,使用Pair封装必达事件的Key和Value
private final List<Pair<Class<?>, Object>> rendezvousEvents;

// 入参必达事件的实例
public boolean removeRendezvousEvent(Object event) {
    
    // 对`rendezvousEvents`加锁
    synchronized (rendezvousEvents) {
        boolean result = false;
        int size = rendezvousEvents.size();
        
        // 使用普通的 for 循环,避免产生 ConcurrentModificationException
        for (int i = 0; i < size; i++) {
            Pair<Class<?>, Object> pair = rendezvousEvents.get(i);
            Object second = pair.second;
            
            // 比较必达事件的实例与入参是否相等
            if (event.equals(second)) {
                rendezvousEvents.remove(i);
                result = true;
            }
            i--;
            size--;
        }
        return result;
    }
}

我们根据 removeRendezvousEvent(Class<T> eventType) 的实现,很容易就可以实现根据必达事件的实例进行移除的逻辑,与 removeRendezvousEvent(Class<T> eventType) 的实现类似,首先对 rendezvousEvents 进行加锁,在临界区内使用普通的 for 循环,在循环体内,取出存储必达事件的实例与入参比较,判断是否相等,相等即移除此必达事件,同时把移除结果置为 True ,最后返回移除结果。

至此,我们终于完成了对 EventBus 增加必达事件的所有逻辑。

单元测试

接下来让我们编写单元测试逻辑来测试我们必达事件的逻辑正确性吧。

EventBusStickyEventTest

我们先看看 EventBus 中对黏性事件的单元测试用例,用例较多,笔者挑选几个:

public class EventBusStickyEventTest extends AbstractEventBusTest {

    // 测试发送黏性事件的逻辑
    // 先发送黏性事件,再注册订阅者,然后断言最后一个事件与发送的黏性事件相等,最后断言最后一个订阅方法的线程是当前线程
    @Test
    public void testPostSticky() throws InterruptedException {
        eventBus.postSticky("Sticky");
        eventBus.register(this);
        assertEquals("Sticky", lastEvent);
        assertEquals(Thread.currentThread(), lastThread);
    }

    // 测试没有黏性事件订阅方法时的逻辑
    @Test
    public void testPostNonStickyRegisterSticky() throws InterruptedException {
        eventBus.post("NonSticky");
        eventBus.register(this);
        assertNull(lastEvent);
        assertEquals(0, eventCount.intValue());
    }

    // 测试注册与反注册订阅者对黏性事件的影响
    @Test
    public void testPostStickyWithRegisterAndUnregister() throws InterruptedException {
        eventBus.register(this);
        eventBus.postSticky("Sticky");
        assertEquals("Sticky", lastEvent);

        eventBus.unregister(this);
        eventBus.register(this);
        assertEquals("Sticky", lastEvent);
        assertEquals(2, eventCount.intValue());

        eventBus.postSticky("NewSticky");
        assertEquals(3, eventCount.intValue());
        assertEquals("NewSticky", lastEvent);

        eventBus.unregister(this);
        eventBus.register(this);
        assertEquals(4, eventCount.intValue());
        assertEquals("NewSticky", lastEvent);
    }

    // 测试获取黏性事件
    @Test
    public void testPostStickyAndGet() throws InterruptedException {
        eventBus.postSticky("Sticky");
        assertEquals("Sticky", eventBus.getStickyEvent(String.class));
    }

    // 测试移除黏性事件
    @Test
    public void testPostStickyRemoveEvent() throws InterruptedException {
        eventBus.postSticky("Sticky");
        assertTrue(eventBus.removeStickyEvent("Sticky"));
        assertNull(eventBus.getStickyEvent(String.class));
        eventBus.register(this);
        assertNull(lastEvent);
        assertEquals(0, eventCount.intValue());
    }
}

EventBusRendezvousEventTest

我们可以根据 EventBusStickyEventTest 编写必达事件的测试用例,以下为笔者实现的测试用例:

public class EventBusRendezvousEventTest extends AbstractEventBusTest {

    @Test
    public void testRendezvousSticky() throws InterruptedException {
        eventBus.postRendezvous("Rendezvous");
        eventBus.register(this);
        assertEquals("Rendezvous", lastEvent);
        assertEquals(Thread.currentThread(), lastThread);
    }

    @Test
    public void testPostRendezvousTwoEvents() throws InterruptedException {
        eventBus.postRendezvous("Rendezvous");
        eventBus.postRendezvous(new IntTestEvent(7));
        eventBus.register(this);
        assertEquals(2, eventCount.intValue());
    }

    @Test
    public void testPostRendezvousTwoSubscribers() throws InterruptedException {
        eventBus.postRendezvous("Rendezvous");
        eventBus.postRendezvous(new IntTestEvent(7));
        eventBus.register(this);
        RendezvousIntTestSubscriber subscriber2 = new RendezvousIntTestSubscriber();
        eventBus.register(subscriber2);
        assertEquals(3, eventCount.intValue());

        eventBus.postRendezvous("Rendezvous");
        assertEquals(4, eventCount.intValue());

        eventBus.postRendezvous(new IntTestEvent(8));
        assertEquals(6, eventCount.intValue());
    }

    @Test
    public void testPostRendezvousRegisterNonRendezvous() throws InterruptedException {
        eventBus.postRendezvous("Rendezvous");
        eventBus.register(new NonRendezvousSubscriber());
        assertNull(lastEvent);
        assertEquals(0, eventCount.intValue());
    }

    @Test
    public void testPostNonRendezvousRegisterRendezvous() throws InterruptedException {
        eventBus.post("NonRendezvous");
        eventBus.register(this);
        assertNull(lastEvent);
        assertEquals(0, eventCount.intValue());
    }

    @Test
    public void testPostRendezvousTwice() throws InterruptedException {
        eventBus.postRendezvous("Rendezvous");
        eventBus.postRendezvous("NewRendezvous");
        eventBus.register(this);
        assertEquals(2, eventCount.intValue());
        assertEquals("NewRendezvous", lastEvent);
    }

    @Test
    public void testPostRendezvousThenPostNormal() throws InterruptedException {
        eventBus.postRendezvous("Rendezvous");
        eventBus.post("NonRendezvous");
        eventBus.register(this);
        assertEquals(1, eventCount.intValue());
        assertEquals("Rendezvous", lastEvent);
    }

    @Test
    public void testPostRendezvousWithRegisterAndUnregister() throws InterruptedException {
        eventBus.register(this);
        eventBus.postRendezvous("Rendezvous");
        assertEquals("Rendezvous", lastEvent);

        eventBus.unregister(this);
        eventBus.register(this);
        assertEquals("Rendezvous", lastEvent);
        assertEquals(2, eventCount.intValue());

        eventBus.postRendezvous("NewRendezvous");
        assertEquals(3, eventCount.intValue());
        assertEquals("NewRendezvous", lastEvent);

        eventBus.unregister(this);
        eventBus.register(this);
        assertEquals(5, eventCount.intValue());
        assertEquals("NewRendezvous", lastEvent);
    }

    @Test
    public void testPostRendezvousAndGet() throws InterruptedException {
        eventBus.postRendezvous("Rendezvous");
        assertEquals("Rendezvous", eventBus.getRendezvousEvent(String.class));
    }

    @Test
    public void testPostRendezvousRemoveClass() throws InterruptedException {
        eventBus.postRendezvous("Rendezvous");
        eventBus.removeRendezvousEvent(String.class);
        assertNull(eventBus.getRendezvousEvent(String.class));
        eventBus.register(this);
        assertNull(lastEvent);
        assertEquals(0, eventCount.intValue());
    }

    @Test
    public void testPostRendezvousRemoveEvent() throws InterruptedException {
        eventBus.postRendezvous("Rendezvous");
        assertTrue(eventBus.removeRendezvousEvent("Rendezvous"));
        assertNull(eventBus.getRendezvousEvent(String.class));
        eventBus.register(this);
        assertNull(lastEvent);
        assertEquals(0, eventCount.intValue());
    }

    @Test
    public void testPostRendezvousRemoveAll() throws InterruptedException {
        eventBus.postRendezvous("Rendezvous");
        eventBus.postRendezvous(new IntTestEvent(77));
        eventBus.removeAllRendezvousEvents();
        assertNull(eventBus.getRendezvousEvent(String.class));
        assertNull(eventBus.getRendezvousEvent(IntTestEvent.class));
        eventBus.register(this);
        assertNull(lastEvent);
        assertEquals(0, eventCount.intValue());
    }

    @Test
    public void testRemoveRendezvousEventInSubscriber() throws InterruptedException {
        eventBus.register(new RendezvousStickySubscriber());
        eventBus.postRendezvous("Rendezvous");
        eventBus.register(this);
        assertNull(lastEvent);
        assertEquals(0, eventCount.intValue());
        assertNull(eventBus.getRendezvousEvent(String.class));
    }

    @Subscribe(rendezvous = true)
    public void onEvent(String event) {
        trackEvent(event);
    }

    @Subscribe(rendezvous = true)
    public void onEvent(IntTestEvent event) {
        trackEvent(event);
    }

    public class RendezvousStickySubscriber {
        @SuppressWarnings("unused")
        @Subscribe(rendezvous = true)
        public void onEvent(String event) {
            eventBus.removeRendezvousEvent(event);
        }
    }

    public class NonRendezvousSubscriber {
        @Subscribe
        public void onEvent(String event) {
            trackEvent(event);
        }

        @Subscribe
        public void onEvent(IntTestEvent event) {
            trackEvent(event);
        }
    }

    public class RendezvousIntTestSubscriber {
        @Subscribe(rendezvous = true)
        public void onEvent(IntTestEvent event) {
            trackEvent(event);
        }
    }
}

在上面的测试用例源码中,笔者编写了 13 个测试用例,涉及发送一个必达事件,发送两个必达事件,发送必达事件与普通事件,注册订阅者与反注册对必达事件的影响,获取和移除必达事件等。

线程调度

线程模型

目前 EventBus 对订阅方法支持 5 种线程模型的调度,分别是:

  1. ThreadMode.POSTING // 对于普通事件来说在事件发布线程来分发事件
  2. ThreadMode.MAIN // 在主线程来分发事件,根据是否在 Android 上使用,处理逻辑不同
  3. ThreadMode.MAIN_ORDERED // 在主线程按顺序来分发事件
  4. ThreadMode.BACKGROUND // 在后台线程来分发事件,根据是否在 Android 上使用,处理逻辑不同。使用单线程处理,尽量不要进行耗时操作以免阻塞后台线程
  5. ThreadMode.ASYNC // 在异步线程来分发事件,使用线程池处理

在上一篇文章中我们分析黏性事件发送的流程时,事件的发送都会调用 postToSubscription 方法,最终调用 invokeSubscriber 方法来反射调用订阅方法,下面再贴出 postToSubscription 的源码:

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        // 处理POSTING
        case POSTING:
            // 直接分发事件
            invokeSubscriber(subscription, event);
            break;
            // 处理MAIN
        case MAIN:
            // 是否是主线程,在非 Android 平台上时,isMainThread 也是 true
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                // 此时 mainThreadPoster != null,可以认为是在 Android 平台上
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        // 处理MAIN_ORDERED
        case MAIN_ORDERED:
            // 不管是否是主线程,总是先判断是否可以入队
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster not decoupled from subscriber
                // 否则直接分发事件
                invokeSubscriber(subscription, event);
            }
            break;
        // 处理BACKGROUND
        case BACKGROUND:
            // 如果是主线程则入队,backgroundPoster使用单个后台线程依次分发事件,订阅方法应尽快返回以免阻塞后台线程
            // 在非 Android 平台上时,isMainThread 也是 true,即在非 Android 平台上时,始终在后台线程分发事件
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                // 否则直接分发事件
                invokeSubscriber(subscription, event);
            }
            break;
        // 处理ASYNC
        case ASYNC:
            // 始终异步分发事件
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

POSTING

这是 @Subscribe 注解中线程模型的默认配置,对于该线程调度模型,这里需要注意:

  1. 对于普通事件来说,事件发布与订阅方法将在同一个线程,这是该线程调度模型的本意
  2. 但是对于黏性事件和必达事件来说,事件发布与订阅方法可能不在同一个线程

比如以下代码示例:

// MainActivity.kt

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    
    thread {
        // 在子线程中发布黏性事件
        EventBus.getDefault().postSticky("Event-1")
        Log.e(TAG, "onCreate: postSticky-1, ThreadName = ${Thread.currentThread().name}")

        runOnUiThread {
            // 在主线程中注册订阅者
            Log.e(TAG, "onCreate: register, ThreadName = ${Thread.currentThread().name}")
            EventBus.getDefault().register(this)

            // 再在主线程发布一个黏性事件
            Log.e(TAG, "onCreate: postSticky-2, ThreadName = ${Thread.currentThread().name}")
            EventBus.getDefault().postSticky("Event-2")
        }
    }
}

// 订阅方法 使用 POSTING 线程模型
@Subscribe(threadMode = ThreadMode.POSTING, sticky = true)
fun eventBusTest(event: String) {
    Log.e(TAG, "eventBusTest: event = $event, ThreadName = ${Thread.currentThread().name}")
}

// Logcat
>> onCreate: postSticky-1, ThreadName = Thread-290>> onCreate: register, ThreadName = main             ②
>> eventBusTest: event = Event-1, ThreadName = main  ③
>> onCreate: postSticky-2, ThreadName = main         ④
>> eventBusTest: event = Event-2, ThreadName = main  ⑤

在上述代码示例中,有一个使用 POSTING 线程模型的订阅方法,我们分析下示例代码:

  1. 我们先在子线程中发布一个黏性事件 Event-1
  2. 然后切换到主线程去注册订阅者,
  3. 最后在主线程再发布一个黏性事件 Event-2

接下来我们分析下 Logcat 的输出,一共 5 条日志输出,我们一一分析:

  1. 第一条日志是在子线程发布第一个黏性事件时输出的,标识事件发布是在子线程:Thread-290
  2. 第二条日志是在主线程中注册订阅者时输出的,标识注册订阅者是在主线程
  3. 第三条日志是订阅方法收到第一个黏性事件时输出的,标识订阅方法此时是在主线程
  4. 第四条日志是在主线程中发布第二个黏性事件时输出的,标识事件发布是在主线程
  5. 第五条日志是订阅方法收到第二个黏性事件时输出的,标识订阅方法此时是在主线程

我们根据 Logcat 的日志输出发现订阅方法收到 Event-1 黏性事件是在 main 线程,这与 POSTING 线程模型的描述不符,再往下看收到 Event-2 黏性事件是在 main 线程,与 POSTING 线程模型的描述符合。

以上发现与上一篇 Android-EventBus修改纪实open in new window 中黏性事件的发送流程分析吻合:

  1. 黏性事件发布时如果没有订阅者,那么黏性事件的发送将在订阅者 register 时发送
  2. 黏性事件发布时有订阅者,那么已有的订阅者会收到黏性事件,发布后注册的订阅者会在 register 时收到

最后得出结论:

  1. 谨慎把 POSTING 线程模型用于黏性事件和必达事件,此时 POSTING 线程模型将失效,订阅方法将在订阅者注册的线程中调用
  2. 对于普通事件,使用 POSTING 线程模型时,最好不用进行耗时操作,以免阻塞事件发布线程

MAIN和MAIN_ORDERED

MAINMAIN_ORDERED 线程模型都会在主线程分发事件,后者有个排序作用:

  1. MAIN 线程模型首先尝试直接在主线程分发事件(这可能会阻塞事件发布线程),如果不行再切换至主线程分发,
  2. MAIN_ORDERED 线程模型不管当前线程是否是主线程,始终会切换至主线程依次分发,这保证不会阻塞事件发布线程。

不管使用哪种线程模型都不应在订阅方法中进行耗时操作。

BACKGROUND

BACKGROUND 线程模式也需要注意普通事件与黏性事件和必达事件不同的分发逻辑:

  1. 如果是普通事件,事件将始终在 EventBus 的单个后台线程中依次分发,
  2. 如果是黏性事件或必达事件,事件可能不在 EventBus 的单个后台线程中依次分发,原因见 POSTING 线程模型。

ASYNC

ASYNC 线程模型都将异步的分发事件,在此线程模型下可以执行耗时操作,但也应该避免触发大量耗时操作,以限制并发线程数。

总结

本篇文章提供了一版在上一篇文章中必达事件未实现方法的实现,并对必达事件编写了单元测试,最后通过分析 EventBus 提供的 5 种线程模型,明白了 EventBus 是如何实现线程切换的。

EventBus 对外提供了 5 种线程模型,方便使用者做事件的线程调度,但无论使用哪种线程模型,在订阅方法中都应该尽量避免进行耗时操作。

不过我们最后没有分析线程切换的具体源码,下篇一定~~

happy~~,希望可以帮你更好的使用 EventBus