From 535004b8a28d1b227fa0673f80f6086ca8f4be41 Mon Sep 17 00:00:00 2001 From: Allan Wang Date: Fri, 4 Jan 2019 13:32:58 -0500 Subject: Fix flyweight tests --- .../com/pitchedapps/frost/kotlin/Flyweight.kt | 46 +++++++++++----------- 1 file changed, 23 insertions(+), 23 deletions(-) (limited to 'app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt') diff --git a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt index e5edce24..914ce151 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt @@ -36,19 +36,16 @@ import java.util.concurrent.ConcurrentHashMap */ class Flyweight( val scope: CoroutineScope, - capacity: Int, val maxAge: Long, private val fetcher: suspend (K) -> V ) { // Receives a key and a pending request - private val actionChannel = Channel>>(capacity) + private val actionChannel = Channel>>(Channel.UNLIMITED) // Receives a key to invalidate the associated value - private val invalidatorChannel = Channel(capacity) - // Receives a key to fetch the value - private val requesterChannel = Channel(capacity) + private val invalidatorChannel = Channel(Channel.UNLIMITED) // Receives a key and the resulting value - private val receiverChannel = Channel>>(capacity) + private val receiverChannel = Channel>>(Channel.UNLIMITED) // Keeps track of keys and associated update times private val conditionMap: MutableMap = mutableMapOf() @@ -85,7 +82,7 @@ class Flyweight( val valueRequestPending = key in pendingMap pendingMap.getOrPut(key) { mutableListOf() }.add(completable) if (!valueRequestPending) - requesterChannel.send(key) + fulfill(key) } } /* @@ -102,7 +99,7 @@ class Flyweight( resultMap.remove(key) if (pendingMap[key]?.isNotEmpty() == true) // Refetch value for pending requests - requesterChannel.send(key) + fulfill(key) } /* * Value request fulfilled. Should now fulfill pending requests @@ -117,28 +114,32 @@ class Flyweight( } } } - launch { - /* - * Value request received. Should fetch new value using supplied fetcher - */ - for (key in requesterChannel) { - val result = runCatching { - fetcher(key) - } - receiverChannel.send(key to result) - } + } + } + + /* + * Value request received. Should fetch new value using supplied fetcher + */ + private fun fulfill(key: K) { + scope.launch { + val result = runCatching { + fetcher(key) } + receiverChannel.send(key to result) } } /** * Queues the request, and returns a completable once it is sent to a channel. - * The fetcher will only be suspended if the channels are full + * The fetcher will only be suspended if the channels are full. + * + * Note that if the job is already inactive, a cancellation exception will be thrown. + * The message may default to the message for all completables under a cancelled job */ - suspend fun fetch(key: K): CompletableDeferred { + fun fetch(key: K): CompletableDeferred { val completable = CompletableDeferred(job) - if (!job.isActive) completable.completeExceptionally(IllegalStateException("Flyweight is not active")) - else actionChannel.send(key to completable) + if (!job.isActive) completable.completeExceptionally(CancellationException("Flyweight is not active")) + else actionChannel.offer(key to completable) return completable } @@ -155,7 +156,6 @@ class Flyweight( } actionChannel.close() invalidatorChannel.close() - requesterChannel.close() receiverChannel.close() conditionMap.clear() resultMap.clear() -- cgit v1.2.3