使用协程和 Flow 简化 API 设计

使用协程和 Flow 简化 API 设计

如果您是库作者,您也许希望用户在使用 Kotlin 协程与 Flow 时可以更加轻松地调用您基于 Java 或回调的 API。另外,如果您是 API 的使用者,则可能愿意将第三方 API 界面适配协程,以使它们对 Kotlin 更友好。

本文将会介绍如何使用协程和 Flow 简化 API,以及如何使用 suspendCancellableCoroutine 和 callbackFlow API 创建您自己的适配器。针对那些富有好奇心的读者,本文还会对这些 API 进行剖析,以帮您了解它们底层的工作原理。

如果您更喜欢观看视频,可以 点击这里

检查现有协程适配器

在您为现有 API 编写自己的封装之前,请检查是否已经存在针对您的用例的适配器或者 扩展方法。下面是一些包含常见类型协程适配器的库。

Future 类型

对于 future 类型,Java 8 集成了 CompletableFuture,而 Guava 集成了 ListenableFuture。这里提到的并不是全部,您可以在线搜索以确定是否存在适用于您的 future 类型的适配器。

// 等待 CompletionStage 的执行完成而不阻塞线程
suspend fun <T> CompletionStage<T>.await(): T 
 
// 等待 ListenableFuture 的执行完成而不阻塞线程
suspend fun <T> ListenableFuture<T>.await(): T

使用这些函数,您可以摆脱回调并挂起协程直到 future 的结果被返回。

Reactive Stream

对于响应式流的库,有针对 RxJavaJava 9 API响应式流库 的集成:

// 将给定的响应式 Publisher 转换为 Flow
fun <T : Any> Publisher<T>.asFlow(): Flow<T>

这些函数将响应式流转换为了 Flow。

Android 专用 API

对于 Jetpack 库或 Android 平台 API,您可以参阅 Jetpack KTX 库 列表。现有超过 20 个库拥有 KTX 版本,构成了您所熟悉的 Java API。其中包括 SharedPreferences、ViewModels、SQLite 以及 Play Core。

回调

回调是实现异步通讯时非常常见的做法。事实上,我们在 后台线程任务运行指南 中将回调作为 Java 编程语言的默认解决方案。然而,回调也有许多缺点: 这一设计会导致令人费解的回调嵌套。同时,由于没有简单的传播方式,错误处理也更加复杂。在 Kotlin 中,您可以简单地使用协程调用回调,但前提是您必须创建您自己的适配器。

创建您自己的适配器

如果没有找到适合您用例的适配器,更直接的做法是自己编写适配器。对于一次性异步调用,可以使用 suspendCancellableCoroutine API;而对于流数据,可以使用 callbackFlow API。

作为练习,下面的示例将会使用来自 Google Play Services 的 Fused Location Provider API 来获取位置数据。此 API 界面十分简单,但是它使用回调来执行异步操作。当逻辑变得复杂时,这些回调容易使代码变得不可读,而我们可以使用协程来摆脱它们。

如果您希望探索其它解决方案,可以通过上面函数所链接的源代码为您带来启发。

一次性异步调用

Fused Location Provider API 提供了 getLastLocation 方法来获得 最后已知位置。对于协程来说,理想的 API 是一个直接返回确切结果的挂起函数。

注意: 这一 API 返回值为 Task,并且已经有了对应的 适配器。出于学习的目的,我们用它作为范例。

我们可以通过为 FusedLocationProviderClient 创建扩展函数来获得更好的 API:

suspend fun FusedLocationProviderClient.awaitLastLocation(): Location

由于这是一个一次性异步操作,我们使用 suspendCancellableCoroutine 函数: 一个用于从协程库创建挂起函数的底层构建块。

suspendCancellableCoroutine 会执行作为参数传入的代码块,然后在等待继续信号期间挂起协程的执行。当协程 Continuation 对象中的 resumeresumeWithException 方法被调用时,协程会被恢复执行。有关 Continuation 的更多信息,请参阅: Kotlin Vocabulary | 揭秘协程中的 suspend 修饰符

