aboutsummaryrefslogtreecommitdiff
path: root/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt
diff options
context:
space:
mode:
authorAllan Wang <me@allanwang.ca>2019-01-04 01:56:04 -0500
committerAllan Wang <me@allanwang.ca>2019-01-04 01:56:04 -0500
commit339ce9db98c1e6dfb1c8d69f81806680b1efa666 (patch)
tree75b3366004ddf08359490d4d4b83d63366c0a68d /app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt
parent8c77e02e89dfec7bff04a397dfc82613ccd1242a (diff)
downloadfrost-339ce9db98c1e6dfb1c8d69f81806680b1efa666.tar.gz
frost-339ce9db98c1e6dfb1c8d69f81806680b1efa666.tar.bz2
frost-339ce9db98c1e6dfb1c8d69f81806680b1efa666.zip
Convert global continuations to completable deferred
Diffstat (limited to 'app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt')
-rw-r--r--app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt37
1 files changed, 24 insertions, 13 deletions
diff --git a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt
index 7ac80147..a22022de 100644
--- a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt
+++ b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt
@@ -17,6 +17,7 @@
package com.pitchedapps.frost.kotlin
import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
@@ -25,7 +26,6 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.select
import java.util.concurrent.ConcurrentHashMap
-import kotlin.coroutines.Continuation
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
@@ -44,7 +44,7 @@ class Flyweight<K, V>(
) {
// Receives a key and a pending request
- private val actionChannel = Channel<Pair<K, Continuation<V>>>(capacity)
+ private val actionChannel = Channel<Pair<K, CompletableDeferred<V>>>(capacity)
// Receives a key to invalidate the associated value
private val invalidatorChannel = Channel<K>(capacity)
// Receives a key to fetch the value
@@ -58,10 +58,17 @@ class Flyweight<K, V>(
private val resultMap: MutableMap<K, Result<V>> = mutableMapOf()
// Keeps track of unfulfilled actions
// Note that the explicit type is very important here. See https://youtrack.jetbrains.net/issue/KT-18053
- private val pendingMap: MutableMap<K, MutableList<Continuation<V>>> = ConcurrentHashMap()
+ private val pendingMap: MutableMap<K, MutableList<CompletableDeferred<V>>> = ConcurrentHashMap()
private val job: Job
+ private fun CompletableDeferred<V>.completeWith(result: Result<V>) {
+ if (result.isSuccess)
+ complete(result.getOrNull()!!)
+ else
+ completeExceptionally(result.exceptionOrNull()!!)
+ }
+
init {
job = scope.launch(Dispatchers.IO) {
launch {
@@ -70,15 +77,15 @@ class Flyweight<K, V>(
/*
* New request received. Continuation should be fulfilled eventually
*/
- actionChannel.onReceive { (key, continuation) ->
+ actionChannel.onReceive { (key, completable) ->
val lastUpdate = conditionMap[key]
val lastResult = resultMap[key]
// Valid value, retrieved within acceptable time
if (lastResult != null && lastUpdate != null && System.currentTimeMillis() - lastUpdate < maxAge) {
- continuation.resumeWith(lastResult)
+ completable.completeWith(lastResult)
} else {
val valueRequestPending = key in pendingMap
- pendingMap.getOrPut(key) { mutableListOf() }.add(continuation)
+ pendingMap.getOrPut(key) { mutableListOf() }.add(completable)
if (!valueRequestPending)
requesterChannel.send(key)
}
@@ -106,7 +113,7 @@ class Flyweight<K, V>(
conditionMap[key] = System.currentTimeMillis()
resultMap[key] = result
pendingMap.remove(key)?.forEach {
- it.resumeWith(result)
+ it.completeWith(result)
}
}
}
@@ -126,11 +133,15 @@ class Flyweight<K, V>(
}
}
- suspend fun fetch(key: K): V = suspendCoroutine {
- if (!job.isActive) it.resumeWithException(IllegalStateException("Flyweight is not active"))
- else scope.launch {
- actionChannel.send(key to it)
- }
+ /**
+ * Queues the request, and returns a completable once it is sent to a channel.
+ * The fetcher will only be suspended if the channels are full
+ */
+ suspend fun fetch(key: K): CompletableDeferred<V> {
+ val completable = CompletableDeferred<V>(job)
+ if (!job.isActive) completable.completeExceptionally(IllegalStateException("Flyweight is not active"))
+ else actionChannel.send(key to completable)
+ return completable
}
suspend fun invalidate(key: K) {
@@ -141,7 +152,7 @@ class Flyweight<K, V>(
job.cancel()
if (pendingMap.isNotEmpty()) {
val error = CancellationException("Flyweight cancelled")
- pendingMap.values.flatten().forEach { it.resumeWithException(error) }
+ pendingMap.values.flatten().forEach { it.completeExceptionally(error) }
pendingMap.clear()
}
actionChannel.close()