Skip to content

Commit 8be4def

Browse files
committed
Support BufferedChannel
Before, `buffer` operations and such were only supported partially, and in some cases async stack traces were working only in 50% collects. For example, when several flows are merged with `flattenMerge` and emit values simultaneously. This change seems large, changing a lot of lines in BufferedChannel.kt, but most of them are effectively refactoring (propagation of a wrapped value).
1 parent 0efa558 commit 8be4def

File tree

3 files changed

+29
-27
lines changed

3 files changed

+29
-27
lines changed

IntelliJ-patches.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ The key for MutableStateFlow is the element itself. For MutableSharedFlow, the e
6464
Most of the operators applicable to flows (such as `map`, `scan`, `debounce`, `buffer`) are supported. As some of them use an intermediary flow inside, the transferred values are wrapped and unwrapped the same way as in MutableSharedFlow.
6565
It means there may be all-library async stack traces between a stack trace containing `emit` and a stack trace containing `collect`.
6666

67-
There is no support yet for many operators that heavily use `Channel`s inside (such as `timeout`), as well as for functions that convert flows to channels and vice versa (such as `produceIn`).
67+
There is no support yet for many operators that use `Select` inside (such as `timeout`).
6868

6969
### API
7070

@@ -75,6 +75,7 @@ Some logic related to instrumentation was extracted to separate methods so that
7575
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.wrapInternalDebuggerCapture` -- wraps passed arguments into a `FlowValueWrapperInternal`; only used after transformation.
7676
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapInternal` -- returns passed argument by default; the agent instruments it to call `unwrapInternalDebuggerCapture` instead
7777
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapInternalDebuggerCapture` -- unwraps passed argument so it returns the original value; only used after transformation
78+
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.unwrapTyped` -- utility function served to ease casting to a real underlying type
7879
- `kotlinx.coroutines.flow.internal.FlowValueWrapperInternalKt.emitInternal(FlowCollector, value)` -- common insertion point for a debugger agent; simplifies instrumentation; the value is always being unwrapped inside
7980

8081
One internal method was added to `BufferedChannelIterator`: `nextInternal` -- same as `next` but may return a wrapped value. It should only be used with a function that is capable of unwrapping the value (see `BufferedChannel.emitAll` and `BufferedChannelIterator.next`), so there's a guarantee a wrapped value will always unwrap before emitting.
@@ -83,3 +84,5 @@ Why not just let `next` return a maybe wrapped value? That's because it is heavi
8384

8485
One public method was added to support `buffer` and operators that use it inside:
8586
- `ReceiveChannel.emitAll`. It encapsulates emitting values in `FlowCollector.emitAllImpl` and has a special implementation in `BufferedChannel`.
87+
88+
Changes were made to lambda parameter `onElementRetrieved` in `BufferedChannel<E>` methods: now they accept `Any?` instead of `E` because now they may be given a wrapped value.

kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ internal open class BufferedChannel<E>(
665665
protected open fun onReceiveDequeued() {}
666666

667667
override suspend fun receive(): E =
668-
receiveImpl( // <-- this is an inline function
668+
receiveImpl<E>( // <-- this is an inline function
669669
// Do not create a continuation until it is required;
670670
// it is created later via [onNoWaiterSuspend], if needed.
671671
waiter = null,
@@ -674,7 +674,7 @@ internal open class BufferedChannel<E>(
674674
// Also, inform `BufferedChannel` extensions that
675675
// synchronization of this receive operation is completed.
676676
onElementRetrieved = { element ->
677-
return element
677+
return unwrapTyped<E>(element)
678678
},
679679
// As no waiter is provided, suspension is impossible.
680680
onSuspend = { _, _, _ -> error("unexpected") },
@@ -683,7 +683,7 @@ internal open class BufferedChannel<E>(
683683
// If `receive()` decides to suspend, the corresponding
684684
// `suspend` function that creates a continuation is called.
685685
// The tail-call optimization is applied here.
686-
onNoWaiterSuspend = { segm, i, r -> receiveOnNoWaiterSuspend(segm, i, r) }
686+
onNoWaiterSuspend = { segm, i, r -> unwrapTyped(receiveOnNoWaiterSuspend(segm, i, r)) }
687687
)
688688

689689
private suspend fun receiveOnNoWaiterSuspend(
@@ -706,7 +706,7 @@ internal open class BufferedChannel<E>(
706706
// not dispatched yet. In case `onUndeliveredElement` is
707707
// specified, we need to invoke it in the latter case.
708708
onElementRetrieved = { element ->
709-
val onCancellation = onUndeliveredElement?.bindCancellationFun(element, cont.context)
709+
val onCancellation = onUndeliveredElement?.bindCancellationFun(unwrapTyped(element), cont.context)
710710
cont.resume(element, onCancellation)
711711
},
712712
onClosed = { onClosedReceiveOnNoWaiterSuspend(cont) },
@@ -733,11 +733,11 @@ internal open class BufferedChannel<E>(
733733
receiveImpl( // <-- this is an inline function
734734
waiter = null,
735735
onElementRetrieved = { element ->
736-
success(element)
736+
success(unwrapTyped(element))
737737
},
738738
onSuspend = { _, _, _ -> error("unexpected") },
739739
onClosed = { closed(closeCause) },
740-
onNoWaiterSuspend = { segm, i, r -> receiveCatchingOnNoWaiterSuspend(segm, i, r) }
740+
onNoWaiterSuspend = { segm, i, r -> unwrapTyped(receiveCatchingOnNoWaiterSuspend(segm, i, r)) }
741741
)
742742

743743
private suspend fun receiveCatchingOnNoWaiterSuspend(
@@ -750,7 +750,8 @@ internal open class BufferedChannel<E>(
750750
segment, index, r,
751751
waiter = waiter,
752752
onElementRetrieved = { element ->
753-
cont.resume(success(element), onUndeliveredElement?.bindCancellationFun(element, cont.context))
753+
val unwrapped = unwrapTyped<E>(element)
754+
cont.resume(success(unwrapped), onUndeliveredElement?.bindCancellationFun(unwrapped, cont.context))
754755
},
755756
onClosed = { onClosedReceiveCatchingOnNoWaiterSuspend(cont) }
756757
)
@@ -783,7 +784,7 @@ internal open class BufferedChannel<E>(
783784
// Store an already interrupted receiver in case of suspension.
784785
waiter = INTERRUPTED_RCV,
785786
// Finish when an element is successfully retrieved.
786-
onElementRetrieved = { element -> success(element) },
787+
onElementRetrieved = { element -> success(unwrapTyped(element)) },
787788
// On suspension, the `INTERRUPTED_RCV` token has been
788789
// installed, and this `tryReceive()` must fail.
789790
onSuspend = { segm, _, globalIndex ->
@@ -847,7 +848,7 @@ internal open class BufferedChannel<E>(
847848
// Clean the reference to the previous segment.
848849
segment.cleanPrev()
849850
@Suppress("UNCHECKED_CAST")
850-
onUndeliveredElement?.callUndeliveredElementCatchingException(updCellResult as E)?.let { throw it }
851+
onUndeliveredElement?.callUndeliveredElementCatchingException(unwrapTyped(updCellResult))?.let { throw it }
851852
}
852853
}
853854
}
@@ -865,7 +866,7 @@ internal open class BufferedChannel<E>(
865866
/* This lambda is invoked when an element has been
866867
successfully retrieved, either from the buffer or
867868
by making a rendezvous with a suspended sender. */
868-
onElementRetrieved: (element: E) -> R,
869+
onElementRetrieved: (element: Any?) -> R,
869870
/* This lambda is called when the operation suspends in the cell
870871
specified by the segment and its global and in-segment indices. */
871872
onSuspend: (segm: ChannelSegment<E>, i: Int, r: Long) -> R,
@@ -935,7 +936,7 @@ internal open class BufferedChannel<E>(
935936
// Clean the reference to the previous segment before finishing.
936937
segment.cleanPrev()
937938
@Suppress("UNCHECKED_CAST")
938-
onElementRetrieved(updCellResult as E)
939+
onElementRetrieved(updCellResult)
939940
}
940941
}
941942
}
@@ -953,7 +954,7 @@ internal open class BufferedChannel<E>(
953954
/* This lambda is invoked when an element has been
954955
successfully retrieved, either from the buffer or
955956
by making a rendezvous with a suspended sender. */
956-
onElementRetrieved: (element: E) -> Unit,
957+
onElementRetrieved: (element: Any?) -> Unit,
957958
/* This lambda is called when the channel is observed
958959
in the closed state and no waiting senders is found,
959960
which means that it is closed for receiving. */
@@ -1512,7 +1513,7 @@ internal open class BufferedChannel<E>(
15121513
private fun registerSelectForReceive(select: SelectInstance<*>, ignoredParam: Any?) =
15131514
receiveImpl( // <-- this is an inline function
15141515
waiter = select,
1515-
onElementRetrieved = { elem -> select.selectInRegistrationPhase(elem) },
1516+
onElementRetrieved = { elem -> select.selectInRegistrationPhase(unwrapTyped(elem)) },
15161517
onSuspend = { _, _, _ -> },
15171518
onClosed = { onClosedSelectOnReceive(select) }
15181519
)
@@ -1609,7 +1610,7 @@ internal open class BufferedChannel<E>(
16091610
// Also, inform the `BufferedChannel` extensions that
16101611
// the synchronization of this receive operation is completed.
16111612
onElementRetrieved = { element ->
1612-
saveReceiveResult(element)
1613+
this.receiveResult = element
16131614
true
16141615
},
16151616
// As no waiter is provided, suspension is impossible.
@@ -1647,9 +1648,9 @@ internal open class BufferedChannel<E>(
16471648
// In case `onUndeliveredElement` is present, we must
16481649
// invoke it in the latter case.
16491650
onElementRetrieved = { element ->
1650-
saveReceiveResult(element)
1651+
this.receiveResult = element
16511652
this.continuation = null
1652-
cont.resume(true, onUndeliveredElement?.bindCancellationFun(element, cont.context))
1653+
cont.resume(true, onUndeliveredElement?.bindCancellationFun(unwrapTyped(element), cont.context))
16531654
},
16541655
onClosed = { onClosedHasNextNoWaiterSuspend() }
16551656
)
@@ -1704,17 +1705,13 @@ internal open class BufferedChannel<E>(
17041705
val cont = this.continuation!!
17051706
this.continuation = null
17061707
// Store the retrieved element in `receiveResult`.
1707-
saveReceiveResult(element)
1708+
this.receiveResult = wrapInternal(element)
17081709
// Try to resume this `hasNext()`. Importantly, the receiver coroutine
17091710
// may be cancelled after it is successfully resumed but not dispatched yet.
17101711
// In case `onUndeliveredElement` is specified, we need to invoke it in the latter case.
17111712
return cont.tryResume0(true, onUndeliveredElement?.bindCancellationFun(element, cont.context))
17121713
}
17131714

1714-
private fun saveReceiveResult(element: E) {
1715-
this.receiveResult = wrapInternal(element)
1716-
}
1717-
17181715
fun tryResumeHasNextOnClosedChannel() {
17191716
/*
17201717
* Read the current continuation of the suspended `hasNext()` call and clean the corresponding field to avoid memory leaks.
@@ -2783,16 +2780,17 @@ internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: Bu
27832780
}
27842781

27852782
@Suppress("UNCHECKED_CAST")
2786-
internal fun getElement(index: Int) = data[index * 2].value as E
2783+
internal fun getElement(index: Int) = unwrapInternal(data[index * 2].value) as E
27872784

2788-
internal fun retrieveElement(index: Int): E = getElement(index).also { cleanElement(index) }
2785+
@Suppress("UNCHECKED_CAST")
2786+
internal fun retrieveElement(index: Int): E = (data[index * 2].value as E).also { cleanElement(index) }
27892787

27902788
internal fun cleanElement(index: Int) {
2791-
setElementLazy(index, null)
2789+
data[index * 2].lazySet(null)
27922790
}
27932791

27942792
private fun setElementLazy(index: Int, value: Any?) {
2795-
data[index * 2].lazySet(value)
2793+
data[index * 2].lazySet(wrapInternal(value))
27962794
}
27972795

27982796
// ######################################

kotlinx-coroutines-core/common/src/flow/internal/FlowValueWrapperInternal.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ internal class FlowValueWrapperInternal<T>(val value: T)
1414

1515
internal fun <T> wrapInternal(value: T): T = value
1616
internal fun <T> unwrapInternal(value: T): T = value
17+
internal fun <T> unwrapTyped(value: Any?): T = NULL.unbox(unwrapInternal(value))
1718

1819
// debugger agent transforms wrapInternal so it returns wrapInternalDebuggerCapture(value) instead of just value.
1920
private fun wrapInternalDebuggerCapture(value: Any?): Any {
@@ -35,5 +36,5 @@ private fun unwrapInternalDebuggerCapture(value: Any?): Any? {
3536

3637
// Shouldn't be inlined, the method is instrumented by the IDEA debugger agent
3738
internal suspend fun <T> FlowCollector<T>.emitInternal(value: Any?) {
38-
emit(NULL.unbox(unwrapInternal(value)))
39+
emit(unwrapTyped<T>(value))
3940
}

0 commit comments

Comments
 (0)