From e8d9dca1ede1295f67e27faf731a5caa1bd2810a Mon Sep 17 00:00:00 2001 From: Allan Wang Date: Tue, 23 Nov 2021 11:58:30 -0800 Subject: Convert login refresh channel to flow and remove some outdated tests --- .../pitchedapps/frost/activities/LoginActivity.kt | 34 ++++++-- .../com/pitchedapps/frost/utils/KotlinUtils.kt | 36 -------- .../pitchedapps/frost/views/FrostContentView.kt | 3 + .../com/pitchedapps/frost/utils/CoroutineTest.kt | 99 ---------------------- 4 files changed, 28 insertions(+), 144 deletions(-) delete mode 100644 app/src/main/kotlin/com/pitchedapps/frost/utils/KotlinUtils.kt diff --git a/app/src/main/kotlin/com/pitchedapps/frost/activities/LoginActivity.kt b/app/src/main/kotlin/com/pitchedapps/frost/activities/LoginActivity.kt index 949f1ddd..a95e931b 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/activities/LoginActivity.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/activities/LoginActivity.kt @@ -45,13 +45,20 @@ import com.pitchedapps.frost.utils.frostEvent import com.pitchedapps.frost.utils.frostJsoup import com.pitchedapps.frost.utils.launchNewTask import com.pitchedapps.frost.utils.logFrostEvent -import com.pitchedapps.frost.utils.uniqueOnly +import com.pitchedapps.frost.web.FrostEmitter import com.pitchedapps.frost.web.LoginWebView +import com.pitchedapps.frost.web.asFrostEmitter import dagger.hilt.android.AndroidEntryPoint import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async -import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext @@ -76,7 +83,15 @@ class LoginActivity : BaseActivity() { private val profile: ImageView by bindView(R.id.profile) private lateinit var profileLoader: RequestManager - private val refreshChannel = Channel(10) + + private val refreshMutableFlow = MutableSharedFlow( + extraBufferCapacity = 10, + onBufferOverflow = BufferOverflow.DROP_OLDEST + ) + + private val refreshFlow: SharedFlow = refreshMutableFlow.asSharedFlow() + + private val refreshEmit: FrostEmitter = refreshMutableFlow.asFrostEmitter() override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) @@ -87,11 +102,12 @@ class LoginActivity : BaseActivity() { toolbar(toolbar) } profileLoader = GlideApp.with(profile) - launch { - for (refreshing in refreshChannel.uniqueOnly(this)) { - swipeRefresh.isRefreshing = refreshing - } - } + + refreshFlow + .distinctUntilChanged() + .onEach { swipeRefresh.isRefreshing = it } + .launchIn(this) + launch { val cookie = web.loadLogin { refresh(it != 100) }.await() L.d { "Login found" } @@ -107,7 +123,7 @@ class LoginActivity : BaseActivity() { } private fun refresh(refreshing: Boolean) { - refreshChannel.offer(refreshing) + refreshEmit(refreshing) } private suspend fun loadInfo(cookie: CookieEntity): Unit = withMainContext { diff --git a/app/src/main/kotlin/com/pitchedapps/frost/utils/KotlinUtils.kt b/app/src/main/kotlin/com/pitchedapps/frost/utils/KotlinUtils.kt deleted file mode 100644 index f4357c9b..00000000 --- a/app/src/main/kotlin/com/pitchedapps/frost/utils/KotlinUtils.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2018 Allan Wang - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package com.pitchedapps.frost.utils - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.channels.produce -import kotlinx.coroutines.isActive - -@UseExperimental(ExperimentalCoroutinesApi::class) -fun ReceiveChannel.uniqueOnly(scope: CoroutineScope): ReceiveChannel = scope.produce { - var previous: T? = null - for (current in this@uniqueOnly) { - if (!scope.isActive) { - cancel() - } else if (previous != current) { - previous = current - send(current) - } - } -} diff --git a/app/src/main/kotlin/com/pitchedapps/frost/views/FrostContentView.kt b/app/src/main/kotlin/com/pitchedapps/frost/views/FrostContentView.kt index f9d04ad1..d2083816 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/views/FrostContentView.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/views/FrostContentView.kt @@ -132,6 +132,9 @@ abstract class FrostContentViewBase( /** * While this can be conflated, there exist situations where we wish to watch refresh cycles. * Here, we'd need to make sure we don't skip events + * + * TODO ensure there is only one flow provider is this is still separated in login + * Use case for shared flow is to avoid emitting before subscribing; buffer can probably be size 1 */ private val refreshMutableFlow = MutableSharedFlow( extraBufferCapacity = 10, diff --git a/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt index 2744d0d8..43bd1563 100644 --- a/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt +++ b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt @@ -26,7 +26,6 @@ import kotlinx.coroutines.async import kotlinx.coroutines.channels.BroadcastChannel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharedFlow @@ -41,7 +40,6 @@ import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import java.util.concurrent.Executors import kotlin.coroutines.EmptyCoroutineContext -import kotlin.test.Ignore import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFalse @@ -162,103 +160,6 @@ class CoroutineTest { } } - /** - * Not a true throttle, but for things like fetching header badges, we want to avoid simultaneous fetches. - * As a result, I want to test that the usage of offer along with a conflated channel will work as I expect. - * Events should be consumed when there is no pending consumer on previous elements. - */ - @Test - @Ignore("Move to flow") - fun throttledChannel() { - val channel = Channel(Channel.CONFLATED) - runBlocking { - val deferred = async { - listen(channel) { - // Throttle consumer - delay(10) - return@listen false - } - } - (0..100).forEach { - channel.offer(it) - delay(1) - } - channel.close() - val received = deferred.await() - assertTrue( - received.size < 20, - "Received data should be throttled; expected that around 1/10th of all events are consumed, but received ${received.size}" - ) - println(received) - } - } - - @Test - fun uniqueOnly() { - val channel = BroadcastChannel(100) - runBlocking { - val fullReceiver = channel.openSubscription() - val uniqueReceiver = channel.openSubscription().uniqueOnly(this) - - val fullDeferred = async { listen(fullReceiver) } - val uniqueDeferred = async { listen(uniqueReceiver) } - - listOf(0, 1, 2, 3, 3, 3, 4, 3, 5, 5, 1).forEach { - channel.offer(it) - } - channel.close() - - val fullData = fullDeferred.await() - val uniqueData = uniqueDeferred.await() - - assertEquals( - listOf(0, 1, 2, 3, 3, 3, 4, 3, 5, 5, 1), - fullData, - "Full receiver should get all channel events" - ) - assertEquals( - listOf(0, 1, 2, 3, 4, 3, 5, 1), - uniqueData, - "Unique receiver should not have two consecutive events that are equal" - ) - } - } - - /** - * When using [uniqueOnly] for channels with limited capacity, - * the duplicates should not count towards the actual capacity - */ - @Ignore("Not yet working as unique only buffered removes the capacity limitation of the channel") - @Test - fun uniqueOnlyBuffer() { - val channel = Channel(3) - runBlocking { - - val deferred = async { - listen(channel.uniqueOnly(GlobalScope)) { - // Throttle consumer - delay(50) - return@listen false - } - } - - listOf(0, 1, 1, 1, 1, 1, 2, 2, 2).forEach { - delay(10) - channel.offer(it) - } - - channel.close() - - val data = deferred.await() - - assertEquals( - listOf(0, 1, 2), - data, - "Unique receiver should not have two consecutive events that are equal" - ) - } - } - class TestException(msg: String) : RuntimeException(msg) @Test -- cgit v1.2.3