Skip to content

协程

介绍

Kotlin 协程提供了一种全新处理并发的方式,你可以在 Android 平台上使用它来简化异步执行的代码。协程从 Kotlin 1.3 版本开始引入,但这一概念在编程世界诞生的黎明之际就有了,最早使用协程的编程语言可以追溯到 1967 年的 Simula 语言。在过去几年间,协程这个概念发展势头迅猛,现已经被诸多主流编程语言采用,比如 Javascript、C#、Python、Ruby 以及 Go 等。Kotlin 协程是基于来自其他语言的既定概念

Google 官方推荐将 Kotlin 协程作为在 Android 上进行异步编程的解决方案,值得关注的功能点包括:

  • 轻量:可以在单个线程上运行多个协程,因为协程支持挂起,不会使正在运行协程的线程阻塞。挂起比阻塞节省内存,且支持多个并行操作
  • 内存泄露更少:使用结构化并发机制在一个作用域内执行多个操作
  • 内置取消支持:取消功能会自动通过正在运行的协程层次结构传播
  • Jetpack 集成:许多 Jetpack 库都包含提供全面协程支持的扩展。某些库还提供自己的协程作用域,可供你用于结构化并发

协程的导入:

kotlin
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-android:1.5.2")

创建协程

kotlin
    GlobalScope.launch(context = Dispatchers.IO) {
        delay(1000)
        log("launch")
    }
    Thread.sleep(2000)
    log("end")

我们通过GlobalScope来启动了一个协程,在1000ms后输出,启动的协程是运行在协程内部的线程池中,类似于用单线程的形式实现了多线程,可以极大的提高线程的并发效率,避免以往的嵌套回调地狱,极大提高了代码的可读性。

suspend

像delay这种是一个挂起函数,只能有协程或者其他挂起函数来调用。delay() 函数就使用了 suspend 进行修饰,用 suspend 修饰的函数就是挂起函数

suspend挂起与恢复

协程在常规函数的基础上添加了两项操作用于处理长时间运行的任务,在invoke(或 call)和return之外,协程添加了suspendresume

  • suspend 用于暂停执行当前协程,并保存所有局部变量
  • resume 用于让已暂停的协程从暂停处继续执行

suspend 函数只能由其它 suspend 函数调用,或者是由协程来调用。如:

kotlin
suspend fun fetchDocs() {                             // Dispatchers.Main
    val result = get("https://developer.android.com") // Dispatchers.IO for `get`
    show(result)                                      // Dispatchers.Main
}

suspend fun get(url: String) = withContext(Dispatchers.IO) { /* ... */ }

在上面的示例中,get() 仍在主线程上被调用,但它会在启动网络请求之前暂停协程。get() 主体内通过调用 withContext(Dispatchers.IO) 创建了一个在 IO 线程池中运行的代码块,在该块内的任何代码都始终通过 IO 调度器执行。当网络请求完成后,get() 会恢复已暂停的协程,使得主线程协程可以直接拿到网络请求结果而不用使用回调来通知主线程。Retrofit 就是以这种方式来实现对协程的支持。

我们可以发现虽然看上去是普通的阻塞式的请求,但是协程能确保这个过程不会阻塞主线程。

CoroutineScope

CoroutineScope是协程作用域,用于对协程进行追踪。

所有的协程都需要通过 CoroutineScope 来启动,它会跟踪通过 launchasync 创建的所有协程,你可以随时调用 scope.cancel() 取消正在运行的协程。CoroutineScope 本身并不运行协程,它只是确保你不会失去对协程的追踪,即使协程被挂起也是如此。在 Android 中,某些 ktx 库为某些生命周期类提供了自己的 CoroutineScope,例如,ViewModel 有 viewModelScope,Lifecycle 有 lifecycleScope

GlobalScope

GlobalScope属于全局作用域,GlobalScope 启动的协程的生命周期只受整个应用程序的生命周期的限制,只要整个应用程序还在运行且协程的任务还未结束,协程就可以一直运行。

kotlin
    GlobalScope.launch {
        launch {
            delay(400)
            log("launch A")
        }
        launch {
            delay(300)
            log("launch B")
        }
        log("GlobalScope")
    }
    log("end")
    Thread.sleep(500)

