Skip to content

Commit 00cb4e5

Browse files
committed
Enhance support for async stack traces in flows
* simplify instrumentation by making a single insertion point source instead of having one in every class * handle a double-wrapping case which leads to errors; allow agent to choose how to handle it * support more commonly used operators (such as `scan`, `buffer`, `debounce` with dynamic timeout) Unfortunately, this change doesn't cover all possible scenarios of using flows, as many of them interoperate with `Channel`s, and it should be addressed separately.
1 parent 75107bc commit 00cb4e5

File tree

8 files changed

+95
-35
lines changed

8 files changed

+95
-35
lines changed

IntelliJ-patches.md

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,29 @@ The agent needs three entities to establish a proper asynchronous stack traces c
6161

6262
The key for MutableStateFlow is the element itself. For MutableSharedFlow, the element is wrapped into a unique object to prevent bridging mistakes when two equal elements are emitted from different places.
6363

64-
Also, operators `debounce` and `sample` are supported. As they use an intermediary flow inside, the transferred values are wrapped and unwrapped the same way as in MutableSharedFlow.
64+
Most of the operators applicable to flows (such as `map`, `scan`, `debounce`, `buffer`) are supported. As some of them use an intermediary flow inside, the transferred values are wrapped and unwrapped the same way as in MutableSharedFlow.
6565
It means there may be all-library async stack traces between a stack trace containing `emit` and a stack trace containing `collect`.
6666

67+
There is no support yet for many operators that heavily use `Channel`s inside (such as `timeout`), as well as for functions that convert flows to channels and vice versa (such as `produceIn`).
68+
6769
### API
6870

69-
No new public methods are introduced; some logic was extracted to separate methods so that the debugger agent could instrument it properly:
71+
Some logic related to instrumentation was extracted to separate methods so that the debugger agent could instrument it properly:
7072

