Skip to content

Commit 3a8e3d8

Browse files
authored
Merge pull request #5 from AlexVanGogen/release/1.8.0-intellij-11
Enhance support for async stack traces in flows
2 parents 75107bc + 0267812 commit 3a8e3d8

File tree

10 files changed

+136
-66
lines changed

10 files changed

+136
-66
lines changed

IntelliJ-patches.md

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,32 @@ The agent needs three entities to establish a proper asynchronous stack traces c
6161

6262
The key for MutableStateFlow is the element itself. For MutableSharedFlow, the element is wrapped into a unique object to prevent bridging mistakes when two equal elements are emitted from different places.
6363

64-
Also, operators `debounce` and `sample` are supported. As they use an intermediary flow inside, the transferred values are wrapped and unwrapped the same way as in MutableSharedFlow.
64+
Most of the operators applicable to flows (such as `map`, `scan`, `debounce`, `timeout`, `buffer`) are supported. As some of them use an intermediary flow inside, the transferred values are wrapped and unwrapped the same way as in MutableSharedFlow.
6565
It means there may be all-library async stack traces between a stack trace containing `emit` and a stack trace containing `collect`.
6666

6767
### API
6868

69-
No new public methods are introduced; some logic was extracted to separate methods so that the debugger agent could instrument it properly:
69+
Some logic related to instrumentation was extracted to separate methods so that the debugger agent could instrument it properly:
7070

