diff options
author | Allan Wang <me@allanwang.ca> | 2018-12-28 21:45:46 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-12-28 21:45:46 -0500 |
commit | 9c3d7c8b6cca17dc10fc310d41e547d1fe1725ea (patch) | |
tree | 8e6202efb768d954145038cb8642453c62650c5e | |
parent | c9769223cb014f588d93c1a73da157010e68a1c8 (diff) | |
parent | 8c4db7d79d4f9557d0eef2ef707663c5e8a7aac6 (diff) | |
download | frost-9c3d7c8b6cca17dc10fc310d41e547d1fe1725ea.tar.gz frost-9c3d7c8b6cca17dc10fc310d41e547d1fe1725ea.tar.bz2 frost-9c3d7c8b6cca17dc10fc310d41e547d1fe1725ea.zip |
Merge pull request #1269 from AllanWang/enhancement/coroutine-auth
Enhancement/coroutine
27 files changed, 809 insertions, 360 deletions
diff --git a/app/build.gradle b/app/build.gradle index c7d78537..84d2d694 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -100,6 +100,8 @@ android { resValue "string", "frost_name", "Frost Debug" resValue "string", "frost_web", "Frost Web Debug" ext.enableBugsnag = false + + kotlinOptions.freeCompilerArgs += ["-Xuse-experimental=kotlin.Experimental", "-XXLanguage:+InlineClasses"] } releaseTest { minifyEnabled true @@ -141,6 +143,7 @@ android { includeAndroidResources = true } } + } repositories { diff --git a/app/src/main/kotlin/com/pitchedapps/frost/activities/AboutActivity.kt b/app/src/main/kotlin/com/pitchedapps/frost/activities/AboutActivity.kt index a110071c..283477d7 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/activities/AboutActivity.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/activities/AboutActivity.kt @@ -176,7 +176,8 @@ class AboutActivity : AboutActivityBase(null, { } val set = ConstraintSet() set.clone(container) - set.createHorizontalChain(ConstraintSet.PARENT_ID, + set.createHorizontalChain( + ConstraintSet.PARENT_ID, ConstraintSet.LEFT, ConstraintSet.PARENT_ID, ConstraintSet.RIGHT, diff --git a/app/src/main/kotlin/com/pitchedapps/frost/activities/BaseMainActivity.kt b/app/src/main/kotlin/com/pitchedapps/frost/activities/BaseMainActivity.kt index 20b5727f..7f69cc27 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/activities/BaseMainActivity.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/activities/BaseMainActivity.kt @@ -31,7 +31,6 @@ import android.webkit.WebChromeClient import android.webkit.WebView import android.widget.FrameLayout import androidx.annotation.StringRes -import androidx.appcompat.widget.Toolbar import androidx.coordinatorlayout.widget.CoordinatorLayout import androidx.fragment.app.Fragment import androidx.fragment.app.FragmentPagerAdapter @@ -58,7 +57,6 @@ import co.zsmb.materialdrawerkt.draweritems.divider import co.zsmb.materialdrawerkt.draweritems.profile.profile import co.zsmb.materialdrawerkt.draweritems.profile.profileSetting import com.google.android.material.appbar.AppBarLayout -import com.google.android.material.floatingactionbutton.FloatingActionButton import com.google.android.material.tabs.TabLayout import com.mikepenz.google_material_typeface_library.GoogleMaterial import com.mikepenz.iconics.IconicsDrawable @@ -105,24 +103,33 @@ import com.pitchedapps.frost.utils.setFrostColors import com.pitchedapps.frost.views.BadgedIcon import com.pitchedapps.frost.views.FrostVideoViewer import com.pitchedapps.frost.views.FrostViewPager +import kotlinx.android.synthetic.main.activity_frame_wrapper.* +import kotlinx.android.synthetic.main.view_main_fab.* +import kotlinx.android.synthetic.main.view_main_toolbar.* +import kotlinx.android.synthetic.main.view_main_viewpager.* +import kotlinx.coroutines.ExperimentalCoroutinesApi /** * Created by Allan Wang on 20/12/17. * * Most of the logic that is unrelated to handling fragments */ +@UseExperimental(ExperimentalCoroutinesApi::class) abstract class BaseMainActivity : BaseActivity(), MainActivityContract, FileChooserContract by FileChooserDelegate(), VideoViewHolder, SearchViewHolder { protected lateinit var adapter: SectionsPagerAdapter - override val frameWrapper: FrameLayout by bindView(R.id.frame_wrapper) - val toolbar: Toolbar by bindView(R.id.toolbar) - val viewPager: FrostViewPager by bindView(R.id.container) - val fab: FloatingActionButton by bindView(R.id.fab) + override val frameWrapper: FrameLayout get() = frame_wrapper + val viewPager: FrostViewPager get() = container + + /* + * Components with the same id in multiple layout files + */ val tabs: TabLayout by bindView(R.id.tabs) val appBar: AppBarLayout by bindView(R.id.appbar) val coordinator: CoordinatorLayout by bindView(R.id.main_content) + override var videoViewer: FrostVideoViewer? = null private lateinit var drawer: Drawer private lateinit var drawerHeader: AccountHeader @@ -341,7 +348,7 @@ abstract class BaseMainActivity : BaseActivity(), MainActivityContract, private fun refreshAll() { L.d { "Refresh all" } - fragmentSubject.onNext(REQUEST_REFRESH) + fragmentChannel.offer(REQUEST_REFRESH) } override fun onCreateOptionsMenu(menu: Menu): Boolean { @@ -424,9 +431,9 @@ abstract class BaseMainActivity : BaseActivity(), MainActivityContract, /* * These results can be stacked */ - if (resultCode and REQUEST_REFRESH > 0) fragmentSubject.onNext(REQUEST_REFRESH) + if (resultCode and REQUEST_REFRESH > 0) fragmentChannel.offer(REQUEST_REFRESH) if (resultCode and REQUEST_NAV > 0) frostNavigationBar() - if (resultCode and REQUEST_TEXT_ZOOM > 0) fragmentSubject.onNext(REQUEST_TEXT_ZOOM) + if (resultCode and REQUEST_TEXT_ZOOM > 0) fragmentChannel.offer(REQUEST_TEXT_ZOOM) if (resultCode and REQUEST_SEARCH > 0) invalidateOptionsMenu() } } @@ -465,6 +472,8 @@ abstract class BaseMainActivity : BaseActivity(), MainActivityContract, } override fun onDestroy() { + fragmentChannel.close() + headerBadgeChannel.close() controlWebview?.destroy() super.onDestroy() } diff --git a/app/src/main/kotlin/com/pitchedapps/frost/activities/MainActivity.kt b/app/src/main/kotlin/com/pitchedapps/frost/activities/MainActivity.kt index d03c6496..e5eb1907 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/activities/MainActivity.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/activities/MainActivity.kt @@ -20,18 +20,23 @@ import android.os.Bundle import androidx.viewpager.widget.ViewPager import com.google.android.material.tabs.TabLayout import com.pitchedapps.frost.facebook.FbItem +import com.pitchedapps.frost.utils.L import com.pitchedapps.frost.views.BadgedIcon -import io.reactivex.android.schedulers.AndroidSchedulers -import io.reactivex.schedulers.Schedulers import io.reactivex.subjects.PublishSubject +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.BroadcastChannel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import org.jsoup.Jsoup -import java.util.concurrent.TimeUnit +@UseExperimental(ExperimentalCoroutinesApi::class) class MainActivity : BaseMainActivity() { - override val fragmentSubject = PublishSubject.create<Int>() + override val fragmentChannel = BroadcastChannel<Int>(10) + override val headerBadgeChannel = Channel<String>(Channel.RENDEZVOUS) var lastPosition = -1 - val headerBadgeObservable = PublishSubject.create<String>() override fun onNestedCreate(savedInstanceState: Bundle?) { setupTabs() @@ -43,8 +48,8 @@ class MainActivity : BaseMainActivity() { override fun onPageSelected(position: Int) { super.onPageSelected(position) if (lastPosition == position) return - if (lastPosition != -1) fragmentSubject.onNext(-(lastPosition + 1)) - fragmentSubject.onNext(position) + if (lastPosition != -1) fragmentChannel.offer(-(lastPosition + 1)) + fragmentChannel.offer(position) lastPosition = position } @@ -62,7 +67,7 @@ class MainActivity : BaseMainActivity() { } } }) - viewPager.post { fragmentSubject.onNext(0); lastPosition = 0 } //trigger hook so title is set + viewPager.post { fragmentChannel.offer(0); lastPosition = 0 } //trigger hook so title is set } private fun setupTabs() { @@ -78,31 +83,41 @@ class MainActivity : BaseMainActivity() { (tab.customView as BadgedIcon).badgeText = null } }) - headerBadgeObservable.throttleFirst(15, TimeUnit.SECONDS) - .subscribeOn(Schedulers.newThread()) - .map { Jsoup.parse(it) } - .filter { it.select("[data-sigil=count]").size >= 0 } //ensure headers exist - .map { - val feed = it.select("[data-sigil*=feed] [data-sigil=count]") - val requests = it.select("[data-sigil*=requests] [data-sigil=count]") - val messages = it.select("[data-sigil*=messages] [data-sigil=count]") - val notifications = it.select("[data-sigil*=notifications] [data-sigil=count]") - return@map arrayOf(feed, requests, messages, notifications).map { e -> e?.getOrNull(0)?.ownText() } - } - .observeOn(AndroidSchedulers.mainThread()) - .subscribe { (feed, requests, messages, notifications) -> - tabsForEachView { _, view -> - when (view.iicon) { - FbItem.FEED.icon -> view.badgeText = feed - FbItem.FRIENDS.icon -> view.badgeText = requests - FbItem.MESSAGES.icon -> view.badgeText = messages - FbItem.NOTIFICATIONS.icon -> view.badgeText = notifications + launch(Dispatchers.IO) { + for (html in headerBadgeChannel) { + try { + val doc = Jsoup.parse(html) + if (doc.select("[data-sigil=count]").isEmpty()) + continue // Header doesn't exist + val (feed, requests, messages, notifications) = listOf( + "feed", + "requests", + "messages", + "notifications" + ) + .map { "[data-sigil*=$it] [data-sigil=count]" } + .map { doc.select(it) } + .map { e -> e?.getOrNull(0)?.ownText() } + L._d { "Badges $feed $requests $messages $notifications" } + withContext(Dispatchers.Main) { + tabsForEachView { _, view -> + when (view.iicon) { + FbItem.FEED.icon -> view.badgeText = feed + FbItem.FRIENDS.icon -> view.badgeText = requests + FbItem.MESSAGES.icon -> view.badgeText = messages + FbItem.NOTIFICATIONS.icon -> view.badgeText = notifications + } + } } + } catch (e: Exception) { + L.e(e) { "Header badge error" } } - }.disposeOnDestroy() + } + } adapter.pages.forEach { - tabs.addTab(tabs.newTab() - .setCustomView(BadgedIcon(this).apply { iicon = it.icon }) + tabs.addTab( + tabs.newTab() + .setCustomView(BadgedIcon(this).apply { iicon = it.icon }) ) } } diff --git a/app/src/main/kotlin/com/pitchedapps/frost/activities/WebOverlayActivity.kt b/app/src/main/kotlin/com/pitchedapps/frost/activities/WebOverlayActivity.kt index b706d467..19a1109f 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/activities/WebOverlayActivity.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/activities/WebOverlayActivity.kt @@ -67,11 +67,14 @@ import com.pitchedapps.frost.utils.Showcase import com.pitchedapps.frost.utils.frostSnackbar import com.pitchedapps.frost.utils.materialDialogThemed import com.pitchedapps.frost.utils.setFrostColors +import com.pitchedapps.frost.utils.uniqueOnly import com.pitchedapps.frost.views.FrostContentWeb import com.pitchedapps.frost.views.FrostVideoViewer import com.pitchedapps.frost.views.FrostWebView -import io.reactivex.android.schedulers.AndroidSchedulers -import io.reactivex.disposables.Disposable +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import okhttp3.HttpUrl /** @@ -87,6 +90,7 @@ import okhttp3.HttpUrl * Used by notifications. Unlike the other overlays, this runs as a singleInstance * Going back will bring you back to the previous app */ +@UseExperimental(ExperimentalCoroutinesApi::class) class FrostWebActivity : WebOverlayActivityBase(false) { override fun onCreate(savedInstanceState: Bundle?) { @@ -98,12 +102,15 @@ class FrostWebActivity : WebOverlayActivityBase(false) { * We will subscribe to the load cycle once, * and pop a dialog giving the user the option to copy the shared text */ - var disposable: Disposable? = null - disposable = content.refreshObservable.subscribe { - disposable?.dispose() - materialDialogThemed { - title(R.string.invalid_share_url) - content(R.string.invalid_share_url_desc) + val refreshReceiver = content.refreshChannel.openSubscription() + content.scope.launch(Dispatchers.IO) { + refreshReceiver.receive() + refreshReceiver.cancel() + withContext(Dispatchers.Main) { + materialDialogThemed { + title(R.string.invalid_share_url) + content(R.string.invalid_share_url_desc) + } } } } @@ -144,6 +151,7 @@ class WebOverlayBasicActivity : WebOverlayActivityBase(true) class WebOverlayActivity : WebOverlayActivityBase(false) @SuppressLint("Registered") +@UseExperimental(ExperimentalCoroutinesApi::class) open class WebOverlayActivityBase(private val forceBasicAgent: Boolean) : BaseActivity(), ActivityContract, FrostContentContainer, VideoViewHolder, FileChooserContract by FileChooserDelegate() { @@ -181,7 +189,6 @@ open class WebOverlayActivityBase(private val forceBasicAgent: Boolean) : BaseAc finish() return } - setFrameContentView(R.layout.activity_web_overlay) setSupportActionBar(toolbar) supportActionBar?.setDisplayShowHomeEnabled(true) @@ -197,10 +204,13 @@ open class WebOverlayActivityBase(private val forceBasicAgent: Boolean) : BaseAc content.bind(this) - content.titleObservable - .observeOn(AndroidSchedulers.mainThread()) - .subscribe { toolbar.title = it } - .disposeOnDestroy() + val titleReceiver = content.titleChannel.openSubscription().uniqueOnly(this) + + launch { + for (t in titleReceiver) { + toolbar.title = t + } + } with(web) { if (forceBasicAgent) //todo check; the webview already adds it dynamically diff --git a/app/src/main/kotlin/com/pitchedapps/frost/contracts/ActivityContract.kt b/app/src/main/kotlin/com/pitchedapps/frost/contracts/ActivityContract.kt index 2ce83871..483e49c5 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/contracts/ActivityContract.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/contracts/ActivityContract.kt @@ -19,7 +19,8 @@ package com.pitchedapps.frost.contracts import com.mikepenz.iconics.typeface.IIcon import com.pitchedapps.frost.activities.MainActivity import com.pitchedapps.frost.fragments.BaseFragment -import io.reactivex.subjects.PublishSubject +import kotlinx.coroutines.channels.BroadcastChannel +import kotlinx.coroutines.channels.Channel /** * All the contracts for [MainActivity] @@ -27,7 +28,8 @@ import io.reactivex.subjects.PublishSubject interface ActivityContract : FileChooserActivityContract interface MainActivityContract : ActivityContract, MainFabContract { - val fragmentSubject: PublishSubject<Int> + val fragmentChannel: BroadcastChannel<Int> + val headerBadgeChannel : Channel<String> fun setTitle(res: Int) fun setTitle(text: CharSequence) /** diff --git a/app/src/main/kotlin/com/pitchedapps/frost/contracts/FrostContentContract.kt b/app/src/main/kotlin/com/pitchedapps/frost/contracts/FrostContentContract.kt index 50c2fe77..8a6e57af 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/contracts/FrostContentContract.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/contracts/FrostContentContract.kt @@ -18,8 +18,9 @@ package com.pitchedapps.frost.contracts import android.view.View import com.pitchedapps.frost.facebook.FbItem -import io.reactivex.subjects.BehaviorSubject -import io.reactivex.subjects.PublishSubject +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.BroadcastChannel /** * Created by Allan Wang on 20/12/17. @@ -29,7 +30,7 @@ import io.reactivex.subjects.PublishSubject * Contract for the underlying parent, * binds to activities & fragments */ -interface FrostContentContainer { +interface FrostContentContainer : CoroutineScope { val baseUrl: String @@ -45,24 +46,28 @@ interface FrostContentContainer { * Contract for components shared among * all content providers */ +@UseExperimental(ExperimentalCoroutinesApi::class) interface FrostContentParent : DynamicUiContract { + val scope: CoroutineScope + val core: FrostContentCore /** * Observable to get data on whether view is refreshing or not */ - val refreshObservable: PublishSubject<Boolean> + val refreshChannel: BroadcastChannel<Boolean> /** * Observable to get data on refresh progress, with range [0, 100] */ - val progressObservable: PublishSubject<Int> + val progressChannel: BroadcastChannel<Int> /** * Observable to get new title data (unique values only) */ - val titleObservable: BehaviorSubject<String> + // todo note that this should be like a behavior subject vs publish subject + val titleChannel: BroadcastChannel<String> var baseUrl: String @@ -106,6 +111,9 @@ interface FrostContentParent : DynamicUiContract { */ interface FrostContentCore : DynamicUiContract { + val scope: CoroutineScope + get() = parent.scope + /** * Reference to parent * Bound through calling [FrostContentParent.bind] diff --git a/app/src/main/kotlin/com/pitchedapps/frost/contracts/FrostObservables.kt b/app/src/main/kotlin/com/pitchedapps/frost/contracts/FrostObservables.kt deleted file mode 100644 index b3b93b66..00000000 --- a/app/src/main/kotlin/com/pitchedapps/frost/contracts/FrostObservables.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2018 Allan Wang - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ -package com.pitchedapps.frost.contracts - -import io.reactivex.subjects.BehaviorSubject -import io.reactivex.subjects.PublishSubject - -/** - * Created by Allan Wang on 2017-11-07. - */ -interface FrostObservables { - /** - * Observable to get data on whether view is refreshing or not - */ - var refreshObservable: PublishSubject<Boolean> - - /** - * Observable to get data on refresh progress, with range [0, 100] - */ - var progressObservable: PublishSubject<Int> - - /** - * Observable to get new title data (unique values only) - */ - var titleObservable: BehaviorSubject<String> - - fun passObservablesTo(other: FrostObservables) { - other.refreshObservable = refreshObservable - other.progressObservable = progressObservable - other.titleObservable = titleObservable - } -} diff --git a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt index 584107cc..50da367d 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/FbRequest.kt @@ -24,10 +24,12 @@ import com.pitchedapps.frost.facebook.FB_URL_BASE import com.pitchedapps.frost.facebook.FB_USER_MATCHER import com.pitchedapps.frost.facebook.USER_AGENT_BASIC import com.pitchedapps.frost.facebook.get -import com.pitchedapps.frost.rx.RxFlyweight +import com.pitchedapps.frost.rx.Flyweight import com.pitchedapps.frost.utils.L import io.reactivex.Single import io.reactivex.schedulers.Schedulers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.runBlocking import okhttp3.Call import okhttp3.FormBody import okhttp3.OkHttpClient @@ -38,18 +40,10 @@ import org.apache.commons.text.StringEscapeUtils /** * Created by Allan Wang on 21/12/17. */ -private class RxAuth : RxFlyweight<String, Long, RequestAuth>() { - - override fun call(input: String) = input.getAuth() - - override fun validate(input: String, cond: Long) = - System.currentTimeMillis() - cond < 3600000 // valid for an hour - - override fun cache(input: String) = System.currentTimeMillis() +val fbAuth = Flyweight<String, RequestAuth>(GlobalScope, 100, 3600000 /* an hour */) { + it.getAuth() } -private val auth = RxAuth() - /** * Synchronously fetch [RequestAuth] from cookie * [action] will only be called if a valid auth is found. @@ -58,7 +52,7 @@ private val auth = RxAuth() fun String?.fbRequest(fail: () -> Unit = {}, action: RequestAuth.() -> Unit) { if (this == null) return fail() try { - val auth = auth(this).blockingGet() + val auth = runBlocking { fbAuth.fetch(this@fbRequest) } auth.action() } catch (e: Exception) { L.e { "Failed auth for ${hashCode()}: ${e.message}" } diff --git a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/Images.kt b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/Images.kt index e0ccea81..4afd8e8a 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/Images.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/facebook/requests/Images.kt @@ -33,8 +33,9 @@ import com.pitchedapps.frost.facebook.FB_URL_BASE import com.pitchedapps.frost.facebook.formattedFbUrl import com.pitchedapps.frost.facebook.get import io.reactivex.Maybe +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withTimeout import okhttp3.Call -import okhttp3.Request import java.io.IOException import java.io.InputStream @@ -123,21 +124,22 @@ class HdImageFetcher(private val model: HdImageMaybe) : DataFetcher<InputStream> override fun loadData(priority: Priority, callback: DataFetcher.DataCallback<in InputStream>) { if (!model.isValid) return callback.fail("Model is invalid") - model.cookie.fbRequest(fail = { callback.fail("Invalid auth") }) { - if (cancelled) return@fbRequest callback.fail("Cancelled") - val url = getFullSizedImage(model.id).invoke() - ?: return@fbRequest callback.fail("Null url") - if (cancelled) return@fbRequest callback.fail("Cancelled") - if (!url.contains("png") && !url.contains("jpg")) return@fbRequest callback.fail("Invalid format") - urlCall = Request.Builder().url(url).get().call() - - inputStream = try { - urlCall?.execute()?.body()?.byteStream() - } catch (e: IOException) { - null + val result: Result<InputStream?> = runCatching { + runBlocking { + withTimeout(20000L) { + val auth = fbAuth.fetch(model.cookie) + if (cancelled) throw RuntimeException("Cancelled") + val url = auth.getFullSizedImage(model.id).invoke() ?: throw RuntimeException("Null url") + if (cancelled) throw RuntimeException("Cancelled") + if (!url.contains("png") && !url.contains("jpg")) throw RuntimeException("Invalid format") + urlCall?.execute()?.body()?.byteStream() + } } - callback.onDataReady(inputStream) } + if (result.isSuccess) + callback.onDataReady(result.getOrNull()) + else + callback.onLoadFailed(result.exceptionOrNull() as? Exception ?: RuntimeException("Failed")) } override fun cleanup() { diff --git a/app/src/main/kotlin/com/pitchedapps/frost/fragments/FragmentBase.kt b/app/src/main/kotlin/com/pitchedapps/frost/fragments/FragmentBase.kt index 2c46edbc..72150ddd 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/fragments/FragmentBase.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/fragments/FragmentBase.kt @@ -16,7 +16,6 @@ */ package com.pitchedapps.frost.fragments -import android.content.Context import android.os.Bundle import android.view.LayoutInflater import android.view.View @@ -39,12 +38,14 @@ import com.pitchedapps.frost.utils.Prefs import com.pitchedapps.frost.utils.REQUEST_REFRESH import com.pitchedapps.frost.utils.REQUEST_TEXT_ZOOM import com.pitchedapps.frost.utils.frostEvent -import io.reactivex.android.schedulers.AndroidSchedulers -import io.reactivex.disposables.Disposable import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch import kotlin.coroutines.CoroutineContext /** @@ -53,6 +54,7 @@ import kotlin.coroutines.CoroutineContext * All fragments pertaining to the main view * Must be attached to activities implementing [MainActivityContract] */ +@UseExperimental(ExperimentalCoroutinesApi::class) abstract class BaseFragment : Fragment(), CoroutineScope, FragmentContract, DynamicUiContract { companion object { @@ -87,9 +89,8 @@ abstract class BaseFragment : Fragment(), CoroutineScope, FragmentContract, Dyna override var valid: Boolean get() = arguments!!.getBoolean(ARG_VALID, true) set(value) { - if (value || this is WebFragment) return + if (!isActive || value || this is WebFragment) return arguments!!.putBoolean(ARG_VALID, value) - L.e { "Invalidating position $position" } frostEvent( "Native Fallback", "Item" to baseEnum.name @@ -98,7 +99,7 @@ abstract class BaseFragment : Fragment(), CoroutineScope, FragmentContract, Dyna } override var firstLoad: Boolean = true - private var activityDisposable: Disposable? = null + private var activityReceiver: ReceiveChannel<Int>? = null private var onCreateRunnable: ((FragmentContract) -> Unit)? = null override var content: FrostContentParent? = null @@ -131,6 +132,10 @@ abstract class BaseFragment : Fragment(), CoroutineScope, FragmentContract, Dyna onCreateRunnable?.invoke(this) onCreateRunnable = null firstLoadRequest() + detachMainObservable() + (context as? MainActivityContract)?.let { + activityReceiver = attachMainObservable(it) + } } override fun setUserVisibleHint(isVisibleToUser: Boolean) { @@ -154,29 +159,34 @@ abstract class BaseFragment : Fragment(), CoroutineScope, FragmentContract, Dyna (context as? MainActivityContract)?.setTitle(title) } - override fun attachMainObservable(contract: MainActivityContract): Disposable = - contract.fragmentSubject.observeOn(AndroidSchedulers.mainThread()).subscribe { - when (it) { - REQUEST_REFRESH -> { - core?.apply { - clearHistory() - firstLoad = true - firstLoadRequest() + override fun attachMainObservable(contract: MainActivityContract): ReceiveChannel<Int> { + val receiver = contract.fragmentChannel.openSubscription() + launch { + for (flag in receiver) { + when (flag) { + REQUEST_REFRESH -> { + core?.apply { + clearHistory() + firstLoad = true + firstLoadRequest() + } + } + position -> { + contract.setTitle(baseEnum.titleId) + updateFab(contract) + core?.active = true + } + -(position + 1) -> { + core?.active = false + } + REQUEST_TEXT_ZOOM -> { + reloadTextSize() } - } - position -> { - contract.setTitle(baseEnum.titleId) - updateFab(contract) - core?.active = true - } - -(position + 1) -> { - core?.active = false - } - REQUEST_TEXT_ZOOM -> { - reloadTextSize() } } } + return receiver + } override fun updateFab(contract: MainFabContract) { contract.hideFab() // default @@ -195,25 +205,14 @@ abstract class BaseFragment : Fragment(), CoroutineScope, FragmentContract, Dyna } override fun detachMainObservable() { - activityDisposable?.dispose() - } - - override fun onAttach(context: Context) { - super.onAttach(context) - detachMainObservable() - if (context is MainActivityContract) - activityDisposable = attachMainObservable(context) - } - - override fun onDetach() { - detachMainObservable() - super.onDetach() + activityReceiver?.cancel() } override fun onDestroyView() { L.i { "Fragment on destroy $position ${hashCode()}" } content?.destroy() content = null + detachMainObservable() super.onDestroyView() } diff --git a/app/src/main/kotlin/com/pitchedapps/frost/fragments/FragmentContract.kt b/app/src/main/kotlin/com/pitchedapps/frost/fragments/FragmentContract.kt index e24e8308..10c612c5 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/fragments/FragmentContract.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/fragments/FragmentContract.kt @@ -22,7 +22,7 @@ import com.pitchedapps.frost.contracts.FrostContentParent import com.pitchedapps.frost.contracts.MainActivityContract import com.pitchedapps.frost.contracts.MainFabContract import com.pitchedapps.frost.views.FrostRecyclerView -import io.reactivex.disposables.Disposable +import kotlinx.coroutines.channels.ReceiveChannel /** * Created by Allan Wang on 2017-11-07. @@ -34,8 +34,9 @@ interface FragmentContract : FrostContentContainer { /** * Defines whether the fragment is valid in the viewpager - * Or if it needs to be recreated - * May be called from any thread to toggle status + * or if it needs to be recreated + * May be called from any thread to toggle status. + * Note that calls beyond the fragment lifecycle will be ignored */ var valid: Boolean @@ -75,9 +76,10 @@ interface FragmentContract : FrostContentContainer { /** * Call whenever a fragment is attached so that it may listen - * to activity emissions + * to activity emissions. + * Returns a means of closing the listener, which can be called from [detachMainObservable] */ - fun attachMainObservable(contract: MainActivityContract): Disposable + fun attachMainObservable(contract: MainActivityContract): ReceiveChannel<Int> /** * Call when fragment is detached so that any existing @@ -101,9 +103,10 @@ interface RecyclerContentContract { fun bind(recyclerView: FrostRecyclerView) /** - * Completely handle data reloading - * Optional progress emission update - * Callback returns [true] for success, [false] otherwise + * Completely handle data reloading, within a non-ui thread + * The progress function allows optional emission of progress values (between 0 and 100) + * and can be called from any thread. + * Returns [true] for success, [false] otherwise */ - fun reload(progress: (Int) -> Unit, callback: (Boolean) -> Unit) + suspend fun reload(progress: (Int) -> Unit): Boolean } diff --git a/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragmentBase.kt b/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragmentBase.kt index f77f83ea..7a8309ff 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragmentBase.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragmentBase.kt @@ -21,7 +21,6 @@ import com.mikepenz.fastadapter.FastAdapter import com.mikepenz.fastadapter.IItem import com.mikepenz.fastadapter.adapters.ItemAdapter import com.mikepenz.fastadapter.adapters.ModelAdapter -import com.mikepenz.fastadapter_extensions.items.ProgressItem import com.pitchedapps.frost.R import com.pitchedapps.frost.facebook.FbCookie import com.pitchedapps.frost.facebook.parsers.FrostParser @@ -29,16 +28,19 @@ import com.pitchedapps.frost.facebook.parsers.ParseResponse import com.pitchedapps.frost.utils.L import com.pitchedapps.frost.utils.frostJsoup import com.pitchedapps.frost.views.FrostRecyclerView -import org.jetbrains.anko.doAsync -import org.jetbrains.anko.uiThread +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.isActive +import kotlinx.coroutines.withContext /** * Created by Allan Wang on 27/12/17. */ -abstract class RecyclerFragment : BaseFragment(), RecyclerContentContract { +abstract class RecyclerFragment<T, Item : IItem<*, *>> : BaseFragment(), RecyclerContentContract { override val layoutRes: Int = R.layout.view_content_recycler + abstract val adapter: ModelAdapter<T, Item> + override fun firstLoadRequest() { val core = core ?: return if (firstLoad) { @@ -47,23 +49,34 @@ abstract class RecyclerFragment : BaseFragment(), RecyclerContentContract { } } - final override fun reload(progress: (Int) -> Unit, callback: (Boolean) -> Unit) { - reloadImpl(progress) { - if (it) - callback(it) - else + final override suspend fun reload(progress: (Int) -> Unit): Boolean { + val data = try { + reloadImpl(progress) + } catch (e: Exception) { + L.e(e) { "Recycler reload fail" } + null + } + if (!isActive) + return false + return withContext(Dispatchers.Main) { + if (data == null) { valid = false + return@withContext false + } else { + adapter.setNewList(data) + return@withContext true + } } } - protected abstract fun reloadImpl(progress: (Int) -> Unit, callback: (Boolean) -> Unit) + protected abstract suspend fun reloadImpl(progress: (Int) -> Unit): List<T>? } -abstract class GenericRecyclerFragment<T, Item : IItem<*, *>> : RecyclerFragment() { +abstract class GenericRecyclerFragment<T, Item : IItem<*, *>> : RecyclerFragment<T, Item>() { abstract fun mapper(data: T): Item - val adapter: ModelAdapter<T, Item> = ModelAdapter { this.mapper(it) } + override val adapter: ModelAdapter<T, Item> = ModelAdapter { this.mapper(it) } final override fun bind(recyclerView: FrostRecyclerView) { recyclerView.adapter = getAdapter() @@ -83,7 +96,7 @@ abstract class GenericRecyclerFragment<T, Item : IItem<*, *>> : RecyclerFragment open fun getAdapter(): FastAdapter<IItem<*, *>> = fastAdapter(this.adapter) } -abstract class FrostParserFragment<T : Any, Item : IItem<*, *>> : RecyclerFragment() { +abstract class FrostParserFragment<T : Any, Item : IItem<*, *>> : RecyclerFragment<Item, Item>() { /** * The parser to make this all happen @@ -94,7 +107,7 @@ abstract class FrostParserFragment<T : Any, Item : IItem<*, *>> : RecyclerFragme abstract fun toItems(response: ParseResponse<T>): List<Item> - val adapter: ItemAdapter<Item> = ItemAdapter() + override val adapter: ItemAdapter<Item> = ItemAdapter() final override fun bind(recyclerView: FrostRecyclerView) { recyclerView.adapter = getAdapter() @@ -113,50 +126,19 @@ abstract class FrostParserFragment<T : Any, Item : IItem<*, *>> : RecyclerFragme */ open fun getAdapter(): FastAdapter<IItem<*, *>> = fastAdapter(this.adapter) - override fun reloadImpl(progress: (Int) -> Unit, callback: (Boolean) -> Unit) { - doAsync { - progress(10) - val cookie = FbCookie.webCookie - val doc = getDoc(cookie) - progress(60) - val response = parser.parse(cookie, doc) - if (response == null) { - L.i { "RecyclerFragment failed for ${baseEnum.name}" } - return@doAsync callback(false) - } - progress(80) - val items = toItems(response) - progress(97) - uiThread { adapter.setNewList(items) } - callback(true) + override suspend fun reloadImpl(progress: (Int) -> Unit): List<Item>? = withContext(Dispatchers.IO) { + progress(10) + val cookie = FbCookie.webCookie + val doc = getDoc(cookie) + progress(60) + val response = parser.parse(cookie, doc) + if (response == null) { + L.i { "RecyclerFragment failed for ${baseEnum.name}" } + return@withContext null } + progress(80) + val items = toItems(response) + progress(97) + return@withContext items } } - -//abstract class PagedRecyclerFragment<T : Any, Item : IItem<*, *>> : RecyclerFragment<T, Item>() { -// -// var allowPagedLoading = true -// -// val footerAdapter = ItemAdapter<FrostProgress>() -// -// val footerScrollListener = object : EndlessRecyclerOnScrollListener(footerAdapter) { -// override fun onLoadMore(currentPage: Int) { -// TODO("not implemented") -// -// } -// -// } -// -// override fun getAdapter() = fastAdapter(adapter, footerAdapter) -// -// override fun bindImpl(recyclerView: FrostRecyclerView) { -// recyclerView.addOnScrollListener(footerScrollListener) -// } -// -// override fun reload(progress: (Int) -> Unit, callback: (Boolean) -> Unit) { -// footerScrollListener. -// super.reload(progress, callback) -// } -//} - -class FrostProgress : ProgressItem() diff --git a/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragments.kt b/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragments.kt index ff37b66d..f7ed9937 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragments.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/fragments/RecyclerFragments.kt @@ -26,7 +26,7 @@ import com.pitchedapps.frost.facebook.requests.MenuFooterItem import com.pitchedapps.frost.facebook.requests.MenuHeader import com.pitchedapps.frost.facebook.requests.MenuItem import com.pitchedapps.frost.facebook.requests.MenuItemData -import com.pitchedapps.frost.facebook.requests.fbRequest +import com.pitchedapps.frost.facebook.requests.fbAuth import com.pitchedapps.frost.facebook.requests.getMenuData import com.pitchedapps.frost.iitems.ClickableIItemContract import com.pitchedapps.frost.iitems.MenuContentIItem @@ -36,8 +36,8 @@ import com.pitchedapps.frost.iitems.MenuHeaderIItem import com.pitchedapps.frost.iitems.NotificationIItem import com.pitchedapps.frost.utils.frostJsoup import com.pitchedapps.frost.views.FrostRecyclerView -import org.jetbrains.anko.doAsync -import org.jetbrains.anko.uiThread +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext /** * Created by Allan Wang on 27/12/17. @@ -71,20 +71,16 @@ class MenuFragment : GenericRecyclerFragment<MenuItemData, IItem<*, *>>() { ClickableIItemContract.bindEvents(adapter) } - override fun reloadImpl(progress: (Int) -> Unit, callback: (Boolean) -> Unit) { - doAsync { - val cookie = FbCookie.webCookie - progress(10) - cookie.fbRequest({ callback(false) }) { - progress(30) - val data = getMenuData().invoke() ?: return@fbRequest callback(false) - if (data.data.isEmpty()) return@fbRequest callback(false) - progress(70) - val items = data.flatMapValid() - progress(90) - uiThread { adapter.add(items) } - callback(true) - } - } + override suspend fun reloadImpl(progress: (Int) -> Unit): List<MenuItemData>? = withContext(Dispatchers.IO) { + val cookie = FbCookie.webCookie ?: return@withContext null + progress(10) + val auth = fbAuth.fetch(cookie) + progress(30) + val data = auth.getMenuData().invoke() ?: return@withContext null + if (data.data.isEmpty()) return@withContext null + progress(70) + val items = data.flatMapValid() + progress(90) + return@withContext items } } diff --git a/app/src/main/kotlin/com/pitchedapps/frost/rx/Flyweight.kt b/app/src/main/kotlin/com/pitchedapps/frost/rx/Flyweight.kt new file mode 100644 index 00000000..a9aedb6d --- /dev/null +++ b/app/src/main/kotlin/com/pitchedapps/frost/rx/Flyweight.kt @@ -0,0 +1,138 @@ +package com.pitchedapps.frost.rx + +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.selects.select +import java.util.concurrent.ConcurrentHashMap +import kotlin.coroutines.Continuation +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +/** + * Flyweight to keep track of values so long as they are valid. + * Values that have been fetched within [maxAge] from the time of use will be reused. + * If multiple requests are sent with the same key, then the value should only be fetched once. + * Otherwise, they will be fetched using [fetcher]. + * All requests will stem from the supplied [scope]. + */ +class Flyweight<K, V>( + val scope: CoroutineScope, + capacity: Int, + val maxAge: Long, + private val fetcher: suspend (K) -> V +) { + + // Receives a key and a pending request + private val actionChannel = Channel<Pair<K, Continuation<V>>>(capacity) + // Receives a key to invalidate the associated value + private val invalidatorChannel = Channel<K>(capacity) + // Receives a key to fetch the value + private val requesterChannel = Channel<K>(capacity) + // Receives a key and the resulting value + private val receiverChannel = Channel<Pair<K, Result<V>>>(capacity) + + // Keeps track of keys and associated update times + private val conditionMap: MutableMap<K, Long> = mutableMapOf() + // Keeps track of keys and associated values + private val resultMap: MutableMap<K, Result<V>> = mutableMapOf() + // Keeps track of unfulfilled actions + // Note that the explicit type is very important here. See https://youtrack.jetbrains.net/issue/KT-18053 + private val pendingMap: MutableMap<K, MutableList<Continuation<V>>> = ConcurrentHashMap() + + private val job: Job + + init { + job = scope.launch(Dispatchers.IO) { + launch { + while (isActive) { + select<Unit> { + /* + * New request received. Continuation should be fulfilled eventually + */ + actionChannel.onReceive { (key, continuation) -> + val lastUpdate = conditionMap[key] + val lastResult = resultMap[key] + // Valid value, retrieved within acceptable time + if (lastResult != null && lastUpdate != null && System.currentTimeMillis() - lastUpdate < maxAge) { + continuation.resumeWith(lastResult) + } else { + val valueRequestPending = key in pendingMap + pendingMap.getOrPut(key) { mutableListOf() }.add(continuation) + if (!valueRequestPending) + requesterChannel.send(key) + } + } + /* + * Invalidator received. Existing result associated with key should not be used. + * Note that any unfulfilled request and future requests should still operate, but with a new value. + */ + invalidatorChannel.onReceive { key -> + if (key !in resultMap) { + // Nothing to invalidate. + // If pending requests exist, they are already in the process of being updated. + return@onReceive + } + conditionMap.remove(key) + resultMap.remove(key) + if (pendingMap[key]?.isNotEmpty() == true) + // Refetch value for pending requests + requesterChannel.send(key) + } + /* + * Value request fulfilled. Should now fulfill pending requests + */ + receiverChannel.onReceive { (key, result) -> + conditionMap[key] = System.currentTimeMillis() + resultMap[key] = result + pendingMap.remove(key)?.forEach { + it.resumeWith(result) + } + } + } + } + } + launch { + /* + * Value request received. Should fetch new value using supplied fetcher + */ + for (key in requesterChannel) { + val result = runCatching { + fetcher(key) + } + receiverChannel.send(key to result) + } + } + } + } + + suspend fun fetch(key: K): V = suspendCoroutine { + if (!job.isActive) it.resumeWithException(IllegalStateException("Flyweight is not active")) + else scope.launch { + actionChannel.send(key to it) + } + } + + suspend fun invalidate(key: K) { + invalidatorChannel.send(key) + } + + fun cancel() { + job.cancel() + if (pendingMap.isNotEmpty()) { + val error = CancellationException("Flyweight cancelled") + pendingMap.values.flatten().forEach { it.resumeWithException(error) } + pendingMap.clear() + } + actionChannel.close() + invalidatorChannel.close() + requesterChannel.close() + receiverChannel.close() + conditionMap.clear() + resultMap.clear() + } +}
\ No newline at end of file diff --git a/app/src/main/kotlin/com/pitchedapps/frost/services/FrostRequestService.kt b/app/src/main/kotlin/com/pitchedapps/frost/services/FrostRequestService.kt index 989f1b24..c88f3946 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/services/FrostRequestService.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/services/FrostRequestService.kt @@ -25,7 +25,7 @@ import android.content.Intent import android.os.BaseBundle import android.os.PersistableBundle import com.pitchedapps.frost.facebook.requests.RequestAuth -import com.pitchedapps.frost.facebook.requests.fbRequest +import com.pitchedapps.frost.facebook.requests.fbAuth import com.pitchedapps.frost.facebook.requests.markNotificationRead import com.pitchedapps.frost.utils.EnumBundle import com.pitchedapps.frost.utils.EnumBundleCompanion @@ -179,15 +179,13 @@ class FrostRequestService : BaseJobService() { } launch(Dispatchers.IO) { try { - var failed = true - cookie.fbRequest { - L.d { "Requesting frost service for ${command.name}" } - command.invoke(this, bundle) - failed = false - } + val auth = fbAuth.fetch(cookie) + command.invoke(auth, bundle) L.d { - "${if (failed) "Failed" else "Finished"} frost service for ${command.name} in ${System.currentTimeMillis() - startTime} ms" + "Finished frost service for ${command.name} in ${System.currentTimeMillis() - startTime} ms" } + } catch (e: Exception) { + L.e(e) { "Failed frost service for ${command.name} in ${System.currentTimeMillis() - startTime} ms" } } finally { jobFinished(params, false) } diff --git a/app/src/main/kotlin/com/pitchedapps/frost/utils/Const.kt b/app/src/main/kotlin/com/pitchedapps/frost/utils/Const.kt index 3c76759c..3d69b0ae 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/utils/Const.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/utils/Const.kt @@ -22,13 +22,14 @@ package com.pitchedapps.frost.utils const val ACTIVITY_SETTINGS = 97 /* * Possible responses from the SettingsActivity - * after the configurations have changed + * after the configurations have changed. + * Note that the first few bits are restricted to position related requests */ -const val REQUEST_RESTART_APPLICATION = 1 shl 11 -const val REQUEST_RESTART = 1 shl 12 -const val REQUEST_REFRESH = 1 shl 13 -const val REQUEST_TEXT_ZOOM = 1 shl 14 -const val REQUEST_NAV = 1 shl 15 -const val REQUEST_SEARCH = 1 shl 16 +const val REQUEST_RESTART_APPLICATION = 1 shl 5 +const val REQUEST_RESTART = 1 shl 6 +const val REQUEST_REFRESH = 1 shl 7 +const val REQUEST_TEXT_ZOOM = 1 shl 8 +const val REQUEST_NAV = 1 shl 9 +const val REQUEST_SEARCH = 1 shl 10 -const val MAIN_TIMEOUT_DURATION = 30 * 60 * 1000 // 30 min +const val MAIN_TIMEOUT_DURATION = 30 * 60 * 1000 // 30 min
\ No newline at end of file diff --git a/app/src/main/kotlin/com/pitchedapps/frost/utils/KotlinUtils.kt b/app/src/main/kotlin/com/pitchedapps/frost/utils/KotlinUtils.kt new file mode 100644 index 00000000..320aeb69 --- /dev/null +++ b/app/src/main/kotlin/com/pitchedapps/frost/utils/KotlinUtils.kt @@ -0,0 +1,20 @@ +package com.pitchedapps.frost.utils + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.produce +import kotlinx.coroutines.isActive + +@UseExperimental(ExperimentalCoroutinesApi::class) +fun <T> ReceiveChannel<T>.uniqueOnly(scope: CoroutineScope): ReceiveChannel<T> = scope.produce { + var previous: T? = null + for (current in this@uniqueOnly) { + if (!scope.isActive) { + cancel() + } else if (previous != current) { + previous = current + send(current) + } + } +}
\ No newline at end of file diff --git a/app/src/main/kotlin/com/pitchedapps/frost/utils/Utils.kt b/app/src/main/kotlin/com/pitchedapps/frost/utils/Utils.kt index 56c1d6d9..8e4e410d 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/utils/Utils.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/utils/Utils.kt @@ -114,8 +114,10 @@ fun Activity.cookies(): ArrayList<CookieModel> { private inline fun <reified T : WebOverlayActivityBase> Context.launchWebOverlayImpl(url: String) { val argUrl = url.formattedFbUrl L.v { "Launch received: $url\nLaunch web overlay: $argUrl" } - if (argUrl.isFacebookUrl && argUrl.contains("/logout.php")) + if (argUrl.isFacebookUrl && argUrl.contains("/logout.php")) { + L.d { "Logout php found" } FbCookie.logout(this) + } else if (!(Prefs.linksInDefaultApp && resolveActivityForUri(Uri.parse(argUrl)))) startActivity<T>(false, intentBuilder = { putExtra(ARG_URL, argUrl) @@ -371,7 +373,12 @@ fun EmailBuilder.addFrostDetails() { fun frostJsoup(url: String) = frostJsoup(FbCookie.webCookie, url) fun frostJsoup(cookie: String?, url: String) = - Jsoup.connect(url).cookie(FACEBOOK_COM, cookie).userAgent(USER_AGENT_BASIC).get()!! + Jsoup.connect(url).run { + if (cookie != null) cookie( + FACEBOOK_COM, + cookie + ) else this + }.userAgent(USER_AGENT_BASIC).get()!! fun Element.first(vararg select: String): Element? { select.forEach { diff --git a/app/src/main/kotlin/com/pitchedapps/frost/views/FrostContentView.kt b/app/src/main/kotlin/com/pitchedapps/frost/views/FrostContentView.kt index d17a424c..9619eecc 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/views/FrostContentView.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/views/FrostContentView.kt @@ -39,12 +39,14 @@ import com.pitchedapps.frost.facebook.FbItem import com.pitchedapps.frost.facebook.WEB_LOAD_DELAY import com.pitchedapps.frost.utils.L import com.pitchedapps.frost.utils.Prefs -import io.reactivex.android.schedulers.AndroidSchedulers -import io.reactivex.disposables.CompositeDisposable import io.reactivex.disposables.Disposable -import io.reactivex.rxkotlin.addTo -import io.reactivex.subjects.BehaviorSubject -import io.reactivex.subjects.PublishSubject +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.BroadcastChannel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext class FrostContentWeb @JvmOverloads constructor( context: Context, @@ -66,6 +68,7 @@ class FrostContentRecycler @JvmOverloads constructor( override val layoutRes: Int = R.layout.view_content_base_recycler } +@UseExperimental(ExperimentalCoroutinesApi::class) abstract class FrostContentView<out T> @JvmOverloads constructor( context: Context, attrs: AttributeSet? = null, @@ -81,11 +84,11 @@ abstract class FrostContentView<out T> @JvmOverloads constructor( override val core: FrostContentCore get() = coreView - override val progressObservable: PublishSubject<Int> = PublishSubject.create() - override val refreshObservable: PublishSubject<Boolean> = PublishSubject.create() - override val titleObservable: BehaviorSubject<String> = BehaviorSubject.create() + override val refreshChannel: BroadcastChannel<Boolean> = BroadcastChannel(100) + override val progressChannel: BroadcastChannel<Int> = BroadcastChannel(100) + override val titleChannel: BroadcastChannel<String> = BroadcastChannel(100) - private val compositeDisposable = CompositeDisposable() + override lateinit var scope: CoroutineScope override lateinit var baseUrl: String override var baseEnum: FbItem? = null @@ -107,24 +110,6 @@ abstract class FrostContentView<out T> @JvmOverloads constructor( protected fun init() { inflate(context, layoutRes, this) coreView.parent = this - - // bind observables - progressObservable.observeOn(AndroidSchedulers.mainThread()).subscribe { - progress.invisibleIf(it == 100) - if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) - progress.setProgress(it, true) - else - progress.progress = it - }.addTo(compositeDisposable) - - refreshObservable - .observeOn(AndroidSchedulers.mainThread()) - .subscribe { - refresh.isRefreshing = it - refresh.isEnabled = true - }.addTo(compositeDisposable) - refresh.setOnRefreshListener { coreView.reload(true) } - reloadThemeSelf() } @@ -132,7 +117,38 @@ abstract class FrostContentView<out T> @JvmOverloads constructor( baseUrl = container.baseUrl baseEnum = container.baseEnum init() + scope = container core.bind(container) + refresh.setOnRefreshListener { + with(coreView) { + reload(true) + } + } + // Begin subscription in the main thread + val refreshReceiver = refreshChannel.openSubscription() + val progressReceiver = progressChannel.openSubscription() + + scope.launch(Dispatchers.Default) { + launch { + for (r in refreshReceiver) { + withContext(Dispatchers.Main) { + refresh.isRefreshing = r + refresh.isEnabled = true + } + } + } + launch { + for (p in progressReceiver) { + withContext(Dispatchers.Main) { + progress.invisibleIf(p == 100) + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) + progress.setProgress(p, true) + else + progress.progress = p + } + } + } + } } override fun reloadTheme() { @@ -155,15 +171,15 @@ abstract class FrostContentView<out T> @JvmOverloads constructor( } override fun destroy() { - titleObservable.onComplete() - progressObservable.onComplete() - refreshObservable.onComplete() + titleChannel.close() + progressChannel.close() + refreshChannel.close() core.destroy() - compositeDisposable.dispose() } private var dispose: Disposable? = null private var transitionStart: Long = -1 + private var refreshReceiver: ReceiveChannel<Boolean>? = null /** * Hook onto the refresh observable for one cycle @@ -171,32 +187,32 @@ abstract class FrostContentView<out T> @JvmOverloads constructor( * The cycle only starts on the first load since there may have been another process when this is registered */ override fun registerTransition(urlChanged: Boolean, animate: Boolean): Boolean { - if (!urlChanged && dispose != null) { + if (!urlChanged && refreshReceiver != null) { L.v { "Consuming url load" } return false // still in progress; do not bother with load } L.v { "Registered transition" } with(coreView) { - var loading = dispose != null - dispose?.dispose() - dispose = refreshObservable - .observeOn(AndroidSchedulers.mainThread()) - .subscribe { - if (it) { - loading = true - transitionStart = System.currentTimeMillis() - clearAnimation() - if (isVisible) - fadeOut(duration = 200L) - } else if (loading) { - loading = false - if (animate && Prefs.animate) circularReveal(offset = WEB_LOAD_DELAY) - else fadeIn(duration = 200L, offset = WEB_LOAD_DELAY) - L.v { "Transition loaded in ${System.currentTimeMillis() - transitionStart} ms" } - dispose?.dispose() - dispose = null + refreshReceiver = refreshChannel.openSubscription().also { receiver -> + scope.launch(Dispatchers.Main) { + var loading = false + for (r in receiver) { + if (r) { + loading = true + transitionStart = System.currentTimeMillis() + clearAnimation() + if (isVisible) + fadeOut(duration = 200L) + } else if (loading) { + if (animate && Prefs.animate) circularReveal(offset = WEB_LOAD_DELAY) + else fadeIn(duration = 200L, offset = WEB_LOAD_DELAY) + L.v { "Transition loaded in ${System.currentTimeMillis() - transitionStart} ms" } + receiver.cancel() + refreshReceiver = null + } } } + } } return true } diff --git a/app/src/main/kotlin/com/pitchedapps/frost/views/FrostRecyclerView.kt b/app/src/main/kotlin/com/pitchedapps/frost/views/FrostRecyclerView.kt index 2b9e8f9c..f7cb2214 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/views/FrostRecyclerView.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/views/FrostRecyclerView.kt @@ -28,6 +28,7 @@ import com.pitchedapps.frost.contracts.FrostContentCore import com.pitchedapps.frost.contracts.FrostContentParent import com.pitchedapps.frost.fragments.RecyclerContentContract import com.pitchedapps.frost.utils.Prefs +import kotlinx.coroutines.launch /** * Created by Allan Wang on 2017-05-29. @@ -69,11 +70,12 @@ class FrostRecyclerView @JvmOverloads constructor( override fun reloadBase(animate: Boolean) { if (Prefs.animate) fadeOut(onFinish = onReloadClear) - parent.refreshObservable.onNext(true) - recyclerContract.reload({ parent.progressObservable.onNext(it) }) { - parent.progressObservable.onNext(100) - parent.refreshObservable.onNext(false) - if (Prefs.animate) post { circularReveal() } + scope.launch { + parent.refreshChannel.offer(true) + val loaded = recyclerContract.reload { parent.progressChannel.offer(it) } + parent.progressChannel.offer(100) + parent.refreshChannel.offer(false) + if (Prefs.animate) circularReveal() } } diff --git a/app/src/main/kotlin/com/pitchedapps/frost/web/FrostChromeClients.kt b/app/src/main/kotlin/com/pitchedapps/frost/web/FrostChromeClients.kt index 12df8000..da90e7e5 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/web/FrostChromeClients.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/web/FrostChromeClients.kt @@ -29,8 +29,7 @@ import com.pitchedapps.frost.contracts.ActivityContract import com.pitchedapps.frost.utils.L import com.pitchedapps.frost.utils.frostSnackbar import com.pitchedapps.frost.views.FrostWebView -import io.reactivex.subjects.BehaviorSubject -import io.reactivex.subjects.Subject +import kotlinx.coroutines.channels.SendChannel /** * Created by Allan Wang on 2017-05-31. @@ -43,8 +42,8 @@ import io.reactivex.subjects.Subject */ class FrostChromeClient(web: FrostWebView) : WebChromeClient() { - private val progress: Subject<Int> = web.parent.progressObservable - private val title: BehaviorSubject<String> = web.parent.titleObservable + private val progress: SendChannel<Int> = web.parent.progressChannel + private val title: SendChannel<String> = web.parent.titleChannel private val activity = (web.context as? ActivityContract) private val context = web.context!! @@ -55,13 +54,13 @@ class FrostChromeClient(web: FrostWebView) : WebChromeClient() { override fun onReceivedTitle(view: WebView, title: String) { super.onReceivedTitle(view, title) - if (title.startsWith("http") || this.title.value == title) return - this.title.onNext(title) + if (title.startsWith("http")) return + this.title.offer(title) } override fun onProgressChanged(view: WebView, newProgress: Int) { super.onProgressChanged(view, newProgress) - progress.onNext(newProgress) + progress.offer(newProgress) } override fun onShowFileChooser( diff --git a/app/src/main/kotlin/com/pitchedapps/frost/web/FrostJSI.kt b/app/src/main/kotlin/com/pitchedapps/frost/web/FrostJSI.kt index 2afb28c9..c8b54e7a 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/web/FrostJSI.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/web/FrostJSI.kt @@ -29,7 +29,7 @@ import com.pitchedapps.frost.utils.isIndependent import com.pitchedapps.frost.utils.launchImageActivity import com.pitchedapps.frost.utils.showWebContextMenu import com.pitchedapps.frost.views.FrostWebView -import io.reactivex.subjects.Subject +import kotlinx.coroutines.channels.SendChannel /** * Created by Allan Wang on 2017-06-01. @@ -38,8 +38,8 @@ class FrostJSI(val web: FrostWebView) { private val context = web.context private val activity = context as? MainActivity - private val header: Subject<String>? = activity?.headerBadgeObservable - private val refresh: Subject<Boolean> = web.parent.refreshObservable + private val header: SendChannel<String>? = activity?.headerBadgeChannel + private val refresh: SendChannel<Boolean> = web.parent.refreshChannel private val cookies = activity?.cookies() ?: arrayListOf() /** @@ -102,6 +102,7 @@ class FrostJSI(val web: FrostWebView) { @JavascriptInterface fun loadLogin() { + L.d { "Sign up button found; load login" } FbCookie.logout(context) } @@ -120,7 +121,7 @@ class FrostJSI(val web: FrostWebView) { @JavascriptInterface fun isReady() { - refresh.onNext(false) + refresh.offer(false) } @JavascriptInterface @@ -132,6 +133,6 @@ class FrostJSI(val web: FrostWebView) { @JavascriptInterface fun handleHeader(html: String?) { html ?: return - header?.onNext(html) + header?.offer(html) } } diff --git a/app/src/main/kotlin/com/pitchedapps/frost/web/FrostWebViewClients.kt b/app/src/main/kotlin/com/pitchedapps/frost/web/FrostWebViewClients.kt index d75f03bb..cb212b0a 100644 --- a/app/src/main/kotlin/com/pitchedapps/frost/web/FrostWebViewClients.kt +++ b/app/src/main/kotlin/com/pitchedapps/frost/web/FrostWebViewClients.kt @@ -40,7 +40,7 @@ import com.pitchedapps.frost.utils.isIndirectImageUrl import com.pitchedapps.frost.utils.launchImageActivity import com.pitchedapps.frost.utils.resolveActivityForUri import com.pitchedapps.frost.views.FrostWebView -import io.reactivex.subjects.Subject +import kotlinx.coroutines.channels.SendChannel import org.jetbrains.anko.withAlpha /** @@ -64,7 +64,7 @@ open class BaseWebViewClient : WebViewClient() { */ open class FrostWebViewClient(val web: FrostWebView) : BaseWebViewClient() { - private val refresh: Subject<Boolean> = web.parent.refreshObservable + private val refresh: SendChannel<Boolean> = web.parent.refreshChannel private val isMain = web.parent.baseEnum != null protected inline fun v(crossinline message: () -> Any?) = L.v { "web client: ${message()}" } @@ -73,7 +73,7 @@ open class FrostWebViewClient(val web: FrostWebView) : BaseWebViewClient() { super.onPageStarted(view, url, favicon) if (url == null) return v { "loading $url" } - refresh.onNext(true) + refresh.offer(true) } private fun injectBackgroundColor() { @@ -110,14 +110,14 @@ open class FrostWebViewClient(val web: FrostWebView) : BaseWebViewClient() { JsAssets.MEDIA ) else - refresh.onNext(false) + refresh.offer(false) } override fun onPageFinished(view: WebView, url: String?) { url ?: return v { "finished $url" } if (!url.isFacebookUrl) { - refresh.onNext(false) + refresh.offer(false) return } onPageFinishedActions(url) @@ -131,7 +131,7 @@ open class FrostWebViewClient(val web: FrostWebView) : BaseWebViewClient() { internal fun injectAndFinish() { v { "page finished reveal" } - refresh.onNext(false) + refresh.offer(false) injectBackgroundColor() web.jsInject( JsActions.LOGIN_CHECK, diff --git a/app/src/test/kotlin/com/pitchedapps/frost/MiscTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/MiscTest.kt index ce125298..20610b2a 100644 --- a/app/src/test/kotlin/com/pitchedapps/frost/MiscTest.kt +++ b/app/src/test/kotlin/com/pitchedapps/frost/MiscTest.kt @@ -16,9 +16,7 @@ */ package com.pitchedapps.frost -import com.pitchedapps.frost.facebook.requests.call import com.pitchedapps.frost.facebook.requests.zip -import okhttp3.Request import org.junit.Test import kotlin.test.assertTrue @@ -47,15 +45,4 @@ class MiscTest { "zip did not seem to work on different threads" ) } - - @Test - fun a() { - val s = Request.Builder() - .url("https://www.allanwang.ca/ecse429/magenta.png") - .get() - .call().execute().body()!!.string() - "�PNG\n\u001A\nIDA�c����?\u0000\u0006�\u0002��p�\u0000\u0000\u0000\u0000IEND�B`�" - println("Hello") - println(s) - } } diff --git a/app/src/test/kotlin/com/pitchedapps/frost/rx/FlyweightTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/rx/FlyweightTest.kt new file mode 100644 index 00000000..834163bd --- /dev/null +++ b/app/src/test/kotlin/com/pitchedapps/frost/rx/FlyweightTest.kt @@ -0,0 +1,108 @@ +package com.pitchedapps.frost.rx + +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.async +import kotlinx.coroutines.runBlocking +import org.junit.Rule +import org.junit.rules.Timeout +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue +import kotlin.test.fail + +class FlyweightTest { + + @get:Rule + val globalTimeout: Timeout = Timeout.seconds(5) + + lateinit var flyweight: Flyweight<Int, Int> + + lateinit var callCount: AtomicInteger + + private val LONG_RUNNING_KEY = -78 + + @BeforeTest + fun before() { + callCount = AtomicInteger(0) + flyweight = Flyweight(GlobalScope, 100, 200L) { + callCount.incrementAndGet() + when (it) { + LONG_RUNNING_KEY -> Thread.sleep(100000) + else -> Thread.sleep(100) + } + it * 2 + } + } + + @Test + fun basic() { + assertEquals(2, runBlocking { flyweight.fetch(1) }, "Invalid result") + assertEquals(1, callCount.get(), "1 call expected") + } + + @Test + fun multipleWithOneKey() { + val results: List<Int> = runBlocking { + (0..1000).map { + flyweight.scope.async { + flyweight.fetch(1) + } + }.map { it.await() } + } + assertEquals(1, callCount.get(), "1 call expected") + assertEquals(1001, results.size, "Incorrect number of results returned") + assertTrue(results.all { it == 2 }, "Result should all be 2") + } + + @Test + fun consecutiveReuse() { + runBlocking { + flyweight.fetch(1) + assertEquals(1, callCount.get(), "1 call expected") + flyweight.fetch(1) + assertEquals(1, callCount.get(), "Reuse expected") + Thread.sleep(300) + flyweight.fetch(1) + assertEquals(2, callCount.get(), "Refetch expected") + } + } + + @Test + fun invalidate() { + runBlocking { + flyweight.fetch(1) + assertEquals(1, callCount.get(), "1 call expected") + flyweight.invalidate(1) + flyweight.fetch(1) + assertEquals(2, callCount.get(), "New call expected") + } + } + + @Test + fun destroy() { + runBlocking { + val longRunningResult = async { flyweight.fetch(LONG_RUNNING_KEY) } + flyweight.fetch(1) + flyweight.cancel() + try { + flyweight.fetch(1) + fail("Flyweight should not be fulfilled after it is destroyed") + } catch (e: Exception) { + assertEquals("Flyweight is not active", e.message, "Incorrect error found on fetch after destruction") + } + try { + longRunningResult.await() + fail("Flyweight should have cancelled previously running requests") + } catch (e: Exception) { + assertEquals( + "Flyweight cancelled", + e.message, + "Incorrect error found on fetch cancelled by destruction" + ) + } + println("Done") + } + } +}
\ No newline at end of file diff --git a/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt new file mode 100644 index 00000000..f930e529 --- /dev/null +++ b/app/src/test/kotlin/com/pitchedapps/frost/utils/CoroutineTest.kt @@ -0,0 +1,194 @@ +package com.pitchedapps.frost.utils + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.async +import kotlinx.coroutines.channels.BroadcastChannel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.count +import kotlinx.coroutines.delay +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import java.util.concurrent.Executors +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +/** + * Collection of tests around coroutines + */ +@UseExperimental(ExperimentalCoroutinesApi::class) +class CoroutineTest { + + /** + * Hooks onto the refresh channel for one true -> false cycle. + * Returns the list of event ids that were emitted + */ + private suspend fun transition(channel: ReceiveChannel<Pair<Boolean, Int>>): List<Pair<Boolean, Int>> { + var refreshed = false + return listen(channel) { (refreshing, _) -> + if (refreshed && !refreshing) + return@listen true + if (refreshing) + refreshed = true + return@listen false + } + } + + private suspend fun <T> listen(channel: ReceiveChannel<T>, shouldEnd: suspend (T) -> Boolean = { false }): List<T> = + withContext(Dispatchers.IO) { + val data = mutableListOf<T>() + for (c in channel) { + data.add(c) + if (shouldEnd(c)) break + } + channel.cancel() + return@withContext data + } + + /** + * When refreshing, we have a temporary subscriber that hooks onto a single cycle. + * The refresh channel only contains booleans, but for the sake of identification, + * each boolean will have a unique integer attached. + * + * Things to note: + * Subscription should be opened outside of async, since we don't want to miss any events. + */ + @Test + fun refreshSubscriptions() { + val refreshChannel = BroadcastChannel<Pair<Boolean, Int>>(100) + runBlocking { + // Listen to all events + val fullReceiver = refreshChannel.openSubscription() + val fullDeferred = async { listen(fullReceiver) } + + refreshChannel.send(true to 1) + refreshChannel.send(false to 2) + refreshChannel.send(true to 3) + + val partialReceiver = refreshChannel.openSubscription() + val partialDeferred = async { transition(partialReceiver) } + refreshChannel.send(false to 4) + refreshChannel.send(true to 5) + refreshChannel.send(false to 6) + refreshChannel.send(true to 7) + refreshChannel.close() + val fullStream = fullDeferred.await() + val partialStream = partialDeferred.await() + + assertEquals( + 7, + fullStream.size, + "Full stream should contain all events" + ) + assertEquals( + listOf(false to 4, true to 5, false to 6), + partialStream, + "Partial stream should include up until first true false pair" + ) + } + } + + /** + * Sanity check to ensure that contexts are being honoured + */ + @Test + fun contextSwitching() { + val mainTag = "main-test" + val mainDispatcher = Executors.newSingleThreadExecutor { r -> + Thread(r, mainTag) + }.asCoroutineDispatcher() + + val channel = BroadcastChannel<String>(100) + + runBlocking(Dispatchers.IO) { + val receiver1 = channel.openSubscription() + val receiver2 = channel.openSubscription() + launch(mainDispatcher) { + for (thread in receiver1) { + assertTrue( + Thread.currentThread().name.startsWith(mainTag), + "Channel should be received in main thread" + ) + assertFalse( + thread.startsWith(mainTag), + "Channel execution should not be in main thread" + ) + } + } + listOf(EmptyCoroutineContext, Dispatchers.IO, Dispatchers.Default, Dispatchers.IO).map { + async(it) { channel.send(Thread.currentThread().name) } + }.joinAll() + channel.close() + assertEquals(4, receiver2.count(), "Not all events received") + } + } + + /** + * Not a true throttle, but for things like fetching header badges, we want to avoid simultaneous fetches. + * As a result, I want to test that the usage of offer along with a rendezvous channel will work as I expect. + * Events should be consumed when there is no pending consumer on previous elements. + */ + @Test + fun throttledChannel() { + val channel = Channel<Int>(Channel.RENDEZVOUS) + runBlocking { + val deferred = async { + listen(channel) { + // Throttle consumer + delay(10) + return@listen false + } + } + (0..100).forEach { + channel.offer(it) + delay(1) + } + channel.close() + val received = deferred.await() + assertTrue( + received.size < 20, + "Received data should be throttled; expected that around 1/10th of all events are consumed" + ) + println(received) + } + } + + @Test + fun uniqueOnly() { + val channel = BroadcastChannel<Int>(100) + runBlocking { + val fullReceiver = channel.openSubscription() + val uniqueReceiver = channel.openSubscription().uniqueOnly(this) + + val fullDeferred = async { listen(fullReceiver) } + val uniqueDeferred = async { listen(uniqueReceiver) } + + listOf(0, 1, 2, 3, 3, 3, 4, 3, 5, 5, 1).forEach { + channel.offer(it) + } + channel.close() + + val fullData = fullDeferred.await() + val uniqueData = uniqueDeferred.await() + + assertEquals( + listOf(0, 1, 2, 3, 3, 3, 4, 3, 5, 5, 1), + fullData, + "Full receiver should get all channel events" + ) + assertEquals( + listOf(0, 1, 2, 3, 4, 3, 5, 1), + uniqueData, + "Unique receiver should not have two consecutive events that are equal" + ) + + } + } +}
\ No newline at end of file |