Skip to content

Commit db906d8

Browse files
committed
Support SelectImplementation
This brings support for select-based flow operators, such as `timeout`.
1 parent 8be4def commit db906d8

File tree

5 files changed

+32
-22
lines changed

5 files changed

+32
-22
lines changed

IntelliJ-patches.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,9 @@ The agent needs three entities to establish a proper asynchronous stack traces c
6161

6262
The key for MutableStateFlow is the element itself. For MutableSharedFlow, the element is wrapped into a unique object to prevent bridging mistakes when two equal elements are emitted from different places.
6363

64-
Most of the operators applicable to flows (such as `map`, `scan`, `debounce`, `buffer`) are supported. As some of them use an intermediary flow inside, the transferred values are wrapped and unwrapped the same way as in MutableSharedFlow.
64+
Most of the operators applicable to flows (such as `map`, `scan`, `debounce`, `timeout`, `buffer`) are supported. As some of them use an intermediary flow inside, the transferred values are wrapped and unwrapped the same way as in MutableSharedFlow.
6565
It means there may be all-library async stack traces between a stack trace containing `emit` and a stack trace containing `collect`.
6666

67-
There is no support yet for many operators that use `Select` inside (such as `timeout`).
68-
6967
### API
7068

7169
Some logic related to instrumentation was extracted to separate methods so that the debugger agent could instrument it properly:
@@ -76,7 +74,8 @@ Some logic related to instrumentation was extracted to separate methods so that
7674
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapInternal` -- returns passed argument by default; the agent instruments it to call `unwrapInternalDebuggerCapture` instead
7775
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapInternalDebuggerCapture` -- unwraps passed argument so it returns the original value; only used after transformation
7876
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapTyped` -- utility function served to ease casting to a real underlying type
79-
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.emitInternal(FlowCollector, value)` -- common insertion point for a debugger agent; simplifies instrumentation; the value is always being unwrapped inside
77+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.emitInternal(FlowCollector, value)` -- alternative of a regular `FlowCollector.emit` that supports insertion points; if there is a `FlowCollector`, its `emit` call can be replaced with `emitInternal` so this case would also be supported for constructing async stack traces
78+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.debuggerCapture` -- common insertion point for a debugger agent; simplifies instrumentation; the value is always being unwrapped inside. `emitInternal` uses this method.
8079

8180
One internal method was added to `BufferedChannelIterator`: `nextInternal` -- same as `next` but may return a wrapped value. It should only be used with a function that is capable of unwrapping the value (see `BufferedChannel.emitAll` and `BufferedChannelIterator.next`), so there's a guarantee a wrapped value will always unwrap before emitting.
8281

@@ -86,3 +85,5 @@ One public method was added to support `buffer` and operators that use it inside
8685
- `ReceiveChannel.emitAll`. It encapsulates emitting values in `FlowCollector.emitAllImpl` and has a special implementation in `BufferedChannel`.
8786

8887
Changes were made to lambda parameter `onElementRetrieved` in `BufferedChannel<E>` methods: now they accept `Any?` instead of `E` because now they may be given a wrapped value.
88+
89+
`SelectImplementation.complete` now uses `debuggerCapture` to properly propagate value that might come from flows.

kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1513,7 +1513,7 @@ internal open class BufferedChannel<E>(
15131513
private fun registerSelectForReceive(select: SelectInstance<*>, ignoredParam: Any?) =
15141514
receiveImpl( // <-- this is an inline function
15151515
waiter = select,
1516-
onElementRetrieved = { elem -> select.selectInRegistrationPhase(unwrapTyped(elem)) },
1516+
onElementRetrieved = { elem -> select.selectInRegistrationPhase(elem) },
15171517
onSuspend = { _, _, _ -> },
15181518
onClosed = { onClosedSelectOnReceive(select) }
15191519
)

kotlinx-coroutines-core/common/src/flow/internal/FlowValueWrapperInternal.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,10 @@ private fun unwrapInternalDebuggerCapture(value: Any?): Any? {
3636

3737
// Shouldn't be inlined, the method is instrumented by the IDEA debugger agent
3838
internal suspend fun <T> FlowCollector<T>.emitInternal(value: Any?) {
39-
emit(unwrapTyped<T>(value))
39+
debuggerCapture<T, Unit>(value) { emit(it) }
40+
}
41+
42+
// Shouldn't be inlined, the method is instrumented by the IDEA debugger agent
43+
internal suspend fun <Unwrapped, R> debuggerCapture(value: Any?, block: suspend (Unwrapped) -> R): R {
44+
return block(unwrapTyped<Unwrapped>(value))
4045
}

kotlinx-coroutines-core/common/src/flow/operators/Delay.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long): Fl
229229
}
230230
values.onReceiveCatching { value ->
231231
value
232-
.onSuccess { lastValue = it }
232+
.onSuccess { lastValue = wrapInternal(it) }
233233
.onFailure {
234234
it?.let { throw it }
235235
// If closed normally, emit the latest value
@@ -278,7 +278,7 @@ public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
278278
select<Unit> {
279279
values.onReceiveCatching { result ->
280280
result
281-
.onSuccess { lastValue = it }
281+
.onSuccess { lastValue = wrapInternal(it) }
282282
.onFailure {
283283
it?.let { throw it }
284284
ticker.cancel(ChildCancelledException())

kotlinx-coroutines-core/common/src/selects/Select.kt

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package kotlinx.coroutines.selects
33
import kotlinx.atomicfu.*
44
import kotlinx.coroutines.*
55
import kotlinx.coroutines.channels.*
6+
import kotlinx.coroutines.flow.internal.debuggerCapture
7+
import kotlinx.coroutines.flow.internal.wrapInternal
68
import kotlinx.coroutines.internal.*
79
import kotlinx.coroutines.selects.TrySelectDetailedResult.*
810
import kotlin.contracts.*
@@ -626,7 +628,7 @@ internal open class SelectImplementation<R>(
626628
val cont = curState as CancellableContinuation<Unit>
627629
// Success! Store the resumption value and
628630
// try to resume the continuation.
629-
this.internalResult = internalResult
631+
this.internalResult = wrapInternal(internalResult)
630632
if (cont.tryResume(onCancellation)) return TRY_SELECT_SUCCESSFUL
631633
// If the resumption failed, we need to clean the [result] field to avoid memory leaks.
632634
this.internalResult = NO_RESULT
@@ -690,19 +692,21 @@ internal open class SelectImplementation<R>(
690692
// of memory leaks. Collect the internal result before that.
691693
val internalResult = this.internalResult
692694
cleanup(selectedClause)
693-
// Process the internal result and invoke the user's block.
694-
return if (!RECOVER_STACK_TRACES) {
695-
// TAIL-CALL OPTIMIZATION: the `suspend` block
696-
// is invoked at the very end.
697-
val blockArgument = selectedClause.processResult(internalResult)
698-
selectedClause.invokeBlock(blockArgument)
699-
} else {
700-
// TAIL-CALL OPTIMIZATION: the `suspend`
701-
// function is invoked at the very end.
702-
// However, internally this `suspend` function
703-
// constructs a state machine to recover a
704-
// possible stack-trace.
705-
processResultAndInvokeBlockRecoveringException(selectedClause, internalResult)
695+
return debuggerCapture<Any?, R>(internalResult) { result ->
696+
// Process the internal result and invoke the user's block.
697+
if (!RECOVER_STACK_TRACES) {
698+
// TAIL-CALL OPTIMIZATION: the `suspend` block
699+
// is invoked at the very end.
700+
val blockArgument = selectedClause.processResult(result)
701+
selectedClause.invokeBlock(blockArgument)
702+
} else {
703+
// TAIL-CALL OPTIMIZATION: the `suspend`
704+
// function is invoked at the very end.
705+
// However, internally this `suspend` function
706+
// constructs a state machine to recover a
707+
// possible stack-trace.
708+
processResultAndInvokeBlockRecoveringException(selectedClause, result)
709+
}
706710
}
707711
}
708712

0 commit comments

Comments
 (0)