في الكوروتينات، نوع التدفق هو نوع يمكنه إصدار قيم متعددة بالتتابع، على عكس دوال التعليق التي تعرض فقط قيمة واحدة. على سبيل المثال، يمكنك استخدام مسار لتلقّي محتوى تحديثات من قاعدة بيانات.
تعتمد التدفقات على الكوروتينات ويمكن أن توفر قيمًا متعددة.
التدفق من الناحية النظرية هو مصدر بيانات يمكن احتسابه.
بشكل غير متزامن. يجب أن تكون القيم المنبعثة من النوع نفسه. بالنسبة
Flow<Int> هو تدفق يُصدر عددًا صحيحًا.
يتشابه التدفق إلى حد كبير مع Iterator الذي يُنتج سلسلة من
ولكنها تستخدم دوال التعليق لإنتاج القيم واستهلاكها
بشكل غير متزامن. وهذا يعني، على سبيل المثال، أن التدفق يمكن أن يجعل
لإنتاج القيمة التالية دون حظر الصفحة الرئيسية
.
هناك ثلاثة كيانات متضمنة في تدفقات البيانات:
- ينتج المنتج البيانات التي تتم إضافتها إلى مصدر البيانات. بفضل الكوروتينات، يمكن أن تؤدي التدفقات أيضًا إلى إنتاج البيانات بشكل غير متزامن.
- (اختياري) يمكن للوسطاء تعديل كل قيمة يتم إطلاقها في البث أو البث نفسه
- يستهلك المستهلك القيم من مصدر البيانات.
في نظام Android، يتم إنشاء المستودع منتج عادةً لبيانات واجهة المستخدم التي تحتوي على واجهة المستخدم (UI) تعرض البيانات في النهاية. في أحيان أخرى، تكون طبقة واجهة المستخدم هي منتج وتستهلكها الأحداث التي يُدخلها المستخدم والطبقات الأخرى في التسلسل الهرمي. طبقات في بين المنتج والمستهلك عادةً كوسطاء يقومون بتعديل تدفق البيانات لضبطه وفقًا لمتطلبات الطبقة التالية.
إنشاء تدفق
لإنشاء تدفقات، استخدم
أداة إنشاء التدفق
واجهات برمجة التطبيقات. تنشئ دالة إنشاء flow مسارًا جديدًا يمكنك فيه يدويًا
تنبعث قيمًا جديدة في تدفق البيانات باستخدام
emit
الأخرى.
في المثال التالي، يسترجع مصدر البيانات آخر الأخبار تلقائيًا على فترات زمنية ثابتة. كدالة تعليق، لا يمكن بإرجاع عدة قيم متتالية، فإن مصدر البيانات ينشئ ويرجع تدفقًا لتحقيق هذا المطلب. في هذه الحالة، يعمل مصدر البيانات كمنتج.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
يتم تنفيذ أداة إنشاء flow داخل الكوروتين. وبالتالي، فإنه يستفيد
من واجهات برمجة التطبيقات غير المتزامنة نفسها، ولكن تنطبق بعض القيود:
- تكون التدفقات متسلسلة. بما أن المنتِج موجود في الكوروتين، فعند استدعاء
دالة التعليق، يتم تعليق المنتج حتى دالة التعليق
وإرجاعه. في المثال، يعلّق المنتج المنتج حتى
fetchLatestNewsيكتمل طلب الشبكة. وحينئذٍ فقط يتم إصدار النتيجة إلى ساحة المشاركات. - باستخدام أداة إنشاء
flow، لا يمكن للناتجemitقيمة من مختلفةCoroutineContext. لذلك، لا تستدعيemitبعدCoroutineContextعن طريق إنشاء كوروتين جديد أو باستخدامwithContextمجموعات الرموز البرمجية. يمكنك استخدام منصات إنشاء التدفق الأخرى مثلcallbackFlowفي هذه الحالات.
تعديل ساحة المشاركات
يمكن للوسطاء استخدام عوامل تشغيل متوسطة لتعديل تدفق البيانات دون استهلاك القيم. هذه العوامل هي دوال، عندما تطبيقها على تدفق من البيانات، وإعداد سلسلة من العمليات غير حتى يتم استهلاك القيم في المستقبل. مزيد من المعلومات حول المشغلات الوسيطة في المستندات المرجعية للتدفق
في المثال أدناه، تستخدم طبقة المستودع المشغل المتوسط
map
لتحويل البيانات التي سيتم عرضها على View:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
يمكن تطبيق العوامل المتوسطة واحدًا تلو الآخر، لتكوين سلسلة العمليات التي يتم تنفيذها ببطء عند إطلاق عنصر إلى التدفق. تجدر الإشارة إلى أنّ تطبيق عامل تشغيل متوسط على البث المباشر عدم بدء جمع التدفق.
الجمع من التدفق
استخدِم عامل تشغيل طرفي لتشغيل المسار لبدء الاستماع
القيم. للحصول على جميع القيم في ساحة المشاركات بالشكل المنبعث، استخدم
collect
يمكنك التعرف على مزيد من المعلومات حول مشغلي الوحدات الطرفية في
المستندات الرسمية حول التدفق
بما أنّ collect هي دالة تعليق، يجب تنفيذها ضمن
كوروتين. تستخدم lambda كمعلمة يتم استدعاؤها على
كل قيمة جديدة. ونظرًا لأنها دالة تعليق، فإن الكوروتين
قد يتم تعليق المكالمات collect حتى يتم إغلاق التدفق.
استكمالاً للمثال السابق، إليك تنفيذًا بسيطًا
ViewModel يستخدم البيانات من طبقة المستودع:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
عملية جمع المعلومات تحفّز المنتج على تحديث آخر الأخبار
وينتج عن طلب الشبكة على فاصل ثابت. نظرًا لأن العمود
يبقى المنتج نشطًا دائمًا في حلقة while(true)، ويمثّل البث
من البيانات سيتم إغلاقها عند محو ViewModel
تم إلغاء viewModelScope.
يمكن أن يتوقف جمع البيانات للأسباب التالية:
- يتم إلغاء الكوروتين الذي يجمع، كما هو موضّح في المثال السابق. يؤدي هذا أيضًا إلى إيقاف المنتج الأساسي.
- ينتهي المنتج من إطلاق العناصر. في هذه الحالة، فإن تدفق البيانات
مغلقة ويستأنف الكوروتين الذي يُسمى
collectالتنفيذ.
التدفقات باردة وكسولة ما لم يتم تحديدها مع وسيط آخر
والمشغلات. يعني هذا أنه يتم تنفيذ رمز المنتج في كل مرة
المشغل الطرفي على التدفق. في المثال السابق،
فإن وجود عدة جامعات للتدفقات يؤدي إلى قيام مصدر البيانات بجلب
آخر الأخبار عدة مرات على فترات زمنية ثابتة مختلفة. لتحسين
في التدفق عندما يجمع عدة مستهلكين في الوقت نفسه،
shareIn.
رصد الاستثناءات غير المتوقعة
يمكن أن تأتي عملية تنفيذ المنتج من مكتبة تابعة لجهة خارجية.
وهذا يعني أنه قد ينتج عنه استثناءات غير متوقعة. للتعامل مع هذه الأمور
باستثناءات، يمكنك استخدام
catch
الوسيط المتوسط.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
في المثال السابق، عند حدوث استثناء، لا يتم تعديل السمة collect
لم يتم استدعاء lambda، نظرًا لعدم استلام عنصر جديد.
يمكن لـ "catch" أيضًا إضافة emit عنصر إلى المسار. مثال على المستودع
يمكن أن emit القيم المخزنة مؤقتًا بدلاً من ذلك:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
في هذا المثال، عند حدوث استثناء، تكون دالة lambda collect هي
لأنّه تم إطلاق عنصر جديد إلى البث بسبب
.
التنفيذ في سياق CoroutineContext مختلف
بشكل تلقائي، ينفذ منتج أداة إنشاء flow في
CoroutineContext من الكوروتين الذي يجمع منه، وكذلك
التي ذكرناها سابقًا، لا يمكنها emit من القيم
CoroutineContext وقد يكون هذا السلوك غير مرغوب فيه في بعض الحالات.
فعلى سبيل المثال، في الأمثلة المستخدمة خلال هذا الموضوع، يمثل المستودع
طبقة معينة من المفترض ألا تقوم بإجراء عمليات على Dispatchers.Main والتي
ويستخدمه viewModelScope.
لتغيير CoroutineContext للتدفق، استخدم عامل التشغيل الوسيط
flowOn
يغيِّر flowOn CoroutineContext في التدفّق الرئيسي، ما يعني
الشركة المصنّعة وأي شركات تشغيل وسيطة تم تطبيقها قبل (أو أعلى)
flowOn تدفق التدفق (عوامل التشغيل الوسيطة بعد flowOn
مع المستهلك) لا تتأثر ويتم تنفيذه في
تم استخدام CoroutineContext على collect من المسار. إذا كانت هناك
عدة عوامل تشغيل flowOn، يؤدي كل عامل منها إلى تغيير الجزء الرئيسي من الصفحة
موقعك الجغرافي الحالي.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
ومن خلال هذا الرمز، يستخدم العاملان onEach وmap العاملَين defaultDispatcher
بينما يتم تنفيذ عامل التشغيل catch والمستهلك على
تم استخدام "Dispatchers.Main" من قِبل "viewModelScope".
يجب استخدام مرسِل لأنّ طبقة مصدر البيانات تقوم بعمل الإدخال والإخراج. بحيث يتوافق مع عمليات وحدات الإدخال والإخراج:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
مسارات في مكتبات Jetpack
تم دمج التدفق في العديد من مكتبات Jetpack، وهو شائع بين مكتبات الجهات الخارجية على Android يُعد التدفق مناسبًا جدًا لتحديثات البيانات المباشرة ومصادر لا حصر لها من البيانات.
يمكنك استخدام
Flow with Room
ليتم إشعارك بالتغييرات التي تحدث في قاعدة البيانات. عند استخدام
كائنات الوصول إلى البيانات (DAO)،
عرض النوع Flow للحصول على تحديثات مباشرة.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
كلما حدث تغيير في جدول Example، يتم إصدار قائمة جديدة.
بالعناصر الجديدة في قاعدة البيانات.
تحويل واجهات برمجة التطبيقات المستندة إلى معاودة الاتصال إلى مسارات
callbackFlow
أداة إنشاء التدفق تتيح لك تحويل واجهات برمجة التطبيقات القائمة على معاودة الاتصال إلى مسارات.
على سبيل المثال، يوفّر Firebase Firestore
تستخدم واجهات برمجة تطبيقات Android عمليات معاودة الاتصال.
لتحويل واجهات برمجة التطبيقات هذه إلى تدفقات والاستماع إلى تحديثات قاعدة بيانات Firestore، يمكنك يمكنك استخدام التعليمة البرمجية التالية:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
على عكس أداة إنشاء flow، callbackFlow
وتسمح بانبعاثات القيم من CoroutineContext مختلفة مع
send
أو خارج الكوروتين مع
trySend
الأخرى.
داخليًا، يستخدم callbackFlow
channel
وهو من الناحية النظرية يشبه إلى حد كبير
قائمة انتظار.
يتم إعداد القناة باستخدام السعة، وهو الحد الأقصى لعدد العناصر
التي يمكن تخزينها مؤقتًا. القناة التي تم إنشاؤها في callbackFlow تستخدم إعدادًا تلقائيًا
سعة 64 عنصرًا. عندما تحاول إضافة عنصر جديد إلى
قناة، تعلّق send المنتج حتى تتوفر مساحة لإجراء
العنصر، بينما لا تضيف trySend العنصر إلى القناة وتعرض
false فورًا.
يضيف trySend العنصر المحدد إلى القناة على الفور،
فقط إذا كان ذلك لا ينتهك قيود السعة، ثم يعرض
نتيجة ناجحة.
موارد التدفق الإضافية
- اختبار مسارات Kotlin على Android
StateFlowوSharedFlow- مراجع إضافية حول الكوروتينات في لغة Kotlin وتدفق البيانات