aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAllan Wang <me@allanwang.ca>2021-11-23 12:02:11 -0800
committerAllan Wang <me@allanwang.ca>2021-11-23 12:02:11 -0800
commit602290c33ac25f9308f8adfc459033cd50e13497 (patch)
tree423516151e716c035f729f49d03026a65af407c4
parente8d9dca1ede1295f67e27faf731a5caa1bd2810a (diff)
downloadfrost-602290c33ac25f9308f8adfc459033cd50e13497.tar.gz
frost-602290c33ac25f9308f8adfc459033cd50e13497.tar.bz2
frost-602290c33ac25f9308f8adfc459033cd50e13497.zip
Remove broadcast channel and flyweight
-rw-r--r--app/src/main/kotlin/com/pitchedapps/frost/kotlin/CoroutineUtils.kt39
-rw-r--r--app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt174
-rw-r--r--app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt120
-rw-r--r--app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt89
4 files changed, 0 insertions, 422 deletions
diff --git a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/CoroutineUtils.kt b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/CoroutineUtils.kt
deleted file mode 100644
index 6f8a60a9..00000000
--- a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/CoroutineUtils.kt
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright 2019 Allan Wang
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-package com.pitchedapps.frost.kotlin
-
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.channels.BroadcastChannel
-import kotlinx.coroutines.launch
-import kotlin.coroutines.CoroutineContext
-
-@UseExperimental(ExperimentalCoroutinesApi::class)
-fun <T> BroadcastChannel<T>.subscribeDuringJob(
- scope: CoroutineScope,
- context: CoroutineContext,
- onReceive: suspend (T) -> Unit
-) {
- val receiver = openSubscription()
- scope.launch(context) {
- for (r in receiver) {
- onReceive(r)
- }
- }
- scope.coroutineContext[Job]!!.invokeOnCompletion { receiver.cancel() }
-}
diff --git a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt
deleted file mode 100644
index 74765b58..00000000
--- a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Copyright 2018 Allan Wang
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-package com.pitchedapps.frost.kotlin
-
-import com.pitchedapps.frost.utils.L
-import kotlinx.coroutines.CancellationException
-import kotlinx.coroutines.CompletableDeferred
-import kotlinx.coroutines.CoroutineExceptionHandler
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.SupervisorJob
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.isActive
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.selects.select
-import java.util.concurrent.ConcurrentHashMap
-
-/**
- * Flyweight to keep track of values so long as they are valid.
- * Values that have been fetched within [maxAge] from the time of use will be reused.
- * If multiple requests are sent with the same key, then the value should only be fetched once.
- * Otherwise, they will be fetched using [fetcher].
- * All requests will stem from the supplied [scope].
- */
-class Flyweight<K, V>(
- val scope: CoroutineScope,
- val maxAge: Long,
- private val fetcher: suspend (K) -> V
-) {
-
- // Receives a key and a pending request
- private val actionChannel = Channel<Pair<K, CompletableDeferred<V>>>(Channel.UNLIMITED)
-
- // Receives a key to invalidate the associated value
- private val invalidatorChannel = Channel<K>(Channel.UNLIMITED)
-
- // Receives a key and the resulting value
- private val receiverChannel = Channel<Pair<K, Result<V>>>(Channel.UNLIMITED)
-
- // Keeps track of keys and associated update times
- private val conditionMap: MutableMap<K, Long> = mutableMapOf()
-
- // Keeps track of keys and associated values
- private val resultMap: MutableMap<K, Result<V>> = mutableMapOf()
-
- // Keeps track of unfulfilled actions
- // Note that the explicit type is very important here. See https://youtrack.jetbrains.net/issue/KT-18053
- private val pendingMap: MutableMap<K, MutableList<CompletableDeferred<V>>> = ConcurrentHashMap()
-
- private val job: Job
-
- private fun CompletableDeferred<V>.completeWith(result: Result<V>) {
- if (result.isSuccess)
- complete(result.getOrNull()!!)
- else
- completeExceptionally(result.exceptionOrNull()!!)
- }
-
- private val errHandler =
- CoroutineExceptionHandler { _, throwable -> L.d { "FbAuth failed ${throwable.message}" } }
-
- init {
- job =
- scope.launch(Dispatchers.IO + SupervisorJob() + errHandler) {
- launch {
- while (isActive) {
- select<Unit> {
- /*
- * New request received. Continuation should be fulfilled eventually
- */
- actionChannel.onReceive { (key, completable) ->
- val lastUpdate = conditionMap[key]
- val lastResult = resultMap[key]
- // Valid value, retrieved within acceptable time
- if (lastResult != null && lastUpdate != null && System.currentTimeMillis() - lastUpdate < maxAge) {
- completable.completeWith(lastResult)
- } else {
- val valueRequestPending = key in pendingMap
- pendingMap.getOrPut(key) { mutableListOf() }.add(completable)
- if (!valueRequestPending)
- fulfill(key)
- }
- }
- /*
- * Invalidator received. Existing result associated with key should not be used.
- * Note that any unfulfilled request and future requests should still operate, but with a new value.
- */
- invalidatorChannel.onReceive { key ->
- if (key !in resultMap) {
- // Nothing to invalidate.
- // If pending requests exist, they are already in the process of being updated.
- return@onReceive
- }
- conditionMap.remove(key)
- resultMap.remove(key)
- if (pendingMap[key]?.isNotEmpty() == true)
- // Refetch value for pending requests
- fulfill(key)
- }
- /*
- * Value request fulfilled. Should now fulfill pending requests
- */
- receiverChannel.onReceive { (key, result) ->
- conditionMap[key] = System.currentTimeMillis()
- resultMap[key] = result
- pendingMap.remove(key)?.forEach {
- it.completeWith(result)
- }
- }
- }
- }
- }
- }
- }
-
- /*
- * Value request received. Should fetch new value using supplied fetcher
- */
- private fun fulfill(key: K) {
- scope.launch {
- val result = runCatching {
- fetcher(key)
- }
- receiverChannel.send(key to result)
- }
- }
-
- /**
- * Queues the request, and returns a completable once it is sent to a channel.
- * The fetcher will only be suspended if the channels are full.
- *
- * Note that if the job is already inactive, a cancellation exception will be thrown.
- * The message may default to the message for all completables under a cancelled job
- */
- fun fetch(key: K): CompletableDeferred<V> {
- val completable = CompletableDeferred<V>(job)
- if (!job.isActive) completable.completeExceptionally(CancellationException("Flyweight is not active"))
- else actionChannel.offer(key to completable)
- return completable
- }
-
- suspend fun invalidate(key: K) {
- invalidatorChannel.send(key)
- }
-
- fun cancel() {
- job.cancel()
- if (pendingMap.isNotEmpty()) {
- val error = CancellationException("Flyweight cancelled")
- pendingMap.values.flatten().forEach { it.completeExceptionally(error) }
- pendingMap.clear()
- }
- actionChannel.close()
- invalidatorChannel.close()
- receiverChannel.close()
- conditionMap.clear()
- resultMap.clear()
- }
-}
diff --git a/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt
deleted file mode 100644
index 89289322..00000000
--- a/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright 2018 Allan Wang
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-package com.pitchedapps.frost.kotlin
-
-import kotlinx.coroutines.CancellationException
-import kotlinx.coroutines.GlobalScope
-import kotlinx.coroutines.runBlocking
-import org.junit.Rule
-import org.junit.rules.Timeout
-import java.util.concurrent.atomic.AtomicInteger
-import kotlin.test.BeforeTest
-import kotlin.test.Test
-import kotlin.test.assertEquals
-import kotlin.test.assertFalse
-import kotlin.test.assertTrue
-import kotlin.test.fail
-
-class FlyweightTest {
-
- @get:Rule
- val globalTimeout: Timeout = Timeout.seconds(5)
-
- lateinit var flyweight: Flyweight<Int, Int>
-
- lateinit var callCount: AtomicInteger
-
- private val LONG_RUNNING_KEY = -78
-
- @BeforeTest
- fun before() {
- callCount = AtomicInteger(0)
- flyweight = Flyweight(GlobalScope, 200L) {
- callCount.incrementAndGet()
- when (it) {
- LONG_RUNNING_KEY -> Thread.sleep(100000)
- else -> Thread.sleep(100)
- }
- it * 2
- }
- }
-
- @Test
- fun basic() {
- assertEquals(2, runBlocking { flyweight.fetch(1).await() }, "Invalid result")
- assertEquals(1, callCount.get(), "1 call expected")
- }
-
- @Test
- fun multipleWithOneKey() {
- val results: List<Int> = runBlocking {
- (0..1000).map {
- flyweight.fetch(1)
- }.map { it.await() }
- }
- assertEquals(1, callCount.get(), "1 call expected")
- assertEquals(1001, results.size, "Incorrect number of results returned")
- assertTrue(results.all { it == 2 }, "Result should all be 2")
- }
-
- @Test
- fun consecutiveReuse() {
- runBlocking {
- flyweight.fetch(1).await()
- assertEquals(1, callCount.get(), "1 call expected")
- flyweight.fetch(1).await()
- assertEquals(1, callCount.get(), "Reuse expected")
- Thread.sleep(300)
- flyweight.fetch(1).await()
- assertEquals(2, callCount.get(), "Refetch expected")
- }
- }
-
- @Test
- fun invalidate() {
- runBlocking {
- flyweight.fetch(1).await()
- assertEquals(1, callCount.get(), "1 call expected")
- flyweight.invalidate(1)
- flyweight.fetch(1).await()
- assertEquals(2, callCount.get(), "New call expected")
- }
- }
-
- @Test
- fun destroy() {
- runBlocking {
- val longRunningResult = flyweight.fetch(LONG_RUNNING_KEY)
- flyweight.fetch(1).await()
- flyweight.cancel()
- try {
- flyweight.fetch(1).await()
- fail("Flyweight should not be fulfilled after it is destroyed")
- } catch (ignore: CancellationException) {
- }
- try {
- assertFalse(
- longRunningResult.isActive,
- "Long running result should no longer be active"
- )
- longRunningResult.await()
- fail("Flyweight should have cancelled previously running requests")
- } catch (ignore: CancellationException) {
- }
- }
- }
-}
diff --git a/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt
index 43bd1563..551d0b7b 100644
--- a/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt
+++ b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt
@@ -16,15 +16,10 @@
*/
package com.pitchedapps.frost.utils
-import com.pitchedapps.frost.kotlin.Flyweight
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.GlobalScope
-import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
-import kotlinx.coroutines.channels.BroadcastChannel
-import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
@@ -81,49 +76,6 @@ class CoroutineTest {
return@withContext data
}
- /**
- * When refreshing, we have a temporary subscriber that hooks onto a single cycle.
- * The refresh channel only contains booleans, but for the sake of identification,
- * each boolean will have a unique integer attached.
- *
- * Things to note:
- * Subscription should be opened outside of async, since we don't want to miss any events.
- */
- @Test
- fun refreshSubscriptions() {
- val refreshChannel = BroadcastChannel<Pair<Boolean, Int>>(100)
- runBlocking {
- // Listen to all events
- val fullReceiver = refreshChannel.openSubscription()
- val fullDeferred = async { listen(fullReceiver) }
-
- refreshChannel.send(true to 1)
- refreshChannel.send(false to 2)
- refreshChannel.send(true to 3)
-
- val partialReceiver = refreshChannel.openSubscription()
- val partialDeferred = async { transition(partialReceiver) }
- refreshChannel.send(false to 4)
- refreshChannel.send(true to 5)
- refreshChannel.send(false to 6)
- refreshChannel.send(true to 7)
- refreshChannel.close()
- val fullStream = fullDeferred.await()
- val partialStream = partialDeferred.await()
-
- assertEquals(
- 7,
- fullStream.size,
- "Full stream should contain all events"
- )
- assertEquals(
- listOf(false to 4, true to 5, false to 6),
- partialStream,
- "Partial stream should include up until first true false pair"
- )
- }
- }
-
private fun <T : Any> SharedFlow<T?>.takeUntilNull(): Flow<T> =
takeWhile { it != null }.filterNotNull()
@@ -159,45 +111,4 @@ class CoroutineTest {
assertEquals(4, count, "Not all events received")
}
}
-
- class TestException(msg: String) : RuntimeException(msg)
-
- @Test
- fun exceptionChecks() {
- val mainTag = "main-test"
- val mainDispatcher = Executors.newSingleThreadExecutor { r ->
- Thread(r, mainTag)
- }.asCoroutineDispatcher()
- val channel = Channel<Int>()
-
- val job = SupervisorJob()
-
- val flyweight = Flyweight<Int, Int>(GlobalScope, 200L) {
- throw TestException("Flyweight exception")
- }
-
- suspend fun crash(): Boolean = withContext(Dispatchers.IO) {
- try {
- withContext(Dispatchers.Default) {
- flyweight.fetch(0).await()
- }
- true
- } catch (e: TestException) {
- false
- }
- }
-
- runBlocking(mainDispatcher + job) {
- launch {
- val i = channel.receive()
- println("Received $i")
- }
- launch {
- println("A")
- println(crash())
- println("B")
- channel.offer(1)
- }
- }
- }
}