From 149c6be1bfd4bd84381757940fece1be7b9801aa Mon Sep 17 00:00:00 2001 From: Allan Wang Date: Mon, 31 Dec 2018 18:57:28 -0500 Subject: Enhancement/coroutines (#1273) * Convert rest of fbcookie to suspended methods * Replace active checks with yield * Apply spotless * Switch cookie domain to exact url * Optimize imports and enable travis tests again * Update proguard rules * Remove unnecessary yield * Remove unused flyweight * Remove unused disposable and method * Use contexthelper instead of dispatcher main * Convert login activity to coroutines * Use kau helper methods for coroutines * Enhancement/offline site (#1288) * Begin conversion of offline site logic * Fix offline tests and add validation tests * Ignore cookie in jsoup if it is blank * Force load and zip to be in io * Use different zip files to fix tests * Log all test output * Do not log stdout * Allow test skip for fb offline --- .../com/pitchedapps/frost/kotlin/Flyweight.kt | 154 +++++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt (limited to 'app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt') diff --git a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt new file mode 100644 index 00000000..7ac80147 --- /dev/null +++ b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt @@ -0,0 +1,154 @@ +/* + * Copyright 2018 Allan Wang + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package com.pitchedapps.frost.kotlin + +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.selects.select +import java.util.concurrent.ConcurrentHashMap +import kotlin.coroutines.Continuation +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +/** + * Flyweight to keep track of values so long as they are valid. + * Values that have been fetched within [maxAge] from the time of use will be reused. + * If multiple requests are sent with the same key, then the value should only be fetched once. + * Otherwise, they will be fetched using [fetcher]. + * All requests will stem from the supplied [scope]. + */ +class Flyweight( + val scope: CoroutineScope, + capacity: Int, + val maxAge: Long, + private val fetcher: suspend (K) -> V +) { + + // Receives a key and a pending request + private val actionChannel = Channel>>(capacity) + // Receives a key to invalidate the associated value + private val invalidatorChannel = Channel(capacity) + // Receives a key to fetch the value + private val requesterChannel = Channel(capacity) + // Receives a key and the resulting value + private val receiverChannel = Channel>>(capacity) + + // Keeps track of keys and associated update times + private val conditionMap: MutableMap = mutableMapOf() + // Keeps track of keys and associated values + private val resultMap: MutableMap> = mutableMapOf() + // Keeps track of unfulfilled actions + // Note that the explicit type is very important here. See https://youtrack.jetbrains.net/issue/KT-18053 + private val pendingMap: MutableMap>> = ConcurrentHashMap() + + private val job: Job + + init { + job = scope.launch(Dispatchers.IO) { + launch { + while (isActive) { + select { + /* + * New request received. Continuation should be fulfilled eventually + */ + actionChannel.onReceive { (key, continuation) -> + val lastUpdate = conditionMap[key] + val lastResult = resultMap[key] + // Valid value, retrieved within acceptable time + if (lastResult != null && lastUpdate != null && System.currentTimeMillis() - lastUpdate < maxAge) { + continuation.resumeWith(lastResult) + } else { + val valueRequestPending = key in pendingMap + pendingMap.getOrPut(key) { mutableListOf() }.add(continuation) + if (!valueRequestPending) + requesterChannel.send(key) + } + } + /* + * Invalidator received. Existing result associated with key should not be used. + * Note that any unfulfilled request and future requests should still operate, but with a new value. + */ + invalidatorChannel.onReceive { key -> + if (key !in resultMap) { + // Nothing to invalidate. + // If pending requests exist, they are already in the process of being updated. + return@onReceive + } + conditionMap.remove(key) + resultMap.remove(key) + if (pendingMap[key]?.isNotEmpty() == true) + // Refetch value for pending requests + requesterChannel.send(key) + } + /* + * Value request fulfilled. Should now fulfill pending requests + */ + receiverChannel.onReceive { (key, result) -> + conditionMap[key] = System.currentTimeMillis() + resultMap[key] = result + pendingMap.remove(key)?.forEach { + it.resumeWith(result) + } + } + } + } + } + launch { + /* + * Value request received. Should fetch new value using supplied fetcher + */ + for (key in requesterChannel) { + val result = runCatching { + fetcher(key) + } + receiverChannel.send(key to result) + } + } + } + } + + suspend fun fetch(key: K): V = suspendCoroutine { + if (!job.isActive) it.resumeWithException(IllegalStateException("Flyweight is not active")) + else scope.launch { + actionChannel.send(key to it) + } + } + + suspend fun invalidate(key: K) { + invalidatorChannel.send(key) + } + + fun cancel() { + job.cancel() + if (pendingMap.isNotEmpty()) { + val error = CancellationException("Flyweight cancelled") + pendingMap.values.flatten().forEach { it.resumeWithException(error) } + pendingMap.clear() + } + actionChannel.close() + invalidatorChannel.close() + requesterChannel.close() + receiverChannel.close() + conditionMap.clear() + resultMap.clear() + } +} -- cgit v1.2.3