Skip to content

Commit 971281a

Browse files
committed
Various bug fixes have been made throughout the code base:
- Instead of calling `[UInt8].removeFirst` on `LineSequence`’s `underlyingBuffer`, we now use an index-based approach. This change improves overall streaming performance by eliminating the `O(n)` `.removeFirst()` operation. - The `safelyCloseMultiple` function has been updated to handle cases where the same `IODescriptor` is passed to multiple input/outputs. - On Linux, `_highest_possibly_open_fd_dir_linux()` has been updated to `_close_open_fd_linux()` to more efficiently emulate `POSIX_SPAWN_CLOEXEC_DEFAULT`.
1 parent bf17fa2 commit 971281a

File tree

4 files changed

+122
-55
lines changed

4 files changed

+122
-55
lines changed

Sources/Subprocess/AsyncBufferSequence.swift

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ extension AsyncBufferSequence {
143143
private var source: AsyncBufferSequence.AsyncIterator
144144
private var buffer: [Encoding.CodeUnit]
145145
private var underlyingBuffer: [Encoding.CodeUnit]
146+
private var underlyingBufferIndex: Array<Encoding.CodeUnit>.Index
146147
private var leftover: Encoding.CodeUnit?
147148
private var eofReached: Bool
148149
private let bufferingPolicy: BufferingPolicy
@@ -154,6 +155,7 @@ extension AsyncBufferSequence {
154155
self.source = underlyingIterator
155156
self.buffer = []
156157
self.underlyingBuffer = []
158+
self.underlyingBufferIndex = self.underlyingBuffer.startIndex
157159
self.leftover = nil
158160
self.eofReached = false
159161
self.bufferingPolicy = bufferingPolicy
@@ -208,13 +210,16 @@ extension AsyncBufferSequence {
208210
}
209211

210212
func nextFromSource() async throws -> Encoding.CodeUnit? {
211-
if underlyingBuffer.isEmpty {
213+
if underlyingBufferIndex >= underlyingBuffer.count {
212214
guard let buf = try await loadBuffer() else {
213215
return nil
214216
}
215217
underlyingBuffer = buf
218+
underlyingBufferIndex = buf.startIndex
216219
}
217-
return underlyingBuffer.removeFirst()
220+
let result = underlyingBuffer[underlyingBufferIndex]
221+
underlyingBufferIndex = underlyingBufferIndex.advanced(by: 1)
222+
return result
218223
}
219224

220225
func nextCodeUnit() async throws -> Encoding.CodeUnit? {

Sources/Subprocess/Configuration.swift

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -187,33 +187,73 @@ extension Configuration {
187187
) throws {
188188
var possibleError: (any Swift.Error)? = nil
189189

190+
// To avoid closing the same descriptor multiple times,
191+
// keep track of the list of descriptors that we have
192+
// already closed. If a `IODescriptor.Descriptor` is
193+
// already closed, mark that `IODescriptor` as closed
194+
// as opposed to actually try to close it.
195+
var remainingSet: Set<IODescriptor.Descriptor> = Set(
196+
optionalSequence: [
197+
inputRead?.descriptor,
198+
inputWrite?.descriptor,
199+
outputRead?.descriptor,
200+
outputWrite?.descriptor,
201+
errorRead?.descriptor,
202+
errorWrite?.descriptor,
203+
]
204+
)
205+
190206
do {
191-
try inputRead?.safelyClose()
207+
if remainingSet.tryRemove(inputRead?.descriptor) {
208+
try inputRead?.safelyClose()
209+
} else {
210+
try inputRead?.markAsClosed()
211+
}
192212
} catch {
193213
possibleError = error
194214
}
195215
do {
196-
try inputWrite?.safelyClose()
216+
if remainingSet.tryRemove(inputWrite?.descriptor) {
217+
try inputWrite?.safelyClose()
218+
} else {
219+
try inputWrite?.markAsClosed()
220+
}
197221
} catch {
198222
possibleError = error
199223
}
200224
do {
201-
try outputRead?.safelyClose()
225+
if remainingSet.tryRemove(outputRead?.descriptor) {
226+
try outputRead?.safelyClose()
227+
} else {
228+
try outputRead?.markAsClosed()
229+
}
202230
} catch {
203231
possibleError = error
204232
}
205233
do {
206-
try outputWrite?.safelyClose()
234+
if remainingSet.tryRemove(outputWrite?.descriptor) {
235+
try outputWrite?.safelyClose()
236+
} else {
237+
try outputWrite?.markAsClosed()
238+
}
207239
} catch {
208240
possibleError = error
209241
}
210242
do {
211-
try errorRead?.safelyClose()
243+
if remainingSet.tryRemove(errorRead?.descriptor) {
244+
try errorRead?.safelyClose()
245+
} else {
246+
try errorRead?.markAsClosed()
247+
}
212248
} catch {
213249
possibleError = error
214250
}
215251
do {
216-
try errorWrite?.safelyClose()
252+
if remainingSet.tryRemove(errorWrite?.descriptor) {
253+
try errorWrite?.safelyClose()
254+
} else {
255+
try errorWrite?.markAsClosed()
256+
}
217257
} catch {
218258
possibleError = error
219259
}
@@ -729,6 +769,10 @@ internal struct IODescriptor: ~Copyable {
729769
#endif
730770
}
731771

772+
internal mutating func markAsClosed() throws {
773+
self.closeWhenDone = false
774+
}
775+
732776
deinit {
733777
guard self.closeWhenDone else {
734778
return
@@ -1077,3 +1121,17 @@ extension _OrderedSet: Sequence {
10771121
return self.elements.makeIterator()
10781122
}
10791123
}
1124+
1125+
extension Set {
1126+
init<S>(optionalSequence sequence: S) where S: Sequence, S.Element == Optional<Self.Element> {
1127+
let sequence: [Self.Element] = sequence.compactMap(\.self)
1128+
self.init(sequence)
1129+
}
1130+
1131+
mutating func tryRemove(_ element: Self.Element?) -> Bool {
1132+
guard let element else {
1133+
return false
1134+
}
1135+
return self.remove(element) != nil
1136+
}
1137+
}

Sources/_SubprocessCShims/process_shims.c

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -431,68 +431,70 @@ static int _positive_int_parse(const char *str) {
431431
}
432432

433433
#if defined(__linux__)
434-
// Linux-specific version that uses syscalls directly and doesn't allocate heap memory.
434+
// Close all file descriptors listed under `fd_dir`
435435
// Safe to use after vfork() and before execve()
436-
static int _highest_possibly_open_fd_dir_linux(const char *fd_dir) {
437-
int highest_fd_so_far = 0;
438-
int dir_fd = open(fd_dir, O_RDONLY);
439-
if (dir_fd < 0) {
440-
// errno set by `open`.
441-
return -1;
442-
}
443-
436+
static void _close_open_fd_linux(const char *fd_dir) {
444437
// Buffer for directory entries - allocated on stack, no heap allocation
445438
char buffer[4096] = {0};
439+
int fds_to_close[4096] = {0};
446440

447441
while (1) {
442+
int dir_fd = open(fd_dir, O_RDONLY);
443+
if (dir_fd < 0) {
444+
return;
445+
}
446+
447+
int found_any_fds = 0;
448+
448449
long bytes_read = _getdents64(dir_fd, (struct linux_dirent64 *)buffer, sizeof(buffer));
449450
if (bytes_read < 0) {
451+
close(dir_fd);
450452
if (errno == EINTR) {
451453
continue;
452454
} else {
453-
// `errno` set by _getdents64.
454-
highest_fd_so_far = -1;
455-
close(dir_fd);
456-
return highest_fd_so_far;
455+
return;
457456
}
458457
}
459458
if (bytes_read == 0) {
460459
close(dir_fd);
461-
return highest_fd_so_far;
460+
return;
462461
}
463462
long offset = 0;
463+
int fds_to_close_index = 0;
464464
while (offset < bytes_read) {
465465
struct linux_dirent64 *entry = (struct linux_dirent64 *)(buffer + offset);
466466

467467
// Skip "." and ".." entries
468468
if (entry->d_name[0] != '.') {
469-
int number = _positive_int_parse(entry->d_name);
470-
if (number > highest_fd_so_far) {
471-
highest_fd_so_far = number;
469+
int fd = _positive_int_parse(entry->d_name);
470+
if (fd != dir_fd && fd > STDERR_FILENO) {
471+
fds_to_close[fds_to_close_index] = fd;
472+
fds_to_close_index += 1;
473+
found_any_fds = 1;
472474
}
473475
}
474476

475477
offset += entry->d_reclen;
476478
}
477-
}
478479

479-
close(dir_fd);
480-
return highest_fd_so_far;
480+
// After we finish iteration, close the fds
481+
for (int i = 0; i < fds_to_close_index; i++) {
482+
close(fds_to_close[i]);
483+
}
484+
// Close directory fd
485+
close(dir_fd);
486+
487+
// If we didn't find any FDs (other than dir_fd), we're done
488+
if (found_any_fds == 0) {
489+
return;
490+
}
491+
}
481492
}
482493
#endif
483494

484-
// This function is only used on systems with Linux kernel 5.9 or lower.
485-
// On newer systems, `close_range` is used instead.
495+
// This function is only used on non-Linux systems.
486496
static int _highest_possibly_open_fd(void) {
487-
#if defined(__linux__)
488-
int hi = _highest_possibly_open_fd_dir_linux("/dev/fd");
489-
if (hi < 0) {
490-
hi = sysconf(_SC_OPEN_MAX);
491-
}
492-
#else
493-
int hi = sysconf(_SC_OPEN_MAX);
494-
#endif
495-
return hi;
497+
return sysconf(_SC_OPEN_MAX);
496498
}
497499

498500
int _subprocess_fork_exec(
@@ -692,13 +694,18 @@ int _subprocess_fork_exec(
692694
rc = closefrom(pipefd[1] + 1);
693695
#endif
694696
if (rc != 0) {
697+
#if defined(__linux__)
698+
_close_open_fd_linux("/dev/fd");
699+
#else
695700
// close_range failed (or doesn't exist), fall back to close()
696-
for (int fd = STDERR_FILENO + 1; fd <= _highest_possibly_open_fd(); fd++) {
701+
int highest_open_fd = _highest_possibly_open_fd();
702+
for (int fd = STDERR_FILENO + 1; fd <= highest_open_fd; fd++) {
697703
// We must NOT close pipefd[1] for writing errors
698704
if (fd != pipefd[1]) {
699705
close(fd);
700706
}
701707
}
708+
#endif
702709
}
703710

704711
// Finally, exec

Tests/SubprocessTests/IntegrationTests.swift

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1337,22 +1337,19 @@ extension SubprocessIntegrationTests {
13371337
options: .create,
13381338
permissions: [.ownerReadWrite, .groupReadWrite]
13391339
)
1340-
let echoResult = try await outputFile.closeAfter {
1341-
let echoResult = try await _run(
1342-
setup,
1343-
input: .none,
1344-
output: .fileDescriptor(
1345-
outputFile,
1346-
closeAfterSpawningProcess: false
1347-
),
1348-
error: .fileDescriptor(
1349-
outputFile,
1350-
closeAfterSpawningProcess: false
1351-
)
1340+
let echoResult = try await _run(
1341+
setup,
1342+
input: .none,
1343+
output: .fileDescriptor(
1344+
outputFile,
1345+
closeAfterSpawningProcess: true
1346+
),
1347+
error: .fileDescriptor(
1348+
outputFile,
1349+
closeAfterSpawningProcess: true
13521350
)
1353-
#expect(echoResult.terminationStatus.isSuccess)
1354-
return echoResult
1355-
}
1351+
)
1352+
#expect(echoResult.terminationStatus.isSuccess)
13561353
let outputData: Data = try Data(
13571354
contentsOf: URL(filePath: outputFilePath.string)
13581355
)

0 commit comments

Comments
 (0)