diff options
Diffstat (limited to 'app/src')
3 files changed, 31 insertions, 35 deletions
diff --git a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt index 16ddb7e1..67a03ad4 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt @@ -40,7 +40,7 @@ import java.lang.Exception /** * Created by Allan Wang on 21/12/17. */ -val fbAuth = Flyweight<String, RequestAuth>(GlobalScope, 100, 3600000 /* an hour */) { +val fbAuth = Flyweight<String, RequestAuth>(GlobalScope, 3600000 /* an hour */) { it.getAuth() } diff --git a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt index e5edce24..914ce151 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt @@ -36,19 +36,16 @@ import java.util.concurrent.ConcurrentHashMap */ class Flyweight<K, V>( val scope: CoroutineScope, - capacity: Int, val maxAge: Long, private val fetcher: suspend (K) -> V ) { // Receives a key and a pending request - private val actionChannel = Channel<Pair<K, CompletableDeferred<V>>>(capacity) + private val actionChannel = Channel<Pair<K, CompletableDeferred<V>>>(Channel.UNLIMITED) // Receives a key to invalidate the associated value - private val invalidatorChannel = Channel<K>(capacity) - // Receives a key to fetch the value - private val requesterChannel = Channel<K>(capacity) + private val invalidatorChannel = Channel<K>(Channel.UNLIMITED) // Receives a key and the resulting value - private val receiverChannel = Channel<Pair<K, Result<V>>>(capacity) + private val receiverChannel = Channel<Pair<K, Result<V>>>(Channel.UNLIMITED) // Keeps track of keys and associated update times private val conditionMap: MutableMap<K, Long> = mutableMapOf() @@ -85,7 +82,7 @@ class Flyweight<K, V>( val valueRequestPending = key in pendingMap pendingMap.getOrPut(key) { mutableListOf() }.add(completable) if (!valueRequestPending) - requesterChannel.send(key) + fulfill(key) } } /* @@ -102,7 +99,7 @@ class Flyweight<K, V>( resultMap.remove(key) if (pendingMap[key]?.isNotEmpty() == true) // Refetch value for pending requests - requesterChannel.send(key) + fulfill(key) } /* * Value request fulfilled. Should now fulfill pending requests @@ -117,28 +114,32 @@ class Flyweight<K, V>( } } } - launch { - /* - * Value request received. Should fetch new value using supplied fetcher - */ - for (key in requesterChannel) { - val result = runCatching { - fetcher(key) - } - receiverChannel.send(key to result) - } + } + } + + /* + * Value request received. Should fetch new value using supplied fetcher + */ + private fun fulfill(key: K) { + scope.launch { + val result = runCatching { + fetcher(key) } + receiverChannel.send(key to result) } } /** * Queues the request, and returns a completable once it is sent to a channel. - * The fetcher will only be suspended if the channels are full + * The fetcher will only be suspended if the channels are full. + * + * Note that if the job is already inactive, a cancellation exception will be thrown. + * The message may default to the message for all completables under a cancelled job */ - suspend fun fetch(key: K): CompletableDeferred<V> { + fun fetch(key: K): CompletableDeferred<V> { val completable = CompletableDeferred<V>(job) - if (!job.isActive) completable.completeExceptionally(IllegalStateException("Flyweight is not active")) - else actionChannel.send(key to completable) + if (!job.isActive) completable.completeExceptionally(CancellationException("Flyweight is not active")) + else actionChannel.offer(key to completable) return completable } @@ -155,7 +156,6 @@ class Flyweight<K, V>( } actionChannel.close() invalidatorChannel.close() - requesterChannel.close() receiverChannel.close() conditionMap.clear() resultMap.clear() diff --git a/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt index 79f81002..d1d976b6 100644 --- a/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt +++ b/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt @@ -16,8 +16,8 @@ */ package com.pitchedapps.frost.kotlin +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.async import kotlinx.coroutines.runBlocking import org.junit.Rule import org.junit.rules.Timeout @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.test.assertFalse import kotlin.test.assertTrue import kotlin.test.fail @@ -42,7 +43,7 @@ class FlyweightTest { @BeforeTest fun before() { callCount = AtomicInteger(0) - flyweight = Flyweight(GlobalScope, 100, 200L) { + flyweight = Flyweight(GlobalScope, 200L) { callCount.incrementAndGet() when (it) { LONG_RUNNING_KEY -> Thread.sleep(100000) @@ -97,24 +98,19 @@ class FlyweightTest { @Test fun destroy() { runBlocking { - val longRunningResult = async { flyweight.fetch(LONG_RUNNING_KEY) } + val longRunningResult = flyweight.fetch(LONG_RUNNING_KEY) flyweight.fetch(1).await() flyweight.cancel() try { flyweight.fetch(1).await() fail("Flyweight should not be fulfilled after it is destroyed") - } catch (e: Exception) { - assertEquals("Flyweight is not active", e.message, "Incorrect error found on fetch after destruction") + } catch (ignore: CancellationException) { } try { + assertFalse(longRunningResult.isActive, "Long running result should no longer be active") longRunningResult.await() fail("Flyweight should have cancelled previously running requests") - } catch (e: Exception) { - assertEquals( - "Flyweight cancelled", - e.message, - "Incorrect error found on fetch cancelled by destruction" - ) + } catch (ignore: CancellationException) { } } } |