diff options
author | Allan Wang <me@allanwang.ca> | 2018-12-27 02:15:10 -0500 |
---|---|---|
committer | Allan Wang <me@allanwang.ca> | 2018-12-27 02:15:10 -0500 |
commit | e6dcbd7b32dc49b11184b6beca598819c3f071fd (patch) | |
tree | a02691a1eaf1b1506930c7e50c5f43f6f0fc953a | |
parent | 7d85262ada198501d2d5844e1196c9b45f4a38f5 (diff) | |
download | frost-e6dcbd7b32dc49b11184b6beca598819c3f071fd.tar.gz frost-e6dcbd7b32dc49b11184b6beca598819c3f071fd.tar.bz2 frost-e6dcbd7b32dc49b11184b6beca598819c3f071fd.zip |
Begin replacing observables with channels
11 files changed, 280 insertions, 97 deletions
diff --git a/app/build.gradle b/app/build.gradle index c7d78537..75ffed98 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -100,6 +100,8 @@ android { resValue "string", "frost_name", "Frost Debug" resValue "string", "frost_web", "Frost Web Debug" ext.enableBugsnag = false + + kotlinOptions.freeCompilerArgs += ["-Xuse-experimental=kotlin.Experimental"] } releaseTest { minifyEnabled true @@ -141,6 +143,7 @@ android { includeAndroidResources = true } } + } repositories { diff --git a/app/src/main/kotlin/com/pitchedapps/frost/activities/WebOverlayActivity.kt b/app/src/main/kotlin/com/pitchedapps/frost/activities/WebOverlayActivity.kt index b706d467..bf04c524 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/activities/WebOverlayActivity.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/activities/WebOverlayActivity.kt @@ -72,6 +72,7 @@ import com.pitchedapps.frost.views.FrostVideoViewer import com.pitchedapps.frost.views.FrostWebView import io.reactivex.android.schedulers.AndroidSchedulers import io.reactivex.disposables.Disposable +import kotlinx.coroutines.CoroutineScope import okhttp3.HttpUrl /** @@ -181,7 +182,6 @@ open class WebOverlayActivityBase(private val forceBasicAgent: Boolean) : BaseAc finish() return } - setFrameContentView(R.layout.activity_web_overlay) setSupportActionBar(toolbar) supportActionBar?.setDisplayShowHomeEnabled(true) diff --git a/app/src/main/kotlin/com/pitchedapps/frost/contracts/FrostContentContract.kt b/app/src/main/kotlin/com/pitchedapps/frost/contracts/FrostContentContract.kt index 50c2fe77..a2c2b05a 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/contracts/FrostContentContract.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/contracts/FrostContentContract.kt @@ -20,6 +20,10 @@ import android.view.View import com.pitchedapps.frost.facebook.FbItem import io.reactivex.subjects.BehaviorSubject import io.reactivex.subjects.PublishSubject +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.BroadcastChannel +import kotlinx.coroutines.channels.Channel /** * Created by Allan Wang on 20/12/17. @@ -29,7 +33,7 @@ import io.reactivex.subjects.PublishSubject * Contract for the underlying parent, * binds to activities & fragments */ -interface FrostContentContainer { +interface FrostContentContainer : CoroutineScope { val baseUrl: String @@ -45,8 +49,11 @@ interface FrostContentContainer { * Contract for components shared among * all content providers */ +@UseExperimental(ExperimentalCoroutinesApi::class) interface FrostContentParent : DynamicUiContract { + val scope: CoroutineScope + val core: FrostContentCore /** @@ -54,16 +61,22 @@ interface FrostContentParent : DynamicUiContract { */ val refreshObservable: PublishSubject<Boolean> + val refreshChannel: BroadcastChannel<Boolean> + /** * Observable to get data on refresh progress, with range [0, 100] */ val progressObservable: PublishSubject<Int> + val progressChannel: BroadcastChannel<Int> + /** * Observable to get new title data (unique values only) */ val titleObservable: BehaviorSubject<String> + val titleChannel: BroadcastChannel<String> + var baseUrl: String var baseEnum: FbItem? @@ -106,6 +119,9 @@ interface FrostContentParent : DynamicUiContract { */ interface FrostContentCore : DynamicUiContract { + val scope: CoroutineScope + get() = parent.scope + /** * Reference to parent * Bound through calling [FrostContentParent.bind] diff --git a/app/src/main/kotlin/com/pitchedapps/frost/fragments/FragmentContract.kt b/app/src/main/kotlin/com/pitchedapps/frost/fragments/FragmentContract.kt index e24e8308..95322c1c 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/fragments/FragmentContract.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/fragments/FragmentContract.kt @@ -23,6 +23,8 @@ import com.pitchedapps.frost.contracts.MainActivityContract import com.pitchedapps.frost.contracts.MainFabContract import com.pitchedapps.frost.views.FrostRecyclerView import io.reactivex.disposables.Disposable +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel /** * Created by Allan Wang on 2017-11-07. @@ -102,8 +104,9 @@ interface RecyclerContentContract { /** * Completely handle data reloading - * Optional progress emission update - * Callback returns [true] for success, [false] otherwise + * The progress function allows optional emission of progress values (between 0 and 100). + * This can be called from any thread. + * Returns [true] for success, [false] otherwise */ - fun reload(progress: (Int) -> Unit, callback: (Boolean) -> Unit) + suspend fun reload(progress: (Int) -> Unit): Boolean } diff --git a/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragmentBase.kt b/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragmentBase.kt index f77f83ea..98c8f750 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragmentBase.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragmentBase.kt @@ -29,16 +29,18 @@ import com.pitchedapps.frost.facebook.parsers.ParseResponse import com.pitchedapps.frost.utils.L import com.pitchedapps.frost.utils.frostJsoup import com.pitchedapps.frost.views.FrostRecyclerView -import org.jetbrains.anko.doAsync -import org.jetbrains.anko.uiThread +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext /** * Created by Allan Wang on 27/12/17. */ -abstract class RecyclerFragment : BaseFragment(), RecyclerContentContract { +abstract class RecyclerFragment<T, Item : IItem<*, *>> : BaseFragment(), RecyclerContentContract { override val layoutRes: Int = R.layout.view_content_recycler + abstract val adapter: ModelAdapter<T, Item> + override fun firstLoadRequest() { val core = core ?: return if (firstLoad) { @@ -47,23 +49,30 @@ abstract class RecyclerFragment : BaseFragment(), RecyclerContentContract { } } - final override fun reload(progress: (Int) -> Unit, callback: (Boolean) -> Unit) { - reloadImpl(progress) { - if (it) - callback(it) - else - valid = false + final override suspend fun reload(progress: (Int) -> Unit): Boolean { + val data = try { + reloadImpl(progress) + } catch (e: Exception) { + null + } + if (data == null) { + valid = false + return false + } + withContext(Dispatchers.Main) { + adapter.setNewList(data) } + return true } - protected abstract fun reloadImpl(progress: (Int) -> Unit, callback: (Boolean) -> Unit) + protected abstract suspend fun reloadImpl(progress: (Int) -> Unit): List<T>? } -abstract class GenericRecyclerFragment<T, Item : IItem<*, *>> : RecyclerFragment() { +abstract class GenericRecyclerFragment<T, Item : IItem<*, *>> : RecyclerFragment<T, Item>() { abstract fun mapper(data: T): Item - val adapter: ModelAdapter<T, Item> = ModelAdapter { this.mapper(it) } + override val adapter: ModelAdapter<T, Item> = ModelAdapter { this.mapper(it) } final override fun bind(recyclerView: FrostRecyclerView) { recyclerView.adapter = getAdapter() @@ -83,7 +92,7 @@ abstract class GenericRecyclerFragment<T, Item : IItem<*, *>> : RecyclerFragment open fun getAdapter(): FastAdapter<IItem<*, *>> = fastAdapter(this.adapter) } -abstract class FrostParserFragment<T : Any, Item : IItem<*, *>> : RecyclerFragment() { +abstract class FrostParserFragment<T : Any, Item : IItem<*, *>> : RecyclerFragment<Item, Item>() { /** * The parser to make this all happen @@ -94,7 +103,7 @@ abstract class FrostParserFragment<T : Any, Item : IItem<*, *>> : RecyclerFragme abstract fun toItems(response: ParseResponse<T>): List<Item> - val adapter: ItemAdapter<Item> = ItemAdapter() + override val adapter: ItemAdapter<Item> = ItemAdapter() final override fun bind(recyclerView: FrostRecyclerView) { recyclerView.adapter = getAdapter() @@ -113,23 +122,20 @@ abstract class FrostParserFragment<T : Any, Item : IItem<*, *>> : RecyclerFragme */ open fun getAdapter(): FastAdapter<IItem<*, *>> = fastAdapter(this.adapter) - override fun reloadImpl(progress: (Int) -> Unit, callback: (Boolean) -> Unit) { - doAsync { - progress(10) - val cookie = FbCookie.webCookie - val doc = getDoc(cookie) - progress(60) - val response = parser.parse(cookie, doc) - if (response == null) { - L.i { "RecyclerFragment failed for ${baseEnum.name}" } - return@doAsync callback(false) - } - progress(80) - val items = toItems(response) - progress(97) - uiThread { adapter.setNewList(items) } - callback(true) + override suspend fun reloadImpl(progress: (Int) -> Unit): List<Item>? = withContext(Dispatchers.IO) { + progress(10) + val cookie = FbCookie.webCookie + val doc = getDoc(cookie) + progress(60) + val response = parser.parse(cookie, doc) + if (response == null) { + L.i { "RecyclerFragment failed for ${baseEnum.name}" } + return@withContext null } + progress(80) + val items = toItems(response) + progress(97) + return@withContext items } } diff --git a/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragments.kt b/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragments.kt index ff37b66d..45cf2290 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragments.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragments.kt @@ -26,7 +26,7 @@ import com.pitchedapps.frost.facebook.requests.MenuFooterItem import com.pitchedapps.frost.facebook.requests.MenuHeader import com.pitchedapps.frost.facebook.requests.MenuItem import com.pitchedapps.frost.facebook.requests.MenuItemData -import com.pitchedapps.frost.facebook.requests.fbRequest +import com.pitchedapps.frost.facebook.requests.fbAuth import com.pitchedapps.frost.facebook.requests.getMenuData import com.pitchedapps.frost.iitems.ClickableIItemContract import com.pitchedapps.frost.iitems.MenuContentIItem @@ -36,8 +36,9 @@ import com.pitchedapps.frost.iitems.MenuHeaderIItem import com.pitchedapps.frost.iitems.NotificationIItem import com.pitchedapps.frost.utils.frostJsoup import com.pitchedapps.frost.views.FrostRecyclerView -import org.jetbrains.anko.doAsync -import org.jetbrains.anko.uiThread +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext /** * Created by Allan Wang on 27/12/17. @@ -71,20 +72,16 @@ class MenuFragment : GenericRecyclerFragment<MenuItemData, IItem<*, *>>() { ClickableIItemContract.bindEvents(adapter) } - override fun reloadImpl(progress: (Int) -> Unit, callback: (Boolean) -> Unit) { - doAsync { - val cookie = FbCookie.webCookie - progress(10) - cookie.fbRequest({ callback(false) }) { - progress(30) - val data = getMenuData().invoke() ?: return@fbRequest callback(false) - if (data.data.isEmpty()) return@fbRequest callback(false) - progress(70) - val items = data.flatMapValid() - progress(90) - uiThread { adapter.add(items) } - callback(true) - } - } + override suspend fun reloadImpl(progress: (Int) -> Unit): List<MenuItemData>? = withContext(Dispatchers.IO) { + val cookie = FbCookie.webCookie ?: return@withContext null + progress(10) + val auth = fbAuth.fetch(cookie) + progress(30) + val data = auth.getMenuData().invoke() ?: return@withContext null + if (data.data.isEmpty()) return@withContext null + progress(70) + val items = data.flatMapValid() + progress(90) + return@withContext items } } diff --git a/app/src/main/kotlin/com/pitchedapps/frost/views/FrostContentView.kt b/app/src/main/kotlin/com/pitchedapps/frost/views/FrostContentView.kt index d17a424c..72b81b37 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/views/FrostContentView.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/views/FrostContentView.kt @@ -45,6 +45,13 @@ import io.reactivex.disposables.Disposable import io.reactivex.rxkotlin.addTo import io.reactivex.subjects.BehaviorSubject import io.reactivex.subjects.PublishSubject +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.BroadcastChannel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext class FrostContentWeb @JvmOverloads constructor( context: Context, @@ -66,6 +73,7 @@ class FrostContentRecycler @JvmOverloads constructor( override val layoutRes: Int = R.layout.view_content_base_recycler } +@UseExperimental(ExperimentalCoroutinesApi::class) abstract class FrostContentView<out T> @JvmOverloads constructor( context: Context, attrs: AttributeSet? = null, @@ -85,7 +93,11 @@ abstract class FrostContentView<out T> @JvmOverloads constructor( override val refreshObservable: PublishSubject<Boolean> = PublishSubject.create() override val titleObservable: BehaviorSubject<String> = BehaviorSubject.create() - private val compositeDisposable = CompositeDisposable() + override val refreshChannel: BroadcastChannel<Boolean> = BroadcastChannel(Channel.UNLIMITED) + override val progressChannel: BroadcastChannel<Int> = BroadcastChannel(Channel.UNLIMITED) + override val titleChannel: BroadcastChannel<String> = BroadcastChannel(Channel.UNLIMITED) + + override lateinit var scope: CoroutineScope override lateinit var baseUrl: String override var baseEnum: FbItem? = null @@ -107,24 +119,6 @@ abstract class FrostContentView<out T> @JvmOverloads constructor( protected fun init() { inflate(context, layoutRes, this) coreView.parent = this - - // bind observables - progressObservable.observeOn(AndroidSchedulers.mainThread()).subscribe { - progress.invisibleIf(it == 100) - if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) - progress.setProgress(it, true) - else - progress.progress = it - }.addTo(compositeDisposable) - - refreshObservable - .observeOn(AndroidSchedulers.mainThread()) - .subscribe { - refresh.isRefreshing = it - refresh.isEnabled = true - }.addTo(compositeDisposable) - refresh.setOnRefreshListener { coreView.reload(true) } - reloadThemeSelf() } @@ -132,7 +126,34 @@ abstract class FrostContentView<out T> @JvmOverloads constructor( baseUrl = container.baseUrl baseEnum = container.baseEnum init() + scope = container core.bind(container) + refresh.setOnRefreshListener { + with(coreView) { + reload(true) + } + } + scope.launch(Dispatchers.Default) { + launch { + for (r in refreshChannel.openSubscription()) { + withContext(Dispatchers.Main) { + refresh.isRefreshing = r + refresh.isEnabled = true + } + } + } + launch { + for (p in progressChannel.openSubscription()) { + withContext(Dispatchers.Main) { + progress.invisibleIf(p == 100) + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) + progress.setProgress(p, true) + else + progress.progress = p + } + } + } + } } override fun reloadTheme() { @@ -155,11 +176,10 @@ abstract class FrostContentView<out T> @JvmOverloads constructor( } override fun destroy() { - titleObservable.onComplete() - progressObservable.onComplete() - refreshObservable.onComplete() + titleChannel.close() + progressChannel.close() + refreshChannel.close() core.destroy() - compositeDisposable.dispose() } private var dispose: Disposable? = null diff --git a/app/src/main/kotlin/com/pitchedapps/frost/views/FrostRecyclerView.kt b/app/src/main/kotlin/com/pitchedapps/frost/views/FrostRecyclerView.kt index 2b9e8f9c..fb20c3ba 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/views/FrostRecyclerView.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/views/FrostRecyclerView.kt @@ -23,11 +23,14 @@ import androidx.recyclerview.widget.LinearLayoutManager import androidx.recyclerview.widget.RecyclerView import ca.allanwang.kau.utils.circularReveal import ca.allanwang.kau.utils.fadeOut +import com.pitchedapps.frost.R.string.reload import com.pitchedapps.frost.contracts.FrostContentContainer import com.pitchedapps.frost.contracts.FrostContentCore import com.pitchedapps.frost.contracts.FrostContentParent import com.pitchedapps.frost.fragments.RecyclerContentContract import com.pitchedapps.frost.utils.Prefs +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.launch /** * Created by Allan Wang on 2017-05-29. @@ -69,11 +72,12 @@ class FrostRecyclerView @JvmOverloads constructor( override fun reloadBase(animate: Boolean) { if (Prefs.animate) fadeOut(onFinish = onReloadClear) - parent.refreshObservable.onNext(true) - recyclerContract.reload({ parent.progressObservable.onNext(it) }) { - parent.progressObservable.onNext(100) - parent.refreshObservable.onNext(false) - if (Prefs.animate) post { circularReveal() } + scope.launch { + parent.refreshChannel.send(true) + val loaded = recyclerContract.reload { parent.progressChannel.offer(it) } + parent.progressChannel.send(100) + parent.refreshChannel.send(false) + if (Prefs.animate) circularReveal() } } diff --git a/app/src/main/kotlin/com/pitchedapps/frost/web/FrostChromeClients.kt b/app/src/main/kotlin/com/pitchedapps/frost/web/FrostChromeClients.kt index 12df8000..1f8118da 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/web/FrostChromeClients.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/web/FrostChromeClients.kt @@ -43,8 +43,9 @@ import io.reactivex.subjects.Subject */ class FrostChromeClient(web: FrostWebView) : WebChromeClient() { - private val progress: Subject<Int> = web.parent.progressObservable - private val title: BehaviorSubject<String> = web.parent.titleObservable + private val progress = web.parent.progressChannel + private val title = web.parent.titleChannel + private var prevTitle: String? = null private val activity = (web.context as? ActivityContract) private val context = web.context!! @@ -55,13 +56,14 @@ class FrostChromeClient(web: FrostWebView) : WebChromeClient() { override fun onReceivedTitle(view: WebView, title: String) { super.onReceivedTitle(view, title) - if (title.startsWith("http") || this.title.value == title) return - this.title.onNext(title) + if (title.startsWith("http") || prevTitle == title) return + prevTitle = title + this.title.offer(title) } override fun onProgressChanged(view: WebView, newProgress: Int) { super.onProgressChanged(view, newProgress) - progress.onNext(newProgress) + progress.offer(newProgress) } override fun onShowFileChooser( diff --git a/app/src/test/kotlin/com/pitchedapps/frost/MiscTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/MiscTest.kt index ce125298..2676e37d 100644 --- a/app/src/test/kotlin/com/pitchedapps/frost/MiscTest.kt +++ b/app/src/test/kotlin/com/pitchedapps/frost/MiscTest.kt @@ -16,10 +16,16 @@ */ package com.pitchedapps.frost -import com.pitchedapps.frost.facebook.requests.call import com.pitchedapps.frost.facebook.requests.zip -import okhttp3.Request +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.channels.BroadcastChannel +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import org.junit.Test +import java.util.concurrent.Executors import kotlin.test.assertTrue /** @@ -48,14 +54,30 @@ class MiscTest { ) } - @Test - fun a() { - val s = Request.Builder() - .url("https://www.allanwang.ca/ecse429/magenta.png") - .get() - .call().execute().body()!!.string() - "�PNG\n\u001A\nIDA�c����?\u0000\u0006�\u0002��p�\u0000\u0000\u0000\u0000IEND�B`�" - println("Hello") - println(s) +@Test +@UseExperimental(ExperimentalCoroutinesApi::class) +fun channel() { + val c = BroadcastChannel<Int>(100) + runBlocking { + launch(Dispatchers.IO) { + println("1 start ${Thread.currentThread()}") + for (i in c.openSubscription()) { + println("1 $i") + } + println("1 end ${Thread.currentThread()}") + } + launch(Dispatchers.IO) { + println("2 start ${Thread.currentThread()}") + for (i in c.openSubscription()) { + println("2 $i") + } + println("2 end ${Thread.currentThread()}") + } + c.send(1) + c.send(2) + c.send(3) + delay(1000) + c.close() } } +} diff --git a/app/src/test/kotlin/com/pitchedapps/frost/views/FrostContentViewAsyncTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/views/FrostContentViewAsyncTest.kt new file mode 100644 index 00000000..a179fb98 --- /dev/null +++ b/app/src/test/kotlin/com/pitchedapps/frost/views/FrostContentViewAsyncTest.kt @@ -0,0 +1,110 @@ +package com.pitchedapps.frost.views + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExecutorCoroutineDispatcher +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.async +import kotlinx.coroutines.channels.BroadcastChannel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import java.util.concurrent.Executors +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals + +/** + * Collection of tests around the view thread logic + */ +@UseExperimental(ExperimentalCoroutinesApi::class) +class FrostContentViewAsyncTest { + + /** + * Single threaded dispatcher with thread name "main" + * Mimics the usage of Android's main dispatcher + */ + private lateinit var mainDispatcher: ExecutorCoroutineDispatcher + + @BeforeTest + fun before() { + mainDispatcher = Executors.newSingleThreadExecutor { r -> + Thread(r, "main") + }.asCoroutineDispatcher() + } + + @AfterTest + fun after() { + mainDispatcher.close() + } + + /** + * Hooks onto the refresh channel for one true -> false cycle. + * Returns the list of event ids that were emitted + */ + private suspend fun transition(channel: ReceiveChannel<Pair<Boolean, Int>>): List<Pair<Boolean, Int>> { + var refreshed = false + return listen(channel) { (refreshing, _) -> + if (refreshed && !refreshing) + return@listen true + if (refreshing) + refreshed = true + return@listen false + } + } + + private suspend fun <T> listen(channel: ReceiveChannel<T>, shouldEnd: (T) -> Boolean = { false }): List<T> = + withContext(Dispatchers.IO) { + val data = mutableListOf<T>() + for (c in channel) { + data.add(c) + if (shouldEnd(c)) break + } + channel.cancel() + return@withContext data + } + + /** + * When refreshing, we have a temporary subscriber that hooks onto a single cycle. + * The refresh channel only contains booleans, but for the sake of identification, + * each boolean will have a unique integer attached. + * + * Things to note: + * Subscription should be opened outside of async, since we don't want to miss any events. + */ + @Test + fun refreshSubscriptions() { + val refreshChannel = BroadcastChannel<Pair<Boolean, Int>>(100) + runBlocking { + // Listen to all events + val fullReceiver = refreshChannel.openSubscription() + val fullDeferred = async { listen(fullReceiver) } + + refreshChannel.send(true to 1) + refreshChannel.send(false to 2) + refreshChannel.send(true to 3) + + val partialReceiver = refreshChannel.openSubscription() + val partialDeferred = async { transition(partialReceiver) } + refreshChannel.send(false to 4) + refreshChannel.send(true to 5) + refreshChannel.send(false to 6) + refreshChannel.send(true to 7) + refreshChannel.close() + val fullStream = fullDeferred.await() + val partialStream = partialDeferred.await() + + assertEquals( + 7, + fullStream.size, + "Full stream should contain all events" + ) + assertEquals( + listOf(false to 4, true to 5, false to 6), + partialStream, + "Partial stream should include up until first true false pair" + ) + } + } +}
\ No newline at end of file |