Skip to content

Commit 75107bc

Browse files
AlexVanGogenknisht
authored andcommitted
Prepare shared flows for the debugger agent to support async stack traces
The agent needs three entities to establish a proper asynchronous stack traces connection: - a capture point -- method that indicates the stack trace that precedes the current stack trace; - an insertion point -- method within the current stack trace; - a key -- an object that is present in both points and is unique enough to bridge two points properly. This change tweaks the code a bit to introduce the three entities in MutableSharedFlow and MutableStateFlow. The key for MutableSharedFlow is the element itself. For MutableSharedFlow, the element is wrapped into a unique object to prevent bridging mistakes when two equal elements are emitted from different places.
1 parent d1fb045 commit 75107bc

File tree

5 files changed

+86
-14
lines changed

5 files changed

+86
-14
lines changed

IntelliJ-patches.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,25 @@ exceeding the parallelism limit would eliminate this (likely expected) side effe
5151
associated coroutine dispatcher when it decides to park the thread
5252
- `CoroutineDispatcher.softLimitedParallelism` – an analogue of `.limitedParallelism` which supports
5353
parallelism compensation
54+
55+
## Asynchronous stack traces for flows in the IDEA debugger
56+
57+
The agent needs three entities to establish a proper asynchronous stack traces connection:
58+
- a capture point — method that indicates the stack trace that precedes the current stack trace;
59+
- an insertion point — method within the current stack trace;
60+
- a key — an object that is present in both points and is unique enough to bridge two stack traces properly.
61+
62+
The key for MutableStateFlow is the element itself. For MutableSharedFlow, the element is wrapped into a unique object to prevent bridging mistakes when two equal elements are emitted from different places.
63+
64+
Also, operators `debounce` and `sample` are supported. As they use an intermediary flow inside, the transferred values are wrapped and unwrapped the same way as in MutableSharedFlow.
65+
It means there may be all-library async stack traces between a stack trace containing `emit` and a stack trace containing `collect`.
66+
67+
### API
68+
69+
No new public methods are introduced; some logic was extracted to separate methods so that the debugger agent could instrument it properly:
70+
71+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternal` -- wrapper class used to create a unique object for the debugger agent
72+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.wrapInternal` -- returns passed argument by default; the agent instruments it to call `wrapInternalDebuggerCapture` instead
73+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.wrapInternalDebuggerCapture` -- wraps passed arguments into a `FlowValueWrapperInternal`; only used after transformation
74+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapInternal` -- returns passed argument by default; the agent instruments it to call `unwrapInternalDebuggerCapture` instead
75+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapInternalDebuggerCapture` -- unwraps passed argument so it returns the original value; only used after transformation

kotlinx-coroutines-core/common/src/flow/SharedFlow.kt

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ internal open class SharedFlowImpl<T>(
364364
val result = ArrayList<T>(replaySize)
365365
val buffer = buffer!! // must be allocated, because replaySize > 0
366366
@Suppress("UNCHECKED_CAST")
367-
for (i in 0 until replaySize) result += buffer.getBufferAt(replayIndex + i) as T
367+
for (i in 0 until replaySize) result += unwrapInternal(buffer.getBufferAt(replayIndex + i)) as T
368368
result
369369
}
370370

@@ -373,7 +373,7 @@ internal open class SharedFlowImpl<T>(
373373
*/
374374
@Suppress("UNCHECKED_CAST")
375375
protected val lastReplayedLocked: T
376-
get() = buffer!!.getBufferAt(replayIndex + replaySize - 1) as T
376+
get() = unwrapInternal(buffer!!.getBufferAt(replayIndex + replaySize - 1)) as T
377377

378378
@Suppress("UNCHECKED_CAST")
379379
override suspend fun collect(collector: FlowCollector<T>): Nothing {
@@ -389,13 +389,18 @@ internal open class SharedFlowImpl<T>(
389389
awaitValue(slot) // await signal that the new value is available
390390
}
391391
collectorJob?.ensureActive()
392-
collector.emit(newValue as T)
392+
emitInner(collector, newValue as T)
393393
}
394394
} finally {
395395
freeSlot(slot)
396396
}
397397
}
398398

399+
// Shouldn't be inlined, the method is instrumented by the IDEA debugger agent
400+
private suspend fun emitInner(collector: FlowCollector<T>, value: T) {
401+
collector.emit(unwrapInternal(value))
402+
}
403+
399404
override fun tryEmit(value: T): Boolean {
400405
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
401406
val emitted = synchronized(this) {
@@ -469,8 +474,16 @@ internal open class SharedFlowImpl<T>(
469474
minCollectorIndex = newHead
470475
}
471476

472-
// enqueues item to buffer array, caller shall increment either bufferSize or queueSize
473477
private fun enqueueLocked(item: Any?) {
478+
enqueueLockedInner(wrapInternal(item))
479+
}
480+
481+
private fun enqueueEmitterLocked(emitter: Emitter) {
482+
enqueueLockedInner(emitter)
483+
}
484+
485+
// enqueues item to buffer array, caller shall increment either bufferSize or queueSize
486+
private fun enqueueLockedInner(item: Any?) {
474487
val curSize = totalSize
475488
val buffer = when (val curBuffer = buffer) {
476489
null -> growBuffer(null, 0, 2)
@@ -500,8 +513,8 @@ internal open class SharedFlowImpl<T>(
500513
return@lock null
501514
}
502515
// add suspended emitter to the buffer
503-
Emitter(this, head + totalSize, value, cont).also {
504-
enqueueLocked(it)
516+
Emitter(this, head + totalSize, wrapInternal(value), cont).also {
517+
enqueueEmitterLocked(it)
505518
queueSize++ // added to queue of waiting emitters
506519
// synchronous shared flow might rendezvous with waiting emitter
507520
if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)

kotlinx-coroutines-core/common/src/flow/StateFlow.kt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ private class StateFlowImpl<T>(
321321
val oldState = _state.value
322322
if (expectedState != null && oldState != expectedState) return false // CAS support
323323
if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
324-
_state.value = newState
324+
updateInner(newState)
325325
curSequence = sequence
326326
if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
327327
curSequence++ // make it odd
@@ -357,6 +357,11 @@ private class StateFlowImpl<T>(
357357
}
358358
}
359359

360+
// Shouldn't be inlined, the method is instrumented by the IDEA debugger agent
361+
private fun updateInner(newState: Any) {
362+
_state.value = newState
363+
}
364+
360365
override val replayCache: List<T>
361366
get() = listOf(value)
362367

@@ -389,7 +394,7 @@ private class StateFlowImpl<T>(
389394
collectorJob?.ensureActive()
390395
// Conflate value emissions using equality
391396
if (oldState == null || oldState != newState) {
392-
collector.emit(NULL.unbox(newState))
397+
emitInner(collector, newState)
393398
oldState = newState
394399
}
395400
// Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
@@ -402,6 +407,11 @@ private class StateFlowImpl<T>(
402407
}
403408
}
404409

410+
// Shouldn't be inlined, the method is instrumented by the IDEA debugger agent
411+
private suspend fun emitInner(collector: FlowCollector<T>, newState: Any) {
412+
collector.emit(NULL.unbox(newState))
413+
}
414+
405415
override fun createSlot() = StateFlowSlot()
406416
override fun createSlotArray(size: Int): Array<StateFlowSlot?> = arrayOfNulls(size)
407417

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package kotlinx.coroutines.flow.internal
2+
3+
/**
4+
* Used by IDEA debugger agent to support asynchronous stack traces in flows.
5+
* The agent requires a unique object present in both current and async stack traces,
6+
* so, without a wrapper, if two flows, `f1` and `f2`, emit equal values,
7+
* the agent could suggest `f1` as emitter for `f2.collect` and `f2` as emitter for `f1.collect`.
8+
*/
9+
internal class FlowValueWrapperInternal<T>(val value: T)
10+
11+
internal fun <T> wrapInternal(value: T): T = value
12+
internal fun <T> unwrapInternal(value: T): T = value
13+
14+
// debugger agent transforms wrapInternal so it returns wrapInternalDebuggerCapture(value) instead of just value
15+
private fun wrapInternalDebuggerCapture(value: Any?): Any = FlowValueWrapperInternal(value)
16+
17+
// debugger agent transforms unwrapInternal so it returns unwrapInternalDebuggerCapture(value) instead of just value
18+
//
19+
// normally, value is always FlowValueWrapperInternal, but potentially instrumentation may start
20+
// in the middle of the execution (for example, when the debugger was attached to a running application),
21+
// and the emitted value hadn't been wrapped
22+
private fun unwrapInternalDebuggerCapture(value: Any?): Any? = (value as? FlowValueWrapperInternal<*>)?.value ?: value

kotlinx-coroutines-core/common/src/flow/operators/Delay.kt

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long): Fl
201201
scopedFlow { downstream ->
202202
// Produce the values using the default (rendezvous) channel
203203
val values = produce {
204-
collect { value -> send(value ?: NULL) }
204+
collect { value -> send(value?.let(::wrapInternal) ?: NULL) }
205205
}
206206
// Now consume the values
207207
var lastValue: Any? = null
@@ -212,7 +212,7 @@ private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long): Fl
212212
timeoutMillis = timeoutMillisSelector(NULL.unbox(lastValue))
213213
require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }
214214
if (timeoutMillis == 0L) {
215-
downstream.emit(NULL.unbox(lastValue))
215+
emitInner(downstream, lastValue)
216216
lastValue = null // Consume the value
217217
}
218218
}
@@ -223,7 +223,7 @@ private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long): Fl
223223
// Set timeout when lastValue exists and is not consumed yet
224224
if (lastValue != null) {
225225
onTimeout(timeoutMillis) {
226-
downstream.emit(NULL.unbox(lastValue))
226+
emitInner<T>(downstream, lastValue)
227227
lastValue = null // Consume the value
228228
}
229229
}
@@ -233,14 +233,19 @@ private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long): Fl
233233
.onFailure {
234234
it?.let { throw it }
235235
// If closed normally, emit the latest value
236-
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
236+
if (lastValue != null) emitInner<T>(downstream, lastValue)
237237
lastValue = DONE
238238
}
239239
}
240240
}
241241
}
242242
}
243243

244+
// Shouldn't be inlined, the method is instrumented by the IDEA debugger agent
245+
private suspend fun <T> emitInner(downstream: FlowCollector<T>, value: Any?) {
246+
downstream.emit(NULL.unbox(unwrapInternal(value)))
247+
}
248+
244249
/**
245250
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis].
246251
*
@@ -270,7 +275,7 @@ public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
270275
require(periodMillis > 0) { "Sample period should be positive" }
271276
return scopedFlow { downstream ->
272277
val values = produce(capacity = Channel.CONFLATED) {
273-
collect { value -> send(value ?: NULL) }
278+
collect { value -> send(value?.let(::wrapInternal) ?: NULL) }
274279
}
275280
var lastValue: Any? = null
276281
val ticker = fixedPeriodTicker(periodMillis)
@@ -290,7 +295,7 @@ public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
290295
ticker.onReceive {
291296
val value = lastValue ?: return@onReceive
292297
lastValue = null // Consume the value
293-
downstream.emit(NULL.unbox(value))
298+
emitInner(downstream, value)
294299
}
295300
}
296301
}

0 commit comments

Comments
 (0)