GlobalScope.launch 会创建一个顶级协程,尽管它很轻量级,但在运行时还是会消耗一些内存资源,且可以一直运行直到整个应用程序停止(只要任务还未结束),这可能会导致内存泄露,所以在日常开发中应该谨慎使用 GlobalScope。

runBlocking

kotlin
fun main() {
    log("start")
    runBlocking {
        launch {
            repeat(3) {
                delay(100)
                log("launchA - $it")
            }
        }
        launch {
            repeat(3) {
                delay(100)
                log("launchB - $it")
            }
        }
        GlobalScope.launch {
            repeat(3) {
                delay(120)
                log("GlobalScope - $it")
            }
        }
    }
    log("end")
}

runBlocking的好处是:只有当内部相同作用域的所有协程都运行结束后,声明在 runBlocking 之后的代码才能执行,即 runBlocking 会阻塞其所在线程。

相当于,runBlocking会阻塞当前外部的线程,但内部还是非阻塞的。

coroutineScope

coroutineScope 函数用于创建一个独立的协程作用域,直到所有启动的协程都完成后才结束自身。runBlockingcoroutineScope 看起来很像,因为它们都需要等待其内部所有相同作用域的协程结束后才会结束自己。两者的主要区别在于 runBlocking 方法会阻塞当前线程,而 coroutineScope不会,而是会挂起并释放底层线程以供其它协程使用。基于这个差别,runBlocking 是一个普通函数,而 coroutineScope 是一个挂起函数。

supervisorScope

supervisorScope 函数用于创建一个使用了 SupervisorJob 的 coroutineScope,该作用域的特点就是抛出的异常不会连锁取消同级协程和父协程

自定义CoroutineScope

这部分用处不是很大,给出看到的参考。

假设我们在 Activity 中先后启动了多个协程用于执行异步耗时操作,那么当 Activity 退出时,必须取消所有协程以避免内存泄漏。我们可以通过保留每一个 Job 引用然后在 onDestroy方法里来手动取消,但这种方式相当来说会比较繁琐和低效。kotlinx.coroutines 提供了 CoroutineScope 来管理多个协程的生命周期

我们可以通过创建与 Activity 生命周期相关联的协程作用域来管理协程的生命周期。CoroutineScope 的实例可以通过 CoroutineScope()MainScope() 的工厂函数来构建。前者创建通用作用域,后者创建 UI 应用程序的作用域并使用 Dispatchers.Main 作为默认的调度器

kotlin
class Activity {

    private val mainScope = MainScope()

    fun onCreate() {
        mainScope.launch {
            repeat(5) {
                delay(1000L * it)
            }
        }
    }

    fun onDestroy() {
        mainScope.cancel()
    }

}

或者,我们可以通过委托模式来让 Activity 实现 CoroutineScope 接口,从而可以在 Activity 内直接启动协程而不必显示地指定它们的上下文,并且在 onDestroy()中自动取消所有协程

kotlin
class Activity : CoroutineScope by CoroutineScope(Dispatchers.Default) {

    fun onCreate() {
        launch {
            repeat(5) {
                delay(200L * it)
                log(it)
            }
        }
        log("Activity Created")
    }

    fun onDestroy() {
        cancel()
        log("Activity Destroyed")
    }

}

fun main() = runBlocking {
    val activity = Activity()
    activity.onCreate()
    delay(1000)
    activity.onDestroy()
    delay(1000)
}

从输出结果可以看出,当回调了onDestroy()方法后协程就不会再输出日志了

kotlin
[main] Activity Created
[DefaultDispatcher-worker-1] 0
[DefaultDispatcher-worker-1] 1
[DefaultDispatcher-worker-1] 2
[main] Activity Destroyed

已取消的作用域无法再创建协程。因此,仅当控制其生命周期的类被销毁时,才应调用 scope.cancel()。例如,使用 viewModelScope 时, ViewModel 会在自身的 onCleared() 方法中自动取消作用域。

CoroutineBuilder

launch