7173
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternal` -- wrapper class used to create a unique object for the debugger agent
7274
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.wrapInternal` -- returns passed argument by default; the agent instruments it to call `wrapInternalDebuggerCapture` instead
73-
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.wrapInternalDebuggerCapture` -- wraps passed arguments into a `FlowValueWrapperInternal`; only used after transformation
75+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.wrapInternalDebuggerCaptureX` -- wraps passed arguments into a `FlowValueWrapperInternal`; only used after transformation.
76+
`X` may mean `Strict` or `Lenient`. Both methods handle double-wrapping, which is always an error that is hard to investigate if it arises naturally.
77+
- `Strict` throws an exception, thus allowing fail-fast strategy
78+
- `Lenient` returns its argument without wrapping it again
79+
Debugger agent decides which version to use based on IDE settings.
7480
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapInternal` -- returns passed argument by default; the agent instruments it to call `unwrapInternalDebuggerCapture` instead
7581
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapInternalDebuggerCapture` -- unwraps passed argument so it returns the original value; only used after transformation
82+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.emitInternal(FlowCollector, value)` -- common insertion point for a debugger agent; simplifies instrumentation; the value is always being unwrapped inside
83+
84+
One internal method was added to `BufferedChannelIterator`: `nextInternal` -- same as `next` but may return a wrapped value. It should only be used with a function that is capable of unwrapping the value (see `BufferedChannel.emitAll` and `BufferedChannelIterator.next`), so there's a guarantee a wrapped value will always unwrap before emitting.
85+
86+
Why not just let `next` return a maybe wrapped value? That's because it is heavily used outside a currently supported scope. For example, one may just indirectly call it from a for-loop. In this case, unwrapping will never happen, and a user will get a handful of `ClassCastException`s.
87+
88+
One public method was added to support `buffer` and operators that use it inside:
89+
- `ReceiveChannel.emitAll`. It encapsulates emitting values in `FlowCollector.emitAllImpl` and has a special implementation in `BufferedChannel`.

kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import kotlinx.coroutines.*
77
import kotlinx.coroutines.channels.ChannelResult.Companion.closed
88
import kotlinx.coroutines.channels.ChannelResult.Companion.failure
99
import kotlinx.coroutines.channels.ChannelResult.Companion.success
10+
import kotlinx.coroutines.flow.FlowCollector
1011
import kotlinx.coroutines.flow.internal.*
1112
import kotlinx.coroutines.internal.*
1213
import kotlinx.coroutines.selects.*
@@ -1550,6 +1551,13 @@ internal open class BufferedChannel<E>(
15501551

15511552
override fun iterator(): ChannelIterator<E> = BufferedChannelIterator()
15521553

1554+
override suspend fun emitAll(collector: FlowCollector<E>) {
1555+
val iterator = iterator() as BufferedChannel.BufferedChannelIterator
1556+
while (iterator.hasNext()) {
1557+
collector.emitInternal(iterator.nextInternal())
1558+
}
1559+
}
1560+
15531561
/**
15541562
* The key idea is that an iterator is a special receiver type,
15551563
* which should be resumed differently to [receive] and [onReceive]
@@ -1601,7 +1609,7 @@ internal open class BufferedChannel<E>(
16011609
// Also, inform the `BufferedChannel` extensions that
16021610
// the synchronization of this receive operation is completed.
16031611
onElementRetrieved = { element ->
1604-
this.receiveResult = element
1612+
saveReceiveResult(element)
16051613
true
16061614
},
16071615
// As no waiter is provided, suspension is impossible.
@@ -1639,7 +1647,7 @@ internal open class BufferedChannel<E>(
16391647
// In case `onUndeliveredElement` is present, we must
16401648
// invoke it in the latter case.
16411649
onElementRetrieved = { element ->
1642-
this.receiveResult = element
1650+
saveReceiveResult(element)
16431651
this.continuation = null
16441652
cont.resume(true, onUndeliveredElement?.bindCancellationFun(element, cont.context))
16451653
},
@@ -1669,8 +1677,17 @@ internal open class BufferedChannel<E>(
16691677
}
16701678
}
16711679

1672-
@Suppress("UNCHECKED_CAST")
16731680
override fun next(): E {
1681+
return unwrapInternal(nextInternal())
1682+
}
1683+
1684+
/**
1685+
* Result may be wrapped by debugger agent; use this method only with [unwrapInternal] or [emitInternal]!
1686+
*
1687+
* @see [next], [emitAll]
1688+
*/
1689+
@Suppress("UNCHECKED_CAST")
1690+
internal fun nextInternal(): E {
16741691
// Read the already received result, or [NO_RECEIVE_RESULT] if [hasNext] has not been invoked yet.
16751692
val result = receiveResult
16761693
check(result !== NO_RECEIVE_RESULT) { "`hasNext()` has not been invoked" }
@@ -1687,13 +1704,17 @@ internal open class BufferedChannel<E>(
16871704
val cont = this.continuation!!
16881705
this.continuation = null
16891706
// Store the retrieved element in `receiveResult`.
1690-
this.receiveResult = element
1707+
saveReceiveResult(element)
16911708
// Try to resume this `hasNext()`. Importantly, the receiver coroutine
16921709
// may be cancelled after it is successfully resumed but not dispatched yet.
16931710
// In case `onUndeliveredElement` is specified, we need to invoke it in the latter case.
16941711
return cont.tryResume0(true, onUndeliveredElement?.bindCancellationFun(element, cont.context))
16951712
}
16961713

1714+
private fun saveReceiveResult(element: E) {
1715+
this.receiveResult = wrapInternal(element)
1716+
}
1717+
16971718
fun tryResumeHasNextOnClosedChannel() {
16981719
/*
16991720
* Read the current continuation of the suspended `hasNext()` call and clean the corresponding field to avoid memory leaks.

kotlinx-coroutines-core/common/src/channels/Channel.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
88
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
99
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
1010
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
11+
import kotlinx.coroutines.flow.FlowCollector
1112
import kotlinx.coroutines.internal.*
1213
import kotlinx.coroutines.selects.*
1314
import kotlin.contracts.*
@@ -288,6 +289,15 @@ public interface ReceiveChannel<out E> {
288289
*/
289290
public operator fun iterator(): ChannelIterator<E>
290291

292+
/**
293+
* Emits all elements of this channel using [collector].
294+
*/
295+
public suspend fun emitAll(collector: FlowCollector<E>) {
296+
for (element in this) {
297+
collector.emit(element)
298+
}
299+
}
300+
291301
/**
292302
* Cancels reception of remaining elements from this channel with an optional [cause].
293303
* This function closes the channel and removes all buffered sent elements from it.

kotlinx-coroutines-core/common/src/flow/Channels.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>,
2929
ensureActive()
3030
var cause: Throwable? = null
3131
try {
32-
for (element in channel) {
33-
emit(element)
34-
}
32+
channel.emitAll(this)
3533
} catch (e: Throwable) {
3634
cause = e
3735
throw e

kotlinx-coroutines-core/common/src/flow/SharedFlow.kt

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -389,18 +389,13 @@ internal open class SharedFlowImpl<T>(
389389
awaitValue(slot) // await signal that the new value is available
390390
}
391391
collectorJob?.ensureActive()
392-
emitInner(collector, newValue as T)
392+
collector.emitInternal(newValue as T)
393393
}
394394
} finally {
395395
freeSlot(slot)
396396
}
397397
}
398398

399-
// Shouldn't be inlined, the method is instrumented by the IDEA debugger agent
400-
private suspend fun emitInner(collector: FlowCollector<T>, value: T) {
401-
collector.emit(unwrapInternal(value))
402-
}
403-
404399
override fun tryEmit(value: T): Boolean {
405400
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
406401
val emitted = synchronized(this) {

kotlinx-coroutines-core/common/src/flow/StateFlow.kt

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ private class StateFlowImpl<T>(
394394
collectorJob?.ensureActive()
395395
// Conflate value emissions using equality
396396
if (oldState == null || oldState != newState) {
397-
emitInner(collector, newState)
397+
collector.emitInternal(newState)
398398
oldState = newState
399399
}
400400
// Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
@@ -407,11 +407,6 @@ private class StateFlowImpl<T>(
407407
}
408408
}
409409

410-
// Shouldn't be inlined, the method is instrumented by the IDEA debugger agent
411-
private suspend fun emitInner(collector: FlowCollector<T>, newState: Any) {
412-
collector.emit(NULL.unbox(newState))
413-
}
414-
415410
override fun createSlot() = StateFlowSlot()
416411
override fun createSlotArray(size: Int): Array<StateFlowSlot?> = arrayOfNulls(size)
417412

kotlinx-coroutines-core/common/src/flow/internal/FlowValueWrapperInternal.kt

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
@file:Suppress("UNCHECKED_CAST")
2+
13
package kotlinx.coroutines.flow.internal
24

5+
import kotlinx.coroutines.flow.FlowCollector
6+
37
/**
48
* Used by IDEA debugger agent to support asynchronous stack traces in flows.
59
* The agent requires a unique object present in both current and async stack traces,
@@ -11,12 +15,40 @@ internal class FlowValueWrapperInternal<T>(val value: T)
1115
internal fun <T> wrapInternal(value: T): T = value
1216
internal fun <T> unwrapInternal(value: T): T = value
1317

14-
// debugger agent transforms wrapInternal so it returns wrapInternalDebuggerCapture(value) instead of just value
15-
private fun wrapInternalDebuggerCapture(value: Any?): Any = FlowValueWrapperInternal(value)
18+
// debugger agent transforms wrapInternal so it returns wrapInternalDebuggerCaptureX(value) instead of just value.
19+
// "X" may be Strict or Lenient, debugger agent picks one depending on its arguments.
20+
// Both versions are aimed at handling double wrapping, which is always an error;
21+
// a lenient version swallows it, allowing the application to proceed normally
22+
// at the cost of not working async stack traces in the place;
23+
// a strict version throws an exception so that the issue could be traced at its earliest point
24+
25+
private fun wrapInternalDebuggerCaptureStrict(value: Any?): Any {
26+
if (value is FlowValueWrapperInternal<*>) {
27+
throw DoubleWrappingException("Double-wrapping detected; failing fast. This should never happen!")
28+
}
29+
return FlowValueWrapperInternal(value)
30+
}
31+
32+
private fun wrapInternalDebuggerCaptureLenient(value: Any?): Any {
33+
if (value is FlowValueWrapperInternal<*>) {
34+
return value
35+
}
36+
return FlowValueWrapperInternal(value)
37+
}
1638

1739
// debugger agent transforms unwrapInternal so it returns unwrapInternalDebuggerCapture(value) instead of just value
1840
//
1941
// normally, value is always FlowValueWrapperInternal, but potentially instrumentation may start
2042
// in the middle of the execution (for example, when the debugger was attached to a running application),
2143
// and the emitted value hadn't been wrapped
22-
private fun unwrapInternalDebuggerCapture(value: Any?): Any? = (value as? FlowValueWrapperInternal<*>)?.value ?: value
44+
private fun unwrapInternalDebuggerCapture(value: Any?): Any? {
45+
val wrapper = value as? FlowValueWrapperInternal<*> ?: return value
46+
return wrapper.value
47+
}
48+
49+
// Shouldn't be inlined, the method is instrumented by the IDEA debugger agent
50+
internal suspend fun <T> FlowCollector<T>.emitInternal(value: Any?) {
51+
emit(NULL.unbox(unwrapInternal(value)))
52+
}
53+
54+
private class DoubleWrappingException(message: String) : RuntimeException(message)

kotlinx-coroutines-core/common/src/flow/operators/Delay.kt

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -209,10 +209,10 @@ private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long): Fl
209209
var timeoutMillis = 0L // will be always computed when lastValue != null
210210
// Compute timeout for this value
211211
if (lastValue != null) {
212-
timeoutMillis = timeoutMillisSelector(NULL.unbox(lastValue))
212+
timeoutMillis = timeoutMillisSelector(NULL.unbox(unwrapInternal(lastValue)))
213213
require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }
214214
if (timeoutMillis == 0L) {
215-
emitInner(downstream, lastValue)
215+
downstream.emitInternal(lastValue)
216216
lastValue = null // Consume the value
217217
}
218218
}
@@ -223,7 +223,7 @@ private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long): Fl
223223
// Set timeout when lastValue exists and is not consumed yet
224224
if (lastValue != null) {
225225
onTimeout(timeoutMillis) {
226-
emitInner<T>(downstream, lastValue)
226+
downstream.emitInternal<T>(lastValue)
227227
lastValue = null // Consume the value
228228
}
229229
}
@@ -233,19 +233,14 @@ private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long): Fl
233233
.onFailure {
234234
it?.let { throw it }
235235
// If closed normally, emit the latest value
236-
if (lastValue != null) emitInner<T>(downstream, lastValue)
236+
if (lastValue != null) downstream.emitInternal<T>(lastValue)
237237
lastValue = DONE
238238
}
239239
}
240240
}
241241
}
242242
}
243243

244-
// Shouldn't be inlined, the method is instrumented by the IDEA debugger agent
245-
private suspend fun <T> emitInner(downstream: FlowCollector<T>, value: Any?) {
246-
downstream.emit(NULL.unbox(unwrapInternal(value)))
247-
}
248-
249244
/**
250245
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis].
251246
*
@@ -295,7 +290,7 @@ public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
295290
ticker.onReceive {
296291
val value = lastValue ?: return@onReceive
297292
lastValue = null // Consume the value
298-
emitInner(downstream, value)
293+
downstream.emitInternal(value)
299294
}
300295
}
301296
}

0 commit comments

Comments
 (0)