@@ -2,37 +2,47 @@ import datadog.trace.api.Trace
22import datadog.trace.instrumentation.kotlin.coroutines.CoreKotlinCoroutineTests
33import kotlinx.coroutines.CoroutineDispatcher
44import kotlinx.coroutines.Dispatchers
5+ import kotlinx.coroutines.flow.Flow
6+ import kotlinx.coroutines.flow.FlowCollector
7+ import kotlinx.coroutines.flow.channelFlow
58import kotlinx.coroutines.flow.flow
69import kotlinx.coroutines.flow.flowOn
710import kotlinx.coroutines.flow.single
811import kotlinx.coroutines.launch
912import kotlinx.coroutines.withTimeout
1013
14+ // Workaround for Groovy 4 breaking Kotlin SAM conversion for FlowCollector
15+ private suspend inline fun <T > Flow<T>.forEach (crossinline action : suspend (T ) -> Unit ) = collect(object : FlowCollector <T > {
16+ override suspend fun emit (value : T ) = action(value)
17+ })
18+
1119class KotlinCoroutineTests (dispatcher : CoroutineDispatcher ) : CoreKotlinCoroutineTests(dispatcher) {
1220
13- // @Trace
14- // fun tracedAcrossFlows(withModifiedContext: Boolean): Int = runTest {
15- // val producer = flow {
16- // repeat(3) {
17- // tracedChild("produce_$it")
18- // if (withModifiedContext) {
19- // withTimeout(100) {
20- // emit(it)
21- // }
22- // } else {
23- // emit(it)
24- // }
25- // }
26- // }.flowOn(jobName("producer"))
27- //
28- // launch(jobName("consumer")) {
29- // producer.collect {
30- // tracedChild("consume_$it")
31- // }
32- // }
33- //
34- // 7
35- // }
21+ @Trace
22+ fun tracedAcrossFlows (withModifiedContext : Boolean ): Int = runTest {
23+ // Use channelFlow when emitting from modified context (withTimeout) as regular flow doesn't allow it
24+ val producer: Flow <Int > = if (withModifiedContext) {
25+ channelFlow {
26+ repeat(3 ) {
27+ tracedChild(" produce_$it " )
28+ withTimeout(100 ) { send(it) }
29+ }
30+ }
31+ } else {
32+ flow {
33+ repeat(3 ) {
34+ tracedChild(" produce_$it " )
35+ emit(it)
36+ }
37+ }
38+ }.flowOn(jobName(" producer" ))
39+
40+ launch(jobName(" consumer" )) {
41+ producer.forEach { tracedChild(" consume_$it " ) }
42+ }
43+
44+ 7
45+ }
3646
3747 @Trace
3848 fun traceAfterFlow (): Int = runTest {
0 commit comments