launch 是一个作用于 CoroutineScope 的扩展函数,用于在不阻塞当前线程的情况下启动一个协程,并返回对该协程任务的引用,即 Job 对象。

launch 函数共包含三个参数:

  1. context。用于指定协程的上下文
  2. start。用于指定协程的启动方式,默认值为 CoroutineStart.DEFAULT,即协程会在声明的同时就立即进入等待调度的状态,即可以立即执行的状态。可以通过将其设置为CoroutineStart.LAZY来实现延迟启动,即懒加载
  3. block。用于传递协程的执行体,即希望交由协程执行的任务

job

Job 是协程的句柄。使用 launchasync 创建的每个协程都会返回一个 Job 实例,该实例唯一标识协程并管理其生命周期。Job 是一个接口类型,这里列举 Job 几个比较有用的属性和函数

kotlin
//当 Job 处于活动状态时为 true
//如果 Job 未被取消或没有失败,则均处于 active 状态
public val isActive: Boolean

//当 Job 正常结束或者由于异常结束,均返回 true
public val isCompleted: Boolean

//当 Job 被主动取消或者由于异常结束,均返回 true
public val isCancelled: Boolean

//启动 Job
//如果此调用的确启动了 Job,则返回 true
//如果 Job 调用前就已处于 started 或者是 completed 状态,则返回 false 
public fun start(): Boolean

//用于取消 Job,可同时通过传入 Exception 来标明取消原因
public fun cancel(cause: CancellationException? = null)

//阻塞等待直到此 Job 结束运行
public suspend fun join()

//当 Job 结束运行时(不管由于什么原因)回调此方法,可用于接收可能存在的运行异常
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle

async

async也是一个作用于CoroutineScope 的扩展函数,可以返回协程的执行结果。

kotlin
fun main() {
    val time = measureTimeMillis {
        runBlocking {
            val asyncA = async {
                delay(3000)
                1
            }
            val asyncB = async {
                delay(4000)
                2
            }
            log(asyncA.await() + asyncB.await())
        }
    }
    log(time)
}

CoroutineContext

CoroutineContext 使用以下元素集定义协程的行为:

  • Job:控制协程的生命周期
  • CoroutineDispatcher:将任务指派给适当的线程
  • CoroutineName:协程的名称,可用于调试
  • CoroutineExceptionHandler:处理未捕获的异常

Job

可以通过Job控制CoroutineScope的生命周期,Job可以通过corountineContext[Job]获取到。

kotlin
val job = Job()

val scope = CoroutineScope(job + Dispatchers.IO)

fun main(): Unit = runBlocking {
    log("job is $job")
    val job = scope.launch {
        try {
            delay(3000)
        } catch (e: CancellationException) {
            log("job is cancelled")
            throw e
        }
        log("end")
    }
    delay(1000)
    log("scope job is ${scope.coroutineContext[Job]}")
    scope.coroutineContext[Job]?.cancel()
}

CoroutineDispatcher

CoroutineContext 包含一个 CoroutineDispatcher(协程调度器)用于指定执行协程的目标载体,即 运行于哪个线程。CoroutineDispatcher 可以将协程的执行操作限制在特定线程上,也可以将其分派到线程池中,或者让它无限制地运行。所有的协程构造器(如 launch 和 async)都接受一个可选参数,即 CoroutineContext ,该参数可用于显式指定要创建的协程和其它上下文元素所要使用的 CoroutineDispatcher。

主要有下面四个Dispatcher用于指定在哪一类线程中执行:

  • Dispatchers.Default。默认调度器,适合用于执行占用大量 CPU 资源的任务。例如:对列表排序和解析 JSON
  • Dispatchers.IO。适合用于执行磁盘或网络 I/O 的任务。例如:使用 Room 组件、读写磁盘文件,执行网络请求
  • Dispatchers.Unconfined。对执行协程的线程不做限制,可以直接在当前调度器所在线程上执行
  • Dispatchers.Main。使用此调度程序可用于在 Android 主线程上运行协程,只能用于与界面交互和执行快速工作,例如:更新 UI、调用 LiveData.setValue

withContext

