跳至主要內容

协作式与结构化

guodongAndroid大约 8 分钟

上一章的代码示例写得非常好,它们的目的是解释如何并行地为多个值赋值,并在它们准备好后将它们传递给函数。另一方面,如果出现未知情况或异步代码块中的代码有未覆盖的逻辑,这将会阻塞线程并可能使整个系统无法正常工作。比如,你的异步代码块中包含 while 循环,但是循环中的条件没有包含 跳出策略,你的函数将永远不会返回值。另外,如果你在函数中执行繁重或耗时的操作,那么在函数执行的任何时候应该可以取消执行。否则,你可能会取消它们的父 Job,但不会取消 Job 自身。最后你的代码仍在运行并可能耗尽资源,即使它的 Job 已被取消。

这就是为什么你的代码应该始终是 协作式的

在发布协程后,Jetbrains 开始围绕协程构建各种用例和库。Jetbrains 随后发布了一篇文章,说明了一些协程的细节,解释了人们对协程的最初示例和想法为何并非 100% 正确。以前面使用过的函数的扩展为例:

private suspend fun getUserByIdFromNetwork(userId: Int) =
	GlobalScope.async {
    	println("Retrieving user from network")
        
    	delay(3000)
    	println("Still in the coroutine")
        
    	User(userId, "Filip", "Babic") // we simulate the network call
	}

如果你调用这个函数,那么它将在 3 秒后返回一个 User 对象。由于它是一个挂起函数,在调用它时需要将它放在协程构造器中,比如:

fun main() {
    GlobalScope.launch {
        val deferredUser = getUserByIdFromNetwork(130)
        println(deferredUser.await())
    }
}

但是,如果你在 launch 函数返回的 Job 执行后取消它,这将会发生什么呢?

getUserByIdFromNetwork() 函数仍然会执行 3 秒并随后返回一个 User 对象,即使它的父 Job 已经取消了。这会导致时间和资源的浪费,而这些时间和资源本来可以利用在其他地方。

这就是为什么 Jetbrains 提出了 结构化并发协作式代码 的思想。这个思想指导我们编写的代码可以反映其调用者状态,并构建依赖于其父级状态的协程。简单来说,如果父 Job 被取消,它的子 Job 也应该被取消。默认情况下,这种行为确实存在于协程 API 中。

但是你仍然可以编写不遵循上诉行为的代码。如果你将上面的代码片段调整为以下内容,例如:

fun main() {
    val launch = GlobalScope.launch {
        val dataDeferred = getUserByIdFromNetwork(1312)
        println("Not cancelled")
        // do something with the data
        
        val data = dataDeferred.await()
        println(data)
    }
    
    Thread.sleep(50)
    launch.cancel()
    
    while (true) { // stops the program from finishing
    }
}

你会看到 getUserByIdFromNetwork() 会完整的执行,即使你已经取消了调用它的协程。这是因为上面的代码不是协作式的,并且不允许代码在需要时挂起和取消。它不关心调用方或其父级的作用域和上下文。构建此函数的正确方法如下:

private suspend fun getUserByIdFromNetwork(
    userId: Int,
    parentScope: CoroutineScope
) = parentScope.async {
   	if (!isActive) {
   	    return@async User(0, "", "")
   	}
   	
   	println("Retrieving user from network")
   	delay(3000)
   	
   	User(userId, "Filip", "Babic") // we simulate the network call
}

该函数现在接收父 CoroutineScope 的实例作为参数,并且使用它启动一个异步代码块。此外,它会检查来自父 JobisActive,因为如果父 Job 被取消,它就没有必要再继续执行。如果你再次启动上面的 main 并传入父作用域,代码只会打印出 Retrieving user from networkNot Cancelled,之后它不会继续等待三秒钟以将值返回,因为它已经终止了。

最后,还可以优化之前用于组合两个值的代码。修改之前的示例:

GlobalScope.launch {
    val userDeferred = getUserByIdFromNetwork(userId)
    val usersFromFileDeferred = readUsersFromFile("users.txt")
    
    println("Finding user")
    val userStoredInFile = checkUserExists(
        userDeferred.await(), usersFromFileDeferred.await()
    )
    
    if (userStoredInFile) {
        println("Found user in file!")
    }
}

你可能会看到和上面一样的问题。你提供了两个延迟值并在下面使用它们,但是没有检查 Job 的当前状态。因为它是由全局作用域启动的,所以除非你将返回的 Job 存储在某个地方,以便稍后可以取消它,否则你将会创建一个 fire-and-forget(即发即弃,或者说游离,就像Git中的游离头指针) 函数,这可能会占用宝贵的资源。你可能已经发现了一个规律。你应该始终可以把控你启动的 Job 和启动它们的协程作用域。协程作用域将协程绑定到对象的生命周期。这就是为什么你不应该广泛地使用 GlobalScope,而是提供自定义的作用域。

但是,如何确保用于启动协程的 CoroutineScope 是正确的?答案就是通过自己实现 CoroutineScope 接口!

协作式代码和结构化并发指南指出,应该将协程限制在具有明确定义的生命周期的对象中,比如 Android 应用程序环境中的 Activity。所以让我们在一个类中实现 CoroutineScope。创建一个名为 CustomScope.kt 的 Kotlin 类,然后在其中编写如下代码:

import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext

