Skip to content

Commit e342eb0

Browse files
committed
Update the implementation of share to handle sendability requirements, address some edge cases and update to the latest version of the discussion around buffering behaviors as well as adding some documentation and commentary
1 parent 4322be7 commit e342eb0

File tree

2 files changed

+307
-64
lines changed

2 files changed

+307
-64
lines changed

Evolution/NNNN-share.md

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ The `Sendable` annotation identifies to the developer that this sequence can be
2929
```swift
3030
extension AsyncSequence where Element: Sendable {
3131
public func share(
32-
bufferingPolicy: AsyncBufferSequencePolicy = .unbounded
32+
bufferingPolicy: AsyncBufferSequencePolicy = .bounded(1)
3333
) -> some AsyncSequence<Element, Failure> & Sendable
3434
}
3535
```
@@ -44,7 +44,7 @@ When the underlying type backing the share algorithm is constructed a new extent
4444

4545
That construction then creates an initial shared state and buffer. No task is started initially; it is only upon the first demand that the task backing the iteration is started; this means on the first call to next a task is spun up servicing all potential sides. The order of which the sides are serviced is not specified and cannot be relied upon, however the order of delivery within a side is always guarenteed to be ordered. The singular task servicing the iteration will be the only place holding any sort of iterator from the base `AsyncSequence`; so that iterator is isolated and not sent from one isolation to another. That iteration first awaits any limit availability and then awaits for a demand given by a side. After-which it then awaits an element or terminal event from the iterator and enqueues the elements to the buffer.
4646

47-
The buffer itself is only held in one location, each side however has a cursor index into that buffer and when values are consumed it adjusts the indexes accordingly; leaving the buffer usage only as big as the largest deficit. This means that new sides that are started post initial start up will not have a "replay" effect; that is a similar but distinct algorithm and is not addressed by this proposal. Any buffer size sensitive systems that wish to adjust behavior should be aware that specifying a policy is a suggested step. However in common usage similar to other such systems servicing desktop and mobile applications the default and common behavior is to be unbounded. This allows for a progressive disclosure from common usage that just works out of the box with no configuration, to more advanced cases that need finer grained control. Furthermore there are scenarios where one might want ways of identifing dropped value events within the iteration of a side, this is something that will be addressed later in an upcoming proposal.
47+
The buffer itself is only held in one location, each side however has a cursor index into that buffer and when values are consumed it adjusts the indexes accordingly; leaving the buffer usage only as big as the largest deficit. This means that new sides that are started post initial start up will not have a "replay" effect; that is a similar but distinct algorithm and is not addressed by this proposal. Any buffer size sensitive systems that wish to adjust behavior should be aware that specifying a policy is a suggested step. However in common usage similar to other such systems servicing desktop and mobile applications the common behavior is often unbounded. Alternatively desktop or mobile applications will often want `.bounded(1)` since that enforces the slowest consumption to drive the forward progress at most 1 buffered element. All of the use cases have a reasonable default of `.bounded(1)`; mobile, deskop, and server side uses. Leaving this as the default parameter keeps the progressive disclosure of the beahviors - such that the easiest thing to write is correct for all uses, and then more advanced control can be adjusted by passing in a specific policy. This default argument diverges slightly from AsyncStream, but follows a similar behavior to that of Combine's `share`.
4848

4949
As previously stated, the isolation of the iteration of the upstream/base AsyncSequence is to a detached task, this ensures that individual sides can have independent cancellation. Those cancellations will have the effect of remvoing that side from the shared iteration and cleaning up accordingly (including adjusting the trimming of the internal buffer).
5050

@@ -53,7 +53,7 @@ Representing concurrent access is difficult to express all potential examples bu
5353
Practically this all means that a given iteration may be "behind" another and can eventually catch up (provided it is within the buffer limit).
5454

5555
```swift
56-
let exampleSource = [0, 1, 2, 3, 4].async.share()
56+
let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .unbounded)
5757

5858
let t1 = Task {
5959
for await element in exampleSource {
@@ -131,6 +131,51 @@ Task 1 2
131131

132132
However in this particular case the newest values are the dropped elements.
133133

134+
The `.bounded(N)` policy enforces consumption to prevent any side from being beyond a given amount away from other sides' consumption.
135+
136+
```swift
137+
let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .bounded(1))
138+
139+
let t1 = Task {
140+
for await element in exampleSource {
141+
if element == 0 {
142+
try? await Task.sleep(for: .seconds(1))
143+
}
144+
print("Task 1", element)
145+
}
146+
}
147+
148+
let t2 = Task {
149+
for await element in exampleSource {
150+
if element == 3 {
151+
try? await Task.sleep(for: .seconds(1))
152+
}
153+
print("Task 2", element)
154+
}
155+
}
156+
157+
await t1.value
158+
await t2.value
159+
```
160+
161+
Will have a potential ordering output of:
162+
163+
```
164+
Task 2 0
165+
Task 2 1
166+
Task 1 0
167+
Task 1 1
168+
Task 2 2
169+
Task 1 2
170+
Task 1 3
171+
Task 1 4
172+
Task 2 3
173+
Task 2 4
174+
```
175+
176+
In that example output Task 2 can get element 0 and 1 but must await until task 1 has caught up to the specified buffering. This limit means that no additional iteration (and no values are then dropped) is made until the buffer count is below the specified value.
177+
178+
134179
## Effect on API resilience
135180

136181
This is an additive API and no existing systems are changed, however it will introduce a few new types that will need to be maintained as ABI interfaces. Since the intent of this is to provide a mechanism to store AsyncSequences to a shared context the type must be exposed as ABI (for type sizing).

0 commit comments

Comments
 (0)