Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ import CompilerPluginSupport
let AsyncAlgorithms_v1_0 = "AvailabilityMacro=AsyncAlgorithms 1.0:macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0"
#if compiler(>=6.0) && swift(>=6.0) // 5.10 doesnt support visionOS availability
let AsyncAlgorithms_v1_1 =
"AvailabilityMacro=AsyncAlgorithms 1.1:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0"
"AvailabilityMacro=AsyncAlgorithms 1.1:macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, visionOS 1.0"
let AsyncAlgorithms_v1_2 =
"AvailabilityMacro=AsyncAlgorithms 1.2:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0"
#else
let AsyncAlgorithms_v1_1 = "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0"
let AsyncAlgorithms_v1_2 =
"AvailabilityMacro=AsyncAlgorithms 1.2:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0"
#endif

let availabilityMacros: [SwiftSetting] = [
Expand All @@ -18,6 +22,9 @@ let availabilityMacros: [SwiftSetting] = [
.enableExperimentalFeature(
AsyncAlgorithms_v1_1
),
.enableExperimentalFeature(
AsyncAlgorithms_v1_2
)
]

let package = Package(
Expand Down
29 changes: 29 additions & 0 deletions Sources/AsyncAlgorithms/AsyncFailureBackportable.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

/// A backportable protocol / hack to allow `Failure` associated type on older iOS/macOS/etc. versions.
///
/// By assigning this protocol to any value conforming to `AsyncSequence`, they will both have access to `Failure`
/// > There could be a possible issue with mangled name of the entire object as discussed
/// [here](https://forums.swift.org/t/how-to-use-asyncsequence-on-macos-14-5-in-xcode-16-beta-need-help-with-availability-check-since-failure-is-unavailb-e/72439/5).
/// However, the issue should only happen if the object conforming to this protocol follows (_Concurrency, AsyncSequence)
/// in lexicographic order. (AsyncAlgorithms, MySequence) should always be after it.
///
/// Example:
/// ```swift
/// class MySequence: AsyncSequence, AsyncFailureBackportable { ... }
///
/// ```
@available(AsyncAlgorithms 1.1, *)
public protocol AsyncFailureBackportable {
typealias BackportableFailure = Failure
associatedtype Failure: Error
}
83 changes: 42 additions & 41 deletions Sources/AsyncAlgorithms/AsyncShareSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

#if compiler(>=6.2)

import Synchronization
import DequeModule

@available(AsyncAlgorithms 1.1, *)
extension AsyncSequence
where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype {
where Element: Sendable, Self: _SendableMetatype, AsyncIterator: _SendableMetatype {
/// Creates a shared async sequence that allows multiple concurrent iterations over a single source.
///
/// The `share` method transforms an async sequence into a shareable sequence that can be safely
Expand Down Expand Up @@ -67,7 +66,7 @@ where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype
///
public func share(
bufferingPolicy: AsyncBufferSequencePolicy = .bounded(1)
) -> some AsyncSequence<Element, Failure> & Sendable {
) -> AsyncShareSequence<Self> {
// The iterator is transferred to the isolation of the iterating task
// this has to be done "unsafely" since we cannot annotate the transfer
// however since iterating an AsyncSequence types twice has been defined
Expand Down Expand Up @@ -115,8 +114,8 @@ where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype
// This type is typically not used directly; instead, use the `share()` method on any
// async sequence that meets the sendability requirements.
@available(AsyncAlgorithms 1.1, *)
struct AsyncShareSequence<Base: AsyncSequence>: Sendable
where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: SendableMetatype {
public struct AsyncShareSequence<Base: AsyncSequence>: Sendable
where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _SendableMetatype {
// Represents a single consumer's connection to the shared sequence.
//
// Each iterator of the shared sequence creates its own `Side` instance, which tracks
Expand All @@ -135,7 +134,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
// - `continuation`: The continuation waiting for the next element (nil if not waiting)
// - `position`: The consumer's current position in the shared buffer
struct State {
var continuation: UnsafeContinuation<Result<Element?, Failure>, Never>?
var continuation: UnsafeContinuation<Result<Base.Element?, Failure>, Never>?
var position = 0

// Creates a new state with the position adjusted by the given offset.
Expand All @@ -162,7 +161,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
iteration.unregisterSide(id)
}

func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? {
func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Base.Element? {
try await iteration.next(isolation: actor, id: id)
}
}
Expand Down Expand Up @@ -230,7 +229,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
var generation = 0
var sides = [Int: Side.State]()
var iteratingTask: IteratingTask
private(set) var buffer = Deque<Element>()
private(set) var buffer = Deque<Base.Element>()
private(set) var finished = false
private(set) var failure: Failure?
var cancelled = false
Expand Down Expand Up @@ -311,7 +310,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
// **Buffering Newest**: Appends if under the limit, otherwise removes the oldest and appends
//
// - Parameter element: The element to add to the buffer
mutating func enqueue(_ element: Element) {
mutating func enqueue(_ element: Base.Element) {
let count = buffer.count

switch storagePolicy {
Expand Down Expand Up @@ -341,14 +340,14 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
}
}

let state: Mutex<State>
let state: ManagedCriticalState<State>
let limit: Int?

init(
_ iteratorFactory: @escaping @Sendable () -> sending Base.AsyncIterator,
bufferingPolicy: AsyncBufferSequencePolicy
) {
state = Mutex(State(iteratorFactory, bufferingPolicy: bufferingPolicy))
state = ManagedCriticalState(State(iteratorFactory, bufferingPolicy: bufferingPolicy))
switch bufferingPolicy.policy {
case .bounded(let limit):
self.limit = limit
Expand Down Expand Up @@ -478,15 +477,15 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
}

struct Resumption {
let continuation: UnsafeContinuation<Result<Element?, Failure>, Never>
let result: Result<Element?, Failure>
let continuation: UnsafeContinuation<Result<Base.Element?, Failure>, Never>
let result: Result<Base.Element?, Failure>

func resume() {
continuation.resume(returning: result)
}
}

func emit(_ result: Result<Element?, Failure>) {
func emit(_ result: Result<Base.Element?, Failure>) {
let (resumptions, limitContinuation, demandContinuation, cancelled) = state.withLock {
state -> ([Resumption], UnsafeContinuation<Bool, Never>?, UnsafeContinuation<Void, Never>?, Bool) in
var resumptions = [Resumption]()
Expand Down Expand Up @@ -533,12 +532,12 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab

private func nextIteration(
_ id: Int
) async -> Result<AsyncShareSequence<Base>.Element?, AsyncShareSequence<Base>.Failure> {
) async -> Result<Base.Element?, Failure> {
return await withTaskCancellationHandler {
await withUnsafeContinuation { continuation in
let (res, limitContinuation, demandContinuation, cancelled) = state.withLock {
state -> (
Result<Element?, Failure>?, UnsafeContinuation<Bool, Never>?, UnsafeContinuation<Void, Never>?, Bool
Result<Base.Element?, Failure>?, UnsafeContinuation<Bool, Never>?, UnsafeContinuation<Void, Never>?, Bool
) in
guard let side = state.sides[id] else {
return state.emit(.success(nil), limit: limit)
Expand Down Expand Up @@ -591,20 +590,19 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
}
}

func next(isolation actor: isolated (any Actor)?, id: Int) async throws(Failure) -> Element? {
let (factory, cancelled) = state.withLock { state -> ((@Sendable () -> sending Base.AsyncIterator)?, Bool) in
switch state.iteratingTask {
case .pending(let factory):
state.iteratingTask = .starting
return (factory, false)
case .cancelled:
return (nil, true)
default:
return (nil, false)
func next(isolation actor: isolated (any Actor)?, id: Int) async throws(Failure) -> Base.Element? {
let iteratingTask = state.withLock { state -> IteratingTask in
defer {
if case .pending = state.iteratingTask {
state.iteratingTask = .starting
}
}
return state.iteratingTask
}
if cancelled { return nil }
if let factory {

if case .cancelled = iteratingTask { return nil }

if case .pending(let factory) = iteratingTask {
let task: Task<Void, Never>
// for the fancy dance of availability and canImport see the comment on the next check for details
#if swift(>=6.2)
Expand Down Expand Up @@ -659,7 +657,6 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
#else
return try await nextIteration(id).get()
#endif

}
}

Expand Down Expand Up @@ -698,29 +695,33 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
}

@available(AsyncAlgorithms 1.1, *)
extension AsyncShareSequence: AsyncSequence {
typealias Element = Base.Element
typealias Failure = Base.Failure

struct Iterator: AsyncIteratorProtocol {
extension AsyncShareSequence: AsyncSequence, AsyncFailureBackportable {
public typealias Element = Base.Element
public struct Iterator: AsyncIteratorProtocol, _SendableMetatype {
let side: Side

init(_ iteration: Iteration) {
side = Side(iteration)
}

mutating func next() async rethrows -> Element? {
mutating public func next() async rethrows -> Element? {
try await side.next(isolation: nil)
}

mutating func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? {
try await side.next(isolation: actor)
}
// mutating public func next(isolation actor: isolated (any Actor)?) async throws(Self.BackportableFailure) -> Element? {
// try await side.next(isolation: actor)
// }
}

func makeAsyncIterator() -> Iterator {
public func makeAsyncIterator() -> Iterator {
Iterator(extent.iteration)
}
}

@available(AsyncAlgorithms 1.2, *)
extension AsyncShareSequence.Iterator {
mutating public func next(isolation actor: isolated (any Actor)?) async throws(Base.Failure) -> Base.Element? {
try await side.next(isolation: actor)
}
}
#endif
18 changes: 18 additions & 0 deletions Sources/AsyncAlgorithms/Shims.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

import Foundation

#if compiler(>=6.2)
public typealias _SendableMetatype = SendableMetatype
#else
public typealias _SendableMetatype = Any
#endif
Loading