From 339ce9db98c1e6dfb1c8d69f81806680b1efa666 Mon Sep 17 00:00:00 2001 From: Allan Wang Date: Fri, 4 Jan 2019 01:56:04 -0500 Subject: Convert global continuations to completable deferred --- .../frost/facebook/requests/FbRequest.kt | 16 ---------- .../pitchedapps/frost/facebook/requests/Images.kt | 2 +- .../frost/fragments/RecyclerFragments.kt | 2 +- .../com/pitchedapps/frost/kotlin/Flyweight.kt | 37 ++++++++++++++-------- .../frost/services/FrostRequestService.kt | 2 +- .../com/pitchedapps/frost/web/LoginWebView.kt | 30 +++++++----------- .../com/pitchedapps/frost/kotlin/FlyweightTest.kt | 20 ++++++------ 7 files changed, 48 insertions(+), 61 deletions(-) diff --git a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt index 53ea6e67..3663f908 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt @@ -44,22 +44,6 @@ val fbAuth = Flyweight(GlobalScope, 100, 3600000 /* an hour it.getAuth() } -/** - * Synchronously fetch [RequestAuth] from cookie - * [action] will only be called if a valid auth is found. - * Otherwise, [fail] will be called - */ -fun String?.fbRequest(fail: () -> Unit = {}, action: RequestAuth.() -> Unit) { - if (this == null) return fail() - try { - val auth = runBlocking { fbAuth.fetch(this@fbRequest) } - auth.action() - } catch (e: Exception) { - L.e { "Failed auth for ${hashCode()}: ${e.message}" } - fail() - } -} - /** * Underlying container for all fb requests */ diff --git a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/Images.kt b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/Images.kt index ee18e15e..36dff6ff 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/Images.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/Images.kt @@ -133,7 +133,7 @@ class HdImageFetcher(private val model: HdImageMaybe) : DataFetcher val result: Result = runCatching { runBlocking { withTimeout(20000L) { - val auth = fbAuth.fetch(model.cookie) + val auth = fbAuth.fetch(model.cookie).await() if (cancelled) throw RuntimeException("Cancelled") val url = auth.getFullSizedImage(model.id).invoke() ?: throw RuntimeException("Null url") if (cancelled) throw RuntimeException("Cancelled") diff --git a/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragments.kt b/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragments.kt index f7ed9937..d9d518b1 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragments.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragments.kt @@ -74,7 +74,7 @@ class MenuFragment : GenericRecyclerFragment>() { override suspend fun reloadImpl(progress: (Int) -> Unit): List? = withContext(Dispatchers.IO) { val cookie = FbCookie.webCookie ?: return@withContext null progress(10) - val auth = fbAuth.fetch(cookie) + val auth = fbAuth.fetch(cookie).await() progress(30) val data = auth.getMenuData().invoke() ?: return@withContext null if (data.data.isEmpty()) return@withContext null diff --git a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt index 7ac80147..a22022de 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/kotlin/Flyweight.kt @@ -17,6 +17,7 @@ package com.pitchedapps.frost.kotlin import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -25,7 +26,6 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.selects.select import java.util.concurrent.ConcurrentHashMap -import kotlin.coroutines.Continuation import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine @@ -44,7 +44,7 @@ class Flyweight( ) { // Receives a key and a pending request - private val actionChannel = Channel>>(capacity) + private val actionChannel = Channel>>(capacity) // Receives a key to invalidate the associated value private val invalidatorChannel = Channel(capacity) // Receives a key to fetch the value @@ -58,10 +58,17 @@ class Flyweight( private val resultMap: MutableMap> = mutableMapOf() // Keeps track of unfulfilled actions // Note that the explicit type is very important here. See https://youtrack.jetbrains.net/issue/KT-18053 - private val pendingMap: MutableMap>> = ConcurrentHashMap() + private val pendingMap: MutableMap>> = ConcurrentHashMap() private val job: Job + private fun CompletableDeferred.completeWith(result: Result) { + if (result.isSuccess) + complete(result.getOrNull()!!) + else + completeExceptionally(result.exceptionOrNull()!!) + } + init { job = scope.launch(Dispatchers.IO) { launch { @@ -70,15 +77,15 @@ class Flyweight( /* * New request received. Continuation should be fulfilled eventually */ - actionChannel.onReceive { (key, continuation) -> + actionChannel.onReceive { (key, completable) -> val lastUpdate = conditionMap[key] val lastResult = resultMap[key] // Valid value, retrieved within acceptable time if (lastResult != null && lastUpdate != null && System.currentTimeMillis() - lastUpdate < maxAge) { - continuation.resumeWith(lastResult) + completable.completeWith(lastResult) } else { val valueRequestPending = key in pendingMap - pendingMap.getOrPut(key) { mutableListOf() }.add(continuation) + pendingMap.getOrPut(key) { mutableListOf() }.add(completable) if (!valueRequestPending) requesterChannel.send(key) } @@ -106,7 +113,7 @@ class Flyweight( conditionMap[key] = System.currentTimeMillis() resultMap[key] = result pendingMap.remove(key)?.forEach { - it.resumeWith(result) + it.completeWith(result) } } } @@ -126,11 +133,15 @@ class Flyweight( } } - suspend fun fetch(key: K): V = suspendCoroutine { - if (!job.isActive) it.resumeWithException(IllegalStateException("Flyweight is not active")) - else scope.launch { - actionChannel.send(key to it) - } + /** + * Queues the request, and returns a completable once it is sent to a channel. + * The fetcher will only be suspended if the channels are full + */ + suspend fun fetch(key: K): CompletableDeferred { + val completable = CompletableDeferred(job) + if (!job.isActive) completable.completeExceptionally(IllegalStateException("Flyweight is not active")) + else actionChannel.send(key to completable) + return completable } suspend fun invalidate(key: K) { @@ -141,7 +152,7 @@ class Flyweight( job.cancel() if (pendingMap.isNotEmpty()) { val error = CancellationException("Flyweight cancelled") - pendingMap.values.flatten().forEach { it.resumeWithException(error) } + pendingMap.values.flatten().forEach { it.completeExceptionally(error) } pendingMap.clear() } actionChannel.close() diff --git a/app/src/main/kotlin/com/pitchedapps/frost/services/FrostRequestService.kt b/app/src/main/kotlin/com/pitchedapps/frost/services/FrostRequestService.kt index c88f3946..a8ecb27d 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/services/FrostRequestService.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/services/FrostRequestService.kt @@ -179,7 +179,7 @@ class FrostRequestService : BaseJobService() { } launch(Dispatchers.IO) { try { - val auth = fbAuth.fetch(cookie) + val auth = fbAuth.fetch(cookie).await() command.invoke(auth, bundle) L.d { "Finished frost service for ${command.name} in ${System.currentTimeMillis() - startTime} ms" diff --git a/app/src/main/kotlin/com/pitchedapps/frost/web/LoginWebView.kt b/app/src/main/kotlin/com/pitchedapps/frost/web/LoginWebView.kt index 8132382a..c21ce93b 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/web/LoginWebView.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/web/LoginWebView.kt @@ -28,7 +28,7 @@ import android.webkit.WebResourceRequest import android.webkit.WebView import ca.allanwang.kau.utils.fadeIn import ca.allanwang.kau.utils.isVisible -import ca.allanwang.kau.utils.withMainContext +import ca.allanwang.kau.utils.launchMain import com.pitchedapps.frost.dbflow.CookieModel import com.pitchedapps.frost.facebook.FB_LOGIN_URL import com.pitchedapps.frost.facebook.FB_USER_MATCHER @@ -40,10 +40,8 @@ import com.pitchedapps.frost.injectors.jsInject import com.pitchedapps.frost.utils.L import com.pitchedapps.frost.utils.Prefs import com.pitchedapps.frost.utils.isFacebookUrl +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlin.coroutines.resume /** * Created by Allan Wang on 2017-05-29. @@ -54,7 +52,7 @@ class LoginWebView @JvmOverloads constructor( defStyleAttr: Int = 0 ) : WebView(context, attrs, defStyleAttr) { - private lateinit var loginCallback: (CookieModel) -> Unit + private val completable: CompletableDeferred = CompletableDeferred() private lateinit var progressCallback: (Int) -> Unit @SuppressLint("SetJavaScriptEnabled") @@ -65,19 +63,15 @@ class LoginWebView @JvmOverloads constructor( webChromeClient = LoginChromeClient() } - suspend fun loadLogin(progressCallback: (Int) -> Unit): CookieModel = withMainContext { - coroutineScope { - suspendCancellableCoroutine { cont -> - this@LoginWebView.progressCallback = progressCallback - this@LoginWebView.loginCallback = { cont.resume(it) } - L.d { "Begin loading login" } - launch { - FbCookie.reset() - setupWebview() - loadUrl(FB_LOGIN_URL) - } - } + suspend fun loadLogin(progressCallback: (Int) -> Unit): CompletableDeferred = coroutineScope { + this@LoginWebView.progressCallback = progressCallback + L.d { "Begin loading login" } + launchMain { + FbCookie.reset() + setupWebview() + loadUrl(FB_LOGIN_URL) } + completable } private inner class LoginClient : BaseWebViewClient() { @@ -86,7 +80,7 @@ class LoginWebView @JvmOverloads constructor( super.onPageFinished(view, url) val cookieModel = checkForLogin(url) if (cookieModel != null) - loginCallback(cookieModel) + completable.complete(cookieModel) if (!view.isVisible) view.fadeIn() } diff --git a/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt index 0eee530e..79f81002 100644 --- a/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt +++ b/app/src/test/kotlin/com/pitchedapps/frost/kotlin/FlyweightTest.kt @@ -54,7 +54,7 @@ class FlyweightTest { @Test fun basic() { - assertEquals(2, runBlocking { flyweight.fetch(1) }, "Invalid result") + assertEquals(2, runBlocking { flyweight.fetch(1).await() }, "Invalid result") assertEquals(1, callCount.get(), "1 call expected") } @@ -62,9 +62,7 @@ class FlyweightTest { fun multipleWithOneKey() { val results: List = runBlocking { (0..1000).map { - flyweight.scope.async { - flyweight.fetch(1) - } + flyweight.fetch(1) }.map { it.await() } } assertEquals(1, callCount.get(), "1 call expected") @@ -75,12 +73,12 @@ class FlyweightTest { @Test fun consecutiveReuse() { runBlocking { - flyweight.fetch(1) + flyweight.fetch(1).await() assertEquals(1, callCount.get(), "1 call expected") - flyweight.fetch(1) + flyweight.fetch(1).await() assertEquals(1, callCount.get(), "Reuse expected") Thread.sleep(300) - flyweight.fetch(1) + flyweight.fetch(1).await() assertEquals(2, callCount.get(), "Refetch expected") } } @@ -88,10 +86,10 @@ class FlyweightTest { @Test fun invalidate() { runBlocking { - flyweight.fetch(1) + flyweight.fetch(1).await() assertEquals(1, callCount.get(), "1 call expected") flyweight.invalidate(1) - flyweight.fetch(1) + flyweight.fetch(1).await() assertEquals(2, callCount.get(), "New call expected") } } @@ -100,10 +98,10 @@ class FlyweightTest { fun destroy() { runBlocking { val longRunningResult = async { flyweight.fetch(LONG_RUNNING_KEY) } - flyweight.fetch(1) + flyweight.fetch(1).await() flyweight.cancel() try { - flyweight.fetch(1) + flyweight.fetch(1).await() fail("Flyweight should not be fulfilled after it is destroyed") } catch (e: Exception) { assertEquals("Flyweight is not active", e.message, "Incorrect error found on fetch after destruction") -- cgit v1.2.3