From 3a3096be58bacd9408c10ef5d8add6c32204d4e9 Mon Sep 17 00:00:00 2001 From: Allan Wang Date: Wed, 26 Dec 2018 18:10:04 -0500 Subject: Add new flyweight --- .../frost/facebook/requests/FbRequest.kt | 8 ++ .../kotlin/com/pitchedapps/frost/rx/Flyweight.kt | 138 +++++++++++++++++++++ .../com/pitchedapps/frost/rx/FlyweightTest.kt | 108 ++++++++++++++++ 3 files changed, 254 insertions(+) create mode 100644 app/src/main/kotlin/com/pitchedapps/frost/rx/Flyweight.kt create mode 100644 app/src/test/kotlin/com/pitchedapps/frost/rx/FlyweightTest.kt diff --git a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt index 584107cc..1aa2a1b6 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt @@ -28,12 +28,18 @@ import com.pitchedapps.frost.rx.RxFlyweight import com.pitchedapps.frost.utils.L import io.reactivex.Single import io.reactivex.schedulers.Schedulers +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.selects.select import okhttp3.Call import okhttp3.FormBody import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.logging.HttpLoggingInterceptor import org.apache.commons.text.StringEscapeUtils +import kotlin.coroutines.Continuation +import kotlin.coroutines.suspendCoroutine /** * Created by Allan Wang on 21/12/17. @@ -66,6 +72,8 @@ fun String?.fbRequest(fail: () -> Unit = {}, action: RequestAuth.() -> Unit) { } } +data class FbRequest(val cookie: String, val request: suspend (RequestAuth) -> Unit) + /** * Underlying container for all fb requests */ diff --git a/app/src/main/kotlin/com/pitchedapps/frost/rx/Flyweight.kt b/app/src/main/kotlin/com/pitchedapps/frost/rx/Flyweight.kt new file mode 100644 index 00000000..a9aedb6d --- /dev/null +++ b/app/src/main/kotlin/com/pitchedapps/frost/rx/Flyweight.kt @@ -0,0 +1,138 @@ +package com.pitchedapps.frost.rx + +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.selects.select +import java.util.concurrent.ConcurrentHashMap +import kotlin.coroutines.Continuation +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +/** + * Flyweight to keep track of values so long as they are valid. + * Values that have been fetched within [maxAge] from the time of use will be reused. + * If multiple requests are sent with the same key, then the value should only be fetched once. + * Otherwise, they will be fetched using [fetcher]. + * All requests will stem from the supplied [scope]. + */ +class Flyweight( + val scope: CoroutineScope, + capacity: Int, + val maxAge: Long, + private val fetcher: suspend (K) -> V +) { + + // Receives a key and a pending request + private val actionChannel = Channel>>(capacity) + // Receives a key to invalidate the associated value + private val invalidatorChannel = Channel(capacity) + // Receives a key to fetch the value + private val requesterChannel = Channel(capacity) + // Receives a key and the resulting value + private val receiverChannel = Channel>>(capacity) + + // Keeps track of keys and associated update times + private val conditionMap: MutableMap = mutableMapOf() + // Keeps track of keys and associated values + private val resultMap: MutableMap> = mutableMapOf() + // Keeps track of unfulfilled actions + // Note that the explicit type is very important here. See https://youtrack.jetbrains.net/issue/KT-18053 + private val pendingMap: MutableMap>> = ConcurrentHashMap() + + private val job: Job + + init { + job = scope.launch(Dispatchers.IO) { + launch { + while (isActive) { + select { + /* + * New request received. Continuation should be fulfilled eventually + */ + actionChannel.onReceive { (key, continuation) -> + val lastUpdate = conditionMap[key] + val lastResult = resultMap[key] + // Valid value, retrieved within acceptable time + if (lastResult != null && lastUpdate != null && System.currentTimeMillis() - lastUpdate < maxAge) { + continuation.resumeWith(lastResult) + } else { + val valueRequestPending = key in pendingMap + pendingMap.getOrPut(key) { mutableListOf() }.add(continuation) + if (!valueRequestPending) + requesterChannel.send(key) + } + } + /* + * Invalidator received. Existing result associated with key should not be used. + * Note that any unfulfilled request and future requests should still operate, but with a new value. + */ + invalidatorChannel.onReceive { key -> + if (key !in resultMap) { + // Nothing to invalidate. + // If pending requests exist, they are already in the process of being updated. + return@onReceive + } + conditionMap.remove(key) + resultMap.remove(key) + if (pendingMap[key]?.isNotEmpty() == true) + // Refetch value for pending requests + requesterChannel.send(key) + } + /* + * Value request fulfilled. Should now fulfill pending requests + */ + receiverChannel.onReceive { (key, result) -> + conditionMap[key] = System.currentTimeMillis() + resultMap[key] = result + pendingMap.remove(key)?.forEach { + it.resumeWith(result) + } + } + } + } + } + launch { + /* + * Value request received. Should fetch new value using supplied fetcher + */ + for (key in requesterChannel) { + val result = runCatching { + fetcher(key) + } + receiverChannel.send(key to result) + } + } + } + } + + suspend fun fetch(key: K): V = suspendCoroutine { + if (!job.isActive) it.resumeWithException(IllegalStateException("Flyweight is not active")) + else scope.launch { + actionChannel.send(key to it) + } + } + + suspend fun invalidate(key: K) { + invalidatorChannel.send(key) + } + + fun cancel() { + job.cancel() + if (pendingMap.isNotEmpty()) { + val error = CancellationException("Flyweight cancelled") + pendingMap.values.flatten().forEach { it.resumeWithException(error) } + pendingMap.clear() + } + actionChannel.close() + invalidatorChannel.close() + requesterChannel.close() + receiverChannel.close() + conditionMap.clear() + resultMap.clear() + } +} \ No newline at end of file diff --git a/app/src/test/kotlin/com/pitchedapps/frost/rx/FlyweightTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/rx/FlyweightTest.kt new file mode 100644 index 00000000..834163bd --- /dev/null +++ b/app/src/test/kotlin/com/pitchedapps/frost/rx/FlyweightTest.kt @@ -0,0 +1,108 @@ +package com.pitchedapps.frost.rx + +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.async +import kotlinx.coroutines.runBlocking +import org.junit.Rule +import org.junit.rules.Timeout +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue +import kotlin.test.fail + +class FlyweightTest { + + @get:Rule + val globalTimeout: Timeout = Timeout.seconds(5) + + lateinit var flyweight: Flyweight + + lateinit var callCount: AtomicInteger + + private val LONG_RUNNING_KEY = -78 + + @BeforeTest + fun before() { + callCount = AtomicInteger(0) + flyweight = Flyweight(GlobalScope, 100, 200L) { + callCount.incrementAndGet() + when (it) { + LONG_RUNNING_KEY -> Thread.sleep(100000) + else -> Thread.sleep(100) + } + it * 2 + } + } + + @Test + fun basic() { + assertEquals(2, runBlocking { flyweight.fetch(1) }, "Invalid result") + assertEquals(1, callCount.get(), "1 call expected") + } + + @Test + fun multipleWithOneKey() { + val results: List = runBlocking { + (0..1000).map { + flyweight.scope.async { + flyweight.fetch(1) + } + }.map { it.await() } + } + assertEquals(1, callCount.get(), "1 call expected") + assertEquals(1001, results.size, "Incorrect number of results returned") + assertTrue(results.all { it == 2 }, "Result should all be 2") + } + + @Test + fun consecutiveReuse() { + runBlocking { + flyweight.fetch(1) + assertEquals(1, callCount.get(), "1 call expected") + flyweight.fetch(1) + assertEquals(1, callCount.get(), "Reuse expected") + Thread.sleep(300) + flyweight.fetch(1) + assertEquals(2, callCount.get(), "Refetch expected") + } + } + + @Test + fun invalidate() { + runBlocking { + flyweight.fetch(1) + assertEquals(1, callCount.get(), "1 call expected") + flyweight.invalidate(1) + flyweight.fetch(1) + assertEquals(2, callCount.get(), "New call expected") + } + } + + @Test + fun destroy() { + runBlocking { + val longRunningResult = async { flyweight.fetch(LONG_RUNNING_KEY) } + flyweight.fetch(1) + flyweight.cancel() + try { + flyweight.fetch(1) + fail("Flyweight should not be fulfilled after it is destroyed") + } catch (e: Exception) { + assertEquals("Flyweight is not active", e.message, "Incorrect error found on fetch after destruction") + } + try { + longRunningResult.await() + fail("Flyweight should have cancelled previously running requests") + } catch (e: Exception) { + assertEquals( + "Flyweight cancelled", + e.message, + "Incorrect error found on fetch cancelled by destruction" + ) + } + println("Done") + } + } +} \ No newline at end of file -- cgit v1.2.3