Skip to content

Commit 8447eef

Browse files
committed
Update the proposal with some initial feedback and add a first draft implementation of share
1 parent e32e9ef commit 8447eef

File tree

2 files changed

+451
-50
lines changed

2 files changed

+451
-50
lines changed

Evolution/NNNN-share.md

Lines changed: 14 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -22,65 +22,33 @@ It is critical to identify that this is one algorithm in the family of algorithm
2222

2323
## Detailed design
2424

25-
It is not just likely but perhaps a certainty that other algorithms will end up needing the same concept of a buffering policy beyond just AsyncStream and the new sharing mechanism. A new type in AsyncAlgorithms will be introduced to handle this. [^BufferingPolicy]
25+
A new extension will be added to return a `Sendable` `AsyncSequence`. This extension will take a buffering policy to identify how the buffer will be handled when iterations do not consume at the same rate.
2626

27-
```swift
28-
/// A strategy that handles exhaustion of a buffer’s capacity.
29-
public enum BufferingPolicy: Sendable {
30-
/// Continue to add to the buffer, without imposing a limit on the number
31-
/// of buffered elements.
32-
case unbounded
33-
34-
/// When the buffer is full, discard the newly received element.
35-
///
36-
/// This strategy enforces keeping at most the specified number of oldest
37-
/// values.
38-
case bufferingOldest(Int)
39-
40-
/// When the buffer is full, discard the oldest element in the buffer.
41-
///
42-
/// This strategy enforces keeping at most the specified number of newest
43-
/// values.
44-
case bufferingNewest(Int)
45-
}
46-
```
47-
48-
A new extension will be added to return a concrete type representing the share algorithm. This extension will take a buffering policy to identify how the buffer will be handled when iterations do not consume at the same rate.
49-
50-
A new AsyncSequence type will be introduced that is explicitly marked as `Sendable`. This annotation identifies to the developer that this sequence can be shared and stored. Because the type is intended to be stored it cannot be returned by the extension as a `some AsyncSequence<Element, Failure> & Sendable` since that cannot be assigned to a stored property. Additionally the type of `AsyncShareSequence`, since indented to be stored, will act as a quasi erasing-barrier to the type information of previous sequences in the chain of algorithms in that it will only hold the generic information of the `Element` and `Failure` as part of it's public interface and not the "Base" asynchronous sequence it was created from.
27+
The `Sendable` annotation identifies to the developer that this sequence can be shared and stored in an existental `any`.
5128

5229
```swift
5330
extension AsyncSequence where Element: Sendable {
5431
public func share(
55-
bufferingPolicy: BufferingPolicy = .unbounded
56-
) -> AsyncShareSequence<Element, Failure>
32+
bufferingPolicy: AsyncBufferSequencePolicy = .unbounded
33+
) -> some AsyncSequence<Element, Failure> & Sendable
5734
}
58-
59-
public struct AsyncShareSequence<Element: Sendable, Failure: Error>: AsyncSequence, Sendable {
60-
public struct Iterator: AsyncIteratorProtocol {
61-
public mutating func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element?
62-
}
63-
64-
public func makeAsyncIterator() -> Iterator
65-
}
66-
67-
@available(*, unavailable)
68-
extension AsyncShareSequence.Iterator: Sendable { }
6935
```
7036

71-
The buffer internally to the share algorithm will only extend back to the furthest element available but there will only be a singular buffer shared across all iterators. This ensures that with the application of the buffering policy the storage size is as minimal as possible while still allowing all iterations to avoid dropping values and keeping the memory usage in check.
37+
The buffer internally to the share algorithm will only extend back to the furthest element available but there will only be a singular buffer shared across all iterators. This ensures that with the application of the buffering policy the storage size is as minimal as possible while still allowing all iterations to avoid dropping values and keeping the memory usage in check. The signature reuses the existing `AsyncBufferSequencePolicy` type to specify the behavior around buffering either responding to how it should limit emitting to the buffer or what should happen when the buffer is exceeded.
7238

7339
## Runtime Behavior
7440

75-
The construction of the `AsyncShareSequence` will initially construct a shared iteration reference. This means that all instances of the structure of the `AsyncShareSequence` will reference to the same iteration.
41+
The runtime behaviors fall into a few categories; ordering, iteration isolation, cancellation, and lifetimes. To understand the beahviors there are a terms useful to define. Each creation of the AsyncIterator of the sequence and invocation of next will be referred to a side of the share iteration. The back pressure to the system to fetch a new element or termination is refered to as demand. The limit which is the pending gate for awaiting until the buffer has been serviced used for the `AsyncBufferSequencePolicy.bounded(_ : Int)` policy. The last special definition is that of the extent which is specifically in this case the lifetime of the asynchronous sequence itself.
7642

