package com.pitchedapps.frost.rx import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.selects.select import java.util.concurrent.ConcurrentHashMap import kotlin.coroutines.Continuation import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine /** * Flyweight to keep track of values so long as they are valid. * Values that have been fetched within [maxAge] from the time of use will be reused. * If multiple requests are sent with the same key, then the value should only be fetched once. * Otherwise, they will be fetched using [fetcher]. * All requests will stem from the supplied [scope]. */ class Flyweight( val scope: CoroutineScope, capacity: Int, val maxAge: Long, private val fetcher: suspend (K) -> V ) { // Receives a key and a pending request private val actionChannel = Channel>>(capacity) // Receives a key to invalidate the associated value private val invalidatorChannel = Channel(capacity) // Receives a key to fetch the value private val requesterChannel = Channel(capacity) // Receives a key and the resulting value private val receiverChannel = Channel>>(capacity) // Keeps track of keys and associated update times private val conditionMap: MutableMap = mutableMapOf() // Keeps track of keys and associated values private val resultMap: MutableMap> = mutableMapOf() // Keeps track of unfulfilled actions // Note that the explicit type is very important here. See https://youtrack.jetbrains.net/issue/KT-18053 private val pendingMap: MutableMap>> = ConcurrentHashMap() private val job: Job init { job = scope.launch(Dispatchers.IO) { launch { while (isActive) { select { /* * New request received. Continuation should be fulfilled eventually */ actionChannel.onReceive { (key, continuation) -> val lastUpdate = conditionMap[key] val lastResult = resultMap[key] // Valid value, retrieved within acceptable time if (lastResult != null && lastUpdate != null && System.currentTimeMillis() - lastUpdate < maxAge) { continuation.resumeWith(lastResult) } else { val valueRequestPending = key in pendingMap pendingMap.getOrPut(key) { mutableListOf() }.add(continuation) if (!valueRequestPending) requesterChannel.send(key) } } /* * Invalidator received. Existing result associated with key should not be used. * Note that any unfulfilled request and future requests should still operate, but with a new value. */ invalidatorChannel.onReceive { key -> if (key !in resultMap) { // Nothing to invalidate. // If pending requests exist, they are already in the process of being updated. return@onReceive } conditionMap.remove(key) resultMap.remove(key) if (pendingMap[key]?.isNotEmpty() == true) // Refetch value for pending requests requesterChannel.send(key) } /* * Value request fulfilled. Should now fulfill pending requests */ receiverChannel.onReceive { (key, result) -> conditionMap[key] = System.currentTimeMillis() resultMap[key] = result pendingMap.remove(key)?.forEach { it.resumeWith(result) } } } } } launch { /* * Value request received. Should fetch new value using supplied fetcher */ for (key in requesterChannel) { val result = runCatching { fetcher(key) } receiverChannel.send(key to result) } } } } suspend fun fetch(key: K): V = suspendCoroutine { if (!job.isActive) it.resumeWithException(IllegalStateException("Flyweight is not active")) else scope.launch { actionChannel.send(key to it) } } suspend fun invalidate(key: K) { invalidatorChannel.send(key) } fun cancel() { job.cancel() if (pendingMap.isNotEmpty()) { val error = CancellationException("Flyweight cancelled") pendingMap.values.flatten().forEach { it.resumeWithException(error) } pendingMap.clear() } actionChannel.close() invalidatorChannel.close() requesterChannel.close() receiverChannel.close() conditionMap.clear() resultMap.clear() } }