2
2
//
3
3
// This source file is part of the Swift open source project
4
4
//
5
- // Copyright (c) 2022-2023 Apple Inc. and the Swift project authors
5
+ // Copyright (c) 2022-2025 Apple Inc. and the Swift project authors
6
6
// Licensed under Apache License v2.0 with Runtime Library Exception
7
7
//
8
8
// See https://swift.org/LICENSE.txt for license information
@@ -19,12 +19,13 @@ import NIO
19
19
// - Known issues:
20
20
// - no tests
21
21
// - most configurations have never run
22
- struct FileContentStream : AsyncSequence {
22
+ internal typealias FileContentStream = _FileContentStream
23
+ public struct _FileContentStream : AsyncSequence & Sendable {
23
24
public typealias Element = ByteBuffer
24
25
typealias Underlying = AsyncThrowingChannel < Element , Error >
25
26
26
27
public func makeAsyncIterator( ) -> AsyncIterator {
27
- AsyncIterator ( underlying: self . asyncChannel. makeAsyncIterator ( ) )
28
+ return AsyncIterator ( underlying: self . asyncChannel. makeAsyncIterator ( ) )
28
29
}
29
30
30
31
public struct AsyncIterator : AsyncIteratorProtocol {
@@ -33,21 +34,31 @@ struct FileContentStream: AsyncSequence {
33
34
var underlying : Underlying . AsyncIterator
34
35
35
36
public mutating func next( ) async throws -> ByteBuffer ? {
36
- try await self . underlying. next ( )
37
+ return try await self . underlying. next ( )
37
38
}
38
39
}
39
40
40
41
public struct IOError : Error {
41
42
public var errnoValue : CInt
42
43
43
44
public static func makeFromErrnoGlobal( ) -> IOError {
44
- IOError ( errnoValue: errno)
45
+ return IOError ( errnoValue: errno)
45
46
}
46
47
}
47
48
48
49
private let asyncChannel : AsyncThrowingChannel < ByteBuffer , Error >
49
50
50
- public init (
51
+ public static func makeReader(
52
+ fileDescriptor: CInt ,
53
+ eventLoop: EventLoop = MultiThreadedEventLoopGroup . singleton. any ( ) ,
54
+ blockingPool: NIOThreadPool = . singleton
55
+ ) async throws -> _FileContentStream {
56
+ return try await eventLoop. submit {
57
+ try FileContentStream ( fileDescriptor: fileDescriptor, eventLoop: eventLoop, blockingPool: blockingPool)
58
+ } . get ( )
59
+ }
60
+
61
+ internal init (
51
62
fileDescriptor: CInt ,
52
63
eventLoop: EventLoop ,
53
64
blockingPool: NIOThreadPool ? = nil
@@ -64,7 +75,7 @@ struct FileContentStream: AsyncSequence {
64
75
65
76
switch statInfo. st_mode & S_IFMT {
66
77
case S_IFREG:
67
- guard let blockingPool else {
78
+ guard let blockingPool = blockingPool else {
68
79
throw IOError ( errnoValue: EINVAL)
69
80
}
70
81
let fileHandle = NIOLoopBound (
@@ -86,7 +97,7 @@ struct FileContentStream: AsyncSequence {
86
97
. whenComplete { result in
87
98
try ! fileHandle. value. close ( )
88
99
switch result {
89
- case let . failure( error) :
100
+ case . failure( let error) :
90
101
asyncChannel. fail ( error)
91
102
case . success:
92
103
asyncChannel. finish ( )
@@ -140,7 +151,7 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler {
140
151
return data
141
152
case . error:
142
153
return nil
143
- case var . sending( queue) :
154
+ case . sending( var queue) :
144
155
queue. append ( data)
145
156
self = . sending( queue)
146
157
return nil
@@ -153,7 +164,7 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler {
153
164
preconditionFailure ( " didSendOne during .idle " )
154
165
case . error:
155
166
return nil
156
- case var . sending( queue) :
167
+ case . sending( var queue) :
157
168
if queue. isEmpty {
158
169
self = . idle
159
170
return nil
@@ -212,7 +223,7 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler {
212
223
eventLoop. makeFutureWithTask {
213
224
// note: We're _not_ on an EventLoop thread here
214
225
switch data {
215
- case let . chunk( data) :
226
+ case . chunk( let data) :
216
227
await sink. send ( data)
217
228
case . finish:
218
229
sink. finish ( )
@@ -255,18 +266,15 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler {
255
266
256
267
extension FileHandle {
257
268
func fileContentStream( eventLoop: EventLoop ) throws -> FileContentStream {
258
- let asyncBytes = try FileContentStream (
259
- fileDescriptor: self . fileDescriptor,
260
- eventLoop: eventLoop
261
- )
269
+ let asyncBytes = try FileContentStream ( fileDescriptor: self . fileDescriptor, eventLoop: eventLoop)
262
270
try self . close ( )
263
271
return asyncBytes
264
272
}
265
273
}
266
274
267
275
extension FileContentStream {
268
276
var lines : AsyncByteBufferLineSequence < FileContentStream > {
269
- AsyncByteBufferLineSequence (
277
+ return AsyncByteBufferLineSequence (
270
278
self ,
271
279
dropTerminator: true ,
272
280
maximumAllowableBufferSize: 1024 * 1024 ,
@@ -281,7 +289,7 @@ extension AsyncSequence where Element == ByteBuffer, Self: Sendable {
281
289
maximumAllowableBufferSize: Int = 1024 * 1024 ,
282
290
dropLastChunkIfNoNewline: Bool = false
283
291
) -> AsyncByteBufferLineSequence < Self > {
284
- AsyncByteBufferLineSequence (
292
+ return AsyncByteBufferLineSequence (
285
293
self ,
286
294
dropTerminator: dropTerminator,
287
295
maximumAllowableBufferSize: maximumAllowableBufferSize,
@@ -290,7 +298,7 @@ extension AsyncSequence where Element == ByteBuffer, Self: Sendable {
290
298
}
291
299
292
300
public var strings : AsyncMapSequence < Self , String > {
293
- self . map { String ( buffer: $0) }
301
+ return self . map { String ( buffer: $0) }
294
302
}
295
303
}
296
304
@@ -312,28 +320,26 @@ where Base: AsyncSequence, Base.Element == ByteBuffer {
312
320
313
321
struct Buffer {
314
322
private var buffer : [ ByteBuffer ] = [ ]
315
- private( set) var byteCount : Int = 0
323
+ internal private( set) var byteCount : Int = 0
316
324
317
325
mutating func append( _ buffer: ByteBuffer ) {
318
326
self . buffer. append ( buffer)
319
327
self . byteCount += buffer. readableBytes
320
328
}
321
329
322
330
func allButLast( ) -> ArraySlice < ByteBuffer > {
323
- self . buffer. dropLast ( )
331
+ return self . buffer. dropLast ( )
324
332
}
325
333
326
334
var byteCountButLast : Int {
327
- self . byteCount - ( self . buffer. last? . readableBytes ?? 0 )
335
+ return self . byteCount - ( self . buffer. last? . readableBytes ?? 0 )
328
336
}
329
337
330
338
var lastChunkView : ByteBufferView ? {
331
- self . buffer. last? . readableBytesView
339
+ return self . buffer. last? . readableBytesView
332
340
}
333
341
334
- mutating func concatenateEverything( upToLastChunkLengthToConsume lastLength: Int )
335
- -> ByteBuffer
336
- {
342
+ mutating func concatenateEverything( upToLastChunkLengthToConsume lastLength: Int ) -> ByteBuffer {
337
343
var output = ByteBuffer ( )
338
344
output. reserveCapacity ( lastLength + self . byteCountButLast)
339
345
@@ -359,7 +365,7 @@ where Base: AsyncSequence, Base.Element == ByteBuffer {
359
365
}
360
366
}
361
367
362
- init (
368
+ internal init (
363
369
underlying: Base . AsyncIterator ,
364
370
dropTerminator: Bool ,
365
371
maximumAllowableBufferSize: Int ,
@@ -446,7 +452,7 @@ where Base: AsyncSequence, Base.Element == ByteBuffer {
446
452
}
447
453
448
454
public func makeAsyncIterator( ) -> AsyncIterator {
449
- AsyncIterator (
455
+ return AsyncIterator (
450
456
underlying: self . underlying. makeAsyncIterator ( ) ,
451
457
dropTerminator: self . dropTerminator,
452
458
maximumAllowableBufferSize: self . maximumAllowableBufferSize,
0 commit comments