@@ -24,16 +24,25 @@ public struct _FileContentStream: AsyncSequence & Sendable {
24
24
public typealias Element = ByteBuffer
25
25
typealias Underlying = AsyncThrowingChannel < Element , Error >
26
26
27
- public func makeAsyncIterator( ) -> AsyncIterator {
28
- return AsyncIterator ( underlying: self . asyncChannel. makeAsyncIterator ( ) )
29
- }
30
-
31
- public struct AsyncIterator : AsyncIteratorProtocol {
27
+ public final class AsyncIterator : AsyncIteratorProtocol {
32
28
public typealias Element = ByteBuffer
33
29
30
+ deinit {
31
+ // This is painful and so wrong but unfortunately, our iterators don't have a cancel signal, so the only
32
+ // thing we can do is hope for `deinit` to be invoked :(.
33
+ // AsyncIteratorProtocol also doesn't support `~Copyable` so we also have to make this a class.
34
+ self . channel? . close ( promise: nil )
35
+ }
36
+
37
+ init ( underlying: Underlying . AsyncIterator , channel: ( any Channel ) ? ) {
38
+ self . underlying = underlying
39
+ self . channel = channel
40
+ }
41
+
34
42
var underlying : Underlying . AsyncIterator
43
+ let channel : ( any Channel ) ?
35
44
36
- public mutating func next( ) async throws -> ByteBuffer ? {
45
+ public func next( ) async throws -> ByteBuffer ? {
37
46
return try await self . underlying. next ( )
38
47
}
39
48
}
@@ -47,22 +56,41 @@ public struct _FileContentStream: AsyncSequence & Sendable {
47
56
}
48
57
49
58
private let asyncChannel : AsyncThrowingChannel < ByteBuffer , Error >
59
+ private let channel : ( any Channel ) ?
60
+
61
+ internal func isSameAs( _ other: FileContentStream ) -> Bool {
62
+ return ( self . asyncChannel === other. asyncChannel) && ( self . channel === other. channel)
63
+ }
64
+
65
+ public func makeAsyncIterator( ) -> AsyncIterator {
66
+ return AsyncIterator (
67
+ underlying: self . asyncChannel. makeAsyncIterator ( ) ,
68
+ channel: self . channel
69
+ )
70
+ }
71
+
72
+ public func close( ) async throws {
73
+ self . asyncChannel. finish ( )
74
+ do {
75
+ try await self . channel? . close ( ) . get ( )
76
+ } catch ChannelError . alreadyClosed {
77
+ // That's okay
78
+ }
79
+ }
50
80
51
81
public static func makeReader(
52
82
fileDescriptor: CInt ,
53
83
eventLoop: EventLoop = MultiThreadedEventLoopGroup . singleton. any ( ) ,
54
84
blockingPool: NIOThreadPool = . singleton
55
85
) async throws -> _FileContentStream {
56
- return try await eventLoop. submit {
57
- try FileContentStream ( fileDescriptor: fileDescriptor, eventLoop: eventLoop, blockingPool: blockingPool)
58
- } . get ( )
86
+ try await FileContentStream ( fileDescriptor: fileDescriptor, eventLoop: eventLoop, blockingPool: blockingPool)
59
87
}
60
88
61
89
internal init (
62
90
fileDescriptor: CInt ,
63
91
eventLoop: EventLoop ,
64
92
blockingPool: NIOThreadPool ? = nil
65
- ) throws {
93
+ ) async throws {
66
94
var statInfo : stat = . init( )
67
95
let statError = fstat ( fileDescriptor, & statInfo)
68
96
if statError != 0 {
@@ -79,7 +107,7 @@ public struct _FileContentStream: AsyncSequence & Sendable {
79
107
throw IOError ( errnoValue: EINVAL)
80
108
}
81
109
let fileHandle = NIOLoopBound (
82
- NIOFileHandle ( descriptor : dupedFD) ,
110
+ NIOFileHandle ( _deprecatedTakingOwnershipOfDescriptor : dupedFD) ,
83
111
eventLoop: eventLoop
84
112
)
85
113
NonBlockingFileIO ( threadPool: blockingPool)
@@ -103,23 +131,36 @@ public struct _FileContentStream: AsyncSequence & Sendable {
103
131
asyncChannel. finish ( )
104
132
}
105
133
}
134
+ self . channel = nil
106
135
case S_IFSOCK:
107
- _ = ClientBootstrap ( group: eventLoop)
136
+ self . channel = try await ClientBootstrap ( group: eventLoop)
108
137
. channelInitializer { channel in
109
- channel. pipeline. addHandler ( ReadIntoAsyncChannelHandler ( sink: asyncChannel) )
138
+ do {
139
+ try channel. pipeline. syncOperations. addHandler ( ReadIntoAsyncChannelHandler ( sink: asyncChannel) )
140
+ return channel. eventLoop. makeSucceededFuture ( ( ) )
141
+ } catch {
142
+ return channel. eventLoop. makeFailedFuture ( error)
143
+ }
110
144
}
111
145
. withConnectedSocket ( dupedFD)
146
+ . get ( )
112
147
case S_IFIFO:
113
- NIOPipeBootstrap ( group: eventLoop)
148
+ self . channel = try await NIOPipeBootstrap ( group: eventLoop)
114
149
. channelInitializer { channel in
115
- channel. pipeline. addHandler ( ReadIntoAsyncChannelHandler ( sink: asyncChannel) )
150
+ do {
151
+ try channel. pipeline. syncOperations. addHandler ( ReadIntoAsyncChannelHandler ( sink: asyncChannel) )
152
+ return channel. eventLoop. makeSucceededFuture ( ( ) )
153
+ } catch {
154
+ return channel. eventLoop. makeFailedFuture ( error)
155
+ }
116
156
}
117
157
. takingOwnershipOfDescriptor (
118
158
input: dupedFD
119
159
)
120
- . whenSuccess { channel in
160
+ . map { channel in
121
161
channel. close ( mode: . output, promise: nil )
122
- }
162
+ return channel
163
+ } . get ( )
123
164
case S_IFDIR:
124
165
throw IOError ( errnoValue: EISDIR)
125
166
case S_IFBLK, S_IFCHR, S_IFLNK:
@@ -265,8 +306,8 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler {
265
306
}
266
307
267
308
extension FileHandle {
268
- func fileContentStream( eventLoop: EventLoop ) throws -> FileContentStream {
269
- let asyncBytes = try FileContentStream ( fileDescriptor: self . fileDescriptor, eventLoop: eventLoop)
309
+ func fileContentStream( eventLoop: EventLoop ) async throws -> FileContentStream {
310
+ let asyncBytes = try await FileContentStream ( fileDescriptor: self . fileDescriptor, eventLoop: eventLoop)
270
311
try self . close ( )
271
312
return asyncBytes
272
313
}
0 commit comments