7171
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternal` -- wrapper class used to create a unique object for the debugger agent
7272
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.wrapInternal` -- returns passed argument by default; the agent instruments it to call `wrapInternalDebuggerCapture` instead
73-
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.wrapInternalDebuggerCapture` -- wraps passed arguments into a `FlowValueWrapperInternal`; only used after transformation
73+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.wrapInternalDebuggerCapture` -- wraps passed arguments into a `FlowValueWrapperInternal`; only used after transformation.
7474
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapInternal` -- returns passed argument by default; the agent instruments it to call `unwrapInternalDebuggerCapture` instead
7575
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapInternalDebuggerCapture` -- unwraps passed argument so it returns the original value; only used after transformation
76+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapTyped` -- utility function served to ease casting to a real underlying type
77+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.emitInternal(FlowCollector, value)` -- alternative of a regular `FlowCollector.emit` that supports insertion points; if there is a `FlowCollector`, its `emit` call can be replaced with `emitInternal` so this case would also be supported for constructing async stack traces
78+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.debuggerCapture` -- common insertion point for a debugger agent; simplifies instrumentation; the value is always being unwrapped inside.
79+
80+
One internal method was added to `BufferedChannel`: `emitAllInternal`. This method ensures the value will be unwrapped in an insertion point.
81+
82+
One internal method was added to `flow/Channels.kt`: `emitAllInternal`. It emits all values, like usual, but also considers wrapping/unwrapping supported in `BufferedChannel`.
83+
84+
One internal method was added to `ChannelCoroutine`: `emitAllInternal` serves to bridge its delegate and the method above.
85+
86+
One internal method was added to `BufferedChannelIterator`: `nextInternal` -- same as `next` but may return a wrapped value. It should only be used with a function that is capable of unwrapping the value (see `BufferedChannel.emitAll` and `BufferedChannelIterator.next`), so there's a guarantee a wrapped value will always unwrap before emitting.
87+
88+
Why not just let `next` return a maybe wrapped value? That's because it is heavily used outside a currently supported scope. For example, one may just indirectly call it from a for-loop. In this case, unwrapping will never happen, and a user will get a handful of `ClassCastException`s.
89+
90+
Changes were made to lambda parameter `onElementRetrieved` in `BufferedChannel<E>` methods: now they accept `Any?` instead of `E` because now they may be given a wrapped value.
91+
92+
`SelectImplementation.complete` now uses `debuggerCapture` to properly propagate value that might come from flows.

kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import kotlinx.coroutines.*
77
import kotlinx.coroutines.channels.ChannelResult.Companion.closed
88
import kotlinx.coroutines.channels.ChannelResult.Companion.failure
99
import kotlinx.coroutines.channels.ChannelResult.Companion.success
10+
import kotlinx.coroutines.flow.FlowCollector
1011
import kotlinx.coroutines.flow.internal.*
1112
import kotlinx.coroutines.internal.*
1213
import kotlinx.coroutines.selects.*
@@ -664,7 +665,7 @@ internal open class BufferedChannel<E>(
664665
protected open fun onReceiveDequeued() {}
665666

666667
override suspend fun receive(): E =
667-
receiveImpl( // <-- this is an inline function
668+
receiveImpl<E>( // <-- this is an inline function
668669
// Do not create a continuation until it is required;
669670
// it is created later via [onNoWaiterSuspend], if needed.
670671
waiter = null,
@@ -673,7 +674,7 @@ internal open class BufferedChannel<E>(
673674
// Also, inform `BufferedChannel` extensions that
674675
// synchronization of this receive operation is completed.
675676
onElementRetrieved = { element ->
676-
return element
677+
return unwrapTyped<E>(element)
677678
},
678679
// As no waiter is provided, suspension is impossible.
679680
onSuspend = { _, _, _ -> error("unexpected") },
@@ -705,8 +706,9 @@ internal open class BufferedChannel<E>(
705706
// not dispatched yet. In case `onUndeliveredElement` is
706707
// specified, we need to invoke it in the latter case.
707708
onElementRetrieved = { element ->
708-
val onCancellation = onUndeliveredElement?.bindCancellationFun(element, cont.context)
709-
cont.resume(element, onCancellation)
709+
val unwrapped: E = unwrapTyped(element)
710+
val onCancellation = onUndeliveredElement?.bindCancellationFun(unwrapped, cont.context)
711+
cont.resume(unwrapped, onCancellation)
710712
},
711713
onClosed = { onClosedReceiveOnNoWaiterSuspend(cont) },
712714
)
@@ -732,7 +734,7 @@ internal open class BufferedChannel<E>(
732734
receiveImpl( // <-- this is an inline function
733735
waiter = null,
734736
onElementRetrieved = { element ->
735-
success(element)
737+
success(unwrapTyped(element))
736738
},
737739
onSuspend = { _, _, _ -> error("unexpected") },
738740
onClosed = { closed(closeCause) },
@@ -749,7 +751,8 @@ internal open class BufferedChannel<E>(
749751
segment, index, r,
750752
waiter = waiter,
751753
onElementRetrieved = { element ->
752-
cont.resume(success(element), onUndeliveredElement?.bindCancellationFun(element, cont.context))
754+
val unwrapped = unwrapTyped<E>(element)
755+
cont.resume(success(unwrapped), onUndeliveredElement?.bindCancellationFun(unwrapped, cont.context))
753756
},
754757
onClosed = { onClosedReceiveCatchingOnNoWaiterSuspend(cont) }
755758
)
@@ -782,7 +785,7 @@ internal open class BufferedChannel<E>(
782785
// Store an already interrupted receiver in case of suspension.
783786
waiter = INTERRUPTED_RCV,
784787
// Finish when an element is successfully retrieved.
785-
onElementRetrieved = { element -> success(element) },
788+
onElementRetrieved = { element -> success(unwrapTyped(element)) },
786789
// On suspension, the `INTERRUPTED_RCV` token has been
787790
// installed, and this `tryReceive()` must fail.
788791
onSuspend = { segm, _, globalIndex ->
@@ -846,7 +849,7 @@ internal open class BufferedChannel<E>(
846849
// Clean the reference to the previous segment.
847850
segment.cleanPrev()
848851
@Suppress("UNCHECKED_CAST")
849-
onUndeliveredElement?.callUndeliveredElementCatchingException(updCellResult as E)?.let { throw it }
852+
onUndeliveredElement?.callUndeliveredElementCatchingException(unwrapTyped(updCellResult))?.let { throw it }
850853
}
851854
}
852855
}
@@ -864,7 +867,7 @@ internal open class BufferedChannel<E>(
864867
/* This lambda is invoked when an element has been
865868
successfully retrieved, either from the buffer or
866869
by making a rendezvous with a suspended sender. */
867-
onElementRetrieved: (element: E) -> R,
870+
onElementRetrieved: (element: Any?) -> R,
868871
/* This lambda is called when the operation suspends in the cell
869872
specified by the segment and its global and in-segment indices. */
870873
onSuspend: (segm: ChannelSegment<E>, i: Int, r: Long) -> R,
@@ -934,7 +937,7 @@ internal open class BufferedChannel<E>(
934937
// Clean the reference to the previous segment before finishing.
935938
segment.cleanPrev()
936939
@Suppress("UNCHECKED_CAST")
937-
onElementRetrieved(updCellResult as E)
940+
onElementRetrieved(updCellResult)
938941
}
939942
}
940943
}
@@ -952,7 +955,7 @@ internal open class BufferedChannel<E>(
952955
/* This lambda is invoked when an element has been
953956
successfully retrieved, either from the buffer or
954957
by making a rendezvous with a suspended sender. */
955-
onElementRetrieved: (element: E) -> Unit,
958+
onElementRetrieved: (element: Any?) -> Unit,
956959
/* This lambda is called when the channel is observed
957960
in the closed state and no waiting senders is found,
958961
which means that it is closed for receiving. */
@@ -1540,7 +1543,7 @@ internal open class BufferedChannel<E>(
15401543
@Suppress("UNCHECKED_CAST")
15411544
private val onUndeliveredElementReceiveCancellationConstructor: OnCancellationConstructor? = onUndeliveredElement?.let {
15421545
{ select: SelectInstance<*>, _: Any?, element: Any? ->
1543-
{ if (element !== CHANNEL_CLOSED) onUndeliveredElement.callUndeliveredElement(element as E, select.context) }
1546+
{ if (element !== CHANNEL_CLOSED) onUndeliveredElement.callUndeliveredElement(unwrapTyped(element), select.context) }
15441547
}
15451548
}
15461549

@@ -1550,6 +1553,13 @@ internal open class BufferedChannel<E>(
15501553

15511554
override fun iterator(): ChannelIterator<E> = BufferedChannelIterator()
15521555

1556+
internal suspend fun emitAllInternal(collector: FlowCollector<E>) {
1557+
val iterator = iterator() as BufferedChannel.BufferedChannelIterator
1558+
while (iterator.hasNext()) {
1559+
collector.emitInternal(iterator.nextInternal())
1560+
}
1561+
}
1562+
15531563
/**
15541564
* The key idea is that an iterator is a special receiver type,
15551565
* which should be resumed differently to [receive] and [onReceive]
@@ -1641,7 +1651,7 @@ internal open class BufferedChannel<E>(
16411651
onElementRetrieved = { element ->
16421652
this.receiveResult = element
16431653
this.continuation = null
1644-
cont.resume(true, onUndeliveredElement?.bindCancellationFun(element, cont.context))
1654+
cont.resume(true, onUndeliveredElement?.bindCancellationFun(unwrapTyped(element), cont.context))
16451655
},
16461656
onClosed = { onClosedHasNextNoWaiterSuspend() }
16471657
)
@@ -1669,8 +1679,17 @@ internal open class BufferedChannel<E>(
16691679
}
16701680
}
16711681

1672-
@Suppress("UNCHECKED_CAST")
16731682
override fun next(): E {
1683+
return unwrapInternal(nextInternal())
1684+
}
1685+
1686+
/**
1687+
* Result may be wrapped by debugger agent; use this method only with [unwrapInternal] or [emitInternal]!
1688+
*
1689+
* @see [next], [emitAll]
1690+
*/
1691+
@Suppress("UNCHECKED_CAST")
1692+
internal fun nextInternal(): E {
16741693
// Read the already received result, or [NO_RECEIVE_RESULT] if [hasNext] has not been invoked yet.
16751694
val result = receiveResult
16761695
check(result !== NO_RECEIVE_RESULT) { "`hasNext()` has not been invoked" }
@@ -1687,7 +1706,7 @@ internal open class BufferedChannel<E>(
16871706
val cont = this.continuation!!
16881707
this.continuation = null
16891708
// Store the retrieved element in `receiveResult`.
1690-
this.receiveResult = element
1709+
this.receiveResult = wrapInternal(element)
16911710
// Try to resume this `hasNext()`. Importantly, the receiver coroutine
16921711
// may be cancelled after it is successfully resumed but not dispatched yet.
16931712
// In case `onUndeliveredElement` is specified, we need to invoke it in the latter case.
@@ -2762,16 +2781,17 @@ internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: Bu
27622781
}
27632782

27642783
@Suppress("UNCHECKED_CAST")
2765-
internal fun getElement(index: Int) = data[index * 2].value as E
2784+
internal fun getElement(index: Int) = unwrapInternal(data[index * 2].value) as E
27662785

2767-
internal fun retrieveElement(index: Int): E = getElement(index).also { cleanElement(index) }
2786+
@Suppress("UNCHECKED_CAST")
2787+
internal fun retrieveElement(index: Int): E = (data[index * 2].value as E).also { cleanElement(index) }
27682788

27692789
internal fun cleanElement(index: Int) {
2770-
setElementLazy(index, null)
2790+
data[index * 2].lazySet(null)
27712791
}
27722792

27732793
private fun setElementLazy(index: Int, value: Any?) {
2774-
data[index * 2].lazySet(value)
2794+
data[index * 2].lazySet(wrapInternal(value))
27752795
}
27762796

27772797
// ######################################

kotlinx-coroutines-core/common/src/channels/Channel.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
88
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
99
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
1010
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
11+
import kotlinx.coroutines.flow.FlowCollector
1112
import kotlinx.coroutines.internal.*
1213
import kotlinx.coroutines.selects.*
1314
import kotlin.contracts.*

kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package kotlinx.coroutines.channels
22

33
import kotlinx.coroutines.*
4+
import kotlinx.coroutines.flow.FlowCollector
5+
import kotlinx.coroutines.flow.emitAllInternal
46
import kotlin.coroutines.*
57

68
internal open class ChannelCoroutine<E>(
@@ -35,4 +37,8 @@ internal open class ChannelCoroutine<E>(
3537
_channel.cancel(exception) // cancel the channel
3638
cancelCoroutine(exception) // cancel the job
3739
}
40+
41+
internal suspend fun emitAllInternal(flowCollector: FlowCollector<E>) {
42+
emitAllInternal(_channel, flowCollector)
43+
}
3844
}

kotlinx-coroutines-core/common/src/flow/Channels.kt

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>,
2929
ensureActive()
3030
var cause: Throwable? = null
3131
try {
32-
for (element in channel) {
33-
emit(element)
34-
}
32+
emitAllInternal(channel, this)
3533
} catch (e: Throwable) {
3634
cause = e
3735
throw e
@@ -40,6 +38,22 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>,
4038
}
4139
}
4240

41+
internal suspend fun <T> emitAllInternal(channel: ReceiveChannel<T>, collector: FlowCollector<T>) {
42+
when (channel) {
43+
is BufferedChannel<*> -> {
44+
(channel as BufferedChannel<T>).emitAllInternal(collector)
45+
}
46+
is ChannelCoroutine<*> -> {
47+
(channel as ChannelCoroutine<T>).emitAllInternal(collector)
48+
}
49+
else -> {
50+
for (element in channel) {
51+
collector.emit(element)
52+
}
53+
}
54+
}
55+
}
56+
4357
/**
4458
* Represents the given receive channel as a hot flow and [receives][ReceiveChannel.receive] from the channel
4559
* in fan-out fashion every time this flow is collected. One element will be emitted to one collector only.

kotlinx-coroutines-core/common/src/flow/SharedFlow.kt

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -389,18 +389,13 @@ internal open class SharedFlowImpl<T>(
389389
awaitValue(slot) // await signal that the new value is available
390390
}
391391
collectorJob?.ensureActive()
392-
emitInner(collector, newValue as T)
392+
collector.emitInternal(newValue as T)
393393
}
394394
} finally {
395395
freeSlot(slot)
396396
}
397397
}
398398

399-
// Shouldn't be inlined, the method is instrumented by the IDEA debugger agent
400-
private suspend fun emitInner(collector: FlowCollector<T>, value: T) {
401-
collector.emit(unwrapInternal(value))
402-
}
403-
404399
override fun tryEmit(value: T): Boolean {
405400
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
406401
val emitted = synchronized(this) {

kotlinx-coroutines-core/common/src/flow/StateFlow.kt

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ private class StateFlowImpl<T>(
394394
collectorJob?.ensureActive()
395395
// Conflate value emissions using equality
396396
if (oldState == null || oldState != newState) {
397-
emitInner(collector, newState)
397+
collector.emitInternal(newState)
398398
oldState = newState
399399
}
400400
// Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
@@ -407,11 +407,6 @@ private class StateFlowImpl<T>(
407407
}
408408
}
409409

410-
// Shouldn't be inlined, the method is instrumented by the IDEA debugger agent
411-
private suspend fun emitInner(collector: FlowCollector<T>, newState: Any) {
412-
collector.emit(NULL.unbox(newState))
413-
}
414-
415410
override fun createSlot() = StateFlowSlot()
416411
override fun createSlotArray(size: Int): Array<StateFlowSlot?> = arrayOfNulls(size)
417412

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
@file:Suppress("UNCHECKED_CAST")
2+
13
package kotlinx.coroutines.flow.internal
24

5+
import kotlinx.coroutines.flow.FlowCollector
6+
37
/**
48
* Used by IDEA debugger agent to support asynchronous stack traces in flows.
59
* The agent requires a unique object present in both current and async stack traces,
@@ -10,13 +14,32 @@ internal class FlowValueWrapperInternal<T>(val value: T)
1014

1115
internal fun <T> wrapInternal(value: T): T = value
1216
internal fun <T> unwrapInternal(value: T): T = value
17+
internal fun <T> unwrapTyped(value: Any?): T = NULL.unbox(unwrapInternal(value))
1318

14-
// debugger agent transforms wrapInternal so it returns wrapInternalDebuggerCapture(value) instead of just value
15-
private fun wrapInternalDebuggerCapture(value: Any?): Any = FlowValueWrapperInternal(value)
19+
// debugger agent transforms wrapInternal so it returns wrapInternalDebuggerCapture(value) instead of just value.
20+
private fun wrapInternalDebuggerCapture(value: Any?): Any {
21+
if (value is FlowValueWrapperInternal<*>) {
22+
return value
23+
}
24+
return FlowValueWrapperInternal(value)
25+
}
1626

1727
// debugger agent transforms unwrapInternal so it returns unwrapInternalDebuggerCapture(value) instead of just value
1828
//
1929
// normally, value is always FlowValueWrapperInternal, but potentially instrumentation may start
2030
// in the middle of the execution (for example, when the debugger was attached to a running application),
2131
// and the emitted value hadn't been wrapped
22-
private fun unwrapInternalDebuggerCapture(value: Any?): Any? = (value as? FlowValueWrapperInternal<*>)?.value ?: value
32+
private fun unwrapInternalDebuggerCapture(value: Any?): Any? {
33+
val wrapper = value as? FlowValueWrapperInternal<*> ?: return value
34+
return wrapper.value
35+
}
36+
37+
// Shouldn't be inlined, the method is instrumented by the IDEA debugger agent
38+
internal suspend fun <T> FlowCollector<T>.emitInternal(value: Any?) {
39+
emit(unwrapTyped<T>(value))
40+
}
41+
42+
// Shouldn't be inlined, the method is instrumented by the IDEA debugger agent
43+
internal suspend fun <Unwrapped, R> debuggerCapture(value: Any?, block: suspend (Unwrapped) -> R): R {
44+
return block(unwrapInternal(value) as Unwrapped)
45+
}

0 commit comments

Comments
 (0)