Skip to content

Commit 7ab3c29

Browse files
Added AsyncReadUpToCountSequence
1 parent e8e9f26 commit 7ab3c29

File tree

2 files changed

+498
-0
lines changed

2 files changed

+498
-0
lines changed
Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
//
2+
// AsyncReadUpToCountSequence.swift
3+
// AsyncSequenceReader
4+
//
5+
// Created by Dimitri Bouniol on 2021-11-17.
6+
// Copyright © 2021 Mochi Development, Inc. All rights reserved.
7+
//
8+
9+
#if compiler(>=5.5) && canImport(_Concurrency)
10+
11+
extension AsyncIteratorProtocol {
12+
/// Asynchronously advances by the specified number of elements, or ends the sequence if there is no next element.
13+
///
14+
/// If a complete array could not be collected, an error is thrown and the sequence should be considered finished.
15+
/// - Parameter count: The number of bytes to collect.
16+
/// - Returns: A collection with exactly `count` elements, or `nil` if the sequence is finished.
17+
/// - Throws: `AsyncSequenceReaderError.insufficientElements` if a complete byte sequence could not be returned by the time the sequence ended.
18+
public mutating func collect(_ count: Int) async throws -> [Element]? {
19+
assert(count >= 0, "count must be larger than 0")
20+
return try await collect(min: count, max: count)
21+
}
22+
23+
/// Asynchronously advances by the specified minimum number of elements, continuing until the specified maximum number of elements, or ends the sequence if there is no next element.
24+
///
25+
/// If a complete array larger than `minCount` could not be constructed, an error is thrown and the sequence should be considered finished.
26+
/// - Parameter minCount: The minimum number of elements to collect.
27+
/// - Parameter maxCount: The maximum number of elements to collect.
28+
/// - Returns: A collection with at least `minCount` and at most `maxCount` elements, or `nil` if the sequence is finished.
29+
/// - Throws: `AsyncSequenceReaderError.insufficientElements` if a complete byte sequence could not be returned by the time the sequence ended.
30+
public mutating func collect(min minCount: Int = 0, max maxCount: Int) async throws -> [Element]? {
31+
precondition(minCount <= maxCount, "maxCount must be larger than or equal to minCount")
32+
precondition(minCount >= 0, "minCount must be larger than 0")
33+
var result = [Element]()
34+
result.reserveCapacity(minCount)
35+
36+
while let next = try await next() {
37+
result.append(next)
38+
39+
if result.count == maxCount {
40+
return result
41+
}
42+
}
43+
44+
guard !result.isEmpty else { return nil }
45+
46+
guard result.count >= minCount else {
47+
throw AsyncSequenceReaderError.insufficientElements(minimum: minCount, actual: result.count)
48+
}
49+
50+
return result
51+
}
52+
53+
/// Collect the specified number of elements into a sequence, and transform it using the provided closure.
54+
///
55+
/// In this example, an asynchronous sequence of Strings encodes sentences by prefixing each word sequence with a number.
56+
/// The number indicates how many words will be read and concatenated into a complete sentence.
57+
///
58+
/// The closure provided to the `iteratorMap(_:)` first reads the first available string, interpreting it as a number.
59+
/// Then, it will collect the next `count` elements into a new sequence, transforming it into a sentence as those elements become available.
60+
///
61+
/// let dataStream = ... // "2", "Hello,", "World!", "4", "My", "name", "is", "Dimitri.", "0", "1", "Bye!"
62+
///
63+
/// let sentenceStream = dataStream.iteratorMap { iterator -> String? in
64+
/// guard var count = Int(try await iterator.next() ?? "") else {
65+
/// throw SentenceParsing.invalidWordCount
66+
/// }
67+
///
68+
/// return try await iterator.collect(count) { sequence -> String in
69+
/// try await sequence.reduce(into: "") { $0 += ($0.isEmpty ? "" : " ") + $1 }
70+
/// }
71+
/// }
72+
///
73+
/// for await sentence in sentenceStream {
74+
/// print("\"\(sentence)\"", terminator: ", ")
75+
/// }
76+
/// // Prints: "Hello, World!", "My name is Dimitri.", "", "Bye!"
77+
///
78+
/// - Parameter count: The number of elements the `sequenceTransform` closure will have access to.
79+
/// - Parameter sequenceTransform: A transformation that accepts a sequence of the specified size that can be read from, or stopped prematurely by returning early. The receiving iterator will have moved forward by the same amount of items consumed within `sequenceTransform`.
80+
/// - Returns: A transformed value as returned by `sequenceTransform`, or `nil` if the sequence was already finished.
81+
/// - Throws: `AsyncSequenceReaderError.insufficientElements` if a complete byte sequence could not be returned by the time the sequence ended.
82+
public mutating func collect<Transformed>(
83+
_ count: Int,
84+
sequenceTransform: (AsyncReadUpToCountSequence<Self>) async throws -> Transformed
85+
) async rethrows -> Transformed? {
86+
assert(count >= 0, "count must be larger than 0")
87+
return try await collect(min: count, max: count, sequenceTransform: sequenceTransform)
88+
}
89+
90+
/// Collect the minimum number of elements, continuing until the specified maximum number of elements, into a sequence, and transform it using the provided closure.
91+
///
92+
/// In this example, an asynchronous sequence of Strings encodes sentences by prefixing each word sequence with a number.
93+
/// The number indicates how many words will be read and concatenated into a complete sentence.
94+
///
95+
/// The closure provided to the `iteratorMap(_:)` first reads the first available string, interpreting it as a number.
96+
/// Then, it will collect the next `count` elements into a new sequence, transforming it into a sentence as those elements become available.
97+
///
98+
/// let dataStream = ... // "2", "Hello,", "World!", "4", "My", "name", "is", "Dimitri.", "0", "2", "Bye?"
99+
///
100+
/// let sentenceStream = dataStream.iteratorMap { iterator -> String? in
101+
/// guard var count = Int(try await iterator.next() ?? "") else {
102+
/// throw SentenceParsing.invalidWordCount
103+
/// }
104+
///
105+
/// return try await iterator.collect(min: 0, max: count) { sequence -> String in
106+
/// try await sequence.reduce(into: "") { $0 += ($0.isEmpty ? "" : " ") + $1 }
107+
/// }
108+
/// }
109+
///
110+
/// for await sentence in sentenceStream {
111+
/// print("\"\(sentence)\"", terminator: ", ")
112+
/// }
113+
/// // Prints: "Hello, World!", "My name is Dimitri.", "", "Bye?"
114+
///
115+
/// - Parameter minCount: The minimum number of elements the `sequenceTransform` closure will attempt have access to. If this number cannot be guaranteed, an error will be thrown.
116+
/// - Parameter maxCount: The maximum number of elements the `sequenceTransform` closure will have access to.
117+
/// - Parameter sequenceTransform: A transformation that accepts a sequence of the specified size that can be read from, or stopped prematurely by returning early. The receiving iterator will have moved forward by the same amount of items consumed within `sequenceTransform`.
118+
/// - Returns: A transformed value as returned by `sequenceTransform`, or `nil` if the sequence was already finished.
119+
/// - Throws: `AsyncSequenceReaderError.insufficientElements` if a complete byte sequence could not be returned by the time the sequence ended.
120+
public mutating func collect<Transformed>(
121+
min minCount: Int = 0,
122+
max maxCount: Int,
123+
sequenceTransform: (AsyncReadUpToCountSequence<Self>) async throws -> Transformed
124+
) async rethrows -> Transformed? {
125+
try await transform(with: sequenceTransform) { .init($0, minCount: minCount, maxCount: maxCount) }
126+
}
127+
}
128+
129+
extension AsyncBufferedIterator {
130+
/// Collect the specified number of elements into a sequence, and transform it using the provided closure.
131+
///
132+
/// In this example, an asynchronous sequence of Strings encodes sentences by prefixing each word sequence with a number.
133+
/// The number indicates how many words will be read and concatenated into a complete sentence.
134+
///
135+
/// The closure provided to the `iteratorMap(_:)` first reads the first available string, interpreting it as a number.
136+
/// Then, it will collect the next `count` elements into a new sequence, transforming it into a sentence as those elements become available.
137+
///
138+
/// let dataStream = ... // "2", "Hello,", "World!", "4", "My", "name", "is", "Dimitri.", "0", "1", "Bye!"
139+
///
140+
/// let sentenceStream = dataStream.iteratorMap { iterator -> String? in
141+
/// guard var count = Int(try await iterator.next() ?? "") else {
142+
/// throw SentenceParsing.invalidWordCount
143+
/// }
144+
///
145+
/// return try await iterator.collect(count) { sequence -> String in
146+
/// try await sequence.reduce(into: "") { $0 += ($0.isEmpty ? "" : " ") + $1 }
147+
/// }
148+
/// }
149+
///
150+
/// for await sentence in sentenceStream {
151+
/// print("\"\(sentence)\"", terminator: ", ")
152+
/// }
153+
/// // Prints: "Hello, World!", "My name is Dimitri.", "", "Bye!"
154+
///
155+
/// - Parameter count: The number of elements the `sequenceTransform` closure will have access to.
156+
/// - Parameter sequenceTransform: A transformation that accepts a sequence of the specified size that can be read from, or stopped prematurely by returning early. The receiving iterator will have moved forward by the same amount of items consumed within `sequenceTransform`.
157+
/// - Returns: A transformed value as returned by `sequenceTransform`, or `nil` if the sequence was already finished.
158+
/// - Throws: `AsyncSequenceReaderError.insufficientElements` if a complete byte sequence could not be returned by the time the sequence ended.
159+
public mutating func collect<Transformed>(
160+
_ count: Int,
161+
sequenceTransform: (AsyncReadUpToCountSequence<BaseIterator>) async throws -> Transformed
162+
) async rethrows -> Transformed? {
163+
assert(count >= 0, "count must be larger than 0")
164+
return try await collect(min: count, max: count, sequenceTransform: sequenceTransform)
165+
}
166+
167+
/// Collect the minimum number of elements, continuing until the specified maximum number of elements, into a sequence, and transform it using the provided closure.
168+
///
169+
/// In this example, an asynchronous sequence of Strings encodes sentences by prefixing each word sequence with a number.
170+
/// The number indicates how many words will be read and concatenated into a complete sentence.
171+
///
172+
/// The closure provided to the `iteratorMap(_:)` first reads the first available string, interpreting it as a number.
173+
/// Then, it will collect the next `count` elements into a new sequence, transforming it into a sentence as those elements become available.
174+
///
175+
/// let dataStream = ... // "2", "Hello,", "World!", "4", "My", "name", "is", "Dimitri.", "0", "2", "Bye?"
176+
///
177+
/// let sentenceStream = dataStream.iteratorMap { iterator -> String? in
178+
/// guard var count = Int(try await iterator.next() ?? "") else {
179+
/// throw SentenceParsing.invalidWordCount
180+
/// }
181+
///
182+
/// return try await iterator.collect(min: 0, max: count) { sequence -> String in
183+
/// try await sequence.reduce(into: "") { $0 += ($0.isEmpty ? "" : " ") + $1 }
184+
/// }
185+
/// }
186+
///
187+
/// for await sentence in sentenceStream {
188+
/// print("\"\(sentence)\"", terminator: ", ")
189+
/// }
190+
/// // Prints: "Hello, World!", "My name is Dimitri.", "", "Bye?"
191+
///
192+
/// - Parameter minCount: The minimum number of elements the `sequenceTransform` closure will attempt have access to. If this number cannot be guaranteed, an error will be thrown.
193+
/// - Parameter maxCount: The maximum number of elements the `sequenceTransform` closure will have access to.
194+
/// - Parameter sequenceTransform: A transformation that accepts a sequence of the specified size that can be read from, or stopped prematurely by returning early. The receiving iterator will have moved forward by the same amount of items consumed within `sequenceTransform`.
195+
/// - Returns: A transformed value as returned by `sequenceTransform`, or `nil` if the sequence was already finished.
196+
/// - Throws: `AsyncSequenceReaderError.insufficientElements` if a complete byte sequence could not be returned by the time the sequence ended.
197+
public mutating func collect<Transformed>(
198+
min minCount: Int = 0,
199+
max maxCount: Int,
200+
sequenceTransform: (AsyncReadUpToCountSequence<BaseIterator>) async throws -> Transformed
201+
) async rethrows -> Transformed? {
202+
try await transform(with: sequenceTransform) { .init($0, minCount: minCount, maxCount: maxCount) }
203+
}
204+
}
205+
206+
/// An asynchronous sequence that will read from a mutable iterator so long as the specified conditions are valid.
207+
public class AsyncReadUpToCountSequence<BaseIterator: AsyncIteratorProtocol>: AsyncReadSequence {
208+
/// The baseIterator to read from.
209+
///
210+
/// When finished with the sequence, callers should read back this value so they can continue iterating on the sequence.
211+
public var baseIterator: AsyncBufferedIterator<BaseIterator>
212+
213+
@usableFromInline
214+
let minCount: Int
215+
216+
@usableFromInline
217+
let maxCount: Int
218+
219+
@usableFromInline
220+
init(
221+
_ baseIterator: AsyncBufferedIterator<BaseIterator>,
222+
minCount: Int,
223+
maxCount: Int
224+
) {
225+
precondition(minCount <= maxCount, "maxCount must be larger than or equal to minCount")
226+
precondition(minCount >= 0, "minCount must be larger than 0")
227+
self.baseIterator = baseIterator
228+
self.minCount = minCount
229+
self.maxCount = maxCount
230+
}
231+
}
232+
233+
extension AsyncReadUpToCountSequence: AsyncSequence {
234+
/// The type of element produced by this asynchronous sequence.
235+
///
236+
/// The read sequence produces whatever type of element its base iterator produces.
237+
public typealias Element = BaseIterator.Element
238+
239+
/// The iterator that produces elements of the read sequence.
240+
public struct AsyncIterator: AsyncIteratorProtocol {
241+
@usableFromInline
242+
var readSequence: AsyncReadUpToCountSequence
243+
244+
@usableFromInline
245+
var numberOfElementsRead = 0
246+
247+
@usableFromInline
248+
init(_ readSequence: AsyncReadUpToCountSequence) {
249+
self.readSequence = readSequence
250+
}
251+
252+
/// Produces the next element in the sequence.
253+
///
254+
/// This iterator checks if `numberOfElementsRead` has exceeded the max size for the sequence. If it has not, then it'll read until it does. If the next value read marks the end of the sequence, but the minimum size has not yet been reached, an error is thrown.
255+
@inlinable
256+
public mutating func next() async throws -> Element? {
257+
guard numberOfElementsRead < readSequence.maxCount else { return nil }
258+
guard let next = try await readSequence.baseIterator.next() else {
259+
260+
guard numberOfElementsRead >= readSequence.minCount else {
261+
throw AsyncSequenceReaderError.insufficientElements(minimum: readSequence.minCount, actual: numberOfElementsRead)
262+
}
263+
264+
return nil
265+
}
266+
267+
numberOfElementsRead += 1
268+
return next
269+
}
270+
}
271+
272+
@inlinable
273+
public func makeAsyncIterator() -> AsyncIterator {
274+
AsyncIterator(self)
275+
}
276+
}
277+
278+
#endif

0 commit comments

Comments
 (0)