File tree Expand file tree Collapse file tree 2 files changed +47
-17
lines changed
stdlib/public/Concurrency Expand file tree Collapse file tree 2 files changed +47
-17
lines changed Original file line number Diff line number Diff line change @@ -212,25 +212,21 @@ extension AsyncStream {
212212 state. onTermination = nil
213213 state. terminal = true
214214
215- if let continuation = state. continuations. first {
216- if state. pending. count > 0 {
217- state. continuations. removeFirst ( )
218- let toSend = state. pending. removeFirst ( )
219- unlock ( )
220- handler ? ( . finished)
221- continuation. resume ( returning: toSend)
222- } else if state. terminal {
223- state. continuations. removeFirst ( )
224- unlock ( )
225- handler ? ( . finished)
226- continuation. resume ( returning: nil )
227- } else {
228- unlock ( )
229- handler ? ( . finished)
230- }
231- } else {
215+ guard !state. continuations. isEmpty else {
232216 unlock ( )
233217 handler ? ( . finished)
218+ return
219+ }
220+
221+ // Hold on to the continuations to resume outside the lock.
222+ let continuations = state. continuations
223+ state. continuations. removeAll ( )
224+
225+ unlock ( )
226+ handler ? ( . finished)
227+
228+ for continuation in continuations {
229+ continuation. resume ( returning: nil )
234230 }
235231 }
236232
Original file line number Diff line number Diff line change @@ -435,6 +435,40 @@ class NotSendable {}
435435 expectTrue ( expectation. fulfilled)
436436 }
437437
438+ // MARK: - Multiple consumers
439+
440+ tests. test ( " finish behavior with multiple consumers " ) {
441+ let ( stream, continuation) = AsyncStream< Int> . makeStream( )
442+ let ( controlStream, controlContinuation) = AsyncStream< Int> . makeStream( )
443+ var controlIterator = controlStream. makeAsyncIterator ( )
444+
445+ func makeConsumingTaskWithIndex( _ index: Int ) -> Task < Void , Never > {
446+ Task { @MainActor in
447+ controlContinuation. yield ( index)
448+ for await i in stream {
449+ controlContinuation. yield ( i)
450+ }
451+ }
452+ }
453+
454+ // Set up multiple consumers
455+ let consumer1 = makeConsumingTaskWithIndex ( 1 )
456+ expectEqual ( await controlIterator. next ( isolation: #isolation) , 1 )
457+
458+ let consumer2 = makeConsumingTaskWithIndex ( 2 )
459+ expectEqual ( await controlIterator. next ( isolation: #isolation) , 2 )
460+
461+ // Ensure the iterators are suspended
462+ await MainActor . run { }
463+
464+ // Terminate the stream
465+ continuation. finish ( )
466+
467+ // Ensure the consuming Tasks both complete
468+ _ = await consumer1. value
469+ _ = await consumer2. value
470+ }
471+
438472 await runAllTestsAsync ( )
439473 }
440474 }
You can’t perform that action at this time.
0 commit comments