Skip to content

Commit 695dc56

Browse files
Added AsyncSequenceReader
1 parent 04465e8 commit 695dc56

File tree

2 files changed

+271
-1
lines changed

2 files changed

+271
-1
lines changed

Sources/AsyncSequenceReader/AsyncSequenceReader.swift

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,62 @@
66
// Copyright © 2021 Mochi Development, Inc. All rights reserved.
77
//
88

9+
#if compiler(>=5.5) && canImport(_Concurrency)
10+
11+
/// An asynchronous sequence that will read from a mutable iterator so long as the specified condition is valid.
12+
///
13+
/// When finished, the iterator can be read back to read other values.
14+
public class AsyncSequenceReader<BaseIterator: AsyncIteratorProtocol>: AsyncReadSequence {
15+
/// The baseIterator to read from.
16+
///
17+
/// When finished with the sequence, callers should read back this value so they can continue iterating on the sequence.
18+
public var baseIterator: AsyncBufferedIterator<BaseIterator>
19+
20+
/// The closure to call when a new value is requested.
21+
///
22+
/// Note that implementers must **never** read from the iterator if they don't expect to forward the value. It is, however, acceptable to save the last value sent, and verify that before the next read to check if the read sequence should end early (by returning nil) or not. Ending a sequence early is completely supported, allowing more reads within a different context from occuring.
23+
@usableFromInline
24+
let read: (_ iterator: inout AsyncBufferedIterator<BaseIterator>) async throws -> Element?
25+
26+
@usableFromInline
27+
init(
28+
_ baseIterator: AsyncBufferedIterator<BaseIterator>,
29+
read: @escaping (_ iterator: inout AsyncBufferedIterator<BaseIterator>) async throws -> Element?
30+
) {
31+
self.baseIterator = baseIterator
32+
self.read = read
33+
}
34+
}
35+
36+
extension AsyncSequenceReader: AsyncSequence {
37+
/// The type of element produced by this asynchronous sequence.
38+
///
39+
/// The read sequence produces whatever type of element its base iterator produces.
40+
public typealias Element = BaseIterator.Element
41+
42+
/// The iterator that produces elements of the read sequence.
43+
public struct AsyncIterator: AsyncIteratorProtocol {
44+
@usableFromInline
45+
var readSequence: AsyncSequenceReader
46+
47+
@usableFromInline
48+
init(_ readSequence: AsyncSequenceReader) {
49+
self.readSequence = readSequence
50+
}
51+
52+
/// Produces the next element in the sequence.
53+
///
54+
/// This iterator calls `read()` with its base iterator, and lets that closure produce an appropriate result.
55+
@inlinable
56+
public mutating func next() async throws -> Element? {
57+
return try await readSequence.read(&readSequence.baseIterator)
58+
}
59+
}
60+
61+
@inlinable
62+
public func makeAsyncIterator() -> AsyncIterator {
63+
AsyncIterator(self)
64+
}
65+
}
66+
67+
#endif

Tests/AsyncSequenceReaderTests/AsyncSequenceReaderTests.swift

Lines changed: 212 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,218 @@ import XCTest
1010
@testable import AsyncSequenceReader
1111

