Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,22 @@ let package = Package(
]
),
.systemLibrary(name: "SystemSQLite", pkgConfig: "sqlite3"),

// `AsyncProcess` modules and dependencies

.target(name: "CProcessSpawnSync"),
.target(
name: "ProcessSpawnSync",
dependencies: [
"CProcessSpawnSync",
.product(name: "Atomics", package: "swift-atomics"),
.product(name: "NIOConcurrencyHelpers", package: "swift-nio"),
]
),
.target(
name: "AsyncProcess",
dependencies: [
"ProcessSpawnSync",
.product(name: "Atomics", package: "swift-atomics"),
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
.product(name: "Logging", package: "swift-log"),
Expand Down
8 changes: 4 additions & 4 deletions Sources/AsyncProcess/ChunkSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift open source project
//
// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors
// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
Expand Down Expand Up @@ -33,17 +33,17 @@ public struct ChunkSequence: AsyncSequence & Sendable {

public func makeAsyncIterator() -> AsyncIterator {
// This will close the file handle.
AsyncIterator(try! self.fileHandle?.fileContentStream(eventLoop: self.group.any()))
return AsyncIterator(try! self.fileHandle?.fileContentStream(eventLoop: group.any()))
}

public typealias Element = ByteBuffer
public struct AsyncIterator: AsyncIteratorProtocol {
public typealias Element = ByteBuffer
typealias UnderlyingSequence = FileContentStream
internal typealias UnderlyingSequence = FileContentStream

private var underlyingIterator: UnderlyingSequence.AsyncIterator?

init(_ underlyingSequence: UnderlyingSequence?) {
internal init(_ underlyingSequence: UnderlyingSequence?) {
self.underlyingIterator = underlyingSequence?.makeAsyncIterator()
}

Expand Down
6 changes: 3 additions & 3 deletions Sources/AsyncProcess/EOFSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift open source project
//
// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors
// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
Expand All @@ -15,13 +15,13 @@ public struct EOFSequence<Element>: AsyncSequence & Sendable {

public struct AsyncIterator: AsyncIteratorProtocol {
public mutating func next() async throws -> Element? {
nil
return nil
}
}

public init(of type: Element.Type = Element.self) {}

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator()
return AsyncIterator()
}
}
60 changes: 33 additions & 27 deletions Sources/AsyncProcess/FileContentStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift open source project
//
// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors
// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
Expand All @@ -19,12 +19,13 @@ import NIO
// - Known issues:
// - no tests
// - most configurations have never run
struct FileContentStream: AsyncSequence {
internal typealias FileContentStream = _FileContentStream
public struct _FileContentStream: AsyncSequence & Sendable {
public typealias Element = ByteBuffer
typealias Underlying = AsyncThrowingChannel<Element, Error>

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(underlying: self.asyncChannel.makeAsyncIterator())
return AsyncIterator(underlying: self.asyncChannel.makeAsyncIterator())
}

public struct AsyncIterator: AsyncIteratorProtocol {
Expand All @@ -33,21 +34,31 @@ struct FileContentStream: AsyncSequence {
var underlying: Underlying.AsyncIterator

public mutating func next() async throws -> ByteBuffer? {
try await self.underlying.next()
return try await self.underlying.next()
}
}

public struct IOError: Error {
public var errnoValue: CInt

public static func makeFromErrnoGlobal() -> IOError {
IOError(errnoValue: errno)
return IOError(errnoValue: errno)
}
}

private let asyncChannel: AsyncThrowingChannel<ByteBuffer, Error>

public init(
public static func makeReader(
fileDescriptor: CInt,
eventLoop: EventLoop = MultiThreadedEventLoopGroup.singleton.any(),
blockingPool: NIOThreadPool = .singleton
) async throws -> _FileContentStream {
return try await eventLoop.submit {
try FileContentStream(fileDescriptor: fileDescriptor, eventLoop: eventLoop, blockingPool: blockingPool)
}.get()
}

internal init(
fileDescriptor: CInt,
eventLoop: EventLoop,
blockingPool: NIOThreadPool? = nil
Expand All @@ -64,7 +75,7 @@ struct FileContentStream: AsyncSequence {

switch statInfo.st_mode & S_IFMT {
case S_IFREG:
guard let blockingPool else {
guard let blockingPool = blockingPool else {
throw IOError(errnoValue: EINVAL)
}
let fileHandle = NIOLoopBound(
Expand All @@ -86,7 +97,7 @@ struct FileContentStream: AsyncSequence {
.whenComplete { result in
try! fileHandle.value.close()
switch result {
case let .failure(error):
case .failure(let error):
asyncChannel.fail(error)
case .success:
asyncChannel.finish()
Expand Down Expand Up @@ -140,7 +151,7 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler {
return data
case .error:
return nil
case var .sending(queue):
case .sending(var queue):
queue.append(data)
self = .sending(queue)
return nil
Expand All @@ -153,7 +164,7 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler {
preconditionFailure("didSendOne during .idle")
case .error:
return nil
case var .sending(queue):
case .sending(var queue):
if queue.isEmpty {
self = .idle
return nil
Expand Down Expand Up @@ -212,7 +223,7 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler {
eventLoop.makeFutureWithTask {
// note: We're _not_ on an EventLoop thread here
switch data {
case let .chunk(data):
case .chunk(let data):
await sink.send(data)
case .finish:
sink.finish()
Expand Down Expand Up @@ -255,18 +266,15 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler {

extension FileHandle {
func fileContentStream(eventLoop: EventLoop) throws -> FileContentStream {
let asyncBytes = try FileContentStream(
fileDescriptor: self.fileDescriptor,
eventLoop: eventLoop
)
let asyncBytes = try FileContentStream(fileDescriptor: self.fileDescriptor, eventLoop: eventLoop)
try self.close()
return asyncBytes
}
}

extension FileContentStream {
var lines: AsyncByteBufferLineSequence<FileContentStream> {
AsyncByteBufferLineSequence(
return AsyncByteBufferLineSequence(
self,
dropTerminator: true,
maximumAllowableBufferSize: 1024 * 1024,
Expand All @@ -281,7 +289,7 @@ extension AsyncSequence where Element == ByteBuffer, Self: Sendable {
maximumAllowableBufferSize: Int = 1024 * 1024,
dropLastChunkIfNoNewline: Bool = false
) -> AsyncByteBufferLineSequence<Self> {
AsyncByteBufferLineSequence(
return AsyncByteBufferLineSequence(
self,
dropTerminator: dropTerminator,
maximumAllowableBufferSize: maximumAllowableBufferSize,
Expand All @@ -290,7 +298,7 @@ extension AsyncSequence where Element == ByteBuffer, Self: Sendable {
}

public var strings: AsyncMapSequence<Self, String> {
self.map { String(buffer: $0) }
return self.map { String(buffer: $0) }
}
}

Expand All @@ -312,28 +320,26 @@ where Base: AsyncSequence, Base.Element == ByteBuffer {

struct Buffer {
private var buffer: [ByteBuffer] = []
private(set) var byteCount: Int = 0
internal private(set) var byteCount: Int = 0

mutating func append(_ buffer: ByteBuffer) {
self.buffer.append(buffer)
self.byteCount += buffer.readableBytes
}

func allButLast() -> ArraySlice<ByteBuffer> {
self.buffer.dropLast()
return self.buffer.dropLast()
}

var byteCountButLast: Int {
self.byteCount - (self.buffer.last?.readableBytes ?? 0)
return self.byteCount - (self.buffer.last?.readableBytes ?? 0)
}

var lastChunkView: ByteBufferView? {
self.buffer.last?.readableBytesView
return self.buffer.last?.readableBytesView
}

mutating func concatenateEverything(upToLastChunkLengthToConsume lastLength: Int)
-> ByteBuffer
{
mutating func concatenateEverything(upToLastChunkLengthToConsume lastLength: Int) -> ByteBuffer {
var output = ByteBuffer()
output.reserveCapacity(lastLength + self.byteCountButLast)

Expand All @@ -359,7 +365,7 @@ where Base: AsyncSequence, Base.Element == ByteBuffer {
}
}

init(
internal init(
underlying: Base.AsyncIterator,
dropTerminator: Bool,
maximumAllowableBufferSize: Int,
Expand Down Expand Up @@ -446,7 +452,7 @@ where Base: AsyncSequence, Base.Element == ByteBuffer {
}

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(
return AsyncIterator(
underlying: self.underlying.makeAsyncIterator(),
dropTerminator: self.dropTerminator,
maximumAllowableBufferSize: self.maximumAllowableBufferSize,
Expand Down
2 changes: 1 addition & 1 deletion Sources/AsyncProcess/NIOAsyncPipeWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift open source project
//
// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors
// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
Expand Down
Loading