Skip to content

Commit e8d072f

Browse files
committed
ChunkedAsyncSequence
1 parent 653b680 commit e8d072f

File tree

6 files changed

+120
-11
lines changed

6 files changed

+120
-11
lines changed

Sources/ChunkedAsyncSequence.swift

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
//
2+
// ChunkedAsyncSequence.swift
3+
// FlyingFox
4+
//
5+
// Created by Simon Whitty on 20/02/2022.
6+
// Copyright © 2022 Simon Whitty. All rights reserved.
7+
//
8+
// Distributed under the permissive MIT license
9+
// Get the latest version from here:
10+
//
11+
// https://github.com/swhitty/FlyingFox
12+
//
13+
// Permission is hereby granted, free of charge, to any person obtaining a copy
14+
// of this software and associated documentation files (the "Software"), to deal
15+
// in the Software without restriction, including without limitation the rights
16+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17+
// copies of the Software, and to permit persons to whom the Software is
18+
// furnished to do so, subject to the following conditions:
19+
//
20+
// The above copyright notice and this permission notice shall be included in all
21+
// copies or substantial portions of the Software.
22+
//
23+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
24+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
26+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
28+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29+
// SOFTWARE.
30+
//
31+
32+
protocol ChuckedAsyncSequence: AsyncSequence {
33+
34+
/// Retrieve next n elements can be requested from an AsyncSequence.
35+
/// - Parameter count: Number of elements to be read from the sequence
36+
/// - Returns: Returns all the requested number or elements or nil when the
37+
/// sequence ends before all of requested elements are retrieved.
38+
func next(count: Int) async throws -> [Element]?
39+
}
40+
41+
extension ChuckedAsyncSequence {
42+
43+
/// Default implementation that does not read elements in chunks, but slowly
44+
/// reads the chunk one element at a time.
45+
func next(count: Int) async throws -> [Element]? {
46+
var buffer = [Element]()
47+
var iterator = makeAsyncIterator()
48+
49+
while buffer.count < count,
50+
let element = try await iterator.next() {
51+
buffer.append(element)
52+
}
53+
54+
return buffer.count == count ? buffer : nil
55+
}
56+
}

Sources/HTTPConnection.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ struct HTTPConnection {
4242
}
4343

4444
// some AsyncSequence<HTTPRequest>
45-
var requests: HTTPRequestSequence<ClosureSequence<UInt8>> {
45+
var requests: HTTPRequestSequence<ByteSequence> {
4646
HTTPRequestSequence(bytes: socket.bytes)
4747
}
4848

@@ -55,7 +55,7 @@ struct HTTPConnection {
5555
}
5656
}
5757

58-
struct HTTPRequestSequence<S: AsyncSequence>: AsyncSequence, AsyncIteratorProtocol where S.Element == UInt8 {
58+
struct HTTPRequestSequence<S: ChuckedAsyncSequence>: AsyncSequence, AsyncIteratorProtocol where S.Element == UInt8 {
5959
typealias Element = HTTPRequest
6060
private let bytes: S
6161

Sources/HTTPRequestDecoder.swift

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import Foundation
3333

3434
struct HTTPRequestDecoder {
3535

36-
static func decodeRequest<S>(from bytes: S) async throws -> HTTPRequest where S: AsyncSequence, S.Element == UInt8 {
36+
static func decodeRequest<S>(from bytes: S) async throws -> HTTPRequest where S: ChuckedAsyncSequence, S.Element == UInt8 {
3737
let status = try await bytes.takeLine()
3838
let comps = status
3939
.trimmingCharacters(in: .whitespacesAndNewlines)
@@ -82,15 +82,16 @@ struct HTTPRequestDecoder {
8282
return (HTTPHeader(name), value)
8383
}
8484

85-
static func readBody<S: AsyncSequence>(from bytes: S, length: String?) async throws -> Data where S.Element == UInt8 {
85+
static func readBody<S: ChuckedAsyncSequence>(from bytes: S, length: String?) async throws -> Data where S.Element == UInt8 {
8686
guard let length = length.flatMap(Int.init) else {
8787
return Data()
8888
}
8989

90-
return try await bytes
91-
.collectUntil { $0.count == length }
92-
.map { Data($0) }
93-
.first()
90+
guard let buffer = try await bytes.next(count: length) else {
91+
throw Error("ChuckedAsyncSequence prematurely ended")
92+
}
93+
94+
return Data(buffer)
9495
}
9596
}
9697

Sources/Socket/AsyncSocket.swift

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,21 @@ struct AsyncSocket: Sendable {
7373
} while true
7474
}
7575

76+
func read(bytes: Int) async throws -> [UInt8] {
77+
var buffer = [UInt8]()
78+
while buffer.count < bytes {
79+
let toRead = min(bytes - buffer.count, 8192)
80+
do {
81+
try buffer.append(contentsOf: socket.read(atMost: toRead))
82+
} catch SocketError.blocked {
83+
try await pool.suspend(untilReady: socket)
84+
} catch {
85+
throw error
86+
}
87+
}
88+
return buffer
89+
}
90+
7691
func write(_ data: Data) async throws {
7792
var sent = data.startIndex
7893
while sent < data.endIndex {
@@ -104,11 +119,27 @@ struct AsyncSocket: Sendable {
104119
} while true
105120
}
106121

107-
var bytes: ClosureSequence<UInt8> {
108-
ClosureSequence(closure: read)
122+
var bytes: ByteSequence {
123+
ByteSequence(socket: self)
109124
}
110125

111126
var sockets: ClosureSequence<AsyncSocket> {
112127
ClosureSequence(closure: accept)
113128
}
114129
}
130+
131+
struct ByteSequence: ChuckedAsyncSequence, AsyncIteratorProtocol {
132+
typealias Element = UInt8
133+
134+
let socket: AsyncSocket
135+
136+
func makeAsyncIterator() -> ByteSequence { self }
137+
138+
mutating func next() async throws -> UInt8? {
139+
try await socket.read()
140+
}
141+
142+
func next(count: Int) async throws -> [UInt8]? {
143+
try await socket.read(bytes: count)
144+
}
145+
}

Sources/Socket/Socket.swift

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,25 @@ struct Socket: Sendable, Hashable {
135135
}
136136
}
137137

138+
func read(atMost length: Int) throws -> [UInt8] {
139+
try [UInt8](unsafeUninitializedCapacity: length) { buffer, count in
140+
count = try read(into: &buffer, length: length)
141+
}
142+
}
143+
144+
private func read(into buffer: inout UnsafeMutableBufferPointer<UInt8>, length: Int) throws -> Int {
145+
let count = Socket.read(file, buffer.baseAddress, length)
146+
if count == 0 {
147+
throw SocketError.disconnected
148+
} else if count > 0 {
149+
return count
150+
} else if errno == EWOULDBLOCK {
151+
throw SocketError.blocked
152+
} else {
153+
throw SocketError.makeFailed("Read")
154+
}
155+
}
156+
138157
func write(_ data: Data, from index: Data.Index) throws -> Data.Index {
139158
guard index < data.endIndex else { return data.endIndex }
140159
return try data.withUnsafeBytes {

Tests/ConsumingAsyncSequence.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
// SOFTWARE.
3030
//
3131

32-
final class ConsumingAsyncSequence<Element>: AsyncSequence, AsyncIteratorProtocol {
32+
@testable import FlyingFox
33+
34+
final class ConsumingAsyncSequence<Element>: ChuckedAsyncSequence, AsyncIteratorProtocol {
3335

3436
private var iterator: AnySequence<Element>.Iterator
3537

0 commit comments

Comments
 (0)