Skip to content

Commit d5f63ee

Browse files
authored
Enable Sendability for AsyncStream and AsyncThrowingStream (#41713)
* Enable Sendability for AsyncStream and AsyncThrowingStream * Move removeFirst to hit all cases where the continuations are not empty
1 parent 2af1460 commit d5f63ee

File tree

3 files changed

+28
-24
lines changed

3 files changed

+28
-24
lines changed

stdlib/public/Concurrency/AsyncStream.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,3 +428,6 @@ extension AsyncStream.Continuation {
428428
return storage.yield(())
429429
}
430430
}
431+
432+
@available(SwiftStdlib 5.1, *)
433+
extension AsyncStream: @unchecked Sendable where Element: Sendable { }

stdlib/public/Concurrency/AsyncStreamBuffer.swift

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import Swift
1717
import Darwin
1818

1919
func _lockWordCount() -> Int {
20-
let sz =
20+
let sz =
2121
MemoryLayout<os_unfair_lock>.size / MemoryLayout<UnsafeRawPointer>.size
2222
return max(sz, 1)
2323
}
@@ -57,7 +57,7 @@ extension AsyncStream {
5757
typealias TerminationHandler = @Sendable (Continuation.Termination) -> Void
5858

5959
struct State {
60-
var continuation: UnsafeContinuation<Element?, Never>?
60+
var continuations = [UnsafeContinuation<Element?, Never>]()
6161
var pending = _Deque<Element>()
6262
let limit: Continuation.BufferingPolicy
6363
var onTermination: TerminationHandler?
@@ -105,7 +105,7 @@ extension AsyncStream {
105105
}
106106
}
107107

108-
func cancel() {
108+
@Sendable func cancel() {
109109
lock()
110110
// swap out the handler before we invoke it to prevent double cancel
111111
let handler = state.onTermination
@@ -123,7 +123,9 @@ extension AsyncStream {
123123
lock()
124124
let limit = state.limit
125125
let count = state.pending.count
126-
if let continuation = state.continuation {
126+
127+
if !state.continuations.isEmpty {
128+
let continuation = state.continuations.removeFirst()
127129
if count > 0 {
128130
if !state.terminal {
129131
switch limit {
@@ -151,17 +153,14 @@ extension AsyncStream {
151153
} else {
152154
result = .terminated
153155
}
154-
state.continuation = nil
155156
let toSend = state.pending.removeFirst()
156157
unlock()
157158
continuation.resume(returning: toSend)
158159
} else if state.terminal {
159-
state.continuation = nil
160160
result = .terminated
161161
unlock()
162162
continuation.resume(returning: nil)
163163
} else {
164-
state.continuation = nil
165164
switch limit {
166165
case .unbounded:
167166
result = .enqueued(remaining: .max)
@@ -212,15 +211,15 @@ extension AsyncStream {
212211
state.onTermination = nil
213212
state.terminal = true
214213

215-
if let continuation = state.continuation {
214+
if let continuation = state.continuations.first {
216215
if state.pending.count > 0 {
217-
state.continuation = nil
216+
state.continuations.removeFirst()
218217
let toSend = state.pending.removeFirst()
219218
unlock()
220219
handler?(.finished)
221220
continuation.resume(returning: toSend)
222221
} else if state.terminal {
223-
state.continuation = nil
222+
state.continuations.removeFirst()
224223
unlock()
225224
handler?(.finished)
226225
continuation.resume(returning: nil)
@@ -236,22 +235,20 @@ extension AsyncStream {
236235

237236
func next(_ continuation: UnsafeContinuation<Element?, Never>) {
238237
lock()
239-
if state.continuation == nil {
240-
if state.pending.count > 0 {
241-
let toSend = state.pending.removeFirst()
242-
unlock()
243-
continuation.resume(returning: toSend)
244-
} else if state.terminal {
245-
unlock()
246-
continuation.resume(returning: nil)
247-
} else {
248-
state.continuation = continuation
249-
unlock()
250-
}
238+
state.continuations.append(continuation)
239+
if state.pending.count > 0 {
240+
let cont = state.continuations.removeFirst()
241+
let toSend = state.pending.removeFirst()
242+
unlock()
243+
cont.resume(returning: toSend)
244+
} else if state.terminal {
245+
let cont = state.continuations.removeFirst()
246+
unlock()
247+
cont.resume(returning: nil)
251248
} else {
252249
unlock()
253-
fatalError("attempt to await next() on more than one task")
254250
}
251+
255252
}
256253

257254
func next() async -> Element? {
@@ -341,7 +338,7 @@ extension AsyncThrowingStream {
341338
}
342339
}
343340

344-
func cancel() {
341+
@Sendable func cancel() {
345342
lock()
346343
// swap out the handler before we invoke it to prevent double cancel
347344
let handler = state.onTermination
@@ -595,3 +592,4 @@ final class _AsyncStreamCriticalStorage<Contents>: @unchecked Sendable {
595592
return storage
596593
}
597594
}
595+

stdlib/public/Concurrency/AsyncThrowingStream.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,3 +471,6 @@ extension AsyncThrowingStream.Continuation {
471471
storage.yield(())
472472
}
473473
}
474+
475+
@available(SwiftStdlib 5.1, *)
476+
extension AsyncThrowingStream: @unchecked Sendable where Element: Sendable { }

0 commit comments

Comments
 (0)