aboutsummaryrefslogtreecommitdiff
path: root/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt
diff options
context:
space:
mode:
Diffstat (limited to 'app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt')
-rw-r--r--app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt30
1 files changed, 20 insertions, 10 deletions
diff --git a/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt
index 055d70ee..2744d0d8 100644
--- a/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt
+++ b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt
@@ -26,8 +26,15 @@ import kotlinx.coroutines.async
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
-import kotlinx.coroutines.channels.count
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.MutableSharedFlow
+import kotlinx.coroutines.flow.SharedFlow
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.count
+import kotlinx.coroutines.flow.filterNotNull
+import kotlinx.coroutines.flow.receiveAsFlow
+import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
@@ -67,6 +74,7 @@ class CoroutineTest {
): List<T> =
withContext(Dispatchers.IO) {
val data = mutableListOf<T>()
+ channel.receiveAsFlow()
for (c in channel) {
data.add(c)
if (shouldEnd(c)) break
@@ -118,6 +126,9 @@ class CoroutineTest {
}
}
+ private fun <T : Any> SharedFlow<T?>.takeUntilNull(): Flow<T> =
+ takeWhile { it != null }.filterNotNull()
+
/**
* Sanity check to ensure that contexts are being honoured
*/
@@ -128,13 +139,10 @@ class CoroutineTest {
Thread(r, mainTag)
}.asCoroutineDispatcher()
- val channel = BroadcastChannel<String>(100)
-
+ val flow = MutableSharedFlow<String?>(100)
runBlocking(Dispatchers.IO) {
- val receiver1 = channel.openSubscription()
- val receiver2 = channel.openSubscription()
launch(mainDispatcher) {
- for (thread in receiver1) {
+ flow.takeUntilNull().collect { thread ->
assertTrue(
Thread.currentThread().name.startsWith(mainTag),
"Channel should be received in main thread"
@@ -146,10 +154,11 @@ class CoroutineTest {
}
}
listOf(EmptyCoroutineContext, Dispatchers.IO, Dispatchers.Default, Dispatchers.IO).map {
- async(it) { channel.send(Thread.currentThread().name) }
+ async(it) { flow.emit(Thread.currentThread().name) }
}.joinAll()
- channel.close()
- assertEquals(4, receiver2.count(), "Not all events received")
+ flow.emit(null)
+ val count = flow.takeUntilNull().count()
+ assertEquals(4, count, "Not all events received")
}
}
@@ -159,6 +168,7 @@ class CoroutineTest {
* Events should be consumed when there is no pending consumer on previous elements.
*/
@Test
+ @Ignore("Move to flow")
fun throttledChannel() {
val channel = Channel<Int>(Channel.CONFLATED)
runBlocking {
@@ -177,7 +187,7 @@ class CoroutineTest {
val received = deferred.await()
assertTrue(
received.size < 20,
- "Received data should be throttled; expected that around 1/10th of all events are consumed"
+ "Received data should be throttled; expected that around 1/10th of all events are consumed, but received ${received.size}"
)
println(received)
}