我们使用可以添加到 getLastLocation 方法中的回调来在合适的时机恢复协程。参见下面的实现:

// FusedLocationProviderClient 的扩展函数,返回最后已知位置
suspend fun FusedLocationProviderClient.awaitLastLocation(): Location =

  // 创建新的可取消协程
  suspendCancellableCoroutine<Location> { continuation ->

    // 添加恢复协程执行的监听器
    lastLocation.addOnSuccessListener { location ->
      // 恢复协程并返回位置
      continuation.resume(location)
    }.addOnFailureListener { e ->
      // 通过抛出异常来恢复协程
      continuation.resumeWithException(e)
    }

    // suspendCancellableCoroutine 块的结尾。这里会挂起协程
    //直到某个回调调用了 continuation 参数
  }

注意: 尽管协程库中同样包含了不可取消版本的协程构建器 (即 suspendCoroutine),但最好始终选择使用 suspendCancellableCoroutine 处理协程作用域的取消及从底层 API 传播取消事件。

suspendCancellableCoroutine 原理

在内部,suspendCancellableCoroutine 使用 suspendCoroutineUninterceptedOrReturn 在挂起函数的协程中获得 Continuation。这一 Continuation 对象会被一个 CancellableContinuation 对象拦截,后者会从此时开始控制协程的生命周期 (其 实现 具有 Job 的功能,但是有一些限制)。

接下来,传递给 suspendCancellableCoroutine 的 lambda 表达式会被执行。如果该 lambda 返回了结果,则协程将立即恢复;否则协程将会在 CancellableContinuation 被 lambda 手动恢复前保持挂起状态。

您可以通过我在下面代码片段 (原版实现) 中的注释来了解发生了什么:

public suspend inline fun <T> suspendCancellableCoroutine(
  crossinline block: (CancellableContinuation<T>) -> Unit
): T =
  // 获取运行此挂起函数的协程的 Continuation 对象 
  suspendCoroutineUninterceptedOrReturn { uCont ->

    // 接管协程。Continuation 已经被拦截,
    // 接下来将会遵循 CancellableContinuationImpl 的生命周期
    val cancellable = CancellableContinuationImpl(uCont.intercepted(), ...)
    /* ... */
 
    // 使用可取消 Continuation 调用代码块
    block(cancellable)
        
    // 挂起协程并且等待 Continuation 在 “block” 中被恢复,或者在 “block” 结束执行时返回结果 
    cancellable.getResult()
  }

想了解更多有关挂起函数的工作原理,请参阅这篇: Kotlin Vocabulary | 揭秘协程中的 suspend 修饰符

流数据

如果我们转而希望用户的设备在真实的环境中移动时,周期性地接收位置更新 (使用 requestLocationUpdates 函数),我们就需要使用 Flow 来创建数据流。理想的 API 看起来应该像下面这样:

fun FusedLocationProviderClient.locationFlow(): Flow<Location>

为了将基于回调的 API 转换为 Flow,可以使用 callbackFlow 流构建器来创建新的 flow。callbackFlow 的 lambda 表达式的内部处于一个协程的上下文中,这意味着它可以调用挂起函数。不同于 flow 流构建器,channelFlow 可以在不同的 CoroutineContext 或协程之外使用 offer 方法发送数据。

通常情况下,使用 callbackFlow 构建流适配器遵循以下三个步骤:

  1. 创建使用 offer 向 flow 添加元素的回调;
  2. 注册回调;
  3. 等待消费者取消协程,并注销回调。

将上述步骤应用于当前用例,我们得到以下实现:

// 发送位置更新给消费者
fun FusedLocationProviderClient.locationFlow() = callbackFlow<Location> {
  // 创建了新的 Flow。这段代码会在协程中执行。
  // 1. 创建回调并向 flow 中添加元素
  val callback = object : LocationCallback() {
    override fun onLocationResult(result: LocationResult?) {
      result ?: return  // 忽略为空的结果
      for (location in result.locations) {
        try {
          offer(location)  // 将位置发送到 flow
        } catch (t: Throwable) {
          // 位置无法发送到 flow
        }
      }
    }
  }
  
  // 2. 注册回调并通过调用 requestLocationUpdates 获取位置更新。
  requestLocationUpdates(
    createLocationRequest(),
    callback,
    Looper.getMainLooper()
  ).addOnFailureListener { e ->
    close(e)  // 在出错时关闭 flow
  }
  
  // 3. 等待消费者取消协程并注销回调。这一过程会挂起协程,直到 Flow 被关闭。
  awaitClose {
    // 在这里清理代码
    removeLocationUpdates(callback)
  }
}

callbackFlow 内部原理

在内部,callbackFlow 使用了一个 channel。channel 在概念上很接近阻塞 队列 —— 它在配置时需要指定容量 (capacity): 即可以缓冲的元素个数。在 callbackFlow 中创建的 channel 默认容量是 64 个元素。如果将新元素添加到已满的 channel,由于 offer 不会将元素添加到 channel 中,并且会立即返回 false,所以 send 会暂停生产者,直到频道 channel 中有新元素的可用空间为止。

awaitClose 内部原理

有趣的是,awaitClose 内部使用的是 suspendCancellableCoroutine。您可以通过我在以下代码片段中的注释 (查看 原始实现) 一窥究竟:

public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
  ...
  try {
    // 使用可取消 continuation 挂起协程
    suspendCancellableCoroutine<Unit> { cont ->
      // 仅在 Flow 或 Channel 关闭时成功恢复协程,否则保持挂起
      invokeOnClose { cont.resume(Unit) }
    }
  } finally {
    // 总是会执行调用者的清理代码
    block()
  }
}

复用 Flow

除非额外使用中间操作符 (如: conflate),否则 Flow 是冷且惰性的。这意味着每次调用 flow 的终端操作符时,都会执行构建块。对于我们的用例来说,由于添加一个新的位置监听器开销很小,所以这一特性不会有什么大问题。然而对于另外的一些实现可就不一定了。

您可以使用 shareIn 中间操作符在多个收集器间复用同一个 flow,并使冷流成为热流。

val FusedLocationProviderClient.locationFlow() = callbackFlow<Location> {
  ...
}.shareIn(
  // 让 flow 跟随 applicationScope
  applicationScope,
  // 向新的收集器发送上一次发送的元素
  replay = 1,
  // 在有活跃的订阅者时,保持生产者的活跃状态
  started = SharingStarted.WhileSubscribed()
)

您可以通过文章《协程中的取消和异常 | 驻留任务详解》来了解更多有关在应用中使用 applicationScope 的最佳实践。

您应当考虑通过创建协程适配器使您的 API 或现存 API 简洁、易读且符合 Kotlin 的使用习惯。首先检查是否已经存在可用的适配器,如果没有,您可以使用 suspendCancellableCoroutine 针对一次性调用;或使用 callbackFlow 针对流数据,来创建您自己的适配器。

您可以通过 codelab: 创建 Kotlin 扩展库,来上手本文所介绍的话题。

版权声明

禁止一切形式的转载-禁止商用-禁止衍生 申请授权

脉脉不得语
脉脉不得语
Zhengzhou Website
Android Developer | https://androiddevtools.cn and https://androidweekly.io WebMaster | GDG Zhengzhou Funder & Ex Organizer | http://Toast.show(∞) Podcast Host

你已经成功订阅到 Android 开发技术周报
太棒了!接下来,完成检验以获得全部访问权限 Android 开发技术周报
欢迎回来!你已经成功登录了。
Unable to sign you in. Please try again.
成功!您的帐户已完全激活,您现在可以访问所有内容。
Error! Stripe checkout failed.
Success! Your billing info is updated.
Error! Billing info update failed.