对于以下代码,get方法内使用withContext(Dispatchers.IO) 创建了一个指定在 IO 线程池中运行的代码块,该区间内的任何代码都始终通过 IO 线程来执行。由于 withContext 方法本身就是一个挂起函数,因此 get 方法也必须定义为挂起函数。

kotlin
suspend fun fetchDocs() {                      // Dispatchers.Main
    val result = get("developer.android.com")  // Dispatchers.Main
    show(result)                               // Dispatchers.Main
}

suspend fun get(url: String) =                 // Dispatchers.Main
    withContext(Dispatchers.IO) {              // Dispatchers.IO (main-safety block)
        /* perform network IO here */          // Dispatchers.IO (main-safety block)
    }                                          // Dispatchers.Main
}

corountineName

CoroutineName 用于为协程指定一个名字,方便调试和定位问题。

Flow

基本使用

Flow能返回多个异步计算的值,如:

kotlin
        flow {
            for(i in 1..5) {
                delay(100)
                emit(i)
            }
        }.collect {
            println(it)
        }

简单理解,可以将collect()理解为Rxjava的subscribe(),emit对应onNext()。

创建Flow

flowOf()

kotlin
        flowOf(1,2,3,4,5) //== listOf(1,2,3,4,5).asFlow()
            .onEach {
                delay(100)
            }
            .collect {
                println(it)
            }

asFlow()

即将上面的flowOf()替换成listOf().asFlow()

channelFlow()

channelFlow()和普通的flow()区别是:

  • flow在没切换线程时,生产者和消费者是同步非阻塞的。
  • channel Flow实现了异步非阻塞。

切换线程

相比于RxJava,Flow切换线程十分简单,只需要使用flowOn(...)就可以了。如:

kotlin
        val time = measureTimeMillis {
            flow {
                for (i in 1..5) {
                    delay(100)
                    emit(i)
                }
            }.map {
                it * it
            }.flowOn(Dispatchers.IO)
                .collect {
                    delay(100)
                    println(it)
                }
        }
        print("cost $time")

Tips: 这里的flowOn是一个上游操作符,即上面代码flowOn影响的是flowBuilder和map,而collect仍执行在原线程。

取消flow

如果flow在挂起函数内被挂起了,则可以取消,否则不能取消。

flow的生命周期

flow有onStart,onCompletion来监听flow的创建和结束。

其他的更多的flow的知识可以去《kotlin进阶实战》上看。

例子:使用paging3实现分页加载

Jetpack库中,有一个paging3的分页加载库,能非常轻松简单的实现分页功能,且不需要像之前那样自己来写rv的监听事件。

官方推荐使用flow和协程来完成。

先导入依赖:

kotlin
dependencies {
    ...
    implementation ("androidx.paging:paging-runtime:3.3.6")
    implementation ("com.squareup.retrofit2:retrofit:2.9.0")
    implementation ("com.squareup.retrofit2:converter-gson:2.9.0")
}

然后建立网络请求的数据类:

kotlin
data class Repo(
    @SerializedName("id") val id: Int,
    @SerializedName("name") val name: String,
    @SerializedName("description") val description: String?,
    @SerializedName("stargazers_count") val starCount: Int
)

data class RepoResponse(
    @SerializedName("items") val items: List<Repo> = emptyList()
)

然后定义请求接口:

kotlin
interface GithubService {

    @GET("search/repositories?sort=stars&q=Android")
    suspend fun searchRepos(@Query("page") page: Int, @Query("per_page") perPage: Int) : RepoResponse

    companion object {
        private const val BASE_URL = "https://api.github.com/"

        fun create(): GithubService {
            return Retrofit.Builder()
                .baseUrl(BASE_URL)
                .addConverterFactory(GsonConverterFactory.create())
                .build()
                .create(GithubService::class.java)
        }
    }
}

这里在实际中应该分开,把retrofit实例的创建单独写成一个工具类,与接口分开。

然后我们开始实现paging3的分页功能。

首先最重要的组件就是 PagingSource,我们需要自定义一个子类去继承 PagingSource,然后重写 load() 函数,并在这里提供对应当前页数的数据。

