Skip to content

Commit b864ce4

Browse files
committed
Update Buffer to be based on DispatchData.Slice
1 parent c3e85bb commit b864ce4

File tree

6 files changed

+150
-208
lines changed

6 files changed

+150
-208
lines changed

Sources/Subprocess/AsyncBufferSequence.swift

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,23 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
3737
public typealias Element = Buffer
3838

3939
private let diskIO: DiskIO
40-
private var buffer: [UInt8]
41-
private var currentPosition: Int
42-
private var finished: Bool
40+
private var buffer: [Buffer]
4341

4442
internal init(diskIO: DiskIO) {
4543
self.diskIO = diskIO
4644
self.buffer = []
47-
self.currentPosition = 0
48-
self.finished = false
4945
}
5046

51-
public func next() async throws -> Buffer? {
52-
let data = try await self.diskIO.readChunk(
47+
public mutating func next() async throws -> Buffer? {
48+
// If we have more left in buffer, use that
49+
guard self.buffer.isEmpty else {
50+
return self.buffer.removeFirst()
51+
}
52+
// Read more data
53+
let data = try await self.diskIO.read(
5354
upToLength: readBufferSize
5455
)
55-
if data == nil {
56+
guard let data = data else {
5657
// We finished reading. Close the file descriptor now
5758
#if os(Windows)
5859
try self.diskIO.close()
@@ -61,7 +62,15 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
6162
#endif
6263
return nil
6364
}
64-
return data
65+
let createdBuffers = Buffer.createFrom(data)
66+
// Most (all?) cases there should be only one buffer
67+
// because DispatchData are motsly contiguous
68+
if _fastPath(createdBuffers.count == 1) {
69+
// No need to push to the stack
70+
return createdBuffers[0]
71+
}
72+
self.buffer = createdBuffers
73+
return self.buffer.removeFirst()
6574
}
6675
}
6776

@@ -144,7 +153,10 @@ extension AsyncBufferSequence {
144153
capacity: buffer.data.count
145154
)
146155
defer { temporary.deallocate() }
147-
let actualBytesCopied = buffer.data.copyBytes(to: temporary)
156+
let actualBytesCopied = buffer.data.copyBytes(
157+
to: temporary,
158+
count: buffer.data.count
159+
)
148160

149161
// Calculate how many CodePoint elements we have
150162
let elementCount = actualBytesCopied / MemoryLayout<Encoding.CodeUnit>.stride

Sources/Subprocess/Buffer.swift

Lines changed: 64 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
//
1010
//===----------------------------------------------------------------------===//
1111

12+
#if canImport(Darwin) || canImport(Glibc) || canImport(Android) || canImport(Musl)
1213
@preconcurrency internal import Dispatch
14+
#endif
1315

1416
#if SubprocessSpan
1517
@available(SubprocessSpan, *)
@@ -18,16 +20,32 @@ extension AsyncBufferSequence {
1820
/// A immutable collection of bytes
1921
public struct Buffer: Sendable {
2022
#if os(Windows)
21-
internal var data: [UInt8]
23+
internal let data: [UInt8]
2224

2325
internal init(data: [UInt8]) {
2426
self.data = data
2527
}
28+
29+
internal static func createFrom(_ data: [UInt8]) -> [Buffer] {
30+
return [.init(data: data)]
31+
}
2632
#else
27-
internal var data: DispatchData
33+
// We need to keep the backingData alive while Slice is alive
34+
internal let backingData: DispatchData
35+
internal let data: DispatchData.Slice
2836

29-
internal init(data: DispatchData) {
37+
internal init(data: DispatchData.Slice, backingData: DispatchData) {
3038
self.data = data
39+
self.backingData = backingData
40+
}
41+
42+
internal static func createFrom(_ data: DispatchData) -> [Buffer] {
43+
let slices = data.slices
44+
// In most (all?) cases data should only have one slice
45+
if _fastPath(slices.count == 1) {
46+
return [.init(data: slices[0], backingData: data)]
47+
}
48+
return slices.map{ .init(data: $0, backingData: data) }
3149
}
3250
#endif
3351
}
@@ -64,70 +82,20 @@ extension AsyncBufferSequence.Buffer {
6482
public func withUnsafeBytes<ResultType>(
6583
_ body: (UnsafeRawBufferPointer) throws -> ResultType
6684
) rethrows -> ResultType {
67-
#if os(Windows)
6885
return try self.data.withUnsafeBytes(body)
69-
#else
70-
// Although DispatchData was designed to be uncontiguous, in practice
71-
// we found that almost all DispatchData are contiguous. Therefore
72-
// we can access this body in O(1) most of the time.
73-
return try self.data.withUnsafeBytes { ptr in
74-
let bytes = UnsafeRawBufferPointer(start: ptr, count: self.data.count)
75-
return try body(bytes)
76-
}
77-
#endif
7886
}
7987

8088
#if SubprocessSpan
8189
// Access the storge backing this Buffer
8290
public var bytes: RawSpan {
8391
@lifetime(borrow self)
8492
borrowing get {
85-
var backing: SpanBacking?
86-
#if os(Windows)
87-
self.data.withUnsafeBufferPointer {
88-
backing = .pointer($0)
89-
}
90-
#else
91-
self.data.enumerateBytes { buffer, byteIndex, stop in
92-
if _fastPath(backing == nil) {
93-
// In practice, almost all `DispatchData` is contiguous
94-
backing = .pointer(buffer)
95-
} else {
96-
// This DispatchData is not contiguous. We need to copy
97-
// the bytes out
98-
let contents = Array(buffer)
99-
switch backing! {
100-
case .pointer(let ptr):
101-
// Convert the ptr to array
102-
let existing = Array(ptr)
103-
backing = .array(existing + contents)
104-
case .array(let array):
105-
backing = .array(array + contents)
106-
}
107-
}
108-
}
109-
#endif
110-
guard let backing = backing else {
111-
let empty = UnsafeRawBufferPointer(start: nil, count: 0)
112-
let span = RawSpan(_unsafeBytes: empty)
113-
return _overrideLifetime(of: span, to: self)
114-
}
115-
switch backing {
116-
case .pointer(let ptr):
117-
let span = RawSpan(_unsafeElements: ptr)
118-
return _overrideLifetime(of: span, to: self)
119-
case .array(let array):
120-
let span = array.span.bytes
121-
return _overrideLifetime(of: span, to: self)
122-
}
93+
let ptr = self.data.withUnsafeBytes { $0 }
94+
let bytes = RawSpan(_unsafeBytes: ptr)
95+
return _overrideLifetime(of: bytes, to: self)
12396
}
12497
}
12598
#endif // SubprocessSpan
126-
127-
private enum SpanBacking {
128-
case pointer(UnsafeBufferPointer<UInt8>)
129-
case array([UInt8])
130-
}
13199
}
132100

133101
// MARK: - Hashable, Equatable
@@ -144,51 +112,53 @@ extension AsyncBufferSequence.Buffer: Equatable, Hashable {
144112

145113
public func hash(into hasher: inout Hasher) {
146114
self.data.withUnsafeBytes { ptr in
147-
let bytes = UnsafeRawBufferPointer(
148-
start: ptr,
149-
count: self.data.count
150-
)
151-
hasher.combine(bytes: bytes)
115+
hasher.combine(bytes: ptr)
152116
}
153117
}
154118
#endif
155119
}
156120

157-
// MARK: - Initializers
158-
#if SubprocessSpan
159-
@available(SubprocessSpan, *)
160-
#endif
161-
extension String {
162-
/// Create a String with the given encoding from `Buffer`.
163-
/// - Parameters:
164-
/// - buffer: the buffer to copy from
165-
/// - encoding: the encoding to encode Self with
166-
public init?<Encoding: _UnicodeEncoding>(buffer: AsyncBufferSequence.Buffer, as encoding: Encoding.Type) {
167-
#if os(Windows)
168-
let source = buffer.data.map { Encoding.CodeUnit($0) }
169-
self = String(decoding: source, as: encoding)
170-
#else
171-
self = buffer.withUnsafeBytes { ptr in
172-
return String(
173-
decoding: ptr.bindMemory(to: Encoding.CodeUnit.self).lazy.map { $0 },
174-
as: encoding
175-
)
121+
// MARK: - DispatchData.Block
122+
#if canImport(Darwin) || canImport(Glibc) || canImport(Android) || canImport(Musl)
123+
extension DispatchData {
124+
/// Unfortunitely `DispatchData.Region` is not available on Linux, hence our own wrapper
125+
internal struct Slice: @unchecked Sendable, RandomAccessCollection {
126+
typealias Element = UInt8
127+
128+
internal let bytes: UnsafeBufferPointer<UInt8>
129+
130+
internal var startIndex: Int { self.bytes.startIndex }
131+
internal var endIndex: Int { self.bytes.endIndex }
132+
133+
internal init(bytes: UnsafeBufferPointer<UInt8>) {
134+
self.bytes = bytes
135+
}
136+
137+
internal func withUnsafeBytes<ResultType>(_ body: (UnsafeRawBufferPointer) throws -> ResultType) rethrows -> ResultType {
138+
return try body(UnsafeRawBufferPointer(self.bytes))
139+
}
140+
141+
@discardableResult
142+
internal func copyBytes<DestinationType>(
143+
to ptr: UnsafeMutableBufferPointer<DestinationType>, count: Int
144+
) -> Int {
145+
self.bytes.copyBytes(to: ptr, count: count)
146+
}
147+
148+
subscript(position: Int) -> UInt8 {
149+
_read {
150+
yield self.bytes[position]
151+
}
176152
}
177-
#endif
178153
}
179-
}
180154

181-
#if SubprocessSpan
182-
@available(SubprocessSpan, *)
183-
#endif
184-
extension Array where Element == UInt8 {
185-
/// Create an Array from `Buffer`
186-
/// - Parameter buffer: the buffer to copy from
187-
public init(buffer: AsyncBufferSequence.Buffer) {
188-
#if os(Windows)
189-
self = buffer.data
190-
#else
191-
self = Array(buffer.data)
192-
#endif
155+
internal var slices: [Slice] {
156+
var slices = [Slice]()
157+
enumerateBytes { (bytes, index, stop) in
158+
slices.append(Slice(bytes: bytes))
159+
}
160+
return slices
193161
}
194162
}
163+
164+
#endif

Sources/Subprocess/IO/Output.swift

Lines changed: 25 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -161,29 +161,17 @@ public struct BytesOutput: OutputProtocol {
161161
internal func captureOutput(
162162
from diskIO: consuming TrackedPlatformDiskIO?
163163
) async throws -> [UInt8] {
164-
var diskIOBox: TrackedPlatformDiskIO? = consume diskIO
165-
return try await withCheckedThrowingContinuation { continuation in
166-
let _diskIO = diskIOBox.take()
167-
guard let _diskIO = _diskIO else {
168-
// Show not happen due to type system constraints
169-
fatalError("Trying to capture output without file descriptor")
170-
}
171-
_diskIO.readUntilEOF(upToLength: self.maxSize) { result in
172-
switch result {
173-
case .success(let data):
174-
// FIXME: remove workaround for
175-
// rdar://143992296
176-
// https://github.com/swiftlang/swift-subprocess/issues/3
177-
#if os(Windows)
178-
continuation.resume(returning: data)
179-
#else
180-
continuation.resume(returning: data.array())
181-
#endif
182-
case .failure(let error):
183-
continuation.resume(throwing: error)
184-
}
185-
}
164+
guard let diskIO = diskIO else {
165+
// Show not happen due to type system constraints
166+
fatalError("Trying to capture output without file descriptor")
186167
}
168+
#if os(Windows)
169+
let result = try await diskIO.fileDescriptor.read(upToLength: self.maxSize)
170+
#else
171+
let result = try await diskIO.dispatchIO.read(upToLength: self.maxSize)
172+
#endif
173+
174+
return result?.array() ?? []
187175
}
188176

189177
#if SubprocessSpan
@@ -315,34 +303,23 @@ extension OutputProtocol {
315303
if let bytesOutput = self as? BytesOutput {
316304
return try await bytesOutput.captureOutput(from: diskIO) as! Self.OutputType
317305
}
318-
var diskIOBox: TrackedPlatformDiskIO? = consume diskIO
319-
return try await withCheckedThrowingContinuation { continuation in
320-
if OutputType.self == Void.self {
321-
continuation.resume(returning: () as! OutputType)
322-
return
323-
}
324-
guard let _diskIO = diskIOBox.take() else {
325-
// Show not happen due to type system constraints
326-
fatalError("Trying to capture output without file descriptor")
327-
}
328306

329-
_diskIO.readUntilEOF(upToLength: self.maxSize) { result in
330-
do {
331-
switch result {
332-
case .success(let data):
333-
// FIXME: remove workaround for
334-
// rdar://143992296
335-
// https://github.com/swiftlang/swift-subprocess/issues/3
336-
let output = try self.output(from: data)
337-
continuation.resume(returning: output)
338-
case .failure(let error):
339-
continuation.resume(throwing: error)
340-
}
341-
} catch {
342-
continuation.resume(throwing: error)
343-
}
344-
}
307+
if OutputType.self == Void.self {
308+
return () as! OutputType
309+
}
310+
311+
guard let diskIO = diskIO else {
312+
// Show not happen due to type system constraints
313+
fatalError("Trying to capture output without file descriptor")
345314
}
315+
316+
#if os(Windows)
317+
let result = try await diskIO.fileDescriptor.read(upToLength: self.maxSize)
318+
#else
319+
let result = try await diskIO.dispatchIO.read(upToLength: self.maxSize)
320+
#endif
321+
322+
return try self.output(from: result ?? .empty)
346323
}
347324
}
348325

0 commit comments

Comments
 (0)