class CustomScope : CoroutineScope {
    private var parentJob = Job()
    
    override val coroutineContext: CoroutineContext
    	get() = Dispatchers.Main + parentJob
}

一旦你实现了这个接口,你就必须提供一个 CoroutineContext,这样协程在运行时就有一个上下文。建议的实现是使用默认线程的 Dispatcher 和绑定协程生命周期的 Job 的一个组合。这是一个不错的默认实现,但还可以再优化一下。在类中添加以下函数:

fun onStart() {
    parentJob = Job()
}

fun onStop() {
    parentJob.cancel()
    // You can also cancel the whole scope
    // with `cancel(cause: CancellationException)`
}

通过添加上述函数,你可以停止和取消所有以此作用域启动的协程。通过启动它,你还可以创建一个依赖其生命周期和尽可能使用 isActive 检查的新 Job。一旦你调用了 onStop(),作用域内所有的协程都将被取消,并且如果你的协程是协作式的,它们将不再占用任何资源。现在,如果你想使用新的作用域启动协程,可以使用下面的代码:

fun main() {
    val scope = CustomScope()
    
    scope.launch {
        println("Launching in custom scope")
    }
    
    scope.onStop() //cancels all the coroutines
}

代码相当优雅简洁!让我们将其应用于之前在文件中查找 User 的示例,对 main 应用以下更改:

fun main() {
    val scope = CustomScope()
    val userId = 992
    
    scope.launch { // launching using the scope
        println("Finding user")
        
        val userDeferred = getUserByIdFromNetwork(userId, scope) // pass in scope
        val usersFromFileDeferred = readUsersFromFile("users.txt", scope) // pass in scope
        
        val userStoredInFile = checkUserExists(
            userDeferred.await(), usersFromFileDeferred.await()
        )
        
        if (userStoredInFile) {
            println("Found user in file!")
        }
    }
    
	/**
	 * Cancels all the coroutines. You should only see "Finding user" printed out before the program ends.
	 */
    scope.onStop()
}

正如你所见,你将使用你创建的 scope 来使代码响应取消事件,而不是使用 GlobalScope。现在对两个数据获取函数也做下同样的修改:

private suspend fun getUserByIdFromNetwork(
    userId: Int,
    scope: CoroutineScope // Expose the scope as a parameter
) = scope.async { // Apply the scope
    // The rest of the code...
}

// Apply the same changes
private fun readUsersFromFile(
    filePath: String,
    scope: CoroutineScope
) = scope.async {
    // The rest of the code...
}

在上面的代码中,你只是将作用域作为参数对外暴露,并且使用它来启动获取数据的操作。重新编译和运行上述代码,将得到以下输出结果:

Finding user

太棒了。因为你使用的是可以手动取消的自定义作用域,所以通过它启动的协程,你不必担心在取消后还有任何操作在运行。

上述代码是优化后的,因为它会在你对 CoroutineScope 调用取消后尽快关闭协程。通常,这意味着它将在第一个挂起点或第一次检查 isActive 时取消协程。

你要解决的第二个问题是可能永远不会返回值的异步代码块。假设有以下代码:

fun <T> produceValue(scope: CoroutineScope): Deferred<T> =
	scope.async {
	    var data: T? = null
        
	    requestData<T> { value ->
	    	data = value
	    }
        
	    while (data == null) {
	        // dummy loop to keep the function alive
	    }
        
	    data!!
	}

这个函数允许你通过 async 包装一个有回调的现有函数来生成一个任意类型的值。requestData 的实现并不重要,你只要认为它可以获取任何类型的数据并提供回调将数据传递过来即可。

如果没有意外发生,这个函数将工作的很好。但是,如果在下面的调用代码中,回调没有被触发:

fun main() {
    GlobalScope.launch(context = Dispatchers.Main) {
        val data = produceValue<Int>(this)
        data.await()
    }
}

上述代码最终可能会无限期地挂起代码块,阻塞函数。即使你取消了从 launch 返回的 Job,如果使用 UnconfinedMain 调度程序调用 await,可能会导致系统和用户界面冻结。这就是为什么你应该再次将父 JobisActive 作为关键条件跳出策略。如果把代码修改为下面这样:

fun <T> produceValue(scope: CoroutineScope): Deferred<T> =
	scope.async {
	    var data: T? = null
        
	    requestData<T> { value ->
	    	data = value
	    }
        
	    while (data == null && scope.isActive) {
	        // loop for data, while the scope is active
	    }
        
	    data!!
	}

你不仅依赖内部函数条件,还依赖 scope.isActive,从而允许你取消最外层的协程和异步代码块。请注意,Jetbrains 团队已经添加了更好的协程取消功能,因此 UI 的冻结不应该发生,但安全总比后悔好。但是,取消有时无法传播。如果你的代码没有创建一个可以取消的挂起点,那么你只是在调用常规的阻塞代码,你仍然会阻塞协程所在的线程。

所以,请编写 协作式 的协程代码。

提示

注意:这段代码只是距离,并不能在项目中使用,因为 requestDataproduceValue 是抽象的,并且取决于你在应用程序中获取数据的方式。它可以使用数据库、后端、文件系统或其他信息源来提供用户所需的数据。

但它仍然是一个很好的例子,用来说明基于重试或循环的代码是如何协作的,并且可以依赖 CoroutineScope 来知道何时需要取消它。