在協同程式中,「資料流」是一種可依序發出多個值的類型,而不是僅傳回單一值的「暫停函式」。例如,您可以使用資料流從資料庫接收即時更新。
資料流建構於協同程式之上,可提供多個值。從概念來看,資料流是可透過非同步方式計算的「資料串流」。發出的值必須屬於相同類型。例如,Flow<Int> 是發出整數值的資料流。
資料流與產生值序列的 Iterator 非常類似,但會使用暫停函式,以非同步方式產生和取用值。這意味著,資料流可以安全地發出網路要求,並在不封鎖主執行緒的情況下產生下一個值。
資料串流涉及三個實體:
- 生產端會產生加入串流的資料。藉助協同程式,資料流也可以非同步產生資料。
- (不一定存在) 中繼端可修改發至串流的值或串流本身。
- 取用端會從串流取用值。
在 Android 中,「存放區」通常是使用者介面資料的生產端,並將使用者介面 (UI) 做為最終顯示資料的取用端。在其他時候,使用者介面層是使用者輸入內容事件的生產端,而階層的其他層會取用這些事件。生產端和取用端之間的層,通常是中繼端。這些中繼端可以修改資料串流,使其符合下一層的要求。
建立資料流
如要建立資料流,請使用資料流建構工具 API。flow 建構工具函式會建立新的資料流。在該資料流中,您可以使用 emit 函式,手動將新值發至資料串流內。
在以下範例中,資料來源會以固定的時間間隔自動擷取最新消息。由於暫停函式無法傳回多個連續的值,所以資料來源會建立並傳回資料流,以滿足此要求。在此情況下,資料來源會做為生產端。
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
系統會在協同程式中執行 flow 建構工具。因此,它會受益於相同的非同步 API,但會受到一些限制:
- 資料流會「依序」。由於生產端位於協同程式內,所以呼叫暫停函式時,生產端會暫停,直到暫停函式傳回為止。在這個範例中,生產端會暫停,直到
fetchLatestNews網路請求完成為止。也只有到那時,結果才會發至串流。 - 在使用
flow建構工具時,生產端不能從其他CoroutineContext發出emit值。因此,請勿透過建立新的協同程式,或使用withContext程式碼區塊,呼叫另一個CoroutineContext中的emit。在這些情況下,您可以使用其他資料流建構工具,例如callbackFlow。
修改串流
中繼端可以使用「中繼運算子」修改資料串流,而不必取用值。這些運算子是函式,在套用至資料串流時,會設定一連串在未來取用值時才執行的運算子。如要進一步瞭解中繼運算子,請參閱資料流參考說明文件。
在以下範例中,存放區層使用中繼運算子 map 轉換將在 View 上顯示的資料:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
項目發至資料流時,中繼運算子可以依序套用,串連成延遲執行的運算子。請注意,如果只將中繼運算子套用至串流,則不會啟動資料流收集作業。
從資料流收集
使用「終端運算子」觸發資料流,開始監聽值。如要在發出值時取得串流中的所有值,請使用 collect。如要進一步瞭解終端運算子,請參閱官方資料流說明文件。
由於 collect 是暫停函式,所以需要在協同程式中執行。該函式將 lambda 視為在每個新值上呼叫的參數。由於它是暫停函式,呼叫 collect 的協同程式可能會暫停,直到資料流關閉為止。
延續上一個範例,以下說明如何簡單實作從存放區層取用資料的 ViewModel:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
收集資料流,會觸發生產端重新整理最新消息,並於固定的時間間隔發出網路請求結果。由於生產端持續使用 while(true) 迴圈,當清除 ViewModel 和取消 viewModelScope 時,資料串流將會關閉。
由於以下原因,系統可能會停止收集資料流:
- 取消了收集資料流的協同程式,如上一個範例所示。這樣做也會停止基礎生產端。
- 生產端已發完所有項目。在此情況中,資料串流會關閉,而
collect協同程式會繼續執行。
除非指定與其他中繼運算子一起使用,否則資料流會處於「冷」和「延遲」狀態。這意味著,每次在資料流中呼叫終端運算子時,都會執行生產端程式碼。在上一個範例中,由於設定了多個資料流收集器,所以資料來源多次以不同的固定時間間隔擷取最新消息。如果有多個取用端同時收集資料流,為對資料流進行最佳化調整,並共用資料流,請使用 shareIn 運算子。
擷取非預期的例外狀況
第三方程式庫可能會實作生產端。這意味著,此做法可能會擲回非預期的例外狀況。如要處理這些例外狀況,請使用 catch 中繼運算子。
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
在上一個範例中,在發生例外狀況時,由於尚未收到新項目,所以沒有呼叫 collect lambda。
catch 也可以向資料流執行對項目的 emit 作業。範例存放區層可以emit快取值:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
在此範例中,在發生例外狀況時,由於出現例外狀況,系統向串流發出新項目,所以會呼叫 collect lambda。
在另一個 CoroutineContext 中執行
根據預設,flow 建構工具的生產端會在協同程式 (從該生產端收集資料流) 的 CoroutineContext 中執行,且如前所述,它不能從另一個 CoroutineContext 執行 emit 作業。在某些情況下,這樣的行為未必是您想要的。例如,在本主題使用的範例中,存放區層不應在 viewModelScope 使用的 Dispatchers.Main 上執行作業。
如要變更資料流的 CoroutineContext,請使用中繼運算子 flowOn。flowOn 會變更「上游資料流」的 CoroutineContext,也就是在 flowOn「之前」(或之上)的生產端和任何中繼運算子。「下游資料流」(flowOn 「之後」的中繼運算子,伴隨取用端) 不會受到影響,並會在從資料流執行 collect 作業的 CoroutineContext 執行。如果有多個 flowOn 運算子,每個運算子都會從其目前位置變更上游。
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
藉助這個程式碼,onEach 和 map 運算子會使用 defaultDispatcher,而 catch 運算子和取用端會在 viewModelScope 使用的 Dispatchers.Main 上執行。
由於資料來源層執行 I/O 工作,建議您使用針對 I/O 作業進行最佳化調整的調派程式:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
Jetpack 程式庫內的資料流
資料流已整合至許多 Jetpack 程式庫中,在 Android 第三方程式庫中也廣受歡迎。資料流非常適合即時資料更新和無限資料串流。
您可以使用 Flow with Room,接收資料庫變更的通知。使用資料存取物件 (DAO) 時,傳回 Flow 類型,即可取得即時更新。
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
每當 Example 資料表發生變更時,都會發出包含資料庫中新項目的新清單。
將回呼式 API 轉換為資料流
callbackFlow 是一個資料流建構工具,可將回呼式 API 轉換為資料流。例如,Firebase Firestore Android API 就使用了回呼。
如要將這些 API 轉換為資料流,並監聽 Firestore 資料庫更新,可以使用下方的程式碼:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
offer(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
與 flow 建構工具不同,callbackFlow 可讓您從包含 send 函式的另一個 CoroutineContext 發出值,或在包含 trySend 函式的協同程式以外的位置發出值。
callbackFlow 在內部使用管道。從概念來看,管道與封鎖的佇列非常類似。管道設定了容量,也就是可緩衝的元素數量上限。在 callbackFlow 中建立的管道,其預設容量為 64 個元素。當您嘗試將新元素新增至飽和的管道時,send 會暫停生產端,直到有空間存放新元素為止,同時 offer 不會將元素新增到管道中,並立即傳回 false。