diff options
author | Allan Wang <me@allanwang.ca> | 2019-01-04 01:56:04 -0500 |
---|---|---|
committer | Allan Wang <me@allanwang.ca> | 2019-01-04 01:56:04 -0500 |
commit | 339ce9db98c1e6dfb1c8d69f81806680b1efa666 (patch) | |
tree | 75b3366004ddf08359490d4d4b83d63366c0a68d /app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt | |
parent | 8c77e02e89dfec7bff04a397dfc82613ccd1242a (diff) | |
download | frost-339ce9db98c1e6dfb1c8d69f81806680b1efa666.tar.gz frost-339ce9db98c1e6dfb1c8d69f81806680b1efa666.tar.bz2 frost-339ce9db98c1e6dfb1c8d69f81806680b1efa666.zip |
Convert global continuations to completable deferred
Diffstat (limited to 'app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt')
-rw-r--r-- | app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt | 37 |
1 files changed, 24 insertions, 13 deletions
diff --git a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt index 7ac80147..a22022de 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt @@ -17,6 +17,7 @@ package com.pitchedapps.frost.kotlin import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -25,7 +26,6 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.selects.select import java.util.concurrent.ConcurrentHashMap -import kotlin.coroutines.Continuation import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine @@ -44,7 +44,7 @@ class Flyweight<K, V>( ) { // Receives a key and a pending request - private val actionChannel = Channel<Pair<K, Continuation<V>>>(capacity) + private val actionChannel = Channel<Pair<K, CompletableDeferred<V>>>(capacity) // Receives a key to invalidate the associated value private val invalidatorChannel = Channel<K>(capacity) // Receives a key to fetch the value @@ -58,10 +58,17 @@ class Flyweight<K, V>( private val resultMap: MutableMap<K, Result<V>> = mutableMapOf() // Keeps track of unfulfilled actions // Note that the explicit type is very important here. See https://youtrack.jetbrains.net/issue/KT-18053 - private val pendingMap: MutableMap<K, MutableList<Continuation<V>>> = ConcurrentHashMap() + private val pendingMap: MutableMap<K, MutableList<CompletableDeferred<V>>> = ConcurrentHashMap() private val job: Job + private fun CompletableDeferred<V>.completeWith(result: Result<V>) { + if (result.isSuccess) + complete(result.getOrNull()!!) + else + completeExceptionally(result.exceptionOrNull()!!) + } + init { job = scope.launch(Dispatchers.IO) { launch { @@ -70,15 +77,15 @@ class Flyweight<K, V>( /* * New request received. Continuation should be fulfilled eventually */ - actionChannel.onReceive { (key, continuation) -> + actionChannel.onReceive { (key, completable) -> val lastUpdate = conditionMap[key] val lastResult = resultMap[key] // Valid value, retrieved within acceptable time if (lastResult != null && lastUpdate != null && System.currentTimeMillis() - lastUpdate < maxAge) { - continuation.resumeWith(lastResult) + completable.completeWith(lastResult) } else { val valueRequestPending = key in pendingMap - pendingMap.getOrPut(key) { mutableListOf() }.add(continuation) + pendingMap.getOrPut(key) { mutableListOf() }.add(completable) if (!valueRequestPending) requesterChannel.send(key) } @@ -106,7 +113,7 @@ class Flyweight<K, V>( conditionMap[key] = System.currentTimeMillis() resultMap[key] = result pendingMap.remove(key)?.forEach { - it.resumeWith(result) + it.completeWith(result) } } } @@ -126,11 +133,15 @@ class Flyweight<K, V>( } } - suspend fun fetch(key: K): V = suspendCoroutine { - if (!job.isActive) it.resumeWithException(IllegalStateException("Flyweight is not active")) - else scope.launch { - actionChannel.send(key to it) - } + /** + * Queues the request, and returns a completable once it is sent to a channel. + * The fetcher will only be suspended if the channels are full + */ + suspend fun fetch(key: K): CompletableDeferred<V> { + val completable = CompletableDeferred<V>(job) + if (!job.isActive) completable.completeExceptionally(IllegalStateException("Flyweight is not active")) + else actionChannel.send(key to completable) + return completable } suspend fun invalidate(key: K) { @@ -141,7 +152,7 @@ class Flyweight<K, V>( job.cancel() if (pendingMap.isNotEmpty()) { val error = CancellationException("Flyweight cancelled") - pendingMap.values.flatten().forEach { it.resumeWithException(error) } + pendingMap.values.flatten().forEach { it.completeExceptionally(error) } pendingMap.clear() } actionChannel.close() |