aboutsummaryrefslogtreecommitdiff
path: root/app/src
diff options
context:
space:
mode:
authorAllan Wang <me@allanwang.ca>2018-12-26 18:10:04 -0500
committerAllan Wang <me@allanwang.ca>2018-12-26 18:10:04 -0500
commit3a3096be58bacd9408c10ef5d8add6c32204d4e9 (patch)
treeb51fac240d8dd7017f290f3d62391b51c5acfa39 /app/src
parentc9769223cb014f588d93c1a73da157010e68a1c8 (diff)
downloadfrost-3a3096be58bacd9408c10ef5d8add6c32204d4e9.tar.gz
frost-3a3096be58bacd9408c10ef5d8add6c32204d4e9.tar.bz2
frost-3a3096be58bacd9408c10ef5d8add6c32204d4e9.zip
Add new flyweight
Diffstat (limited to 'app/src')
-rw-r--r--app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt8
-rw-r--r--app/src/main/kotlin/com/pitchedapps/frost/rx/Flyweight.kt138
-rw-r--r--app/src/test/kotlin/com/pitchedapps/frost/rx/FlyweightTest.kt108
3 files changed, 254 insertions, 0 deletions
diff --git a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt
index 584107cc..1aa2a1b6 100644
--- a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt
+++ b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt
@@ -28,12 +28,18 @@ import com.pitchedapps.frost.rx.RxFlyweight
import com.pitchedapps.frost.utils.L
import io.reactivex.Single
import io.reactivex.schedulers.Schedulers
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.selects.select
import okhttp3.Call
import okhttp3.FormBody
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.logging.HttpLoggingInterceptor
import org.apache.commons.text.StringEscapeUtils
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.suspendCoroutine
/**
* Created by Allan Wang on 21/12/17.
@@ -66,6 +72,8 @@ fun String?.fbRequest(fail: () -> Unit = {}, action: RequestAuth.() -> Unit) {
}
}
+data class FbRequest(val cookie: String, val request: suspend (RequestAuth) -> Unit)
+
/**
* Underlying container for all fb requests
*/
diff --git a/app/src/main/kotlin/com/pitchedapps/frost/rx/Flyweight.kt b/app/src/main/kotlin/com/pitchedapps/frost/rx/Flyweight.kt
new file mode 100644
index 00000000..a9aedb6d
--- /dev/null
+++ b/app/src/main/kotlin/com/pitchedapps/frost/rx/Flyweight.kt
@@ -0,0 +1,138 @@
+package com.pitchedapps.frost.rx
+
+import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.isActive
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.selects.select
+import java.util.concurrent.ConcurrentHashMap
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resumeWithException
+import kotlin.coroutines.suspendCoroutine
+
+/**
+ * Flyweight to keep track of values so long as they are valid.
+ * Values that have been fetched within [maxAge] from the time of use will be reused.
+ * If multiple requests are sent with the same key, then the value should only be fetched once.
+ * Otherwise, they will be fetched using [fetcher].
+ * All requests will stem from the supplied [scope].
+ */
+class Flyweight<K, V>(
+ val scope: CoroutineScope,
+ capacity: Int,
+ val maxAge: Long,
+ private val fetcher: suspend (K) -> V
+) {
+
+ // Receives a key and a pending request
+ private val actionChannel = Channel<Pair<K, Continuation<V>>>(capacity)
+ // Receives a key to invalidate the associated value
+ private val invalidatorChannel = Channel<K>(capacity)
+ // Receives a key to fetch the value
+ private val requesterChannel = Channel<K>(capacity)
+ // Receives a key and the resulting value
+ private val receiverChannel = Channel<Pair<K, Result<V>>>(capacity)
+
+ // Keeps track of keys and associated update times
+ private val conditionMap: MutableMap<K, Long> = mutableMapOf()
+ // Keeps track of keys and associated values
+ private val resultMap: MutableMap<K, Result<V>> = mutableMapOf()
+ // Keeps track of unfulfilled actions
+ // Note that the explicit type is very important here. See https://youtrack.jetbrains.net/issue/KT-18053
+ private val pendingMap: MutableMap<K, MutableList<Continuation<V>>> = ConcurrentHashMap()
+
+ private val job: Job
+
+ init {
+ job = scope.launch(Dispatchers.IO) {
+ launch {
+ while (isActive) {
+ select<Unit> {
+ /*
+ * New request received. Continuation should be fulfilled eventually
+ */
+ actionChannel.onReceive { (key, continuation) ->
+ val lastUpdate = conditionMap[key]
+ val lastResult = resultMap[key]
+ // Valid value, retrieved within acceptable time
+ if (lastResult != null && lastUpdate != null && System.currentTimeMillis() - lastUpdate < maxAge) {
+ continuation.resumeWith(lastResult)
+ } else {
+ val valueRequestPending = key in pendingMap
+ pendingMap.getOrPut(key) { mutableListOf() }.add(continuation)
+ if (!valueRequestPending)
+ requesterChannel.send(key)
+ }
+ }
+ /*
+ * Invalidator received. Existing result associated with key should not be used.
+ * Note that any unfulfilled request and future requests should still operate, but with a new value.
+ */
+ invalidatorChannel.onReceive { key ->
+ if (key !in resultMap) {
+ // Nothing to invalidate.
+ // If pending requests exist, they are already in the process of being updated.
+ return@onReceive
+ }
+ conditionMap.remove(key)
+ resultMap.remove(key)
+ if (pendingMap[key]?.isNotEmpty() == true)
+ // Refetch value for pending requests
+ requesterChannel.send(key)
+ }
+ /*
+ * Value request fulfilled. Should now fulfill pending requests
+ */
+ receiverChannel.onReceive { (key, result) ->
+ conditionMap[key] = System.currentTimeMillis()
+ resultMap[key] = result
+ pendingMap.remove(key)?.forEach {
+ it.resumeWith(result)
+ }
+ }
+ }
+ }
+ }
+ launch {
+ /*
+ * Value request received. Should fetch new value using supplied fetcher
+ */
+ for (key in requesterChannel) {
+ val result = runCatching {
+ fetcher(key)
+ }
+ receiverChannel.send(key to result)
+ }
+ }
+ }
+ }
+
+ suspend fun fetch(key: K): V = suspendCoroutine {
+ if (!job.isActive) it.resumeWithException(IllegalStateException("Flyweight is not active"))
+ else scope.launch {
+ actionChannel.send(key to it)
+ }
+ }
+
+ suspend fun invalidate(key: K) {
+ invalidatorChannel.send(key)
+ }
+
+ fun cancel() {
+ job.cancel()
+ if (pendingMap.isNotEmpty()) {
+ val error = CancellationException("Flyweight cancelled")
+ pendingMap.values.flatten().forEach { it.resumeWithException(error) }
+ pendingMap.clear()
+ }
+ actionChannel.close()
+ invalidatorChannel.close()
+ requesterChannel.close()
+ receiverChannel.close()
+ conditionMap.clear()
+ resultMap.clear()
+ }
+} \ No newline at end of file
diff --git a/app/src/test/kotlin/com/pitchedapps/frost/rx/FlyweightTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/rx/FlyweightTest.kt
new file mode 100644
index 00000000..834163bd
--- /dev/null
+++ b/app/src/test/kotlin/com/pitchedapps/frost/rx/FlyweightTest.kt
@@ -0,0 +1,108 @@
+package com.pitchedapps.frost.rx
+
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.async
+import kotlinx.coroutines.runBlocking
+import org.junit.Rule
+import org.junit.rules.Timeout
+import java.util.concurrent.atomic.AtomicInteger
+import kotlin.test.BeforeTest
+import kotlin.test.Test
+import kotlin.test.assertEquals
+import kotlin.test.assertTrue
+import kotlin.test.fail
+
+class FlyweightTest {
+
+ @get:Rule
+ val globalTimeout: Timeout = Timeout.seconds(5)
+
+ lateinit var flyweight: Flyweight<Int, Int>
+
+ lateinit var callCount: AtomicInteger
+
+ private val LONG_RUNNING_KEY = -78
+
+ @BeforeTest
+ fun before() {
+ callCount = AtomicInteger(0)
+ flyweight = Flyweight(GlobalScope, 100, 200L) {
+ callCount.incrementAndGet()
+ when (it) {
+ LONG_RUNNING_KEY -> Thread.sleep(100000)
+ else -> Thread.sleep(100)
+ }
+ it * 2
+ }
+ }
+
+ @Test
+ fun basic() {
+ assertEquals(2, runBlocking { flyweight.fetch(1) }, "Invalid result")
+ assertEquals(1, callCount.get(), "1 call expected")
+ }
+
+ @Test
+ fun multipleWithOneKey() {
+ val results: List<Int> = runBlocking {
+ (0..1000).map {
+ flyweight.scope.async {
+ flyweight.fetch(1)
+ }
+ }.map { it.await() }
+ }
+ assertEquals(1, callCount.get(), "1 call expected")
+ assertEquals(1001, results.size, "Incorrect number of results returned")
+ assertTrue(results.all { it == 2 }, "Result should all be 2")
+ }
+
+ @Test
+ fun consecutiveReuse() {
+ runBlocking {
+ flyweight.fetch(1)
+ assertEquals(1, callCount.get(), "1 call expected")
+ flyweight.fetch(1)
+ assertEquals(1, callCount.get(), "Reuse expected")
+ Thread.sleep(300)
+ flyweight.fetch(1)
+ assertEquals(2, callCount.get(), "Refetch expected")
+ }
+ }
+
+ @Test
+ fun invalidate() {
+ runBlocking {
+ flyweight.fetch(1)
+ assertEquals(1, callCount.get(), "1 call expected")
+ flyweight.invalidate(1)
+ flyweight.fetch(1)
+ assertEquals(2, callCount.get(), "New call expected")
+ }
+ }
+
+ @Test
+ fun destroy() {
+ runBlocking {
+ val longRunningResult = async { flyweight.fetch(LONG_RUNNING_KEY) }
+ flyweight.fetch(1)
+ flyweight.cancel()
+ try {
+ flyweight.fetch(1)
+ fail("Flyweight should not be fulfilled after it is destroyed")
+ } catch (e: Exception) {
+ assertEquals("Flyweight is not active", e.message, "Incorrect error found on fetch after destruction")
+ }
+ try {
+ longRunningResult.await()
+ fail("Flyweight should have cancelled previously running requests")
+ } catch (e: Exception) {
+ assertEquals(
+ "Flyweight cancelled",
+ e.message,
+ "Incorrect error found on fetch cancelled by destruction"
+ )
+ }
+ println("Done")
+ }
+ }
+} \ No newline at end of file