10
10
//===----------------------------------------------------------------------===//
11
11
12
12
import Synchronization
13
+ import DequeModule
13
14
14
15
@available ( AsyncAlgorithms 1 . 1 , * )
15
16
extension AsyncSequence where Element: Sendable , Self: SendableMetatype , AsyncIterator: SendableMetatype {
@@ -19,69 +20,62 @@ extension AsyncSequence where Element: Sendable, Self: SendableMetatype, AsyncIt
19
20
/// iterated by multiple concurrent tasks. This is useful when you want to broadcast elements from
20
21
/// a single source to multiple consumers without duplicating work or creating separate iterations.
21
22
///
22
- /// - Important: Each element from the source sequence is delivered to all active iterators.
23
- /// Elements are buffered according to the specified buffering policy to handle timing differences
24
- /// between consumers.
23
+ /// Each element from the source sequence is delivered to all active iterators.
24
+ /// Elements are buffered according to the specified buffering policy to handle timing differences
25
+ /// between consumers.
25
26
///
26
- /// - Parameter bufferingPolicy: The policy controlling how elements are buffered when consumers
27
- /// iterate at different rates. Defaults to `.bounded(1)`.
28
- /// - `.bounded(n)`: Limits the buffer to `n` elements, applying backpressure to the source when that limit is reached
29
- /// - `.bufferingOldest(n)`: Keeps the oldest `n` elements, discarding newer ones when full
30
- /// - `.bufferingNewest(n)`: Keeps the newest `n` elements, discarding older ones when full
31
- /// - `.unbounded`: Allows unlimited buffering (use with caution)
32
- ///
33
- /// - Returns: A sendable async sequence that can be safely shared across multiple concurrent tasks.
27
+ /// The base sequence is iterated in it's own task to ensure that cancellation is not polluted from
28
+ /// one side of iteration to another.
34
29
///
35
30
/// ## Example Usage
36
31
///
37
32
/// ```swift
38
- /// let numbers = AsyncStream<Int> { continuation in
39
- /// Task {
40
- /// for i in 1...5 {
41
- /// continuation.yield(i)
42
- /// try await Task.sleep(for: .seconds(1))
43
- /// }
44
- /// continuation.finish()
45
- /// }
33
+ /// let numbers = [1, 2, 3, 4, 5].share.map {
34
+ /// try? await Task.sleep(for: .seconds(1))
35
+ /// return $0
46
36
/// }
47
37
///
48
38
/// let shared = numbers.share()
49
39
///
50
40
/// // Multiple tasks can iterate concurrently
51
- /// async let consumer1 = Task {
52
- /// for await value in shared {
53
- /// print("Consumer 1: \(value)")
54
- /// }
41
+ /// let consumer1 = Task {
42
+ /// for await value in shared {
43
+ /// print("Consumer 1: \(value)")
44
+ /// }
55
45
/// }
56
46
///
57
- /// async let consumer2 = Task {
58
- /// for await value in shared {
59
- /// print("Consumer 2: \(value)")
60
- /// }
47
+ /// let consumer2 = Task {
48
+ /// for await value in shared {
49
+ /// print("Consumer 2: \(value)")
50
+ /// }
61
51
/// }
62
52
///
63
53
/// await consumer1.value
64
54
/// await consumer2.value
65
55
/// ```
66
56
///
67
- /// ## Buffering Behavior
68
- ///
69
- /// The buffering policy determines how the shared sequence handles elements when consumers
70
- /// iterate at different speeds:
57
+ /// - Parameter bufferingPolicy: The policy controlling how elements are enqueued to the shared buffer. Defaults to `.bounded(1)`.
58
+ /// - `.bounded(n)`: Limits the buffer to `n` elements, applying backpressure to the source when that limit is reached
59
+ /// - `.bufferingOldest(n)`: Keeps the oldest `n` elements, discarding newer ones when full
60
+ /// - `.bufferingNewest(n)`: Keeps the newest `n` elements, discarding older ones when full
61
+ /// - `.unbounded`: Allows unlimited buffering (use with caution)
71
62
///
72
- /// - **Bounded**: Applies backpressure to slow down the source when the buffer is full
73
- /// - **Buffering Oldest**: Drops new elements when the buffer is full, preserving older ones
74
- /// - **Buffering Newest**: Drops old elements when the buffer is full, preserving newer ones
75
- /// - **Unbounded**: Never drops elements but may consume unbounded memory
63
+ /// - Returns: A sendable async sequence that can be safely shared across multiple concurrent tasks.
76
64
///
77
- /// - Note: The source async sequence's iterator is consumed only once, regardless of how many
78
- /// concurrent consumers are active. This makes sharing efficient for expensive-to-produce sequences.
79
65
public func share( bufferingPolicy: AsyncBufferSequencePolicy = . bounded( 1 ) ) -> some AsyncSequence < Element , Failure > & Sendable {
80
- // the iterator is transferred to the isolation of the iterating task
66
+ // The iterator is transferred to the isolation of the iterating task
81
67
// this has to be done "unsafely" since we cannot annotate the transfer
82
68
// however since iterating an AsyncSequence types twice has been defined
83
69
// as invalid and one creation of the iterator is virtually a consuming
84
70
// operation so this is safe at runtime.
71
+ // The general principal of `.share()` is to provide a mecahnism for non-
72
+ // shared AsyncSequence types to be shared. The parlance for those is
73
+ // that the base AsyncSequence type is not Sendable. If the iterator
74
+ // is not marked as `nonisolated(unsafe)` the compiler will claim that
75
+ // the value is "Capture of 'iterator' with non-Sendable type 'Self.AsyncIterator' in a '@Sendable' closure;"
76
+ // Since the closure returns a disconnected non-sendable value there is no
77
+ // distinct problem here and the compiler just needs to be informed
78
+ // that the diagnostic is overly pessimistic.
85
79
nonisolated ( unsafe) let iterator = makeAsyncIterator ( )
86
80
return AsyncShareSequence < Self > ( {
87
81
iterator
@@ -98,17 +92,17 @@ extension AsyncSequence where Element: Sendable, Self: SendableMetatype, AsyncIt
98
92
//
99
93
// ## Key Features
100
94
//
101
- // - **Single Source Iteration**: The base sequence's iterator is created and consumed only once
102
- // - **Concurrent Safe**: Multiple tasks can safely iterate simultaneously
103
- // - **Configurable Buffering**: Supports various buffering strategies for different use cases
104
- // - **Automatic Cleanup**: Properly manages resources and cancellation across all consumers
95
+ // **Single Source Iteration**: The base sequence's iterator is created and consumed only once
96
+ // **Concurrent Safe**: Multiple tasks can safely iterate simultaneously
97
+ // **Configurable Buffering**: Supports various buffering strategies for different use cases
98
+ // **Automatic Cleanup**: Properly manages resources and cancellation across all consumers
105
99
//
106
100
// ## Internal Architecture
107
101
//
108
102
// The implementation uses several key components:
109
- // - `Side`: Represents a single consumer's iteration state
110
- // - `Iteration`: Coordinates all consumers and manages the shared buffer
111
- // - `Extent`: Manages the overall lifecycle and cleanup
103
+ // `Side`: Represents a single consumer's iteration state
104
+ // `Iteration`: Coordinates all consumers and manages the shared buffer
105
+ // `Extent`: Manages the overall lifecycle and cleanup
112
106
//
113
107
// This type is typically not used directly; instead, use the `share()` method on any
114
108
// async sequence that meets the sendability requirements.
@@ -123,9 +117,9 @@ struct AsyncShareSequence<Base: AsyncSequence>: Sendable where Base.Element: Sen
123
117
//
124
118
// ## Lifecycle
125
119
//
126
- // - **Creation**: Automatically registers with the iteration coordinator
127
- // - **Usage**: Tracks buffer position and manages async continuations
128
- // - **Cleanup**: Automatically unregisters and cancels pending operations on deinit
120
+ // **Creation**: Automatically registers with the iteration coordinator
121
+ // **Usage**: Tracks buffer position and manages async continuations
122
+ // **Cleanup**: Automatically unregisters and cancels pending operations on deinit
129
123
final class Side {
130
124
// Tracks the state of a single consumer's iteration.
131
125
//
@@ -167,11 +161,11 @@ struct AsyncShareSequence<Base: AsyncSequence>: Sendable where Base.Element: Sen
167
161
// The central coordinator that manages the shared iteration state.
168
162
//
169
163
// `Iteration` is responsible for:
170
- // - Managing the single background task that consumes the source sequence
171
- // - Coordinating between multiple consumer sides
172
- // - Buffering elements according to the specified policy
173
- // - Handling backpressure and flow control
174
- // - Managing cancellation and cleanup
164
+ // Managing the single background task that consumes the source sequence
165
+ // Coordinating between multiple consumer sides
166
+ // Buffering elements according to the specified policy
167
+ // Handling backpressure and flow control
168
+ // Managing cancellation and cleanup
175
169
//
176
170
// ## Thread Safety
177
171
//
@@ -181,10 +175,10 @@ struct AsyncShareSequence<Base: AsyncSequence>: Sendable where Base.Element: Sen
181
175
// Represents the state of the background task that consumes the source sequence.
182
176
//
183
177
// The iteration task goes through several states during its lifecycle:
184
- // - `pending`: Initial state, holds the factory to create the iterator
185
- // - `starting`: Transitional state while the task is being created
186
- // - `running`: Active state with a running background task
187
- // - `cancelled`: Terminal state when the iteration has been cancelled
178
+ // `pending`: Initial state, holds the factory to create the iterator
179
+ // `starting`: Transitional state while the task is being created
180
+ // `running`: Active state with a running background task
181
+ // `cancelled`: Terminal state when the iteration has been cancelled
188
182
enum IteratingTask {
189
183
case pending( @Sendable ( ) -> sending Base. AsyncIterator )
190
184
case starting
@@ -215,9 +209,9 @@ struct AsyncShareSequence<Base: AsyncSequence>: Sendable where Base.Element: Sen
215
209
struct State : Sendable {
216
210
// Defines how elements are stored and potentially discarded in the shared buffer.
217
211
//
218
- // - `unbounded`: Store all elements without limit (may cause memory growth)
219
- // - `bufferingOldest(Int)`: Keep only the oldest N elements, ignore newer ones when full
220
- // - `bufferingNewest(Int)`: Keep only the newest N elements, discard older ones when full
212
+ // `unbounded`: Store all elements without limit (may cause memory growth)
213
+ // `bufferingOldest(Int)`: Keep only the oldest N elements, ignore newer ones when full
214
+ // `bufferingNewest(Int)`: Keep only the newest N elements, discard older ones when full
221
215
enum StoragePolicy : Sendable {
222
216
case unbounded
223
217
case bufferingOldest( Int )
@@ -227,7 +221,7 @@ struct AsyncShareSequence<Base: AsyncSequence>: Sendable where Base.Element: Sen
227
221
var generation = 0
228
222
var sides = [ Int: Side . State] ( )
229
223
var iteratingTask : IteratingTask
230
- private( set) var buffer = [ Element] ( )
224
+ private( set) var buffer = Deque < Element > ( )
231
225
private( set) var finished = false
232
226
private( set) var failure : Failure ?
233
227
var cancelled = false
@@ -295,9 +289,9 @@ struct AsyncShareSequence<Base: AsyncSequence>: Sendable where Base.Element: Sen
295
289
// Adds an element to the buffer according to the configured storage policy.
296
290
//
297
291
// The behavior depends on the storage policy:
298
- // - **Unbounded**: Always appends the element
299
- // - **Buffering Oldest**: Appends only if under the limit, otherwise ignores the element
300
- // - **Buffering Newest**: Appends if under the limit, otherwise removes the oldest and appends
292
+ // **Unbounded**: Always appends the element
293
+ // **Buffering Oldest**: Appends only if under the limit, otherwise ignores the element
294
+ // **Buffering Newest**: Appends if under the limit, otherwise removes the oldest and appends
301
295
//
302
296
// - Parameter element: The element to add to the buffer
303
297
mutating func enqueue( _ element: Element ) {
@@ -521,44 +515,8 @@ struct AsyncShareSequence<Base: AsyncSequence>: Sendable where Base.Element: Sen
521
515
}
522
516
}
523
517
524
- func next( isolation actor : isolated ( any Actor ) ? , id: Int ) async throws ( Failure) -> Element ? {
525
- let ( factory, cancelled) = state. withLock { state -> ( ( @Sendable ( ) -> sending Base. AsyncIterator ) ? , Bool ) in
526
- switch state. iteratingTask {
527
- case . pending( let factory) :
528
- state. iteratingTask = . starting
529
- return ( factory, false )
530
- case . cancelled:
531
- return ( nil , true )
532
- default :
533
- return ( nil , false )
534
- }
535
- }
536
- if cancelled { return nil }
537
- if let factory {
538
- // this has to be interfaced as detached since we want the priority inference
539
- // from the creator to not have a direct effect on the iteration.
540
- // This might be improved later by passing on the creation context's task
541
- // priority.
542
- let task = Task . detached ( name: " Share Iteration " ) { [ factory, self ] in
543
- var iterator = factory ( )
544
- do {
545
- while await iterate ( ) {
546
- if let element = try await iterator. next ( ) {
547
- emit ( . success( element) )
548
- } else {
549
- emit ( . success( nil ) )
550
- }
551
- }
552
- } catch {
553
- emit ( . failure( error as! Failure ) )
554
- }
555
- }
556
- state. withLock { state in
557
- precondition ( state. iteratingTask. isStarting)
558
- state. iteratingTask = . running( task)
559
- }
560
- }
561
- let result : Result < Element ? , Failure > = await withTaskCancellationHandler {
518
+ private func nextIteration( _ id: Int ) async -> Result < AsyncShareSequence < Base > . Element ? , AsyncShareSequence < Base > . Failure > {
519
+ return await withTaskCancellationHandler {
562
520
await withUnsafeContinuation { continuation in
563
521
let ( res, limitContinuation, demandContinuation, cancelled) = state. withLock { state -> ( Result < Element ? , Failure > ? , UnsafeContinuation < Bool , Never > ? , UnsafeContinuation < Void , Never > ? , Bool ) in
564
522
guard let side = state. sides [ id] else {
@@ -595,8 +553,91 @@ struct AsyncShareSequence<Base: AsyncSequence>: Sendable where Base.Element: Sen
595
553
} onCancel: {
596
554
cancel ( id: id)
597
555
}
556
+ }
557
+
558
+ private func iterationLoop( factory: @Sendable ( ) -> sending Base. AsyncIterator ) async {
559
+ var iterator = factory ( )
560
+ do {
561
+ while await iterate ( ) {
562
+ if let element = try await iterator. next ( ) {
563
+ emit ( . success( element) )
564
+ } else {
565
+ emit ( . success( nil ) )
566
+ }
567
+ }
568
+ } catch {
569
+ emit ( . failure( error as! Failure ) )
570
+ }
571
+ }
572
+
573
+ func next( isolation actor : isolated ( any Actor ) ? , id: Int ) async throws ( Failure) -> Element ? {
574
+ let ( factory, cancelled) = state. withLock { state -> ( ( @Sendable ( ) -> sending Base. AsyncIterator ) ? , Bool ) in
575
+ switch state. iteratingTask {
576
+ case . pending( let factory) :
577
+ state. iteratingTask = . starting
578
+ return ( factory, false )
579
+ case . cancelled:
580
+ return ( nil , true )
581
+ default :
582
+ return ( nil , false )
583
+ }
584
+ }
585
+ if cancelled { return nil }
586
+ if let factory {
587
+ let task : Task < Void , Never >
588
+ // for the fancy dance of availability and canImport see the comment on the next check for details
589
+ #if canImport(_Concurrency, _version: 6.2)
590
+ if #available( macOS 26 . 0 , iOS 26 . 0 , tvOS 26 . 0 , visionOS 26 . 0 , * ) {
591
+ task = Task ( name: " Share Iteration " ) { [ factory, self ] in
592
+ await iterationLoop ( factory: factory)
593
+ }
594
+ } else {
595
+ task = Task . detached ( name: " Share Iteration " ) { [ factory, self ] in
596
+ await iterationLoop ( factory: factory)
597
+ }
598
+ }
599
+ #else
600
+ task = Task . detached ( name: " Share Iteration " ) { [ factory, self ] in
601
+ await iterationLoop ( factory: factory)
602
+ }
603
+ #endif
604
+ // Known Issue: there is a very small race where the task may not get a priority escalation during startup
605
+ // this unfortuantely cannot be avoided since the task should ideally not be formed within the critical
606
+ // region of the state. Since that could lead to potential deadlocks in low-core-count systems.
607
+ // That window is relatively small and can be revisited if a suitable proof of safe behavior can be
608
+ // determined.
609
+ state. withLock { state in
610
+ precondition ( state. iteratingTask. isStarting)
611
+ state. iteratingTask = . running( task)
612
+ }
613
+ }
614
+
615
+ // withTaskPriorityEscalationHandler is only available for the '26 releases and the 6.2 version of
616
+ // the _Concurrency library. This menas for Darwin based OSes we have to have a fallback at runtime,
617
+ // and for non-darwin OSes we need to verify against the ability to import that version.
618
+ // Using this priority escalation means that the base task can avoid being detached.
619
+ #if canImport(_Concurrency, _version: 6.2)
620
+ if #available( macOS 26 . 0 , iOS 26 . 0 , tvOS 26 . 0 , visionOS 26 . 0 , * ) {
621
+ return try await withTaskPriorityEscalationHandler {
622
+ return await nextIteration ( id)
623
+ } onPriorityEscalated: { old, new in
624
+ let task = state. withLock { state -> Task < Void , Never > ? in
625
+ switch state. iteratingTask {
626
+ case . running( let task) :
627
+ return task
628
+ default :
629
+ return nil
630
+ }
631
+ }
632
+ task? . escalatePriority ( to: new)
633
+ } . get ( )
634
+ } else {
635
+ return try await nextIteration ( id) . get ( )
636
+ }
637
+ #else
638
+ return try await nextIteration ( id) . get ( )
639
+ #endif
598
640
599
- return try result. get ( )
600
641
}
601
642
}
602
643
0 commit comments