aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/kotlin/ca/allanwang/kau/kotlin/Zip.kt
blob: cff520f46ba45a6193ecf5165c3098f0da2086e1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package ca.allanwang.kau.kotlin

import org.jetbrains.anko.doAsync
import java.util.concurrent.atomic.AtomicInteger

/**
 * Created by Allan Wang on 2017-08-06.
 *
 * Collection of zip methods that aim to replicate
 * <a href="http://reactivex.io/documentation/operators/zip.html">Reactive Zips</a>
 * For unit returning functions
 *
 * Typically, the functions will execute asynchronously and call their given callbacks when finished.
 * Once all callbacks are called, the final onFinish callback will be executed.
 *
 * There is also a helper zipper to wrap synchronous functions with Anko's doAsync to achieve the same results
 *
 * Note that not wrapping synchronous functions will render these methods useless,
 * as you can simply define an inline callback after all functions are finished
 */

/**
 * Callback which will only execute the first time
 */
open class ZipCallbackBase {
    var completed: Boolean = false

    inline operator fun invoke(callback: () -> Unit) {
        if (completed) return
        completed = true
        callback()
    }
}

class ZipCallback<T>(val onReceived: (T) -> Unit) : ZipCallbackBase() {
    operator fun invoke(result: T) = invoke { onReceived(result) }
}

class ZipEmptyCallback(val onReceived: () -> Unit) : ZipCallbackBase() {
    operator fun invoke() = invoke(onReceived)
}

/**
 * Given a default result, a series of tasks, and a finished callback,
 * this method will run all tasks and wait until all tasks emit a response
 * The response will then be sent back to the callback
 *
 * ALl tasks must invoke the task callback for [onFinished] to execute
 */
inline fun <reified T> Collection<(ZipCallback<T>) -> Unit>.zip(
        defaultResult: T, crossinline onFinished: (results: Array<T>) -> Unit
) {
    val result = Array(size) { defaultResult }
    val countDown = AtomicInteger(size)
    forEachIndexed { index, asyncFun ->
        asyncFun(ZipCallback<T> {
            result[index] = it
            if (countDown.decrementAndGet() <= 0)
                onFinished(result)
        })
    }
}

/**
 * Simplified zip method with no finished callback arguments
 */
inline fun Collection<(ZipEmptyCallback) -> Unit>.zip(crossinline onFinished: () -> Unit) {
    val countDown = AtomicInteger(size)
    forEach { asyncFun ->
        asyncFun(ZipEmptyCallback {
            if (countDown.decrementAndGet() <= 0)
                onFinished()
        })
    }
}

/**
 * Converts a collection of synchronous tasks to asynchronous tasks with a common callback
 */
inline fun Collection<() -> Unit>.zipAsync(crossinline onFinished: () -> Unit) {
    map { synchronousFun ->
        {
            callback: ZipEmptyCallback ->
            doAsync {
                synchronousFun()
                callback()
            }; Unit
        }
    }.zip(onFinished)
}