Skip to content

Commit 3bea96a

Browse files
committed
[stdlib]: Propagate AsyncStream termination to all consumers
As of the changes in #41713 to enable Sendability for AsyncStream, it has been possible to create multiple stream consumers operating concurrently. This change fixes behavior in the case that the underlying stream is terminated while multiple pending continuations are outstanding. Previously such consumers would have been leaked (never resumed). Now, they are notified of the stream's termination and resumed appropriately. Resolves #66541 & #71412
1 parent ffba6d1 commit 3bea96a

File tree

2 files changed

+47
-17
lines changed

2 files changed

+47
-17
lines changed

stdlib/public/Concurrency/AsyncStreamBuffer.swift

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -212,25 +212,21 @@ extension AsyncStream {
212212
state.onTermination = nil
213213
state.terminal = true
214214

215-
if let continuation = state.continuations.first {
216-
if state.pending.count > 0 {
217-
state.continuations.removeFirst()
218-
let toSend = state.pending.removeFirst()
219-
unlock()
220-
handler?(.finished)
221-
continuation.resume(returning: toSend)
222-
} else if state.terminal {
223-
state.continuations.removeFirst()
224-
unlock()
225-
handler?(.finished)
226-
continuation.resume(returning: nil)
227-
} else {
228-
unlock()
229-
handler?(.finished)
230-
}
231-
} else {
215+
guard !state.continuations.isEmpty else {
232216
unlock()
233217
handler?(.finished)
218+
return
219+
}
220+
221+
// Hold on to the continuations to resume outside the lock.
222+
let continuations = state.continuations
223+
state.continuations.removeAll()
224+
225+
unlock()
226+
handler?(.finished)
227+
228+
for continuation in continuations {
229+
continuation.resume(returning: nil)
234230
}
235231
}
236232

test/Concurrency/Runtime/async_stream.swift

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,40 @@ class NotSendable {}
435435
expectTrue(expectation.fulfilled)
436436
}
437437

438+
// MARK: - Multiple consumers
439+
440+
tests.test("finish behavior with multiple consumers") {
441+
let (stream, continuation) = AsyncStream<Int>.makeStream()
442+
let (controlStream, controlContinuation) = AsyncStream<Int>.makeStream()
443+
var controlIterator = controlStream.makeAsyncIterator()
444+
445+
func makeConsumingTaskWithIndex(_ index: Int) -> Task<Void, Never> {
446+
Task { @MainActor in
447+
controlContinuation.yield(index)
448+
for await i in stream {
449+
controlContinuation.yield(i)
450+
}
451+
}
452+
}
453+
454+
// Set up multiple consumers
455+
let consumer1 = makeConsumingTaskWithIndex(1)
456+
expectEqual(await controlIterator.next(isolation: #isolation), 1)
457+
458+
let consumer2 = makeConsumingTaskWithIndex(2)
459+
expectEqual(await controlIterator.next(isolation: #isolation), 2)
460+
461+
// Ensure the iterators are suspended
462+
await MainActor.run {}
463+
464+
// Terminate the stream
465+
continuation.finish()
466+
467+
// Ensure the consuming Tasks both complete
468+
_ = await consumer1.value
469+
_ = await consumer2.value
470+
}
471+
438472
await runAllTestsAsync()
439473
}
440474
}

0 commit comments

Comments
 (0)