Skip to content

Commit e32e9ef

Browse files
committed
[WIP] Draft pitch for share
1 parent 3997ce3 commit e32e9ef

File tree

1 file changed

+178
-0
lines changed

1 file changed

+178
-0
lines changed

Evolution/NNNN-share.md

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
# Share
2+
3+
## Introduction
4+
5+
Many of the AsyncSequence adopting types only permit a one singular consumption. However there are many times that the same produced values are useful in more than one place. Out of that mechanism there are a few approaches to share, distribute, and broadcast those values. This proposal will focus on one concept; sharing. Sharing is where each consumption independently can make forward progress and get the same values but do not replay from the beginning of time.
6+
7+
## Motivation
8+
9+
There are many potential usages for the sharing concept of AsyncSequences.
10+
11+
One such example is the case where a source of data as an asynchronous sequence needs to be consumed by updating UI, logging, and additionally a network connection. This particular case does not matter on which uses but instead that those uses are independent of each other. It would not be expected for networking to block or delay the updates to UI, nor should logging. This example case also illustrates that the isolation of each side might be different and that some of the sides may not tolerate coalescing or dropping values.
12+
13+
There are many other use cases that have been requested for this family of algorithms. Since the release of AsyncAlgorithms it has perhaps been the most popularly requested set of behaviors as additions to the package.
14+
15+
## Proposed solution
16+
17+
AsyncAlgorithms will introduce a new extension function on AsyncSequence that will provide a shareable asynchronous sequence that will produce the same values upon iteration from multiple instances of it's AsyncIterator. Those iterations can take place in multiple isolations.
18+
19+
When values from a differing isolation cannot be coalesced, the two options available are either awaiting (an exertion of back-pressure across the sequences) or buffering (an internal back-pressure to a buffer). Replaying the values from the beginning of the creation of the sequence is a distinctly different behavior that should be considered a different use case. This then leaves the behavioral characteristic of this particular operation of share as; sharing a buffer of values started from the initialization of a new iteration of the sequence. Control over that buffer should then have options to determine the behavior, similar to how AsyncStream allows that control. It should have options to be unbounded, buffering the oldest count of elements, or buffering the newest count of elements.
20+
21+
It is critical to identify that this is one algorithm in the family of algorithms for sharing values. It should not attempt to solve all behavioral requirements but instead serve a common set of them that make cohesive sense together. This proposal is not mutually exclusive to the other algorithms in the sharing family.
22+
23+
## Detailed design
24+
25+
It is not just likely but perhaps a certainty that other algorithms will end up needing the same concept of a buffering policy beyond just AsyncStream and the new sharing mechanism. A new type in AsyncAlgorithms will be introduced to handle this. [^BufferingPolicy]
26+
27+
```swift
28+
/// A strategy that handles exhaustion of a buffer’s capacity.
29+
public enum BufferingPolicy: Sendable {
30+
/// Continue to add to the buffer, without imposing a limit on the number
31+
/// of buffered elements.
32+
case unbounded
33+
34+
/// When the buffer is full, discard the newly received element.
35+
///
36+
/// This strategy enforces keeping at most the specified number of oldest
37+
/// values.
38+
case bufferingOldest(Int)
39+
40+
/// When the buffer is full, discard the oldest element in the buffer.
41+
///
42+
/// This strategy enforces keeping at most the specified number of newest
43+
/// values.
44+
case bufferingNewest(Int)
45+
}
46+
```
47+
48+
A new extension will be added to return a concrete type representing the share algorithm. This extension will take a buffering policy to identify how the buffer will be handled when iterations do not consume at the same rate.
49+
50+
A new AsyncSequence type will be introduced that is explicitly marked as `Sendable`. This annotation identifies to the developer that this sequence can be shared and stored. Because the type is intended to be stored it cannot be returned by the extension as a `some AsyncSequence<Element, Failure> & Sendable` since that cannot be assigned to a stored property. Additionally the type of `AsyncShareSequence`, since indented to be stored, will act as a quasi erasing-barrier to the type information of previous sequences in the chain of algorithms in that it will only hold the generic information of the `Element` and `Failure` as part of it's public interface and not the "Base" asynchronous sequence it was created from.
51+
52+
```swift
53+
extension AsyncSequence where Element: Sendable {
54+
public func share(
55+
bufferingPolicy: BufferingPolicy = .unbounded
56+
) -> AsyncShareSequence<Element, Failure>
57+
}
58+
59+
public struct AsyncShareSequence<Element: Sendable, Failure: Error>: AsyncSequence, Sendable {
60+
public struct Iterator: AsyncIteratorProtocol {
61+
public mutating func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element?
62+
}
63+
64+
public func makeAsyncIterator() -> Iterator
65+
}
66+
67+
@available(*, unavailable)
68+
extension AsyncShareSequence.Iterator: Sendable { }
69+
```
70+
71+
The buffer internally to the share algorithm will only extend back to the furthest element available but there will only be a singular buffer shared across all iterators. This ensures that with the application of the buffering policy the storage size is as minimal as possible while still allowing all iterations to avoid dropping values and keeping the memory usage in check.
72+
73+
## Runtime Behavior
74+
75+
The construction of the `AsyncShareSequence` will initially construct a shared iteration reference. This means that all instances of the structure of the `AsyncShareSequence` will reference to the same iteration.
76+
77+
Upon creation of the `Iterator` via `makeAsyncIterator` a new "side" will be constructed to identify the specific iterator interacting with the shared iteration. Then when next is invoked is where the first actual action takes place.
78+
79+
The next method will first checkout from a critical region the underlying AsyncIterator from the base. If that is successful (i.e. no other iteration sides have already checked it out) then it will invoke the next method of that iterator (forwarding in the actor isolation). If an element is produced then it enqueues the element to the shared buffer, checks in the iterator, adjusts the index in the buffer, and finds all pending continuations all in a shared critical region by a mutex. Then those continuations will be resumed with the given element.
80+
81+
If no element is returned by the base iterator (signifying the terminal state);then the process is similar except it will instead mark the sequence as finished and resume with nil to any active continuations. Similarly with failures that will set the state as terminal but also store the error for further iteration points that need eventual termination.
82+
83+
Then all sides are "drained" such that continuations are placed into the shared state and resumed when an element is available for that position.
84+
85+
Practically this all means that a given iteration may be "behind" another and can eventually catch up (provided it is within the buffer limit).
86+
87+
```swift
88+
let exampleSource = [0, 1, 2, 3, 4].async.share()
89+
90+
let t1 = Task {
91+
for await element in exampleSource {
92+
if element == 0 {
93+
try? await Task.sleep(for: .seconds(1))
94+
}
95+
print("Task 1", element)
96+
}
97+
}
98+
99+
let t2 = Task {
100+
for await element in exampleSource {
101+
if element == 3 {
102+
try? await Task.sleep(for: .seconds(1))
103+
}
104+
print("Task 2", element)
105+
}
106+
}
107+
108+
await t1.value
109+
await t2.value
110+
111+
```
112+
113+
This example will print a possible ordering of the following:
114+
115+
```
116+
Task 2 0
117+
Task 2 1
118+
Task 2 2
119+
Task 1 0
120+
Task 2 3
121+
Task 2 4
122+
Task 1 1
123+
Task 1 2
124+
Task 1 3
125+
Task 1 4
126+
```
127+
128+
The order of the interleaving of the prints are not guaranteed; however the order of the elements per iteration is. Likewise in this buffering case it is guaranteed that all values are represented in the output.
129+
130+
If the creation were altered to the following:
131+
132+
```swift
133+
let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .bufferingNewest(2))
134+
```
135+
136+
The output would print the possible ordering of:
137+
138+
```
139+
Task 2 0
140+
Task 2 1
141+
Task 2 2
142+
Task 1 0
143+
Task 2 4
144+
Task 1 3
145+
Task 1 4
146+
```
147+
148+
Some values are dropped due to the buffering policy, but eventually they reach consistency. Which similarly works for the following:
149+
150+
```
151+
let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .bufferingOldest(2))
152+
```
153+
154+
```
155+
Task 2 0
156+
Task 2 1
157+
Task 2 2
158+
Task 1 0
159+
Task 2 4
160+
Task 1 1
161+
Task 1 2
162+
```
163+
164+
However in this particular case the newest values are the dropped elements.
165+
166+
## Usage
167+
168+
It is expected that this operator will be unlike other
169+
170+
## Effect on API resilience
171+
172+
This is an additive API and no existing systems are changed, however it will introduce a few new types that will need to be maintained as ABI interfaces. Since the intent of this is to provide a mechanism to store AsyncSequences to a shared context the type must be exposed as ABI (for type sizing).
173+
174+
## Alternatives considered
175+
176+
[^BufferingPolicy] It has been considered that this particular policy would be nested inside the `AsyncShareSequence` type. However since this seems to be something that will be useful for other types it makes sense to expose it as a top level type. However if it is determined that a general form of a buffering policy would require additional behaviors this might be a debatable placement to move back to an interior type similar to AsyncStream.
177+
178+

0 commit comments

Comments
 (0)