77-
Upon creation of the `Iterator` via `makeAsyncIterator` a new "side" will be constructed to identify the specific iterator interacting with the shared iteration. Then when next is invoked is where the first actual action takes place.
43+
When the underlying type backing the share algorithm is constructed a new extent is created; this is used for tracking the reference lifetime under the hood and is used to both house the iteration but also to identify the point at which no more sides can be constructed. When no more sides can be constructed and no sides are left to iterate then the backing iteration is canceled. This prevents any un-referenced task backing the iteration to not be leaked by the algorith itself.
7844

79-
The next method will first checkout from a critical region the underlying AsyncIterator from the base. If that is successful (i.e. no other iteration sides have already checked it out) then it will invoke the next method of that iterator (forwarding in the actor isolation). If an element is produced then it enqueues the element to the shared buffer, checks in the iterator, adjusts the index in the buffer, and finds all pending continuations all in a shared critical region by a mutex. Then those continuations will be resumed with the given element.
45+
That construction then creates an initial shared state and buffer. No task is started initially; it is only upon the first demand that the task backing the iteration is started; this means on the first call to next a task is spun up servicing all potential sides. The order of which the sides are serviced is not specified and cannot be relied upon, however the order of delivery within a side is always guarenteed to be ordered. The singular task servicing the iteration will be the only place holding any sort of iterator from the base `AsyncSequence`; so that iterator is isolated and not sent from one isolation to another. That iteration first awaits any limit availability and then awaits for a demand given by a side. After-which it then awaits an element or terminal event from the iterator and enqueues the elements to the buffer.
8046

81-
If no element is returned by the base iterator (signifying the terminal state);then the process is similar except it will instead mark the sequence as finished and resume with nil to any active continuations. Similarly with failures that will set the state as terminal but also store the error for further iteration points that need eventual termination.
47+
The buffer itself is only held in one location, each side however has a cursor index into that buffer and when values are consumed it adjusts the indexes accordingly; leaving the buffer usage only as big as the largest deficit. This means that new sides that are started post initial start up will not have a "replay" effect; that is a similar but distinct algorithm and is not addressed by this proposal. Any buffer size sensitive systems that wish to adjust behavior should be aware that specifying a policy is a suggested step. However in common usage similar to other such systems servicing desktop and mobile applications the default and common behavior is to be unbounded. This allows for a progressive disclosure from common usage that just works out of the box with no configuration, to more advanced cases that need finer grained control. Furthermore there are scenarios where one might want ways of identifing dropped value events within the iteration of a side, this is something that will be addressed later in an upcoming proposal.
8248

83-
Then all sides are "drained" such that continuations are placed into the shared state and resumed when an element is available for that position.
49+
As previously stated, the isolation of the iteration of the upstream/base AsyncSequence is to a detached task, this ensures that individual sides can have independent cancellation. Those cancellations will have the effect of remvoing that side from the shared iteration and cleaning up accordingly (including adjusting the trimming of the internal buffer).
50+
51+
Representing concurrent access is difficult to express all potential examples but there are a few cases included with this proposal to illustrate some of the behaviors. If a more comprehensive behavioral analysis is needed, it is strongly suggested to try out the pending pull request to identify how specific behaviors work. Please keep in mind that the odering between tasks is not specified, only the order within one side of iteration.
8452

8553
Practically this all means that a given iteration may be "behind" another and can eventually catch up (provided it is within the buffer limit).
8654

@@ -127,7 +95,7 @@ Task 1 4
12795

12896
The order of the interleaving of the prints are not guaranteed; however the order of the elements per iteration is. Likewise in this buffering case it is guaranteed that all values are represented in the output.
12997

130-
If the creation were altered to the following:
98+
If the creation were instead altered to the following:
13199

132100
```swift
133101
let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .bufferingNewest(2))
@@ -163,16 +131,12 @@ Task 1 2
163131

164132
However in this particular case the newest values are the dropped elements.
165133

166-
## Usage
167-
168-
It is expected that this operator will be unlike other
169-
170134
## Effect on API resilience
171135

172136
This is an additive API and no existing systems are changed, however it will introduce a few new types that will need to be maintained as ABI interfaces. Since the intent of this is to provide a mechanism to store AsyncSequences to a shared context the type must be exposed as ABI (for type sizing).
173137

174138
## Alternatives considered
175139

176-
[^BufferingPolicy] It has been considered that this particular policy would be nested inside the `AsyncShareSequence` type. However since this seems to be something that will be useful for other types it makes sense to expose it as a top level type. However if it is determined that a general form of a buffering policy would require additional behaviors this might be a debatable placement to move back to an interior type similar to AsyncStream.
140+
It has been considered that the buffering policy would be nested inside the `AsyncShareSequence` type. However since this seems to be something that will be useful for other types it makes sense to use an existing type from a top level type. However if it is determined that a general form of a buffering policy would require additional behaviors this might be a debatable placement to move back to an interior type similar to AsyncStream.
177141

178142

0 commit comments

Comments
 (0)