From e5e83736d3feff8ac9cc4ae38fad0fa827a6b21d Mon Sep 17 00:00:00 2001 From: Allan Wang Date: Fri, 28 Dec 2018 20:19:18 -0500 Subject: Switch remaining primary observables --- .../com/pitchedapps/frost/utils/CoroutineTest.kt | 194 +++++++++++++++++++++ .../frost/views/FrostContentViewAsyncTest.kt | 130 -------------- 2 files changed, 194 insertions(+), 130 deletions(-) create mode 100644 app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt delete mode 100644 app/src/test/kotlin/com/pitchedapps/frost/views/FrostContentViewAsyncTest.kt (limited to 'app/src/test') diff --git a/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt new file mode 100644 index 00000000..f930e529 --- /dev/null +++ b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt @@ -0,0 +1,194 @@ +package com.pitchedapps.frost.utils + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.async +import kotlinx.coroutines.channels.BroadcastChannel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.count +import kotlinx.coroutines.delay +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import java.util.concurrent.Executors +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +/** + * Collection of tests around coroutines + */ +@UseExperimental(ExperimentalCoroutinesApi::class) +class CoroutineTest { + + /** + * Hooks onto the refresh channel for one true -> false cycle. + * Returns the list of event ids that were emitted + */ + private suspend fun transition(channel: ReceiveChannel>): List> { + var refreshed = false + return listen(channel) { (refreshing, _) -> + if (refreshed && !refreshing) + return@listen true + if (refreshing) + refreshed = true + return@listen false + } + } + + private suspend fun listen(channel: ReceiveChannel, shouldEnd: suspend (T) -> Boolean = { false }): List = + withContext(Dispatchers.IO) { + val data = mutableListOf() + for (c in channel) { + data.add(c) + if (shouldEnd(c)) break + } + channel.cancel() + return@withContext data + } + + /** + * When refreshing, we have a temporary subscriber that hooks onto a single cycle. + * The refresh channel only contains booleans, but for the sake of identification, + * each boolean will have a unique integer attached. + * + * Things to note: + * Subscription should be opened outside of async, since we don't want to miss any events. + */ + @Test + fun refreshSubscriptions() { + val refreshChannel = BroadcastChannel>(100) + runBlocking { + // Listen to all events + val fullReceiver = refreshChannel.openSubscription() + val fullDeferred = async { listen(fullReceiver) } + + refreshChannel.send(true to 1) + refreshChannel.send(false to 2) + refreshChannel.send(true to 3) + + val partialReceiver = refreshChannel.openSubscription() + val partialDeferred = async { transition(partialReceiver) } + refreshChannel.send(false to 4) + refreshChannel.send(true to 5) + refreshChannel.send(false to 6) + refreshChannel.send(true to 7) + refreshChannel.close() + val fullStream = fullDeferred.await() + val partialStream = partialDeferred.await() + + assertEquals( + 7, + fullStream.size, + "Full stream should contain all events" + ) + assertEquals( + listOf(false to 4, true to 5, false to 6), + partialStream, + "Partial stream should include up until first true false pair" + ) + } + } + + /** + * Sanity check to ensure that contexts are being honoured + */ + @Test + fun contextSwitching() { + val mainTag = "main-test" + val mainDispatcher = Executors.newSingleThreadExecutor { r -> + Thread(r, mainTag) + }.asCoroutineDispatcher() + + val channel = BroadcastChannel(100) + + runBlocking(Dispatchers.IO) { + val receiver1 = channel.openSubscription() + val receiver2 = channel.openSubscription() + launch(mainDispatcher) { + for (thread in receiver1) { + assertTrue( + Thread.currentThread().name.startsWith(mainTag), + "Channel should be received in main thread" + ) + assertFalse( + thread.startsWith(mainTag), + "Channel execution should not be in main thread" + ) + } + } + listOf(EmptyCoroutineContext, Dispatchers.IO, Dispatchers.Default, Dispatchers.IO).map { + async(it) { channel.send(Thread.currentThread().name) } + }.joinAll() + channel.close() + assertEquals(4, receiver2.count(), "Not all events received") + } + } + + /** + * Not a true throttle, but for things like fetching header badges, we want to avoid simultaneous fetches. + * As a result, I want to test that the usage of offer along with a rendezvous channel will work as I expect. + * Events should be consumed when there is no pending consumer on previous elements. + */ + @Test + fun throttledChannel() { + val channel = Channel(Channel.RENDEZVOUS) + runBlocking { + val deferred = async { + listen(channel) { + // Throttle consumer + delay(10) + return@listen false + } + } + (0..100).forEach { + channel.offer(it) + delay(1) + } + channel.close() + val received = deferred.await() + assertTrue( + received.size < 20, + "Received data should be throttled; expected that around 1/10th of all events are consumed" + ) + println(received) + } + } + + @Test + fun uniqueOnly() { + val channel = BroadcastChannel(100) + runBlocking { + val fullReceiver = channel.openSubscription() + val uniqueReceiver = channel.openSubscription().uniqueOnly(this) + + val fullDeferred = async { listen(fullReceiver) } + val uniqueDeferred = async { listen(uniqueReceiver) } + + listOf(0, 1, 2, 3, 3, 3, 4, 3, 5, 5, 1).forEach { + channel.offer(it) + } + channel.close() + + val fullData = fullDeferred.await() + val uniqueData = uniqueDeferred.await() + + assertEquals( + listOf(0, 1, 2, 3, 3, 3, 4, 3, 5, 5, 1), + fullData, + "Full receiver should get all channel events" + ) + assertEquals( + listOf(0, 1, 2, 3, 4, 3, 5, 1), + uniqueData, + "Unique receiver should not have two consecutive events that are equal" + ) + + } + } +} \ No newline at end of file diff --git a/app/src/test/kotlin/com/pitchedapps/frost/views/FrostContentViewAsyncTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/views/FrostContentViewAsyncTest.kt deleted file mode 100644 index 13c40aa3..00000000 --- a/app/src/test/kotlin/com/pitchedapps/frost/views/FrostContentViewAsyncTest.kt +++ /dev/null @@ -1,130 +0,0 @@ -package com.pitchedapps.frost.views - -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.async -import kotlinx.coroutines.channels.BroadcastChannel -import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.channels.count -import kotlinx.coroutines.joinAll -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext -import java.util.concurrent.Executors -import kotlin.coroutines.EmptyCoroutineContext -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertFalse -import kotlin.test.assertTrue - -/** - * Collection of tests around the view thread logic - */ -@UseExperimental(ExperimentalCoroutinesApi::class) -class FrostContentViewAsyncTest { - - /** - * Hooks onto the refresh channel for one true -> false cycle. - * Returns the list of event ids that were emitted - */ - private suspend fun transition(channel: ReceiveChannel>): List> { - var refreshed = false - return listen(channel) { (refreshing, _) -> - if (refreshed && !refreshing) - return@listen true - if (refreshing) - refreshed = true - return@listen false - } - } - - private suspend fun listen(channel: ReceiveChannel, shouldEnd: (T) -> Boolean = { false }): List = - withContext(Dispatchers.IO) { - val data = mutableListOf() - for (c in channel) { - data.add(c) - if (shouldEnd(c)) break - } - channel.cancel() - return@withContext data - } - - /** - * When refreshing, we have a temporary subscriber that hooks onto a single cycle. - * The refresh channel only contains booleans, but for the sake of identification, - * each boolean will have a unique integer attached. - * - * Things to note: - * Subscription should be opened outside of async, since we don't want to miss any events. - */ - @Test - fun refreshSubscriptions() { - val refreshChannel = BroadcastChannel>(100) - runBlocking { - // Listen to all events - val fullReceiver = refreshChannel.openSubscription() - val fullDeferred = async { listen(fullReceiver) } - - refreshChannel.send(true to 1) - refreshChannel.send(false to 2) - refreshChannel.send(true to 3) - - val partialReceiver = refreshChannel.openSubscription() - val partialDeferred = async { transition(partialReceiver) } - refreshChannel.send(false to 4) - refreshChannel.send(true to 5) - refreshChannel.send(false to 6) - refreshChannel.send(true to 7) - refreshChannel.close() - val fullStream = fullDeferred.await() - val partialStream = partialDeferred.await() - - assertEquals( - 7, - fullStream.size, - "Full stream should contain all events" - ) - assertEquals( - listOf(false to 4, true to 5, false to 6), - partialStream, - "Partial stream should include up until first true false pair" - ) - } - } - - /** - * Sanity check to ensure that contexts are being honoured - */ - @Test - fun contextSwitching() { - val mainTag = "main-test" - val mainDispatcher = Executors.newSingleThreadExecutor { r -> - Thread(r, mainTag) - }.asCoroutineDispatcher() - - val channel = BroadcastChannel(100) - - runBlocking(Dispatchers.IO) { - val receiver1 = channel.openSubscription() - val receiver2 = channel.openSubscription() - launch(mainDispatcher) { - for (thread in receiver1) { - assertTrue( - Thread.currentThread().name.startsWith(mainTag), - "Channel should be received in main thread" - ) - assertFalse( - thread.startsWith(mainTag), - "Channel execution should not be in main thread" - ) - } - } - listOf(EmptyCoroutineContext, Dispatchers.IO, Dispatchers.Default, Dispatchers.IO).map { - async(it) { channel.send(Thread.currentThread().name) } - }.joinAll() - channel.close() - assertEquals(4, receiver2.count(), "Not all events received") - } - } -} \ No newline at end of file -- cgit v1.2.3