Skip to content

Commit b1b3d8a

Browse files
committed
Add AsyncSequence helpers
1 parent b37485e commit b1b3d8a

File tree

2 files changed

+141
-1
lines changed

2 files changed

+141
-1
lines changed

Sources/_NIOConcurrency/AsyncAwaitSupport.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import NIOCore
1616

1717
#if compiler(>=5.5)
18-
import _Concurrency
1918

2019
extension EventLoopFuture {
2120
/// Get the value/error from an `EventLoopFuture` in an `async` context.
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftNIO open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the SwiftNIO project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
#if swift(>=5.5)
16+
import struct NIO.ByteBuffer
17+
18+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
19+
public struct ByteBufferToUInt8AsyncSequence<Upstream: AsyncSequence>: AsyncSequence where Upstream.Element == ByteBuffer {
20+
public typealias Element = UInt8
21+
public typealias AsyncIterator = Iterator
22+
23+
public struct Iterator: AsyncIteratorProtocol {
24+
/*private but*/ @usableFromInline var state: State
25+
26+
@usableFromInline
27+
enum State {
28+
case hasBuffer(ByteBuffer, Upstream.AsyncIterator)
29+
case askForMore(Upstream.AsyncIterator)
30+
case finished
31+
case modifying
32+
}
33+
34+
@usableFromInline
35+
init(_ upstream: Upstream.AsyncIterator) {
36+
self.state = .askForMore(upstream)
37+
}
38+
39+
@inlinable
40+
public mutating func next() async throws -> Element? {
41+
switch self.state {
42+
case .askForMore(var upstream):
43+
self.state = .modifying
44+
45+
while true {
46+
switch try await upstream.next() {
47+
case .some(let nextBuffer) where nextBuffer.readableBytes == 0:
48+
break
49+
50+
case .some(var nextBuffer):
51+
assert(nextBuffer.readableBytes > 0)
52+
let result = nextBuffer.readInteger(as: UInt8.self)
53+
if nextBuffer.readableBytes > 0 {
54+
self.state = .hasBuffer(nextBuffer, upstream)
55+
} else {
56+
self.state = .askForMore(upstream)
57+
}
58+
return result
59+
60+
case .none:
61+
self.state = .finished
62+
return nil
63+
}
64+
}
65+
66+
case .hasBuffer(var buffer, let upstream):
67+
assert(buffer.readableBytes > 0)
68+
self.state = .modifying
69+
70+
let result = buffer.readInteger(as: UInt8.self)
71+
if buffer.readableBytes > 0 {
72+
self.state = .hasBuffer(buffer, upstream)
73+
} else {
74+
self.state = .askForMore(upstream)
75+
}
76+
return result
77+
78+
case .finished:
79+
return nil
80+
81+
case .modifying:
82+
preconditionFailure("Invalid state: \(self.state)")
83+
}
84+
}
85+
}
86+
87+
@inlinable
88+
public func makeAsyncIterator() -> Iterator {
89+
Iterator(self.upstream.makeAsyncIterator())
90+
}
91+
92+
@usableFromInline
93+
let upstream: Upstream
94+
95+
/*private but*/ @usableFromInline init(_ upstream: Upstream) {
96+
self.upstream = upstream
97+
}
98+
}
99+
100+
@usableFromInline
101+
struct TooManyBytesError: Error {
102+
@usableFromInline
103+
init() {}
104+
}
105+
106+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
107+
extension AsyncSequence where Element == ByteBuffer {
108+
/// Transform an AsyncSequence of ByteBuffers into an AsyncSequence of single bytes.
109+
@inlinable
110+
public func toBytes() -> ByteBufferToUInt8AsyncSequence<Self> {
111+
ByteBufferToUInt8AsyncSequence(self)
112+
}
113+
114+
/// Consume an ``Swift/AsyncSequence`` of ``NIO/ByteBuffer``s into a single `ByteBuffer`.
115+
///
116+
/// - Parameter maxBytes: The maximum number of bytes that the result ByteBuffer is allowed to have.
117+
/// - Returns: A ``NIO/ByteBuffer`` that holds all the bytes of the AsyncSequence
118+
@inlinable
119+
public func consume(maxBytes: Int) async throws -> ByteBuffer? {
120+
var iterator = self.makeAsyncIterator()
121+
guard var buffer = try await iterator.next() else {
122+
return nil
123+
}
124+
125+
var receivedBytes = buffer.readableBytes
126+
if receivedBytes > maxBytes {
127+
throw TooManyBytesError()
128+
}
129+
130+
while var next = try await iterator.next() {
131+
receivedBytes += next.readableBytes
132+
if receivedBytes > maxBytes {
133+
throw TooManyBytesError()
134+
}
135+
136+
buffer.writeBuffer(&next)
137+
}
138+
return buffer
139+
}
140+
}
141+
#endif

0 commit comments

Comments
 (0)