Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 64 additions & 68 deletions Sources/AsyncAlgorithms/AsyncShareSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@
//
//===----------------------------------------------------------------------===//

#if compiler(>=6.2)

import Synchronization
import DequeModule

@available(AsyncAlgorithms 1.1, *)
@available(AsyncAlgorithms 1.0, *)
extension AsyncSequence
where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype {
where Element: Sendable, Self: _SendableMetatype, AsyncIterator: _SendableMetatype {
/// Creates a shared async sequence that allows multiple concurrent iterations over a single source.
///
/// The `share` method transforms an async sequence into a shareable sequence that can be safely
Expand Down Expand Up @@ -67,7 +65,7 @@ where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype
///
public func share(
bufferingPolicy: AsyncBufferSequencePolicy = .bounded(1)
) -> some AsyncSequence<Element, Failure> & Sendable {
) -> AsyncShareSequence<Self> {
// The iterator is transferred to the isolation of the iterating task
// this has to be done "unsafely" since we cannot annotate the transfer
// however since iterating an AsyncSequence types twice has been defined
Expand Down Expand Up @@ -114,9 +112,9 @@ where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype
//
// This type is typically not used directly; instead, use the `share()` method on any
// async sequence that meets the sendability requirements.
@available(AsyncAlgorithms 1.1, *)
struct AsyncShareSequence<Base: AsyncSequence>: Sendable
where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: SendableMetatype {
@available(AsyncAlgorithms 1.0, *)
public struct AsyncShareSequence<Base: AsyncSequence>: Sendable
where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _SendableMetatype {
// Represents a single consumer's connection to the shared sequence.
//
// Each iterator of the shared sequence creates its own `Side` instance, which tracks
Expand All @@ -135,7 +133,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
// - `continuation`: The continuation waiting for the next element (nil if not waiting)
// - `position`: The consumer's current position in the shared buffer
struct State {
var continuation: UnsafeContinuation<Result<Element?, Failure>, Never>?
var continuation: UnsafeContinuation<Result<Base.Element?, Error>, Never>?
var position = 0

// Creates a new state with the position adjusted by the given offset.
Expand All @@ -162,7 +160,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
iteration.unregisterSide(id)
}

func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? {
func next(isolation actor: isolated (any Actor)?) async throws -> Base.Element? {
try await iteration.next(isolation: actor, id: id)
}
}
Expand Down Expand Up @@ -230,9 +228,9 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
var generation = 0
var sides = [Int: Side.State]()
var iteratingTask: IteratingTask
private(set) var buffer = Deque<Element>()
private(set) var buffer = Deque<Base.Element>()
private(set) var finished = false
private(set) var failure: Failure?
private(set) var failure: Error?
var cancelled = false
var limit: UnsafeContinuation<Bool, Never>?
var demand: UnsafeContinuation<Void, Never>?
Expand Down Expand Up @@ -311,7 +309,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
// **Buffering Newest**: Appends if under the limit, otherwise removes the oldest and appends
//
// - Parameter element: The element to add to the buffer
mutating func enqueue(_ element: Element) {
mutating func enqueue(_ element: Base.Element) {
let count = buffer.count

switch storagePolicy {
Expand All @@ -335,20 +333,20 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
finished = true
}

mutating func fail(_ error: Failure) {
mutating func fail(_ error: Error) {
finished = true
failure = error
}
}

let state: Mutex<State>
let state: ManagedCriticalState<State>
let limit: Int?

init(
_ iteratorFactory: @escaping @Sendable () -> sending Base.AsyncIterator,
bufferingPolicy: AsyncBufferSequencePolicy
) {
state = Mutex(State(iteratorFactory, bufferingPolicy: bufferingPolicy))
state = ManagedCriticalState(State(iteratorFactory, bufferingPolicy: bufferingPolicy))
switch bufferingPolicy.policy {
case .bounded(let limit):
self.limit = limit
Expand Down Expand Up @@ -478,15 +476,15 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
}

struct Resumption {
let continuation: UnsafeContinuation<Result<Element?, Failure>, Never>
let result: Result<Element?, Failure>
let continuation: UnsafeContinuation<Result<Base.Element?, Error>, Never>
let result: Result<Base.Element?, Error>

func resume() {
continuation.resume(returning: result)
}
}

func emit(_ result: Result<Element?, Failure>) {
func emit(_ result: Result<Base.Element?, Error>) {
let (resumptions, limitContinuation, demandContinuation, cancelled) = state.withLock {
state -> ([Resumption], UnsafeContinuation<Bool, Never>?, UnsafeContinuation<Void, Never>?, Bool) in
var resumptions = [Resumption]()
Expand Down Expand Up @@ -533,12 +531,12 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab

private func nextIteration(
_ id: Int
) async -> Result<AsyncShareSequence<Base>.Element?, AsyncShareSequence<Base>.Failure> {
) async -> Result<Base.Element?, Error> {
return await withTaskCancellationHandler {
await withUnsafeContinuation { continuation in
let (res, limitContinuation, demandContinuation, cancelled) = state.withLock {
state -> (
Result<Element?, Failure>?, UnsafeContinuation<Bool, Never>?, UnsafeContinuation<Void, Never>?, Bool
Result<Base.Element?, Error>?, UnsafeContinuation<Bool, Never>?, UnsafeContinuation<Void, Never>?, Bool
) in
guard let side = state.sides[id] else {
return state.emit(.success(nil), limit: limit)
Expand Down Expand Up @@ -587,50 +585,49 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
}
}
} catch {
emit(.failure(error as! Failure))
emit(.failure(error))
}
}

func next(isolation actor: isolated (any Actor)?, id: Int) async throws(Failure) -> Element? {
let (factory, cancelled) = state.withLock { state -> ((@Sendable () -> sending Base.AsyncIterator)?, Bool) in
switch state.iteratingTask {
case .pending(let factory):
state.iteratingTask = .starting
return (factory, false)
case .cancelled:
return (nil, true)
default:
return (nil, false)
}
}
if cancelled { return nil }
if let factory {
let task: Task<Void, Never>
// for the fancy dance of availability and canImport see the comment on the next check for details
#if swift(>=6.2)
if #available(macOS 26.0, iOS 26.0, tvOS 26.0, visionOS 26.0, *) {
task = Task(name: "Share Iteration") { [factory, self] in
await iterationLoop(factory: factory)
}
} else {
task = Task.detached(name: "Share Iteration") { [factory, self] in
await iterationLoop(factory: factory)
func next(isolation actor: isolated (any Actor)?, id: Int) async throws -> Base.Element? {
let iteratingTask = state.withLock { state -> IteratingTask in
defer {
if case .pending = state.iteratingTask {
state.iteratingTask = .starting
}
}
return state.iteratingTask
}
#else
task = Task.detached { [factory, self] in
await iterationLoop(factory: factory)
}
#endif
// Known Issue: there is a very small race where the task may not get a priority escalation during startup
// this unfortuantely cannot be avoided since the task should ideally not be formed within the critical
// region of the state. Since that could lead to potential deadlocks in low-core-count systems.
// That window is relatively small and can be revisited if a suitable proof of safe behavior can be
// determined.
state.withLock { state in
precondition(state.iteratingTask.isStarting)
state.iteratingTask = .running(task)
}

if case .cancelled = iteratingTask { return nil }

if case .pending(let factory) = iteratingTask {
let task: Task<Void, Never>
// for the fancy dance of availability and canImport see the comment on the next check for details
#if swift(>=6.2)
if #available(macOS 26.0, iOS 26.0, tvOS 26.0, visionOS 26.0, *) {
task = Task(name: "Share Iteration") { [factory, self] in
await iterationLoop(factory: factory)
}
} else {
task = Task.detached(name: "Share Iteration") { [factory, self] in
await iterationLoop(factory: factory)
}
}
#else
task = Task.detached { [factory, self] in
await iterationLoop(factory: factory)
}
#endif
// Known Issue: there is a very small race where the task may not get a priority escalation during startup
// this unfortuantely cannot be avoided since the task should ideally not be formed within the critical
// region of the state. Since that could lead to potential deadlocks in low-core-count systems.
// That window is relatively small and can be revisited if a suitable proof of safe behavior can be
// determined.
state.withLock { state in
precondition(state.iteratingTask.isStarting)
state.iteratingTask = .running(task)
}
}

// withTaskPriorityEscalationHandler is only available for the '26 releases and the 6.2 version of
Expand Down Expand Up @@ -697,30 +694,29 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
}
}

@available(AsyncAlgorithms 1.1, *)
@available(AsyncAlgorithms 1.0, *)
extension AsyncShareSequence: AsyncSequence {
typealias Element = Base.Element
typealias Failure = Base.Failure
public typealias Element = Base.Element
public typealias Failure = Swift.Error

struct Iterator: AsyncIteratorProtocol {
public struct Iterator: AsyncIteratorProtocol {
let side: Side

init(_ iteration: Iteration) {
side = Side(iteration)
}

mutating func next() async rethrows -> Element? {
mutating public func next() async rethrows -> Element? {
try await side.next(isolation: nil)
}

mutating func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? {
mutating public func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? {
try await side.next(isolation: actor)
}
}

func makeAsyncIterator() -> Iterator {
public func makeAsyncIterator() -> Iterator {
Iterator(extent.iteration)
}
}

#endif
18 changes: 18 additions & 0 deletions Sources/AsyncAlgorithms/Shims.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

import Foundation

#if compiler(>=6.2)
public typealias _SendableMetatype = SendableMetatype
#else
public typealias _SendableMetatype = Any
#endif
Loading