Skip to content

Commit d1cec14

Browse files
committed
update
1 parent e338f2c commit d1cec14

File tree

1 file changed

+108
-0
lines changed
  • src/main/kotlin/com/hoc081098/kotlin_playground

1 file changed

+108
-0
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package com.hoc081098.kotlin_playground
2+
3+
import arrow.atomic.AtomicInt
4+
import com.hoc081098.flowext.FlowExtPreview
5+
import com.hoc081098.flowext.flatMapConcatEager
6+
import com.hoc081098.flowext.flowFromSuspend
7+
import com.hoc081098.flowext.mapToResult
8+
import com.hoc081098.flowext.retryWithExponentialBackoff
9+
import kotlin.system.exitProcess
10+
import kotlin.time.Duration.Companion.seconds
11+
import kotlinx.coroutines.CoroutineDispatcher
12+
import kotlinx.coroutines.CoroutineScope
13+
import kotlinx.coroutines.Dispatchers
14+
import kotlinx.coroutines.ExperimentalCoroutinesApi
15+
import kotlinx.coroutines.FlowPreview
16+
import kotlinx.coroutines.SupervisorJob
17+
import kotlinx.coroutines.cancel
18+
import kotlinx.coroutines.channels.Channel
19+
import kotlinx.coroutines.channels.trySendBlocking
20+
import kotlinx.coroutines.delay
21+
import kotlinx.coroutines.flow.consumeAsFlow
22+
import kotlinx.coroutines.flow.launchIn
23+
import kotlinx.coroutines.flow.map
24+
import kotlinx.coroutines.flow.onEach
25+
import kotlinx.coroutines.runBlocking
26+
27+
fun interface Request<T> {
28+
suspend fun execute(): T
29+
}
30+
31+
@OptIn(
32+
FlowExtPreview::class,
33+
ExperimentalCoroutinesApi::class,
34+
FlowPreview::class,
35+
)
36+
class Processor<T>(
37+
concurrency: Int = 3,
38+
dispatcher: CoroutineDispatcher = Dispatchers.IO,
39+
private val onResult: suspend (Request<T>, Result<T>) -> Unit,
40+
) {
41+
private val requests = Channel<Request<T>>(Channel.UNLIMITED)
42+
private val scope = CoroutineScope(dispatcher + SupervisorJob())
43+
44+
fun enqueue(request: Request<T>) =
45+
requests.trySendBlocking(request).getOrThrow()
46+
47+
init {
48+
requests
49+
.consumeAsFlow()
50+
.flatMapConcatEager(concurrency = concurrency) { request ->
51+
flowFromSuspend { request.execute() }
52+
.retryWithExponentialBackoff(
53+
initialDelay = 1.seconds,
54+
factor = 2.0,
55+
maxAttempt = 2,
56+
) { e ->
57+
println("[Error] $e -> retrying...")
58+
true
59+
}
60+
.mapToResult()
61+
.map { request to it }
62+
}
63+
.onEach { (request, result) -> onResult(request, result) }
64+
.launchIn(scope)
65+
}
66+
67+
fun close() =
68+
runBlocking { scope.cancel(); println("$this closed") }
69+
}
70+
71+
72+
fun main() = runBlocking {
73+
val concurrency = 3
74+
val processor = Processor<Int>(concurrency = concurrency) { request, result ->
75+
println("<<< Request $request -> $result")
76+
}
77+
78+
val count = AtomicInt()
79+
80+
repeat(100) { index ->
81+
processor.enqueue {
82+
count.incrementAndGet().let {
83+
if (it > concurrency) {
84+
exitProcess(-1)
85+
}
86+
}
87+
88+
try {
89+
println(">>> Request $index started")
90+
delay((500L..1_000L).random())
91+
if (index % 10 == 0) {
92+
throw RuntimeException("fake error $index")
93+
}
94+
println(">>> Request $index finished")
95+
index
96+
} finally {
97+
count.decrementAndGet().let {
98+
if (it < 0) {
99+
exitProcess(-1)
100+
}
101+
}
102+
}
103+
}
104+
}
105+
106+
delay(20_000)
107+
processor.close()
108+
}

0 commit comments

Comments
 (0)