@@ -67,10 +67,13 @@ struct FileContentStream: AsyncSequence {
67
67
guard let blockingPool else {
68
68
throw IOError ( errnoValue: EINVAL)
69
69
}
70
- let fileHandle = NIOFileHandle ( descriptor: dupedFD)
70
+ let fileHandle = NIOLoopBound (
71
+ NIOFileHandle ( descriptor: dupedFD) ,
72
+ eventLoop: eventLoop
73
+ )
71
74
NonBlockingFileIO ( threadPool: blockingPool)
72
75
. readChunked (
73
- fileHandle: fileHandle,
76
+ fileHandle: fileHandle. value ,
74
77
byteCount: . max,
75
78
allocator: ByteBufferAllocator ( ) ,
76
79
eventLoop: eventLoop,
@@ -81,7 +84,7 @@ struct FileContentStream: AsyncSequence {
81
84
}
82
85
)
83
86
. whenComplete { result in
84
- try ! fileHandle. close ( )
87
+ try ! fileHandle. value . close ( )
85
88
switch result {
86
89
case let . failure( error) :
87
90
asyncChannel. fail ( error)
@@ -96,20 +99,16 @@ struct FileContentStream: AsyncSequence {
96
99
}
97
100
. withConnectedSocket ( dupedFD)
98
101
case S_IFIFO:
99
- let deadPipe = Pipe ( )
100
102
NIOPipeBootstrap ( group: eventLoop)
101
103
. channelInitializer { channel in
102
104
channel. pipeline. addHandler ( ReadIntoAsyncChannelHandler ( sink: asyncChannel) )
103
105
}
104
- . takingOwnershipOfDescriptors (
105
- input: dupedFD,
106
- output: dup ( deadPipe. fileHandleForWriting. fileDescriptor)
106
+ . takingOwnershipOfDescriptor (
107
+ input: dupedFD
107
108
)
108
109
. whenSuccess { channel in
109
110
channel. close ( mode: . output, promise: nil )
110
111
}
111
- try ! deadPipe. fileHandleForReading. close ( )
112
- try ! deadPipe. fileHandleForWriting. close ( )
113
112
case S_IFDIR:
114
113
throw IOError ( errnoValue: EISDIR)
115
114
case S_IFBLK, S_IFCHR, S_IFLNK:
@@ -206,25 +205,30 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler {
206
205
private func sendOneItem( _ data: ReceivedEvent , context: ChannelHandlerContext ) {
207
206
context. eventLoop. assertInEventLoop ( )
208
207
assert ( self . shouldRead == false , " sendOneItem in unexpected state \( self . state) " )
209
- context. eventLoop. makeFutureWithTask {
208
+ let eventLoop = context. eventLoop
209
+ let sink = self . sink
210
+ let `self` = NIOLoopBound ( self , eventLoop: context. eventLoop)
211
+ let context = NIOLoopBound ( context, eventLoop: context. eventLoop)
212
+ eventLoop. makeFutureWithTask {
213
+ // note: We're _not_ on an EventLoop thread here
210
214
switch data {
211
215
case let . chunk( data) :
212
- await self . sink. send ( data)
216
+ await sink. send ( data)
213
217
case . finish:
214
- self . sink. finish ( )
218
+ sink. finish ( )
215
219
}
216
220
} . map {
217
- if let moreToSend = self . state. didSendOne ( ) {
218
- self . sendOneItem ( moreToSend, context: context)
221
+ if let moreToSend = self . value . state. didSendOne ( ) {
222
+ self . value . sendOneItem ( moreToSend, context: context. value )
219
223
} else {
220
- if self . heldUpRead {
221
- context . eventLoop. execute {
222
- context. read ( )
224
+ if self . value . heldUpRead {
225
+ eventLoop. execute {
226
+ context. value . read ( )
223
227
}
224
228
}
225
229
}
226
230
} . whenFailure { error in
227
- self . state. fail ( error)
231
+ self . value . state. fail ( error)
228
232
}
229
233
}
230
234
@@ -268,7 +272,7 @@ extension FileContentStream {
268
272
}
269
273
}
270
274
271
- public extension AsyncSequence where Element == ByteBuffer {
275
+ public extension AsyncSequence where Element == ByteBuffer , Self : Sendable {
272
276
func splitIntoLines(
273
277
dropTerminator: Bool = true ,
274
278
maximumAllowableBufferSize: Int = 1024 * 1024 ,
0 commit comments