From b1b3d8abc918ee9f7bf96c9523398aa3ececc9e7 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Mon, 16 Aug 2021 13:47:25 +0200 Subject: [PATCH 1/2] Add AsyncSequence helpers --- .../_NIOConcurrency/AsyncAwaitSupport.swift | 1 - .../AsyncSequenceSupport.swift | 141 ++++++++++++++++++ 2 files changed, 141 insertions(+), 1 deletion(-) create mode 100644 Sources/_NIOConcurrency/AsyncSequenceSupport.swift diff --git a/Sources/_NIOConcurrency/AsyncAwaitSupport.swift b/Sources/_NIOConcurrency/AsyncAwaitSupport.swift index 6901905fed..745d3c2e3d 100644 --- a/Sources/_NIOConcurrency/AsyncAwaitSupport.swift +++ b/Sources/_NIOConcurrency/AsyncAwaitSupport.swift @@ -15,7 +15,6 @@ import NIOCore #if compiler(>=5.5) -import _Concurrency extension EventLoopFuture { /// Get the value/error from an `EventLoopFuture` in an `async` context. diff --git a/Sources/_NIOConcurrency/AsyncSequenceSupport.swift b/Sources/_NIOConcurrency/AsyncSequenceSupport.swift new file mode 100644 index 0000000000..f0a9e8805e --- /dev/null +++ b/Sources/_NIOConcurrency/AsyncSequenceSupport.swift @@ -0,0 +1,141 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2021 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if swift(>=5.5) +import struct NIO.ByteBuffer + +@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) +public struct ByteBufferToUInt8AsyncSequence: AsyncSequence where Upstream.Element == ByteBuffer { + public typealias Element = UInt8 + public typealias AsyncIterator = Iterator + + public struct Iterator: AsyncIteratorProtocol { + /*private but*/ @usableFromInline var state: State + + @usableFromInline + enum State { + case hasBuffer(ByteBuffer, Upstream.AsyncIterator) + case askForMore(Upstream.AsyncIterator) + case finished + case modifying + } + + @usableFromInline + init(_ upstream: Upstream.AsyncIterator) { + self.state = .askForMore(upstream) + } + + @inlinable + public mutating func next() async throws -> Element? { + switch self.state { + case .askForMore(var upstream): + self.state = .modifying + + while true { + switch try await upstream.next() { + case .some(let nextBuffer) where nextBuffer.readableBytes == 0: + break + + case .some(var nextBuffer): + assert(nextBuffer.readableBytes > 0) + let result = nextBuffer.readInteger(as: UInt8.self) + if nextBuffer.readableBytes > 0 { + self.state = .hasBuffer(nextBuffer, upstream) + } else { + self.state = .askForMore(upstream) + } + return result + + case .none: + self.state = .finished + return nil + } + } + + case .hasBuffer(var buffer, let upstream): + assert(buffer.readableBytes > 0) + self.state = .modifying + + let result = buffer.readInteger(as: UInt8.self) + if buffer.readableBytes > 0 { + self.state = .hasBuffer(buffer, upstream) + } else { + self.state = .askForMore(upstream) + } + return result + + case .finished: + return nil + + case .modifying: + preconditionFailure("Invalid state: \(self.state)") + } + } + } + + @inlinable + public func makeAsyncIterator() -> Iterator { + Iterator(self.upstream.makeAsyncIterator()) + } + + @usableFromInline + let upstream: Upstream + + /*private but*/ @usableFromInline init(_ upstream: Upstream) { + self.upstream = upstream + } +} + +@usableFromInline +struct TooManyBytesError: Error { + @usableFromInline + init() {} +} + +@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) +extension AsyncSequence where Element == ByteBuffer { + /// Transform an AsyncSequence of ByteBuffers into an AsyncSequence of single bytes. + @inlinable + public func toBytes() -> ByteBufferToUInt8AsyncSequence { + ByteBufferToUInt8AsyncSequence(self) + } + + /// Consume an ``Swift/AsyncSequence`` of ``NIO/ByteBuffer``s into a single `ByteBuffer`. + /// + /// - Parameter maxBytes: The maximum number of bytes that the result ByteBuffer is allowed to have. + /// - Returns: A ``NIO/ByteBuffer`` that holds all the bytes of the AsyncSequence + @inlinable + public func consume(maxBytes: Int) async throws -> ByteBuffer? { + var iterator = self.makeAsyncIterator() + guard var buffer = try await iterator.next() else { + return nil + } + + var receivedBytes = buffer.readableBytes + if receivedBytes > maxBytes { + throw TooManyBytesError() + } + + while var next = try await iterator.next() { + receivedBytes += next.readableBytes + if receivedBytes > maxBytes { + throw TooManyBytesError() + } + + buffer.writeBuffer(&next) + } + return buffer + } +} +#endif From 7d096133a79a19cab730886dbe9c6a6a102bb717 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Mon, 16 Aug 2021 18:12:49 +0200 Subject: [PATCH 2/2] Code review --- .../AsyncSequenceSupport.swift | 75 +++++++++---------- 1 file changed, 35 insertions(+), 40 deletions(-) diff --git a/Sources/_NIOConcurrency/AsyncSequenceSupport.swift b/Sources/_NIOConcurrency/AsyncSequenceSupport.swift index f0a9e8805e..5426bc9231 100644 --- a/Sources/_NIOConcurrency/AsyncSequenceSupport.swift +++ b/Sources/_NIOConcurrency/AsyncSequenceSupport.swift @@ -16,10 +16,23 @@ import struct NIO.ByteBuffer @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) -public struct ByteBufferToUInt8AsyncSequence: AsyncSequence where Upstream.Element == ByteBuffer { +public struct NIOByteBufferToUInt8AsyncSequence: AsyncSequence where Upstream.Element == ByteBuffer { public typealias Element = UInt8 public typealias AsyncIterator = Iterator + @usableFromInline + let upstream: Upstream + + @inlinable + init(_ upstream: Upstream) { + self.upstream = upstream + } + + @inlinable + public func makeAsyncIterator() -> Iterator { + Iterator(self.upstream.makeAsyncIterator()) + } + public struct Iterator: AsyncIteratorProtocol { /*private but*/ @usableFromInline var state: State @@ -28,10 +41,18 @@ public struct ByteBufferToUInt8AsyncSequence: AsyncSequ case hasBuffer(ByteBuffer, Upstream.AsyncIterator) case askForMore(Upstream.AsyncIterator) case finished - case modifying + + @inlinable + init(buffer: ByteBuffer, upstream: Upstream.AsyncIterator) { + if buffer.readableBytes > 0 { + self = .hasBuffer(buffer, upstream) + } else { + self = .askForMore(upstream) + } + } } - @usableFromInline + @inlinable init(_ upstream: Upstream.AsyncIterator) { self.state = .askForMore(upstream) } @@ -40,21 +61,17 @@ public struct ByteBufferToUInt8AsyncSequence: AsyncSequ public mutating func next() async throws -> Element? { switch self.state { case .askForMore(var upstream): - self.state = .modifying - while true { switch try await upstream.next() { case .some(let nextBuffer) where nextBuffer.readableBytes == 0: - break + // we received an empty buffer. for this reason, let's continue and get the + // next buffer fro, the sequence + continue case .some(var nextBuffer): assert(nextBuffer.readableBytes > 0) let result = nextBuffer.readInteger(as: UInt8.self) - if nextBuffer.readableBytes > 0 { - self.state = .hasBuffer(nextBuffer, upstream) - } else { - self.state = .askForMore(upstream) - } + self.state = .init(buffer: nextBuffer, upstream: upstream) return result case .none: @@ -65,50 +82,28 @@ public struct ByteBufferToUInt8AsyncSequence: AsyncSequ case .hasBuffer(var buffer, let upstream): assert(buffer.readableBytes > 0) - self.state = .modifying - let result = buffer.readInteger(as: UInt8.self) - if buffer.readableBytes > 0 { - self.state = .hasBuffer(buffer, upstream) - } else { - self.state = .askForMore(upstream) - } + self.state = .init(buffer: buffer, upstream: upstream) return result case .finished: return nil - - case .modifying: - preconditionFailure("Invalid state: \(self.state)") } } } - @inlinable - public func makeAsyncIterator() -> Iterator { - Iterator(self.upstream.makeAsyncIterator()) - } - - @usableFromInline - let upstream: Upstream - - /*private but*/ @usableFromInline init(_ upstream: Upstream) { - self.upstream = upstream - } } -@usableFromInline -struct TooManyBytesError: Error { - @usableFromInline - init() {} +public struct NIOTooManyBytesError: Error { + public init() {} } @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) extension AsyncSequence where Element == ByteBuffer { /// Transform an AsyncSequence of ByteBuffers into an AsyncSequence of single bytes. @inlinable - public func toBytes() -> ByteBufferToUInt8AsyncSequence { - ByteBufferToUInt8AsyncSequence(self) + public func toBytes() -> NIOByteBufferToUInt8AsyncSequence { + NIOByteBufferToUInt8AsyncSequence(self) } /// Consume an ``Swift/AsyncSequence`` of ``NIO/ByteBuffer``s into a single `ByteBuffer`. @@ -124,13 +119,13 @@ extension AsyncSequence where Element == ByteBuffer { var receivedBytes = buffer.readableBytes if receivedBytes > maxBytes { - throw TooManyBytesError() + throw NIOTooManyBytesError() } while var next = try await iterator.next() { receivedBytes += next.readableBytes if receivedBytes > maxBytes { - throw TooManyBytesError() + throw NIOTooManyBytesError() } buffer.writeBuffer(&next)