diff options
author | Allan Wang <me@allanwang.ca> | 2018-12-26 18:10:04 -0500 |
---|---|---|
committer | Allan Wang <me@allanwang.ca> | 2018-12-26 18:10:04 -0500 |
commit | 3a3096be58bacd9408c10ef5d8add6c32204d4e9 (patch) | |
tree | b51fac240d8dd7017f290f3d62391b51c5acfa39 /app/src/main/kotlin/com | |
parent | c9769223cb014f588d93c1a73da157010e68a1c8 (diff) | |
download | frost-3a3096be58bacd9408c10ef5d8add6c32204d4e9.tar.gz frost-3a3096be58bacd9408c10ef5d8add6c32204d4e9.tar.bz2 frost-3a3096be58bacd9408c10ef5d8add6c32204d4e9.zip |
Add new flyweight
Diffstat (limited to 'app/src/main/kotlin/com')
-rw-r--r-- | app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt | 8 | ||||
-rw-r--r-- | app/src/main/kotlin/com/pitchedapps/frost/rx/Flyweight.kt | 138 |
2 files changed, 146 insertions, 0 deletions
diff --git a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt index 584107cc..1aa2a1b6 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt @@ -28,12 +28,18 @@ import com.pitchedapps.frost.rx.RxFlyweight import com.pitchedapps.frost.utils.L import io.reactivex.Single import io.reactivex.schedulers.Schedulers +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.selects.select import okhttp3.Call import okhttp3.FormBody import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.logging.HttpLoggingInterceptor import org.apache.commons.text.StringEscapeUtils +import kotlin.coroutines.Continuation +import kotlin.coroutines.suspendCoroutine /** * Created by Allan Wang on 21/12/17. @@ -66,6 +72,8 @@ fun String?.fbRequest(fail: () -> Unit = {}, action: RequestAuth.() -> Unit) { } } +data class FbRequest(val cookie: String, val request: suspend (RequestAuth) -> Unit) + /** * Underlying container for all fb requests */ diff --git a/app/src/main/kotlin/com/pitchedapps/frost/rx/Flyweight.kt b/app/src/main/kotlin/com/pitchedapps/frost/rx/Flyweight.kt new file mode 100644 index 00000000..a9aedb6d --- /dev/null +++ b/app/src/main/kotlin/com/pitchedapps/frost/rx/Flyweight.kt @@ -0,0 +1,138 @@ +package com.pitchedapps.frost.rx + +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.selects.select +import java.util.concurrent.ConcurrentHashMap +import kotlin.coroutines.Continuation +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +/** + * Flyweight to keep track of values so long as they are valid. + * Values that have been fetched within [maxAge] from the time of use will be reused. + * If multiple requests are sent with the same key, then the value should only be fetched once. + * Otherwise, they will be fetched using [fetcher]. + * All requests will stem from the supplied [scope]. + */ +class Flyweight<K, V>( + val scope: CoroutineScope, + capacity: Int, + val maxAge: Long, + private val fetcher: suspend (K) -> V +) { + + // Receives a key and a pending request + private val actionChannel = Channel<Pair<K, Continuation<V>>>(capacity) + // Receives a key to invalidate the associated value + private val invalidatorChannel = Channel<K>(capacity) + // Receives a key to fetch the value + private val requesterChannel = Channel<K>(capacity) + // Receives a key and the resulting value + private val receiverChannel = Channel<Pair<K, Result<V>>>(capacity) + + // Keeps track of keys and associated update times + private val conditionMap: MutableMap<K, Long> = mutableMapOf() + // Keeps track of keys and associated values + private val resultMap: MutableMap<K, Result<V>> = mutableMapOf() + // Keeps track of unfulfilled actions + // Note that the explicit type is very important here. See https://youtrack.jetbrains.net/issue/KT-18053 + private val pendingMap: MutableMap<K, MutableList<Continuation<V>>> = ConcurrentHashMap() + + private val job: Job + + init { + job = scope.launch(Dispatchers.IO) { + launch { + while (isActive) { + select<Unit> { + /* + * New request received. Continuation should be fulfilled eventually + */ + actionChannel.onReceive { (key, continuation) -> + val lastUpdate = conditionMap[key] + val lastResult = resultMap[key] + // Valid value, retrieved within acceptable time + if (lastResult != null && lastUpdate != null && System.currentTimeMillis() - lastUpdate < maxAge) { + continuation.resumeWith(lastResult) + } else { + val valueRequestPending = key in pendingMap + pendingMap.getOrPut(key) { mutableListOf() }.add(continuation) + if (!valueRequestPending) + requesterChannel.send(key) + } + } + /* + * Invalidator received. Existing result associated with key should not be used. + * Note that any unfulfilled request and future requests should still operate, but with a new value. + */ + invalidatorChannel.onReceive { key -> + if (key !in resultMap) { + // Nothing to invalidate. + // If pending requests exist, they are already in the process of being updated. + return@onReceive + } + conditionMap.remove(key) + resultMap.remove(key) + if (pendingMap[key]?.isNotEmpty() == true) + // Refetch value for pending requests + requesterChannel.send(key) + } + /* + * Value request fulfilled. Should now fulfill pending requests + */ + receiverChannel.onReceive { (key, result) -> + conditionMap[key] = System.currentTimeMillis() + resultMap[key] = result + pendingMap.remove(key)?.forEach { + it.resumeWith(result) + } + } + } + } + } + launch { + /* + * Value request received. Should fetch new value using supplied fetcher + */ + for (key in requesterChannel) { + val result = runCatching { + fetcher(key) + } + receiverChannel.send(key to result) + } + } + } + } + + suspend fun fetch(key: K): V = suspendCoroutine { + if (!job.isActive) it.resumeWithException(IllegalStateException("Flyweight is not active")) + else scope.launch { + actionChannel.send(key to it) + } + } + + suspend fun invalidate(key: K) { + invalidatorChannel.send(key) + } + + fun cancel() { + job.cancel() + if (pendingMap.isNotEmpty()) { + val error = CancellationException("Flyweight cancelled") + pendingMap.values.flatten().forEach { it.resumeWithException(error) } + pendingMap.clear() + } + actionChannel.close() + invalidatorChannel.close() + requesterChannel.close() + receiverChannel.close() + conditionMap.clear() + resultMap.clear() + } +}
\ No newline at end of file |