23
23
import Glibc
24
24
#elseif canImport(Android)
25
25
import Android
26
+ import posix_filesystem. sys_epoll
26
27
#elseif canImport(Musl)
27
28
import Musl
28
29
#endif
@@ -40,7 +41,7 @@ final class AsyncIO: Sendable {
40
41
41
42
typealias OutputStream = AsyncThrowingStream < AsyncBufferSequence . Buffer , any Error >
42
43
43
- private final class MonitorThreadContext {
44
+ private struct MonitorThreadContext : Sendable {
44
45
let epollFileDescriptor : CInt
45
46
let shutdownFileDescriptor : CInt
46
47
@@ -95,7 +96,7 @@ final class AsyncIO: Sendable {
95
96
events: EPOLLIN . rawValue,
96
97
data: epoll_data ( fd: shutdownFileDescriptor)
97
98
)
98
- var rc = epoll_ctl (
99
+ let rc = epoll_ctl (
99
100
epollFileDescriptor,
100
101
EPOLL_CTL_ADD,
101
102
shutdownFileDescriptor,
@@ -117,76 +118,71 @@ final class AsyncIO: Sendable {
117
118
epollFileDescriptor: epollFileDescriptor,
118
119
shutdownFileDescriptor: shutdownFileDescriptor
119
120
)
120
- let threadContext = Unmanaged . passRetained ( context)
121
- var thread : pthread_t = pthread_t ( )
122
- rc = pthread_create ( & thread, nil , { args in
123
- func reportError( _ error: SubprocessError ) {
124
- _registration. withLock { store in
125
- for continuation in store. values {
126
- continuation. finish ( throwing: error)
121
+ let thread : pthread_t
122
+ do {
123
+ thread = try pthread_create {
124
+ func reportError( _ error: SubprocessError ) {
125
+ _registration. withLock { store in
126
+ for continuation in store. values {
127
+ continuation. finish ( throwing: error)
128
+ }
127
129
}
128
130
}
129
- }
130
-
131
- let unmanaged = Unmanaged< MonitorThreadContext> . fromOpaque( args!)
132
- let context = unmanaged. takeRetainedValue ( )
133
-
134
- var events : [ epoll_event ] = Array (
135
- repeating: epoll_event ( events: 0 , data: epoll_data ( fd: 0 ) ) ,
136
- count: _epollEventSize
137
- )
138
131
139
- // Enter the monitor loop
140
- monitorLoop: while true {
141
- let eventCount = epoll_wait (
142
- context. epollFileDescriptor,
143
- & events,
144
- CInt ( events. count) ,
145
- - 1
132
+ var events : [ epoll_event ] = Array (
133
+ repeating: epoll_event ( events: 0 , data: epoll_data ( fd: 0 ) ) ,
134
+ count: _epollEventSize
146
135
)
147
- if eventCount < 0 {
148
- if errno == EINTR || errno == EAGAIN {
149
- continue // interrupted by signal; try again
150
- }
151
- // Report other errors
152
- let error = SubprocessError (
153
- code: . init( . asyncIOFailed(
154
- " epoll_wait failed " )
155
- ) ,
156
- underlyingError: . init( rawValue: errno)
157
- )
158
- reportError ( error)
159
- break monitorLoop
160
- }
161
136
162
- for index in 0 ..< Int ( eventCount) {
163
- let event = events [ index]
164
- let targetFileDescriptor = event. data. fd
165
- // Breakout the monitor loop if we received shutdown
166
- // from the shutdownFD
167
- if targetFileDescriptor == context. shutdownFileDescriptor {
168
- var buf : UInt64 = 0
169
- _ = _SubprocessCShims. read ( context. shutdownFileDescriptor, & buf, MemoryLayout< UInt64> . size)
137
+ // Enter the monitor loop
138
+ monitorLoop: while true {
139
+ let eventCount = epoll_wait (
140
+ context. epollFileDescriptor,
141
+ & events,
142
+ CInt ( events. count) ,
143
+ - 1
144
+ )
145
+ if eventCount < 0 {
146
+ if errno == EINTR || errno == EAGAIN {
147
+ continue // interrupted by signal; try again
148
+ }
149
+ // Report other errors
150
+ let error = SubprocessError (
151
+ code: . init( . asyncIOFailed(
152
+ " epoll_wait failed " )
153
+ ) ,
154
+ underlyingError: . init( rawValue: errno)
155
+ )
156
+ reportError ( error)
170
157
break monitorLoop
171
158
}
172
159
173
- // Notify the continuation
174
- let continuation = _registration. withLock { store -> SignalStream . Continuation ? in
175
- if let continuation = store [ targetFileDescriptor] {
176
- return continuation
160
+ for index in 0 ..< Int ( eventCount) {
161
+ let event = events [ index]
162
+ let targetFileDescriptor = event. data. fd
163
+ // Breakout the monitor loop if we received shutdown
164
+ // from the shutdownFD
165
+ if targetFileDescriptor == context. shutdownFileDescriptor {
166
+ var buf : UInt64 = 0
167
+ _ = _subprocess_read ( context. shutdownFileDescriptor, & buf, MemoryLayout< UInt64> . size)
168
+ break monitorLoop
177
169
}
178
- return nil
170
+
171
+ // Notify the continuation
172
+ let continuation = _registration. withLock { store -> SignalStream . Continuation ? in
173
+ if let continuation = store [ targetFileDescriptor] {
174
+ return continuation
175
+ }
176
+ return nil
177
+ }
178
+ continuation? . yield ( true )
179
179
}
180
- continuation? . yield ( true )
181
180
}
182
181
}
183
-
184
- return nil
185
- } , threadContext. toOpaque ( ) )
186
- guard rc == 0 else {
182
+ } catch let underlyingError {
187
183
let error = SubprocessError (
188
184
code: . init( . asyncIOFailed( " Failed to create monitor thread " ) ) ,
189
- underlyingError: . init ( rawValue : rc )
185
+ underlyingError: underlyingError
190
186
)
191
187
self . state = . failure( error)
192
188
return
@@ -211,14 +207,14 @@ final class AsyncIO: Sendable {
211
207
212
208
var one : UInt64 = 1
213
209
// Wake up the thread for shutdown
214
- _ = _SubprocessCShims . write ( currentState. shutdownFileDescriptor, & one, MemoryLayout< UInt64> . stride)
210
+ _ = _subprocess_write ( currentState. shutdownFileDescriptor, & one, MemoryLayout< UInt64> . stride)
215
211
// Cleanup the monitor thread
216
212
pthread_join ( currentState. monitorThread, nil )
217
213
var closeError : CInt = 0
218
- if _SubprocessCShims . close ( currentState. epollFileDescriptor) != 0 {
214
+ if _subprocess_close ( currentState. epollFileDescriptor) != 0 {
219
215
closeError = errno
220
216
}
221
- if _SubprocessCShims . close ( currentState. shutdownFileDescriptor) != 0 {
217
+ if _subprocess_close ( currentState. shutdownFileDescriptor) != 0 {
222
218
closeError = errno
223
219
}
224
220
if closeError != 0 {
@@ -231,7 +227,7 @@ final class AsyncIO: Sendable {
231
227
_ fileDescriptor: FileDescriptor ,
232
228
for event: Event
233
229
) -> SignalStream {
234
- return SignalStream { continuation in
230
+ return SignalStream { ( continuation: SignalStream . Continuation ) -> ( ) in
235
231
// If setup failed, nothing much we can do
236
232
switch self . state {
237
233
case . success( let state) :
@@ -261,9 +257,9 @@ final class AsyncIO: Sendable {
261
257
let targetEvent : EPOLL_EVENTS
262
258
switch event {
263
259
case . read:
264
- targetEvent = EPOLLIN
260
+ targetEvent = EPOLL_EVENTS ( EPOLLIN)
265
261
case . write:
266
- targetEvent = EPOLLOUT
262
+ targetEvent = EPOLL_EVENTS ( EPOLLOUT)
267
263
}
268
264
269
265
var event = epoll_event (
@@ -369,7 +365,7 @@ extension AsyncIO {
369
365
let offsetAddress = bufferPointer. baseAddress!. advanced ( by: readLength)
370
366
371
367
// Read directly into the buffer at the offset
372
- return _SubprocessCShims . read ( fileDescriptor. rawValue, offsetAddress, targetCount)
368
+ return _subprocess_read ( fileDescriptor. rawValue, offsetAddress, targetCount)
373
369
}
374
370
if bytesRead > 0 {
375
371
// Read some data
@@ -435,7 +431,7 @@ extension AsyncIO {
435
431
let written = bytes. withUnsafeBytes { ptr in
436
432
let remainingLength = ptr. count - writtenLength
437
433
let startPtr = ptr. baseAddress!. advanced ( by: writtenLength)
438
- return _SubprocessCShims . write ( fileDescriptor. rawValue, startPtr, remainingLength)
434
+ return _subprocess_write ( fileDescriptor. rawValue, startPtr, remainingLength)
439
435
}
440
436
if written > 0 {
441
437
writtenLength += written
@@ -478,7 +474,7 @@ extension AsyncIO {
478
474
let written = span. withUnsafeBytes { ptr in
479
475
let remainingLength = ptr. count - writtenLength
480
476
let startPtr = ptr. baseAddress!. advanced ( by: writtenLength)
481
- return _SubprocessCShims . write ( fileDescriptor. rawValue, startPtr, remainingLength)
477
+ return _subprocess_write ( fileDescriptor. rawValue, startPtr, remainingLength)
482
478
}
483
479
if written > 0 {
484
480
writtenLength += written
0 commit comments