9
9
//
10
10
//===----------------------------------------------------------------------===//
11
11
12
- #if compiler(>=6.2)
13
-
14
12
import Synchronization
15
13
import DequeModule
16
14
17
- @available ( AsyncAlgorithms 1 . 1 , * )
15
+ @available ( AsyncAlgorithms 1 . 0 , * )
18
16
extension AsyncSequence
19
- where Element: Sendable , Self: SendableMetatype , AsyncIterator: SendableMetatype {
17
+ where Element: Sendable , Self: _SendableMetatype , AsyncIterator: _SendableMetatype {
20
18
/// Creates a shared async sequence that allows multiple concurrent iterations over a single source.
21
19
///
22
20
/// The `share` method transforms an async sequence into a shareable sequence that can be safely
@@ -67,7 +65,7 @@ where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype
67
65
///
68
66
public func share(
69
67
bufferingPolicy: AsyncBufferSequencePolicy = . bounded( 1 )
70
- ) -> some AsyncSequence < Element , Failure > & Sendable {
68
+ ) -> AsyncShareSequence < Self > {
71
69
// The iterator is transferred to the isolation of the iterating task
72
70
// this has to be done "unsafely" since we cannot annotate the transfer
73
71
// however since iterating an AsyncSequence types twice has been defined
@@ -114,9 +112,9 @@ where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype
114
112
//
115
113
// This type is typically not used directly; instead, use the `share()` method on any
116
114
// async sequence that meets the sendability requirements.
117
- @available ( AsyncAlgorithms 1 . 1 , * )
118
- struct AsyncShareSequence < Base: AsyncSequence > : Sendable
119
- where Base. Element: Sendable , Base: SendableMetatype , Base. AsyncIterator: SendableMetatype {
115
+ @available ( AsyncAlgorithms 1 . 0 , * )
116
+ public struct AsyncShareSequence < Base: AsyncSequence > : Sendable
117
+ where Base. Element: Sendable , Base: _SendableMetatype , Base. AsyncIterator: _SendableMetatype {
120
118
// Represents a single consumer's connection to the shared sequence.
121
119
//
122
120
// Each iterator of the shared sequence creates its own `Side` instance, which tracks
@@ -135,7 +133,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
135
133
// - `continuation`: The continuation waiting for the next element (nil if not waiting)
136
134
// - `position`: The consumer's current position in the shared buffer
137
135
struct State {
138
- var continuation : UnsafeContinuation < Result < Element ? , Failure > , Never > ?
136
+ var continuation : UnsafeContinuation < Result < Base . Element ? , Error > , Never > ?
139
137
var position = 0
140
138
141
139
// Creates a new state with the position adjusted by the given offset.
@@ -162,7 +160,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
162
160
iteration. unregisterSide ( id)
163
161
}
164
162
165
- func next( isolation actor : isolated ( any Actor ) ? ) async throws ( Failure ) -> Element ? {
163
+ func next( isolation actor : isolated ( any Actor ) ? ) async throws -> Base . Element ? {
166
164
try await iteration. next ( isolation: actor , id: id)
167
165
}
168
166
}
@@ -230,9 +228,9 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
230
228
var generation = 0
231
229
var sides = [ Int: Side . State] ( )
232
230
var iteratingTask : IteratingTask
233
- private( set) var buffer = Deque < Element > ( )
231
+ private( set) var buffer = Deque < Base . Element > ( )
234
232
private( set) var finished = false
235
- private( set) var failure : Failure ?
233
+ private( set) var failure : Error ?
236
234
var cancelled = false
237
235
var limit : UnsafeContinuation < Bool , Never > ?
238
236
var demand : UnsafeContinuation < Void , Never > ?
@@ -311,7 +309,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
311
309
// **Buffering Newest**: Appends if under the limit, otherwise removes the oldest and appends
312
310
//
313
311
// - Parameter element: The element to add to the buffer
314
- mutating func enqueue( _ element: Element ) {
312
+ mutating func enqueue( _ element: Base . Element ) {
315
313
let count = buffer. count
316
314
317
315
switch storagePolicy {
@@ -335,20 +333,20 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
335
333
finished = true
336
334
}
337
335
338
- mutating func fail( _ error: Failure ) {
336
+ mutating func fail( _ error: Error ) {
339
337
finished = true
340
338
failure = error
341
339
}
342
340
}
343
341
344
- let state : Mutex < State >
342
+ let state : ManagedCriticalState < State >
345
343
let limit : Int ?
346
344
347
345
init (
348
346
_ iteratorFactory: @escaping @Sendable ( ) -> sending Base. AsyncIterator ,
349
347
bufferingPolicy: AsyncBufferSequencePolicy
350
348
) {
351
- state = Mutex ( State ( iteratorFactory, bufferingPolicy: bufferingPolicy) )
349
+ state = ManagedCriticalState ( State ( iteratorFactory, bufferingPolicy: bufferingPolicy) )
352
350
switch bufferingPolicy. policy {
353
351
case . bounded( let limit) :
354
352
self . limit = limit
@@ -478,15 +476,15 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
478
476
}
479
477
480
478
struct Resumption {
481
- let continuation : UnsafeContinuation < Result < Element ? , Failure > , Never >
482
- let result : Result < Element ? , Failure >
479
+ let continuation : UnsafeContinuation < Result < Base . Element ? , Error > , Never >
480
+ let result : Result < Base . Element ? , Error >
483
481
484
482
func resume( ) {
485
483
continuation. resume ( returning: result)
486
484
}
487
485
}
488
486
489
- func emit( _ result: Result < Element ? , Failure > ) {
487
+ func emit( _ result: Result < Base . Element ? , Error > ) {
490
488
let ( resumptions, limitContinuation, demandContinuation, cancelled) = state. withLock {
491
489
state -> ( [ Resumption ] , UnsafeContinuation < Bool , Never > ? , UnsafeContinuation < Void , Never > ? , Bool ) in
492
490
var resumptions = [ Resumption] ( )
@@ -533,12 +531,12 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
533
531
534
532
private func nextIteration(
535
533
_ id: Int
536
- ) async -> Result < AsyncShareSequence < Base > . Element ? , AsyncShareSequence < Base > . Failure > {
534
+ ) async -> Result < Base . Element ? , Error > {
537
535
return await withTaskCancellationHandler {
538
536
await withUnsafeContinuation { continuation in
539
537
let ( res, limitContinuation, demandContinuation, cancelled) = state. withLock {
540
538
state -> (
541
- Result < Element ? , Failure > ? , UnsafeContinuation < Bool , Never > ? , UnsafeContinuation < Void , Never > ? , Bool
539
+ Result < Base . Element ? , Error > ? , UnsafeContinuation < Bool , Never > ? , UnsafeContinuation < Void , Never > ? , Bool
542
540
) in
543
541
guard let side = state. sides [ id] else {
544
542
return state. emit ( . success( nil ) , limit: limit)
@@ -587,11 +585,11 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
587
585
}
588
586
}
589
587
} catch {
590
- emit ( . failure( error as! Failure ) )
588
+ emit ( . failure( error) )
591
589
}
592
590
}
593
591
594
- func next( isolation actor : isolated ( any Actor ) ? , id: Int ) async throws ( Failure ) -> Element ? {
592
+ func next( isolation actor : isolated ( any Actor ) ? , id: Int ) async throws -> Base . Element ? {
595
593
let ( factory, cancelled) = state. withLock { state -> ( ( @Sendable ( ) -> sending Base. AsyncIterator ) ? , Bool ) in
596
594
switch state. iteratingTask {
597
595
case . pending( let factory) :
@@ -697,30 +695,29 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
697
695
}
698
696
}
699
697
700
- @available ( AsyncAlgorithms 1 . 1 , * )
698
+ @available ( AsyncAlgorithms 1 . 0 , * )
701
699
extension AsyncShareSequence : AsyncSequence {
702
- typealias Element = Base . Element
703
- typealias Failure = Base . Failure
700
+ public typealias Element = Base . Element
701
+ public typealias Failure = Swift . Error
704
702
705
- struct Iterator : AsyncIteratorProtocol {
703
+ public struct Iterator : AsyncIteratorProtocol {
706
704
let side : Side
707
705
708
706
init ( _ iteration: Iteration ) {
709
707
side = Side ( iteration)
710
708
}
711
709
712
- mutating func next( ) async rethrows -> Element ? {
710
+ mutating public func next( ) async rethrows -> Element ? {
713
711
try await side. next ( isolation: nil )
714
712
}
715
713
716
- mutating func next( isolation actor : isolated ( any Actor ) ? ) async throws ( Failure) -> Element ? {
714
+ mutating public func next( isolation actor : isolated ( any Actor ) ? ) async throws ( Failure) -> Element ? {
717
715
try await side. next ( isolation: actor )
718
716
}
719
717
}
720
718
721
- func makeAsyncIterator( ) -> Iterator {
719
+ public func makeAsyncIterator( ) -> Iterator {
722
720
Iterator ( extent. iteration)
723
721
}
724
722
}
725
723
726
- #endif
0 commit comments