1212
final class AsyncSequenceReaderTests: XCTestCase {
13-
func testExample() throws {
13+
// MARK: - Test Manual Iteration
14+
15+
func testReadSequenceFromStream() async throws {
16+
let testStream = AsyncStream<Int> { continuation in
17+
for value in 0..<10 {
18+
continuation.yield(value)
19+
}
20+
continuation.finish()
21+
}
1422

23+
var iterator = AsyncBufferedIterator(testStream.makeAsyncIterator())
24+
25+
let readSequenceA = AsyncSequenceReader(iterator) { iterator in
26+
let next = await iterator.next()
27+
return next
28+
}
29+
30+
var readSequenceAIterator = readSequenceA.makeAsyncIterator()
31+
32+
try await AsyncXCTAssertEqual(await readSequenceAIterator.next(), 0)
33+
try await AsyncXCTAssertEqual(await readSequenceAIterator.next(), 1)
34+
try await AsyncXCTAssertEqual(await readSequenceAIterator.next(), 2)
35+
try await AsyncXCTAssertEqual(await readSequenceAIterator.next(), 3)
36+
37+
iterator = readSequenceA.baseIterator
38+
39+
var totalRead: Int = 0
40+
let readSequenceB = AsyncSequenceReader(iterator) { iterator in
41+
guard totalRead < 4 else { return nil }
42+
let next = await iterator.next()
43+
totalRead += 1
44+
return next
45+
}
46+
47+
var readSequenceBIterator = readSequenceB.makeAsyncIterator()
48+
49+
try await AsyncXCTAssertEqual(await readSequenceBIterator.next(), 4)
50+
try await AsyncXCTAssertEqual(await readSequenceBIterator.next(), 5)
51+
try await AsyncXCTAssertEqual(await readSequenceBIterator.next(), 6)
52+
try await AsyncXCTAssertEqual(await readSequenceBIterator.next(), 7)
53+
try await AsyncXCTAssertEqual(await readSequenceBIterator.next(), nil)
54+
55+
iterator = readSequenceB.baseIterator
56+
57+
await AsyncXCTAssertEqual(await iterator.next(), 8)
58+
await AsyncXCTAssertEqual(await iterator.next(), 9)
59+
await AsyncXCTAssertEqual(await iterator.next(), nil)
60+
}
61+
62+
func testReadSequenceFromTestSequence() async throws {
63+
let testStream = TestSequence(base: 0..<10)
64+
65+
var iterator = AsyncBufferedIterator(testStream.makeAsyncIterator())
66+
67+
let readSequenceA = AsyncSequenceReader(iterator) { iterator in
68+
let next = await iterator.next()
69+
return next
70+
}
71+
72+
var readSequenceAIterator = readSequenceA.makeAsyncIterator()
73+
74+
try await AsyncXCTAssertEqual(await readSequenceAIterator.next(), 0)
75+
try await AsyncXCTAssertEqual(await readSequenceAIterator.next(), 1)
76+
try await AsyncXCTAssertEqual(await readSequenceAIterator.next(), 2)
77+
try await AsyncXCTAssertEqual(await readSequenceAIterator.next(), 3)
78+
79+
iterator = readSequenceA.baseIterator
80+
81+
var totalRead: Int = 0
82+
let readSequenceB = AsyncSequenceReader(iterator) { iterator in
83+
guard totalRead < 4 else { return nil }
84+
let next = await iterator.next()
85+
totalRead += 1
86+
return next
87+
}
88+
89+
var readSequenceBIterator = readSequenceB.makeAsyncIterator()
90+
91+
try await AsyncXCTAssertEqual(await readSequenceBIterator.next(), 4)
92+
try await AsyncXCTAssertEqual(await readSequenceBIterator.next(), 5)
93+
try await AsyncXCTAssertEqual(await readSequenceBIterator.next(), 6)
94+
try await AsyncXCTAssertEqual(await readSequenceBIterator.next(), 7)
95+
try await AsyncXCTAssertEqual(await readSequenceBIterator.next(), nil)
96+
97+
iterator = readSequenceB.baseIterator
98+
99+
await AsyncXCTAssertEqual(await iterator.next(), 8)
100+
await AsyncXCTAssertEqual(await iterator.next(), 9)
101+
await AsyncXCTAssertEqual(await iterator.next(), nil)
102+
}
103+
104+
func testReadSequenceFromThrowingTestSequence() async throws {
105+
let testStream = ThrowingTestSequence(base: 0..<10)
106+
107+
var iterator = AsyncBufferedIterator(testStream.makeAsyncIterator())
108+
109+
let readSequenceA = AsyncSequenceReader(iterator) { iterator in
110+
let next = try await iterator.next()
111+
return next
112+
}
113+
114+
var readSequenceAIterator = readSequenceA.makeAsyncIterator()
115+
116+
try await AsyncXCTAssertEqual(await readSequenceAIterator.next(), 0)
117+
try await AsyncXCTAssertEqual(await readSequenceAIterator.next(), 1)
118+
try await AsyncXCTAssertEqual(await readSequenceAIterator.next(), 2)
119+
try await AsyncXCTAssertEqual(await readSequenceAIterator.next(), 3)
120+
121+
iterator = readSequenceA.baseIterator
122+
123+
var totalRead: Int = 0
124+
let readSequenceB = AsyncSequenceReader(iterator) { iterator in
125+
guard totalRead < 4 else { return nil }
126+
let next = try await iterator.next()
127+
totalRead += 1
128+
return next
129+
}
130+
131+
var readSequenceBIterator = readSequenceB.makeAsyncIterator()
132+
133+
try await AsyncXCTAssertEqual(await readSequenceBIterator.next(), 4)
134+
try await AsyncXCTAssertEqual(await readSequenceBIterator.next(), 5)
135+
try await AsyncXCTAssertEqual(await readSequenceBIterator.next(), 6)
136+
try await AsyncXCTAssertEqual(await readSequenceBIterator.next(), 7)
137+
try await AsyncXCTAssertEqual(await readSequenceBIterator.next(), nil)
138+
139+
iterator = readSequenceB.baseIterator
140+
141+
try await AsyncXCTAssertEqual(await iterator.next(), 8)
142+
try await AsyncXCTAssertEqual(await iterator.next(), 9)
143+
try await AsyncXCTAssertEqual(await iterator.next(), nil)
144+
}
145+
146+
// MARK: - Test Transforms
147+
148+
func countCharacters<ReadSequence: AsyncReadSequence>(_ sequence: ReadSequence) async throws -> Int? where ReadSequence.Element == String {
149+
try await sequence.map { $0.count }.reduce(into: 0) { partialResult, next in
150+
partialResult += next
151+
}
152+
}
153+
154+
func makeReadFourElementSequence<BaseIterator>(_ iterator: inout AsyncBufferedIterator<BaseIterator>) -> AsyncSequenceReader<BaseIterator> {
155+
var totalRead: Int = 0
156+
return AsyncSequenceReader(iterator) { iterator in
157+
guard totalRead < 4 else { return nil }
158+
let next = try await iterator.next()
159+
totalRead += 1
160+
return next
161+
}
162+
}
163+
164+
func testTransformSequenceFromStream() async throws {
165+
let testStream = AsyncStream<String> { continuation in
166+
let data = ["A", "few", "words", "to", "consider", "today", "as", "this", "test", "runs"]
167+
for value in data {
168+
continuation.yield(value)
169+
}
170+
continuation.finish()
171+
}
172+
173+
var iterator = testStream.makeAsyncIterator()
174+
try await AsyncXCTAssertEqual(await iterator.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), 11)
175+
try await AsyncXCTAssertEqual(await iterator.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), 19)
176+
try await AsyncXCTAssertEqual(await iterator.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), 8)
177+
try await AsyncXCTAssertEqual(await iterator.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), nil)
178+
179+
var iteratorB = testStream.makeAsyncIterator()
180+
try await AsyncXCTAssertEqual(await iteratorB.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), nil)
181+
}
182+
183+
func testTransformSequenceFromBufferedStream() async throws {
184+
let testStream = AsyncStream<String> { continuation in
185+
let data = ["A", "few", "words", "to", "consider", "today", "as", "this", "test", "runs"]
186+
for value in data {
187+
continuation.yield(value)
188+
}
189+
continuation.finish()
190+
}
191+
192+
var iterator = AsyncBufferedIterator(testStream.makeAsyncIterator())
193+
try await AsyncXCTAssertEqual(await iterator.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), 11)
194+
try await AsyncXCTAssertEqual(await iterator.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), 19)
195+
try await AsyncXCTAssertEqual(await iterator.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), 8)
196+
try await AsyncXCTAssertEqual(await iterator.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), nil)
197+
198+
var iteratorB = AsyncBufferedIterator(testStream.makeAsyncIterator())
199+
try await AsyncXCTAssertEqual(await iteratorB.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), nil)
200+
}
201+
202+
func testTransformSequenceFromTestSequence() async throws {
203+
let testStream = TestSequence(base: ["A", "few", "words", "to", "consider", "today", "as", "this", "test", "runs"])
204+
205+
var iterator = testStream.makeAsyncIterator()
206+
try await AsyncXCTAssertEqual(await iterator.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), 11)
207+
try await AsyncXCTAssertEqual(await iterator.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), 19)
208+
try await AsyncXCTAssertEqual(await iterator.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), 8)
209+
try await AsyncXCTAssertEqual(await iterator.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), nil)
210+
211+
var iteratorB = testStream.makeAsyncIterator()
212+
try await AsyncXCTAssertEqual(await iteratorB.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), 11)
213+
}
214+
215+
func testTransformSequenceFromThrowingTestSequence() async throws {
216+
let testStream = ThrowingTestSequence(base: ["A", "few", "words", "to", "consider", "today", "as", "this", "test", "runs"])
217+
218+
var iterator = testStream.makeAsyncIterator()
219+
try await AsyncXCTAssertEqual(await iterator.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), 11)
220+
try await AsyncXCTAssertEqual(await iterator.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), 19)
221+
try await AsyncXCTAssertEqual(await iterator.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), 8)
222+
try await AsyncXCTAssertEqual(await iterator.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), nil)
223+
224+
var iteratorB = testStream.makeAsyncIterator()
225+
try await AsyncXCTAssertEqual(await iteratorB.transform(with: countCharacters, readSequenceFactory: makeReadFourElementSequence), 11)
15226
}
16227
}

0 commit comments

Comments
 (0)