From e8d9dca1ede1295f67e27faf731a5caa1bd2810a Mon Sep 17 00:00:00 2001 From: Allan Wang Date: Tue, 23 Nov 2021 11:58:30 -0800 Subject: Convert login refresh channel to flow and remove some outdated tests --- .../com/pitchedapps/frost/utils/CoroutineTest.kt | 99 ---------------------- 1 file changed, 99 deletions(-) (limited to 'app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt') diff --git a/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt index 2744d0d8..43bd1563 100644 --- a/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt +++ b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt @@ -26,7 +26,6 @@ import kotlinx.coroutines.async import kotlinx.coroutines.channels.BroadcastChannel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharedFlow @@ -41,7 +40,6 @@ import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import java.util.concurrent.Executors import kotlin.coroutines.EmptyCoroutineContext -import kotlin.test.Ignore import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFalse @@ -162,103 +160,6 @@ class CoroutineTest { } } - /** - * Not a true throttle, but for things like fetching header badges, we want to avoid simultaneous fetches. - * As a result, I want to test that the usage of offer along with a conflated channel will work as I expect. - * Events should be consumed when there is no pending consumer on previous elements. - */ - @Test - @Ignore("Move to flow") - fun throttledChannel() { - val channel = Channel(Channel.CONFLATED) - runBlocking { - val deferred = async { - listen(channel) { - // Throttle consumer - delay(10) - return@listen false - } - } - (0..100).forEach { - channel.offer(it) - delay(1) - } - channel.close() - val received = deferred.await() - assertTrue( - received.size < 20, - "Received data should be throttled; expected that around 1/10th of all events are consumed, but received ${received.size}" - ) - println(received) - } - } - - @Test - fun uniqueOnly() { - val channel = BroadcastChannel(100) - runBlocking { - val fullReceiver = channel.openSubscription() - val uniqueReceiver = channel.openSubscription().uniqueOnly(this) - - val fullDeferred = async { listen(fullReceiver) } - val uniqueDeferred = async { listen(uniqueReceiver) } - - listOf(0, 1, 2, 3, 3, 3, 4, 3, 5, 5, 1).forEach { - channel.offer(it) - } - channel.close() - - val fullData = fullDeferred.await() - val uniqueData = uniqueDeferred.await() - - assertEquals( - listOf(0, 1, 2, 3, 3, 3, 4, 3, 5, 5, 1), - fullData, - "Full receiver should get all channel events" - ) - assertEquals( - listOf(0, 1, 2, 3, 4, 3, 5, 1), - uniqueData, - "Unique receiver should not have two consecutive events that are equal" - ) - } - } - - /** - * When using [uniqueOnly] for channels with limited capacity, - * the duplicates should not count towards the actual capacity - */ - @Ignore("Not yet working as unique only buffered removes the capacity limitation of the channel") - @Test - fun uniqueOnlyBuffer() { - val channel = Channel(3) - runBlocking { - - val deferred = async { - listen(channel.uniqueOnly(GlobalScope)) { - // Throttle consumer - delay(50) - return@listen false - } - } - - listOf(0, 1, 1, 1, 1, 1, 2, 2, 2).forEach { - delay(10) - channel.offer(it) - } - - channel.close() - - val data = deferred.await() - - assertEquals( - listOf(0, 1, 2), - data, - "Unique receiver should not have two consecutive events that are equal" - ) - } - } - class TestException(msg: String) : RuntimeException(msg) @Test -- cgit v1.2.3 From 602290c33ac25f9308f8adfc459033cd50e13497 Mon Sep 17 00:00:00 2001 From: Allan Wang Date: Tue, 23 Nov 2021 12:02:11 -0800 Subject: Remove broadcast channel and flyweight --- .../com/pitchedapps/frost/kotlin/CoroutineUtils.kt | 39 ----- .../com/pitchedapps/frost/kotlin/Flyweight.kt | 174 --------------------- .../com/pitchedapps/frost/kotlin/FlyweightTest.kt | 120 -------------- .../com/pitchedapps/frost/utils/CoroutineTest.kt | 89 ----------- 4 files changed, 422 deletions(-) delete mode 100644 app/src/main/kotlin/com/pitchedapps/frost/kotlin/CoroutineUtils.kt delete mode 100644 app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt delete mode 100644 app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt (limited to 'app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt') diff --git a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/CoroutineUtils.kt b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/CoroutineUtils.kt deleted file mode 100644 index 6f8a60a9..00000000 --- a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/CoroutineUtils.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2019 Allan Wang - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package com.pitchedapps.frost.kotlin - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.Job -import kotlinx.coroutines.channels.BroadcastChannel -import kotlinx.coroutines.launch -import kotlin.coroutines.CoroutineContext - -@UseExperimental(ExperimentalCoroutinesApi::class) -fun BroadcastChannel.subscribeDuringJob( - scope: CoroutineScope, - context: CoroutineContext, - onReceive: suspend (T) -> Unit -) { - val receiver = openSubscription() - scope.launch(context) { - for (r in receiver) { - onReceive(r) - } - } - scope.coroutineContext[Job]!!.invokeOnCompletion { receiver.cancel() } -} diff --git a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt deleted file mode 100644 index 74765b58..00000000 --- a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Copyright 2018 Allan Wang - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package com.pitchedapps.frost.kotlin - -import com.pitchedapps.frost.utils.L -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.CoroutineExceptionHandler -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job -import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import kotlinx.coroutines.selects.select -import java.util.concurrent.ConcurrentHashMap - -/** - * Flyweight to keep track of values so long as they are valid. - * Values that have been fetched within [maxAge] from the time of use will be reused. - * If multiple requests are sent with the same key, then the value should only be fetched once. - * Otherwise, they will be fetched using [fetcher]. - * All requests will stem from the supplied [scope]. - */ -class Flyweight( - val scope: CoroutineScope, - val maxAge: Long, - private val fetcher: suspend (K) -> V -) { - - // Receives a key and a pending request - private val actionChannel = Channel>>(Channel.UNLIMITED) - - // Receives a key to invalidate the associated value - private val invalidatorChannel = Channel(Channel.UNLIMITED) - - // Receives a key and the resulting value - private val receiverChannel = Channel>>(Channel.UNLIMITED) - - // Keeps track of keys and associated update times - private val conditionMap: MutableMap = mutableMapOf() - - // Keeps track of keys and associated values - private val resultMap: MutableMap> = mutableMapOf() - - // Keeps track of unfulfilled actions - // Note that the explicit type is very important here. See https://youtrack.jetbrains.net/issue/KT-18053 - private val pendingMap: MutableMap>> = ConcurrentHashMap() - - private val job: Job - - private fun CompletableDeferred.completeWith(result: Result) { - if (result.isSuccess) - complete(result.getOrNull()!!) - else - completeExceptionally(result.exceptionOrNull()!!) - } - - private val errHandler = - CoroutineExceptionHandler { _, throwable -> L.d { "FbAuth failed ${throwable.message}" } } - - init { - job = - scope.launch(Dispatchers.IO + SupervisorJob() + errHandler) { - launch { - while (isActive) { - select { - /* - * New request received. Continuation should be fulfilled eventually - */ - actionChannel.onReceive { (key, completable) -> - val lastUpdate = conditionMap[key] - val lastResult = resultMap[key] - // Valid value, retrieved within acceptable time - if (lastResult != null && lastUpdate != null && System.currentTimeMillis() - lastUpdate < maxAge) { - completable.completeWith(lastResult) - } else { - val valueRequestPending = key in pendingMap - pendingMap.getOrPut(key) { mutableListOf() }.add(completable) - if (!valueRequestPending) - fulfill(key) - } - } - /* - * Invalidator received. Existing result associated with key should not be used. - * Note that any unfulfilled request and future requests should still operate, but with a new value. - */ - invalidatorChannel.onReceive { key -> - if (key !in resultMap) { - // Nothing to invalidate. - // If pending requests exist, they are already in the process of being updated. - return@onReceive - } - conditionMap.remove(key) - resultMap.remove(key) - if (pendingMap[key]?.isNotEmpty() == true) - // Refetch value for pending requests - fulfill(key) - } - /* - * Value request fulfilled. Should now fulfill pending requests - */ - receiverChannel.onReceive { (key, result) -> - conditionMap[key] = System.currentTimeMillis() - resultMap[key] = result - pendingMap.remove(key)?.forEach { - it.completeWith(result) - } - } - } - } - } - } - } - - /* - * Value request received. Should fetch new value using supplied fetcher - */ - private fun fulfill(key: K) { - scope.launch { - val result = runCatching { - fetcher(key) - } - receiverChannel.send(key to result) - } - } - - /** - * Queues the request, and returns a completable once it is sent to a channel. - * The fetcher will only be suspended if the channels are full. - * - * Note that if the job is already inactive, a cancellation exception will be thrown. - * The message may default to the message for all completables under a cancelled job - */ - fun fetch(key: K): CompletableDeferred { - val completable = CompletableDeferred(job) - if (!job.isActive) completable.completeExceptionally(CancellationException("Flyweight is not active")) - else actionChannel.offer(key to completable) - return completable - } - - suspend fun invalidate(key: K) { - invalidatorChannel.send(key) - } - - fun cancel() { - job.cancel() - if (pendingMap.isNotEmpty()) { - val error = CancellationException("Flyweight cancelled") - pendingMap.values.flatten().forEach { it.completeExceptionally(error) } - pendingMap.clear() - } - actionChannel.close() - invalidatorChannel.close() - receiverChannel.close() - conditionMap.clear() - resultMap.clear() - } -} diff --git a/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt deleted file mode 100644 index 89289322..00000000 --- a/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright 2018 Allan Wang - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package com.pitchedapps.frost.kotlin - -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.runBlocking -import org.junit.Rule -import org.junit.rules.Timeout -import java.util.concurrent.atomic.AtomicInteger -import kotlin.test.BeforeTest -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertFalse -import kotlin.test.assertTrue -import kotlin.test.fail - -class FlyweightTest { - - @get:Rule - val globalTimeout: Timeout = Timeout.seconds(5) - - lateinit var flyweight: Flyweight - - lateinit var callCount: AtomicInteger - - private val LONG_RUNNING_KEY = -78 - - @BeforeTest - fun before() { - callCount = AtomicInteger(0) - flyweight = Flyweight(GlobalScope, 200L) { - callCount.incrementAndGet() - when (it) { - LONG_RUNNING_KEY -> Thread.sleep(100000) - else -> Thread.sleep(100) - } - it * 2 - } - } - - @Test - fun basic() { - assertEquals(2, runBlocking { flyweight.fetch(1).await() }, "Invalid result") - assertEquals(1, callCount.get(), "1 call expected") - } - - @Test - fun multipleWithOneKey() { - val results: List = runBlocking { - (0..1000).map { - flyweight.fetch(1) - }.map { it.await() } - } - assertEquals(1, callCount.get(), "1 call expected") - assertEquals(1001, results.size, "Incorrect number of results returned") - assertTrue(results.all { it == 2 }, "Result should all be 2") - } - - @Test - fun consecutiveReuse() { - runBlocking { - flyweight.fetch(1).await() - assertEquals(1, callCount.get(), "1 call expected") - flyweight.fetch(1).await() - assertEquals(1, callCount.get(), "Reuse expected") - Thread.sleep(300) - flyweight.fetch(1).await() - assertEquals(2, callCount.get(), "Refetch expected") - } - } - - @Test - fun invalidate() { - runBlocking { - flyweight.fetch(1).await() - assertEquals(1, callCount.get(), "1 call expected") - flyweight.invalidate(1) - flyweight.fetch(1).await() - assertEquals(2, callCount.get(), "New call expected") - } - } - - @Test - fun destroy() { - runBlocking { - val longRunningResult = flyweight.fetch(LONG_RUNNING_KEY) - flyweight.fetch(1).await() - flyweight.cancel() - try { - flyweight.fetch(1).await() - fail("Flyweight should not be fulfilled after it is destroyed") - } catch (ignore: CancellationException) { - } - try { - assertFalse( - longRunningResult.isActive, - "Long running result should no longer be active" - ) - longRunningResult.await() - fail("Flyweight should have cancelled previously running requests") - } catch (ignore: CancellationException) { - } - } - } -} diff --git a/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt index 43bd1563..551d0b7b 100644 --- a/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt +++ b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt @@ -16,15 +16,10 @@ */ package com.pitchedapps.frost.utils -import com.pitchedapps.frost.kotlin.Flyweight import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.async -import kotlinx.coroutines.channels.BroadcastChannel -import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow @@ -81,49 +76,6 @@ class CoroutineTest { return@withContext data } - /** - * When refreshing, we have a temporary subscriber that hooks onto a single cycle. - * The refresh channel only contains booleans, but for the sake of identification, - * each boolean will have a unique integer attached. - * - * Things to note: - * Subscription should be opened outside of async, since we don't want to miss any events. - */ - @Test - fun refreshSubscriptions() { - val refreshChannel = BroadcastChannel>(100) - runBlocking { - // Listen to all events - val fullReceiver = refreshChannel.openSubscription() - val fullDeferred = async { listen(fullReceiver) } - - refreshChannel.send(true to 1) - refreshChannel.send(false to 2) - refreshChannel.send(true to 3) - - val partialReceiver = refreshChannel.openSubscription() - val partialDeferred = async { transition(partialReceiver) } - refreshChannel.send(false to 4) - refreshChannel.send(true to 5) - refreshChannel.send(false to 6) - refreshChannel.send(true to 7) - refreshChannel.close() - val fullStream = fullDeferred.await() - val partialStream = partialDeferred.await() - - assertEquals( - 7, - fullStream.size, - "Full stream should contain all events" - ) - assertEquals( - listOf(false to 4, true to 5, false to 6), - partialStream, - "Partial stream should include up until first true false pair" - ) - } - } - private fun SharedFlow.takeUntilNull(): Flow = takeWhile { it != null }.filterNotNull() @@ -159,45 +111,4 @@ class CoroutineTest { assertEquals(4, count, "Not all events received") } } - - class TestException(msg: String) : RuntimeException(msg) - - @Test - fun exceptionChecks() { - val mainTag = "main-test" - val mainDispatcher = Executors.newSingleThreadExecutor { r -> - Thread(r, mainTag) - }.asCoroutineDispatcher() - val channel = Channel() - - val job = SupervisorJob() - - val flyweight = Flyweight(GlobalScope, 200L) { - throw TestException("Flyweight exception") - } - - suspend fun crash(): Boolean = withContext(Dispatchers.IO) { - try { - withContext(Dispatchers.Default) { - flyweight.fetch(0).await() - } - true - } catch (e: TestException) { - false - } - } - - runBlocking(mainDispatcher + job) { - launch { - val i = channel.receive() - println("Received $i") - } - launch { - println("A") - println(crash()) - println("B") - channel.offer(1) - } - } - } } -- cgit v1.2.3 From 98b46d9e5341ac827ec776c6e5dd48ac301d4522 Mon Sep 17 00:00:00 2001 From: Allan Wang Date: Tue, 23 Nov 2021 12:06:40 -0800 Subject: Begin removing experimental coroutines annotation --- .../frost/activities/BaseMainActivity.kt | 2 -- .../pitchedapps/frost/activities/MainActivity.kt | 7 +++-- .../com/pitchedapps/frost/utils/CoroutineTest.kt | 35 ---------------------- 3 files changed, 4 insertions(+), 40 deletions(-) (limited to 'app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt') diff --git a/app/src/main/kotlin/com/pitchedapps/frost/activities/BaseMainActivity.kt b/app/src/main/kotlin/com/pitchedapps/frost/activities/BaseMainActivity.kt index 8585f68b..9d16c63a 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/activities/BaseMainActivity.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/activities/BaseMainActivity.kt @@ -130,7 +130,6 @@ import dagger.hilt.android.AndroidEntryPoint import dagger.hilt.android.components.ActivityComponent import dagger.hilt.android.qualifiers.ActivityContext import dagger.hilt.android.scopes.ActivityScoped -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.launch import javax.inject.Inject import kotlin.math.abs @@ -140,7 +139,6 @@ import kotlin.math.abs * * Most of the logic that is unrelated to handling fragments */ -@UseExperimental(ExperimentalCoroutinesApi::class) @AndroidEntryPoint abstract class BaseMainActivity : BaseActivity(), diff --git a/app/src/main/kotlin/com/pitchedapps/frost/activities/MainActivity.kt b/app/src/main/kotlin/com/pitchedapps/frost/activities/MainActivity.kt index 2e44e5f9..16606691 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/activities/MainActivity.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/activities/MainActivity.kt @@ -26,7 +26,6 @@ import com.pitchedapps.frost.views.BadgedIcon import com.pitchedapps.frost.web.FrostEmitter import com.pitchedapps.frost.web.asFrostEmitter import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow @@ -39,10 +38,12 @@ import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.onEach -@UseExperimental(ExperimentalCoroutinesApi::class) class MainActivity : BaseMainActivity() { - private val fragmentMutableFlow = MutableSharedFlow(extraBufferCapacity = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST) + private val fragmentMutableFlow = MutableSharedFlow( + extraBufferCapacity = 10, + onBufferOverflow = BufferOverflow.DROP_OLDEST + ) override val fragmentFlow: SharedFlow = fragmentMutableFlow.asSharedFlow() override val fragmentEmit: FrostEmitter = fragmentMutableFlow.asFrostEmitter() diff --git a/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt index 551d0b7b..7acb4761 100644 --- a/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt +++ b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt @@ -17,22 +17,18 @@ package com.pitchedapps.frost.utils import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.async -import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.count import kotlinx.coroutines.flow.filterNotNull -import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext import java.util.concurrent.Executors import kotlin.coroutines.EmptyCoroutineContext import kotlin.test.Test @@ -43,39 +39,8 @@ import kotlin.test.assertTrue /** * Collection of tests around coroutines */ -@UseExperimental(ExperimentalCoroutinesApi::class) class CoroutineTest { - /** - * Hooks onto the refresh channel for one true -> false cycle. - * Returns the list of event ids that were emitted - */ - private suspend fun transition(channel: ReceiveChannel>): List> { - var refreshed = false - return listen(channel) { (refreshing, _) -> - if (refreshed && !refreshing) - return@listen true - if (refreshing) - refreshed = true - return@listen false - } - } - - private suspend fun listen( - channel: ReceiveChannel, - shouldEnd: suspend (T) -> Boolean = { false } - ): List = - withContext(Dispatchers.IO) { - val data = mutableListOf() - channel.receiveAsFlow() - for (c in channel) { - data.add(c) - if (shouldEnd(c)) break - } - channel.cancel() - return@withContext data - } - private fun SharedFlow.takeUntilNull(): Flow = takeWhile { it != null }.filterNotNull() -- cgit v1.2.3