From e5e83736d3feff8ac9cc4ae38fad0fa827a6b21d Mon Sep 17 00:00:00 2001 From: Allan Wang Date: Fri, 28 Dec 2018 20:19:18 -0500 Subject: Switch remaining primary observables --- .../com/pitchedapps/frost/utils/CoroutineTest.kt | 194 +++++++++++++++++++++ 1 file changed, 194 insertions(+) create mode 100644 app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt (limited to 'app/src/test/kotlin/com/pitchedapps/frost/utils') diff --git a/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt new file mode 100644 index 00000000..f930e529 --- /dev/null +++ b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt @@ -0,0 +1,194 @@ +package com.pitchedapps.frost.utils + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.async +import kotlinx.coroutines.channels.BroadcastChannel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.count +import kotlinx.coroutines.delay +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import java.util.concurrent.Executors +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +/** + * Collection of tests around coroutines + */ +@UseExperimental(ExperimentalCoroutinesApi::class) +class CoroutineTest { + + /** + * Hooks onto the refresh channel for one true -> false cycle. + * Returns the list of event ids that were emitted + */ + private suspend fun transition(channel: ReceiveChannel>): List> { + var refreshed = false + return listen(channel) { (refreshing, _) -> + if (refreshed && !refreshing) + return@listen true + if (refreshing) + refreshed = true + return@listen false + } + } + + private suspend fun listen(channel: ReceiveChannel, shouldEnd: suspend (T) -> Boolean = { false }): List = + withContext(Dispatchers.IO) { + val data = mutableListOf() + for (c in channel) { + data.add(c) + if (shouldEnd(c)) break + } + channel.cancel() + return@withContext data + } + + /** + * When refreshing, we have a temporary subscriber that hooks onto a single cycle. + * The refresh channel only contains booleans, but for the sake of identification, + * each boolean will have a unique integer attached. + * + * Things to note: + * Subscription should be opened outside of async, since we don't want to miss any events. + */ + @Test + fun refreshSubscriptions() { + val refreshChannel = BroadcastChannel>(100) + runBlocking { + // Listen to all events + val fullReceiver = refreshChannel.openSubscription() + val fullDeferred = async { listen(fullReceiver) } + + refreshChannel.send(true to 1) + refreshChannel.send(false to 2) + refreshChannel.send(true to 3) + + val partialReceiver = refreshChannel.openSubscription() + val partialDeferred = async { transition(partialReceiver) } + refreshChannel.send(false to 4) + refreshChannel.send(true to 5) + refreshChannel.send(false to 6) + refreshChannel.send(true to 7) + refreshChannel.close() + val fullStream = fullDeferred.await() + val partialStream = partialDeferred.await() + + assertEquals( + 7, + fullStream.size, + "Full stream should contain all events" + ) + assertEquals( + listOf(false to 4, true to 5, false to 6), + partialStream, + "Partial stream should include up until first true false pair" + ) + } + } + + /** + * Sanity check to ensure that contexts are being honoured + */ + @Test + fun contextSwitching() { + val mainTag = "main-test" + val mainDispatcher = Executors.newSingleThreadExecutor { r -> + Thread(r, mainTag) + }.asCoroutineDispatcher() + + val channel = BroadcastChannel(100) + + runBlocking(Dispatchers.IO) { + val receiver1 = channel.openSubscription() + val receiver2 = channel.openSubscription() + launch(mainDispatcher) { + for (thread in receiver1) { + assertTrue( + Thread.currentThread().name.startsWith(mainTag), + "Channel should be received in main thread" + ) + assertFalse( + thread.startsWith(mainTag), + "Channel execution should not be in main thread" + ) + } + } + listOf(EmptyCoroutineContext, Dispatchers.IO, Dispatchers.Default, Dispatchers.IO).map { + async(it) { channel.send(Thread.currentThread().name) } + }.joinAll() + channel.close() + assertEquals(4, receiver2.count(), "Not all events received") + } + } + + /** + * Not a true throttle, but for things like fetching header badges, we want to avoid simultaneous fetches. + * As a result, I want to test that the usage of offer along with a rendezvous channel will work as I expect. + * Events should be consumed when there is no pending consumer on previous elements. + */ + @Test + fun throttledChannel() { + val channel = Channel(Channel.RENDEZVOUS) + runBlocking { + val deferred = async { + listen(channel) { + // Throttle consumer + delay(10) + return@listen false + } + } + (0..100).forEach { + channel.offer(it) + delay(1) + } + channel.close() + val received = deferred.await() + assertTrue( + received.size < 20, + "Received data should be throttled; expected that around 1/10th of all events are consumed" + ) + println(received) + } + } + + @Test + fun uniqueOnly() { + val channel = BroadcastChannel(100) + runBlocking { + val fullReceiver = channel.openSubscription() + val uniqueReceiver = channel.openSubscription().uniqueOnly(this) + + val fullDeferred = async { listen(fullReceiver) } + val uniqueDeferred = async { listen(uniqueReceiver) } + + listOf(0, 1, 2, 3, 3, 3, 4, 3, 5, 5, 1).forEach { + channel.offer(it) + } + channel.close() + + val fullData = fullDeferred.await() + val uniqueData = uniqueDeferred.await() + + assertEquals( + listOf(0, 1, 2, 3, 3, 3, 4, 3, 5, 5, 1), + fullData, + "Full receiver should get all channel events" + ) + assertEquals( + listOf(0, 1, 2, 3, 4, 3, 5, 1), + uniqueData, + "Unique receiver should not have two consecutive events that are equal" + ) + + } + } +} \ No newline at end of file -- cgit v1.2.3