Skip to content

Commit 5c0d155

Browse files
AlexVanGogenknisht
authored andcommitted
IDEA-352355 Asynchronous stack traces for flows in the IDEA debugger
Patched during cherry-pick to 1.10.1. File with conflicts: BufferedChannel.kt Squashed commits as of version 1.8.0-intellij-11 Don't make unnecessary NULL unboxings (cherry picked from commit 0267812) Don't wrap suspend function (cherry picked from commit 8684cad) Get rid of changes in the public API -- bring back CoroutineChannel support (cherry picked from commit 9836a6b) Get rid of changes in the public API (cherry picked from commit f852554) Support SelectImplementation This brings support for select-based flow operators, such as `timeout`. (cherry picked from commit db906d8) Support BufferedChannel Before, `buffer` operations and such were only supported partially, and in some cases async stack traces were working only in 50% collects. For example, when several flows are merged with `flattenMerge` and emit values simultaneously. This change seems large, changing a lot of lines in BufferedChannel.kt, but most of them are effectively refactoring (propagation of a wrapped value). (cherry picked from commit 8be4def) Discard strict double-wrapping check We decided not to go with it, as it may dump a lot of error messages to a clueless user's console. (cherry picked from commit 0efa558) Enhance support for async stack traces in flows * simplify instrumentation by making a single insertion point source instead of having one in every class * handle a double-wrapping case which leads to errors; allow agent to choose how to handle it * support more commonly used operators (such as `scan`, `buffer`, `debounce` with dynamic timeout) Unfortunately, this change doesn't cover all possible scenarios of using flows, as many of them interoperate with `Channel`s, and it should be addressed separately. (cherry picked from commit 00cb4e5) Prepare shared flows for the debugger agent to support async stack traces The agent needs three entities to establish a proper asynchronous stack traces connection: - a capture point -- method that indicates the stack trace that precedes the current stack trace; - an insertion point -- method within the current stack trace; - a key -- an object that is present in both points and is unique enough to bridge two points properly. This change tweaks the code a bit to introduce the three entities in MutableSharedFlow and MutableStateFlow. The key for MutableSharedFlow is the element itself. For MutableSharedFlow, the element is wrapped into a unique object to prevent bridging mistakes when two equal elements are emitted from different places. (cherry picked from commit 75107bc) # Conflicts: # kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt # Conflicts: # IntelliJ-patches.md
1 parent 730ba1e commit 5c0d155

File tree

10 files changed

+225
-52
lines changed

10 files changed

+225
-52
lines changed

