aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt2
-rw-r--r--app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt46
-rw-r--r--app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt18
3 files changed, 31 insertions, 35 deletions
diff --git a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt
index 16ddb7e1..67a03ad4 100644
--- a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt
+++ b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt
@@ -40,7 +40,7 @@ import java.lang.Exception
/**
* Created by Allan Wang on 21/12/17.
*/
-val fbAuth = Flyweight<String, RequestAuth>(GlobalScope, 100, 3600000 /* an hour */) {
+val fbAuth = Flyweight<String, RequestAuth>(GlobalScope, 3600000 /* an hour */) {
it.getAuth()
}
diff --git a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt
index e5edce24..914ce151 100644
--- a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt
+++ b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt
@@ -36,19 +36,16 @@ import java.util.concurrent.ConcurrentHashMap
*/
class Flyweight<K, V>(
val scope: CoroutineScope,
- capacity: Int,
val maxAge: Long,
private val fetcher: suspend (K) -> V
) {
// Receives a key and a pending request
- private val actionChannel = Channel<Pair<K, CompletableDeferred<V>>>(capacity)
+ private val actionChannel = Channel<Pair<K, CompletableDeferred<V>>>(Channel.UNLIMITED)
// Receives a key to invalidate the associated value
- private val invalidatorChannel = Channel<K>(capacity)
- // Receives a key to fetch the value
- private val requesterChannel = Channel<K>(capacity)
+ private val invalidatorChannel = Channel<K>(Channel.UNLIMITED)
// Receives a key and the resulting value
- private val receiverChannel = Channel<Pair<K, Result<V>>>(capacity)
+ private val receiverChannel = Channel<Pair<K, Result<V>>>(Channel.UNLIMITED)
// Keeps track of keys and associated update times
private val conditionMap: MutableMap<K, Long> = mutableMapOf()
@@ -85,7 +82,7 @@ class Flyweight<K, V>(
val valueRequestPending = key in pendingMap
pendingMap.getOrPut(key) { mutableListOf() }.add(completable)
if (!valueRequestPending)
- requesterChannel.send(key)
+ fulfill(key)
}
}
/*
@@ -102,7 +99,7 @@ class Flyweight<K, V>(
resultMap.remove(key)
if (pendingMap[key]?.isNotEmpty() == true)
// Refetch value for pending requests
- requesterChannel.send(key)
+ fulfill(key)
}
/*
* Value request fulfilled. Should now fulfill pending requests
@@ -117,28 +114,32 @@ class Flyweight<K, V>(
}
}
}
- launch {
- /*
- * Value request received. Should fetch new value using supplied fetcher
- */
- for (key in requesterChannel) {
- val result = runCatching {
- fetcher(key)
- }
- receiverChannel.send(key to result)
- }
+ }
+ }
+
+ /*
+ * Value request received. Should fetch new value using supplied fetcher
+ */
+ private fun fulfill(key: K) {
+ scope.launch {
+ val result = runCatching {
+ fetcher(key)
}
+ receiverChannel.send(key to result)
}
}
/**
* Queues the request, and returns a completable once it is sent to a channel.
- * The fetcher will only be suspended if the channels are full
+ * The fetcher will only be suspended if the channels are full.
+ *
+ * Note that if the job is already inactive, a cancellation exception will be thrown.
+ * The message may default to the message for all completables under a cancelled job
*/
- suspend fun fetch(key: K): CompletableDeferred<V> {
+ fun fetch(key: K): CompletableDeferred<V> {
val completable = CompletableDeferred<V>(job)
- if (!job.isActive) completable.completeExceptionally(IllegalStateException("Flyweight is not active"))
- else actionChannel.send(key to completable)
+ if (!job.isActive) completable.completeExceptionally(CancellationException("Flyweight is not active"))
+ else actionChannel.offer(key to completable)
return completable
}
@@ -155,7 +156,6 @@ class Flyweight<K, V>(
}
actionChannel.close()
invalidatorChannel.close()
- requesterChannel.close()
receiverChannel.close()
conditionMap.clear()
resultMap.clear()
diff --git a/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt
index 79f81002..d1d976b6 100644
--- a/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt
+++ b/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt
@@ -16,8 +16,8 @@
*/
package com.pitchedapps.frost.kotlin
+import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.GlobalScope
-import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import org.junit.Rule
import org.junit.rules.Timeout
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
+import kotlin.test.assertFalse
import kotlin.test.assertTrue
import kotlin.test.fail
@@ -42,7 +43,7 @@ class FlyweightTest {
@BeforeTest
fun before() {
callCount = AtomicInteger(0)
- flyweight = Flyweight(GlobalScope, 100, 200L) {
+ flyweight = Flyweight(GlobalScope, 200L) {
callCount.incrementAndGet()
when (it) {
LONG_RUNNING_KEY -> Thread.sleep(100000)
@@ -97,24 +98,19 @@ class FlyweightTest {
@Test
fun destroy() {
runBlocking {
- val longRunningResult = async { flyweight.fetch(LONG_RUNNING_KEY) }
+ val longRunningResult = flyweight.fetch(LONG_RUNNING_KEY)
flyweight.fetch(1).await()
flyweight.cancel()
try {
flyweight.fetch(1).await()
fail("Flyweight should not be fulfilled after it is destroyed")
- } catch (e: Exception) {
- assertEquals("Flyweight is not active", e.message, "Incorrect error found on fetch after destruction")
+ } catch (ignore: CancellationException) {
}
try {
+ assertFalse(longRunningResult.isActive, "Long running result should no longer be active")
longRunningResult.await()
fail("Flyweight should have cancelled previously running requests")
- } catch (e: Exception) {
- assertEquals(
- "Flyweight cancelled",
- e.message,
- "Incorrect error found on fetch cancelled by destruction"
- )
+ } catch (ignore: CancellationException) {
}
}
}