kotlin
class RepoPagingSource(private val githubService: GithubService) : PagingSource<Int, Repo>() {
    override fun getRefreshKey(state: PagingState<Int, Repo>): Int? = null

    override suspend fun load(params: LoadParams<Int>): LoadResult<Int, Repo> {
        return try {
            val page = params.key ?: 1
            val pageSize = params.loadSize
            val repoResponse = githubService.searchRepos(page, pageSize)
            val repoItems = repoResponse.items
            val preKey = if(page > 1) page - 1 else null
            val nextKey = if(repoItems.isNotEmpty()) page + 1 else null
            LoadResult.Page(repoItems, preKey, nextKey)
        } catch (e: Exception) {
            LoadResult.Error(e)
        }
    }
}

在继承 PagingSource 时需要声明两个泛型类型,第一个类型表示页数的数据类型,我们使用整型。第二个类型表示每一项数据(注意不是每一页)所对应的对象类型,这里使用刚才定义的 Repo。

然后在 load() 函数当中,先通过 params 参数得到 key,这个 key 就是代表着当前的页数。注意 key 是可能为 null 的,如果为 null 的话,我们就默认将当前页数设置为第一页。另外还可以通过 params 参数得到 loadSize,表示每一页包含多少条数据,这个数据的大小我们可以在稍后设置。

接下来调用刚才在 GitHubService 中定义的 searchRepos() 接口,并把 page 和 pageSize 传入,从服务器获取当前页所对应的数据。

最后需要调用 LoadResult.Page() 函数,构建一个 LoadResult 对象并返回。注意 LoadResult.Page() 函数接收 3 个参数,第一个参数传入从响应数据解析出来的 Repo 列表即可,第二和第三个参数分别对应着上一页和下一页的页数。针对于上一页和下一页,我们还额外做了个判断,如果当前页已经是第一页或最后一页,那么它的上一页或下一页就为 null。

然后有个getRefreshKey的方法,我们目前不用管他。

然后就是MVVM架构中,应该需要一个Repository层来返回flow,这里直接省略了,放在了ViewModel层。

kotlin
class MainViewModel : ViewModel() {

    private val _pagingDataLiveData: MutableLiveData<PagingData<Repo>> = MutableLiveData()

    val pagingDataLiveData : LiveData<PagingData<Repo>> get() = _pagingDataLiveData

    fun getPagingData() {
        viewModelScope.launch {
            val flow =  Pager(config = PagingConfig(50), pagingSourceFactory = {RepoPagingSource(
                GithubService.create()
            )}).flow
            flow.collect { pagingData ->
                _pagingDataLiveData.postValue(pagingData)
            }
        }
    }
}

这里使用的是ViewModel ktx中提供的一个viewModelScope,用于在 ViewModel 中启动协程,该作用域的生命周期和 ViewModel 相等,当 ViewModel 回调了 onCleared()方法时会自动取消该作用域,即不需要我们自己去管理协程。

kotlin
implementation("androidx.lifecycle:lifecycle-viewmodel-ktx:2.4.0")

然后将flow收集到的数据通过livedata传递给activity,这样我们就能在activity中获取pagingData。

然后我们还需要创建一个PagingDataAdapter作为Rv的Adapter:

kotlin
class RepoAdapter : PagingDataAdapter<Repo, RepoAdapter.ViewHolder>(object : DiffUtil.ItemCallback<Repo>() {
    override fun areContentsTheSame(oldItem: Repo, newItem: Repo): Boolean {
        return oldItem == newItem
    }

    override fun areItemsTheSame(oldItem: Repo, newItem: Repo): Boolean {
        return oldItem.id == newItem.id
    }

}){

    inner class ViewHolder(view: View) : RecyclerView.ViewHolder(view) {

    }

    override fun onBindViewHolder(holder: ViewHolder, position: Int) {

    }

    override fun onCreateViewHolder(parent: ViewGroup, viewType: Int): ViewHolder {

    }
}

可以发现代码写法与ListAdapter类似,不多赘述。

最后在activity中只需要通过一行代码提交pagingData即可:

kotlin
adapter.submitData(pagingData)

这样就能通过paging3+flow+协程实现分页加载了。