IntelliJ-patches.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,73 @@ We provide a single method `kotlinx.coroutines.internal.intellij.IntellijCorouti
1919
The invariant is that the result of this method is always equal to `coroutineContext` in suspending environment,
2020
and it does not change during the non-suspending execution within the same thread.
2121

22+
## Parallelism compensation for `CoroutineDispatcher`s
23+
24+
If `runBlocking` happens to be invoked on a thread from `CoroutineDispatcher`, it may cause a thread starvation problem
25+
(Kotlin#3983). This happens because `runBlocking` does not release an associated computational permit while it parks the
26+
thread. To fix this, a parallelism compensation mechanism is introduced. Some `CoroutineDispatcher`s (such as
27+
`Dispatchers.Default`, `Dispatchers.IO` and others) support `ParallelismCompensation`, meaning that these dispatchers
28+
can be notified that they should increase parallelism and parallelism limit, or they should decrease it. It is important that these
29+
are only requests and dispatchers are in full control on how and when they need to adjust the effective parallelism.
30+
It also means that the instantaneous parallelism may exceed the current allowed parallelism limit for the given dispatcher.
31+
32+
`runBlockingWithParallelismCompensation` (further abbreviated as `rBWPC`) is introduced as a counterpart of `runBlocking`
33+
with the following behavioral change. When `rBWPC` decides to park a `CoroutineDispatcher` thread, it first increases the allowed parallelism
34+
limit of the `CoroutineDispatcher`. After the thread unparks, `rBWPC` notifies the dispatcher that the parallelism limit should be lowered back.
35+
A separate function is introduced because parallelism compensation is not always a desirable behavior.
36+
37+
It is easy to see that this behavior cannot be general for `CoroutineDispatcher`s, at least because it breaks the contract
38+
of `LimitedDispatcher` (one that can be acquired via `.limitedParallelism`). It means that parallelism compensation
39+
cannot work for `LimitedDispatcher`, so `runBlockingWithParallelismCompensation` can still cause starvation issues there, but it seems rather
40+
expected.
41+
42+
Parallelism compensation support is internal and is implemented for `Dispatchers.Default` and `Dispatchers.IO`.
43+
To acquire an analogue of `limitedParallelism` dispatcher which supports parallelism compensation, use
44+
`IntellijCoroutines.softLimitedParallelism`. Be advised that not every `.limitedParallelism` call can be substituted
45+
with `.softLimitedParallelism`, e.g., `.limitedParallelism(1)` may be used as a synchronization manager and in this case
46+
exceeding the parallelism limit would eliminate this (likely expected) side effect.
47+
48+
### API
49+
- `runBlockingWithParallelismCompensation` - an analogue of `runBlocking` which also compensates parallelism of the
50+
associated coroutine dispatcher when it decides to park the thread
51+
- `CoroutineDispatcher.softLimitedParallelism` – an analogue of `.limitedParallelism` which supports
52+
parallelism compensation
53+
54+
## Asynchronous stack traces for flows in the IDEA debugger
55+
56+
The agent needs three entities to establish a proper asynchronous stack traces connection:
57+
- a capture point — method that indicates the stack trace that precedes the current stack trace;
58+
- an insertion point — method within the current stack trace;
59+
- a key — an object that is present in both points and is unique enough to bridge two stack traces properly.
60+
61+
The key for MutableStateFlow is the element itself. For MutableSharedFlow, the element is wrapped into a unique object to prevent bridging mistakes when two equal elements are emitted from different places.
62+
63+
Most of the operators applicable to flows (such as `map`, `scan`, `debounce`, `timeout`, `buffer`) are supported. As some of them use an intermediary flow inside, the transferred values are wrapped and unwrapped the same way as in MutableSharedFlow.
64+
It means there may be all-library async stack traces between a stack trace containing `emit` and a stack trace containing `collect`.
65+
66+
### API
67+
68+
Some logic related to instrumentation was extracted to separate methods so that the debugger agent could instrument it properly:
69+
70+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternal` -- wrapper class used to create a unique object for the debugger agent
71+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.wrapInternal` -- returns passed argument by default; the agent instruments it to call `wrapInternalDebuggerCapture` instead
72+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.wrapInternalDebuggerCapture` -- wraps passed arguments into a `FlowValueWrapperInternal`; only used after transformation.
73+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapInternal` -- returns passed argument by default; the agent instruments it to call `unwrapInternalDebuggerCapture` instead
74+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapInternalDebuggerCapture` -- unwraps passed argument so it returns the original value; only used after transformation
75+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapTyped` -- utility function served to ease casting to a real underlying type
76+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.emitInternal(FlowCollector, value)` -- alternative of a regular `FlowCollector.emit` that supports insertion points; if there is a `FlowCollector`, its `emit` call can be replaced with `emitInternal` so this case would also be supported for constructing async stack traces
77+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.debuggerCapture` -- common insertion point for a debugger agent; simplifies instrumentation; the value is always being unwrapped inside.
78+
79+
One internal method was added to `BufferedChannel`: `emitAllInternal`. This method ensures the value will be unwrapped in an insertion point.
80+
81+
One internal method was added to `flow/Channels.kt`: `emitAllInternal`. It emits all values, like usual, but also considers wrapping/unwrapping supported in `BufferedChannel`.
82+
83+
One internal method was added to `ChannelCoroutine`: `emitAllInternal` serves to bridge its delegate and the method above.
84+
85+
One internal method was added to `BufferedChannelIterator`: `nextInternal` -- same as `next` but may return a wrapped value. It should only be used with a function that is capable of unwrapping the value (see `BufferedChannel.emitAll` and `BufferedChannelIterator.next`), so there's a guarantee a wrapped value will always unwrap before emitting.
86+
87+
Why not just let `next` return a maybe wrapped value? That's because it is heavily used outside a currently supported scope. For example, one may just indirectly call it from a for-loop. In this case, unwrapping will never happen, and a user will get a handful of `ClassCastException`s.
88+
89+
Changes were made to lambda parameter `onElementRetrieved` in `BufferedChannel<E>` methods: now they accept `Any?` instead of `E` because now they may be given a wrapped value.
90+
91+
`SelectImplementation.complete` now uses `debuggerCapture` to properly propagate value that might come from flows.

kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import kotlinx.coroutines.*
77
import kotlinx.coroutines.channels.ChannelResult.Companion.closed
88
import kotlinx.coroutines.channels.ChannelResult.Companion.failure
99
import kotlinx.coroutines.channels.ChannelResult.Companion.success
10+
import kotlinx.coroutines.flow.FlowCollector
11+
import kotlinx.coroutines.flow.internal.*
1012
import kotlinx.coroutines.internal.*
1113
import kotlinx.coroutines.selects.*
1214
import kotlinx.coroutines.selects.TrySelectDetailedResult.*
@@ -684,7 +686,7 @@ internal open class BufferedChannel<E>(
684686
protected open fun onReceiveDequeued() {}
685687

686688
override suspend fun receive(): E =
687-
receiveImpl( // <-- this is an inline function
689+
receiveImpl<E>( // <-- this is an inline function
688690
// Do not create a continuation until it is required;
689691
// it is created later via [onNoWaiterSuspend], if needed.
690692
waiter = null,
@@ -693,7 +695,7 @@ internal open class BufferedChannel<E>(
693695
// Also, inform `BufferedChannel` extensions that
694696
// synchronization of this receive operation is completed.
695697
onElementRetrieved = { element ->
696-
return element
698+
return unwrapTyped<E>(element)
697699
},
698700
// As no waiter is provided, suspension is impossible.
699701
onSuspend = { _, _, _ -> error("unexpected") },
@@ -726,7 +728,7 @@ internal open class BufferedChannel<E>(
726728
// specified, we need to invoke it in the latter case.
727729
onElementRetrieved = { element ->
728730
val onCancellation = onUndeliveredElement?.bindCancellationFun()
729-
cont.resume(element, onCancellation)
731+
cont.resume(unwrapTyped(element), onCancellation)
730732
},
731733
onClosed = { onClosedReceiveOnNoWaiterSuspend(cont) },
732734
)
@@ -752,7 +754,7 @@ internal open class BufferedChannel<E>(
752754
receiveImpl( // <-- this is an inline function
753755
waiter = null,
754756
onElementRetrieved = { element ->
755-
success(element)
757+
success(unwrapTyped(element))
756758
},
757759
onSuspend = { _, _, _ -> error("unexpected") },
758760
onClosed = { closed(closeCause) },
@@ -769,7 +771,8 @@ internal open class BufferedChannel<E>(
769771
segment, index, r,
770772
waiter = waiter,
771773
onElementRetrieved = { element ->
772-
cont.resume(success(element), onUndeliveredElement?.bindCancellationFunResult())
774+
val unwrapped = unwrapTyped<E>(element)
775+
cont.resume(success(unwrapped), onUndeliveredElement?.bindCancellationFunResult())
773776
},
774777
onClosed = { onClosedReceiveCatchingOnNoWaiterSuspend(cont) }
775778
)
@@ -802,7 +805,7 @@ internal open class BufferedChannel<E>(
802805
// Store an already interrupted receiver in case of suspension.
803806
waiter = INTERRUPTED_RCV,
804807
// Finish when an element is successfully retrieved.
805-
onElementRetrieved = { element -> success(element) },
808+
onElementRetrieved = { element -> success(unwrapTyped(element)) },
806809
// On suspension, the `INTERRUPTED_RCV` token has been
807810
// installed, and this `tryReceive()` must fail.
808811
onSuspend = { segm, _, globalIndex ->
@@ -866,7 +869,7 @@ internal open class BufferedChannel<E>(
866869
// Clean the reference to the previous segment.
867870
segment.cleanPrev()
868871
@Suppress("UNCHECKED_CAST")
869-
onUndeliveredElement?.callUndeliveredElementCatchingException(updCellResult as E)?.let { throw it }
872+
onUndeliveredElement?.callUndeliveredElementCatchingException(unwrapTyped(updCellResult))?.let { throw it }
870873
}
871874
}
872875
}
@@ -884,7 +887,7 @@ internal open class BufferedChannel<E>(
884887
/* This lambda is invoked when an element has been
885888
successfully retrieved, either from the buffer or
886889
by making a rendezvous with a suspended sender. */
887-
onElementRetrieved: (element: E) -> R,
890+
onElementRetrieved: (element: Any?) -> R,
888891
/* This lambda is called when the operation suspends in the cell
889892
specified by the segment and its global and in-segment indices. */
890893
onSuspend: (segm: ChannelSegment<E>, i: Int, r: Long) -> R,
@@ -954,7 +957,7 @@ internal open class BufferedChannel<E>(
954957
// Clean the reference to the previous segment before finishing.
955958
segment.cleanPrev()
956959
@Suppress("UNCHECKED_CAST")
957-
onElementRetrieved(updCellResult as E)
960+
onElementRetrieved(updCellResult)
958961
}
959962
}
960963
}
@@ -972,7 +975,7 @@ internal open class BufferedChannel<E>(
972975
/* This lambda is invoked when an element has been
973976
successfully retrieved, either from the buffer or
974977
by making a rendezvous with a suspended sender. */
975-
onElementRetrieved: (element: E) -> Unit,
978+
onElementRetrieved: (element: Any?) -> Unit,
976979
/* This lambda is called when the channel is observed
977980
in the closed state and no waiting senders is found,
978981
which means that it is closed for receiving. */
@@ -1561,7 +1564,7 @@ internal open class BufferedChannel<E>(
15611564
private val onUndeliveredElementReceiveCancellationConstructor: OnCancellationConstructor? = onUndeliveredElement?.let {
15621565
{ select: SelectInstance<*>, _: Any?, element: Any? ->
15631566
{ _, _, _ ->
1564-
if (element !== CHANNEL_CLOSED) onUndeliveredElement.callUndeliveredElement(element as E, select.context)
1567+
if (element !== CHANNEL_CLOSED) onUndeliveredElement.callUndeliveredElement(unwrapTyped(element), select.context)
15651568
}
15661569
}
15671570
}
@@ -1572,6 +1575,13 @@ internal open class BufferedChannel<E>(
15721575

15731576
override fun iterator(): ChannelIterator<E> = BufferedChannelIterator()
15741577

1578+
internal suspend fun emitAllInternal(collector: FlowCollector<E>) {
1579+
val iterator = iterator() as BufferedChannel.BufferedChannelIterator
1580+
while (iterator.hasNext()) {
1581+
collector.emitInternal(iterator.nextInternal())
1582+
}
1583+
}
1584+
15751585
/**
15761586
* The key idea is that an iterator is a special receiver type,
15771587
* which should be resumed differently to [receive] and [onReceive]
@@ -1666,7 +1676,7 @@ internal open class BufferedChannel<E>(
16661676
onElementRetrieved = { element ->
16671677
this.receiveResult = element
16681678
this.continuation = null
1669-
cont.resume(true, onUndeliveredElement?.bindCancellationFun(element))
1679+
cont.resume(true, onUndeliveredElement?.bindCancellationFun(unwrapTyped(element)))
16701680
},
16711681
onClosed = { onClosedHasNextNoWaiterSuspend() }
16721682
)
@@ -1694,8 +1704,17 @@ internal open class BufferedChannel<E>(
16941704
}
16951705
}
16961706

1697-
@Suppress("UNCHECKED_CAST")
16981707
override fun next(): E {
1708+
return unwrapInternal(nextInternal())
1709+
}
1710+
1711+
/**
1712+
* Result may be wrapped by debugger agent; use this method only with [unwrapInternal] or [emitInternal]!
1713+
*
1714+
* @see [next], [emitAll]
1715+
*/
1716+
@Suppress("UNCHECKED_CAST")
1717+
internal fun nextInternal(): E {
16991718
// Read the already received result, or [NO_RECEIVE_RESULT] if [hasNext] has not been invoked yet.
17001719
val result = receiveResult
17011720
check(result !== NO_RECEIVE_RESULT) { "`hasNext()` has not been invoked" }
@@ -1712,7 +1731,7 @@ internal open class BufferedChannel<E>(
17121731
val cont = this.continuation!!
17131732
this.continuation = null
17141733
// Store the retrieved element in `receiveResult`.
1715-
this.receiveResult = element
1734+
this.receiveResult = wrapInternal(element)
17161735
// Try to resume this `hasNext()`. Importantly, the receiver coroutine
17171736
// may be cancelled after it is successfully resumed but not dispatched yet.
17181737
// In case `onUndeliveredElement` is specified, we need to invoke it in the latter case.
@@ -2815,16 +2834,17 @@ internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: Bu
28152834
}
28162835

28172836
@Suppress("UNCHECKED_CAST")
2818-
internal fun getElement(index: Int) = data[index * 2].value as E
2837+
internal fun getElement(index: Int) = unwrapInternal(data[index * 2].value) as E
28192838

2820-
internal fun retrieveElement(index: Int): E = getElement(index).also { cleanElement(index) }
2839+
@Suppress("UNCHECKED_CAST")
2840+
internal fun retrieveElement(index: Int): E = (data[index * 2].value as E).also { cleanElement(index) }
28212841

28222842
internal fun cleanElement(index: Int) {
2823-
setElementLazy(index, null)
2843+
data[index * 2].lazySet(null)
28242844
}
28252845

28262846
private fun setElementLazy(index: Int, value: Any?) {
2827-
data[index * 2].lazySet(value)
2847+
data[index * 2].lazySet(wrapInternal(value))
28282848
}
28292849

28302850
// ######################################

kotlinx-coroutines-core/common/src/channels/Channel.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
88
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
99
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
1010
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
11+
import kotlinx.coroutines.flow.FlowCollector
1112
import kotlinx.coroutines.internal.*
1213
import kotlinx.coroutines.selects.*
1314
import kotlin.contracts.*

kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package kotlinx.coroutines.channels
22

33
import kotlinx.coroutines.*
4+
import kotlinx.coroutines.flow.FlowCollector
5+
import kotlinx.coroutines.flow.emitAllInternal
46
import kotlin.coroutines.*
57

68
internal open class ChannelCoroutine<E>(
@@ -35,4 +37,8 @@ internal open class ChannelCoroutine<E>(
3537
_channel.cancel(exception) // cancel the channel
3638
cancelCoroutine(exception) // cancel the job
3739
}
40+
41+
internal suspend fun emitAllInternal(flowCollector: FlowCollector<E>) {
42+
emitAllInternal(_channel, flowCollector)
43+
}
3844
}

kotlinx-coroutines-core/common/src/flow/Channels.kt

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>,
2929
ensureActive()
3030
var cause: Throwable? = null
3131
try {
32-
for (element in channel) {
33-
emit(element)
34-
}
32+
emitAllInternal(channel, this)
3533
} catch (e: Throwable) {
3634
cause = e
3735
throw e
@@ -40,6 +38,22 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>,
4038
}
4139
}
4240

41+
internal suspend fun <T> emitAllInternal(channel: ReceiveChannel<T>, collector: FlowCollector<T>) {
42+
when (channel) {
43+
is BufferedChannel<*> -> {
44+
(channel as BufferedChannel<T>).emitAllInternal(collector)
45+
}
46+
is ChannelCoroutine<*> -> {
47+
(channel as ChannelCoroutine<T>).emitAllInternal(collector)
48+
}
49+
else -> {
50+
for (element in channel) {
51+
collector.emit(element)
52+
}
53+
}
54+
}
55+
}
56+
4357
/**
4458
* Represents the given receive channel as a hot flow and [receives][ReceiveChannel.receive] from the channel
4559
* in fan-out fashion every time this flow is collected. One element will be emitted to one collector only.

0 commit comments

Comments
 (0)