Skip to content

Commit 72fd6ce

Browse files
committed
Start implementation of stdin redirect from a stream reader
1 parent ed7195f commit 72fd6ce

File tree

6 files changed

+316
-47
lines changed

6 files changed

+316
-47
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import Foundation
2+
#if canImport(System)
3+
import System
4+
#else
5+
import SystemPackage
6+
#endif
7+
8+
import StreamReader
9+
10+
11+
12+
/**
13+
How to redirect the output file descriptors? */
14+
public enum InputRedirectMode {
15+
16+
/** The stream will be left as-is. */
17+
case none
18+
/** This is the equivalent of using ``InputRedirectMode/send(_:)`` with an empty data. */
19+
case fromNull
20+
/** Send the given data to the subprocess (done with a pipe). */
21+
case sendFromReader(DataReader)
22+
/**
23+
The stream should be redirected from this fd.
24+
If `giveOwnership` is true, the fd will be closed when the process has run.
25+
Otherwise it is your responsability to close it when needed. */
26+
case fromFd(FileDescriptor, giveOwnership: Bool)
27+
28+
public static func send(_ data: Data) -> Self {
29+
let reader = DataReader(data: data)
30+
return .sendFromReader(reader)
31+
}
32+
33+
}

Sources/ProcessInvocation/RedirectMode.swift renamed to Sources/ProcessInvocation/OutputRedirectMode.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import SystemPackage
99

1010
/**
1111
How to redirect the output file descriptors? */
12-
public enum RedirectMode {
12+
public enum OutputRedirectMode {
1313

1414
/**
1515
The stream will be left as-is:
@@ -19,7 +19,7 @@ public enum RedirectMode {
1919
case capture
2020
/**
2121
The stream should be redirected to this fd.
22-
If `giveOwnership` is true, the fd will be closed when the process is run.
22+
If `giveOwnership` is true, the fd will be closed when the process has run.
2323
Otherwise it is your responsability to close it when needed. */
2424
case toFd(FileDescriptor, giveOwnership: Bool)
2525

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import Foundation
2+
#if canImport(System)
3+
import System
4+
#else
5+
import SystemPackage
6+
#endif
7+
8+
import SignalHandling
9+
10+
11+
12+
extension ProcessInvocation {
13+
14+
@available(*, deprecated, message: "Use the new InputRedirectMode for stdin redirect.")
15+
public init(
16+
_ executable: FilePath, _ args: String..., usePATH: Bool = true, customPATH: [FilePath]?? = nil,
17+
workingDirectory: URL? = nil, environment: [String: String]? = nil,
18+
stdin: FileDescriptor?, stdoutRedirect: OutputRedirectMode = .capture, stderrRedirect: OutputRedirectMode = .capture,
19+
signalsToProcess: Set<Signal> = Signal.toForwardToSubprocesses,
20+
signalHandling: @escaping (Signal) -> SignalHandling = { .default(for: $0) },
21+
fileDescriptorsToSend: [FileDescriptor /* Value in **child** */: FileDescriptor /* Value in **parent** */] = [:],
22+
additionalOutputFileDescriptors: Set<FileDescriptor> = [],
23+
lineSeparators: LineSeparators = .default,
24+
shouldContinueStreamHandler: ((_ line: RawLineWithSource, _ process: Process) -> Bool)? = nil,
25+
expectedTerminations: [(Int32, Process.TerminationReason)]?? = nil
26+
) {
27+
self.init(
28+
executable, args: args, usePATH: usePATH, customPATH: customPATH,
29+
workingDirectory: workingDirectory, environment: environment,
30+
stdinRedirect: stdin.flatMap{ .fromFd($0, giveOwnership: false) } ?? .fromNull, stdoutRedirect: stdoutRedirect, stderrRedirect: stderrRedirect,
31+
signalsToProcess: signalsToProcess,
32+
signalHandling: signalHandling,
33+
fileDescriptorsToSend: fileDescriptorsToSend,
34+
additionalOutputFileDescriptors: additionalOutputFileDescriptors,
35+
lineSeparators: lineSeparators,
36+
shouldContinueStreamHandler: shouldContinueStreamHandler,
37+
expectedTerminations: expectedTerminations
38+
)
39+
}
40+
41+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import Foundation
2+
#if canImport(System)
3+
import System
4+
#else
5+
import SystemPackage
6+
#endif
7+
8+
import StreamReader
9+
10+
11+
12+
public extension ProcessInvocation {
13+
14+
/**
15+
Returns a simple pipe.
16+
Different than using the `Pipe()` object from Foundation because you get control on when the fds are closed.
17+
18+
- Important: The `FileDescriptor`s returned **must** be closed manually. */
19+
static func unownedPipe() throws -> (fdRead: FileDescriptor, fdWrite: FileDescriptor) {
20+
let pipepointer = UnsafeMutablePointer<CInt>.allocate(capacity: 2)
21+
defer {pipepointer.deallocate()}
22+
pipepointer.initialize(to: -1)
23+
24+
guard pipe(pipepointer) == 0 else {
25+
throw Err.systemError(Errno(rawValue: errno))
26+
}
27+
28+
let fdRead = pipepointer.advanced(by: 0).pointee
29+
let fdWrite = pipepointer.advanced(by: 1).pointee
30+
assert(fdRead != -1 && fdWrite != -1)
31+
32+
return (FileDescriptor(rawValue: fdRead), FileDescriptor(rawValue: fdWrite))
33+
}
34+
35+
/**
36+
Returns a the read end of a pipe that will stream everything from the stream reader.
37+
38+
The data will be read from the stream and sent along to the pipe.
39+
At most `maxCacheSize` bytes will be kept in memory from the stream (assuming the stream does not have a bigger internal cache).
40+
41+
If at any point reading from the stream fails, an error is logged and the write end of the pipe is closed, effectively ending the data sent in the read end.
42+
43+
- Important: If the fd is sent to a subprocess, the fd must be closed after the subprocess is launched. */
44+
static func readFdOfPipeForStreaming(dataFromReader reader: StreamReader, maxCacheSize: Int = .max) throws -> FileDescriptor {
45+
let pipe = try ProcessInvocation.unownedPipe()
46+
47+
/* If the reader is a DataReader and the source data is 0 we can skip the writing and directly close the write fd.
48+
* We only check this special case as reading from the reader can be a blocking operation and we want to avoid that in an init. */
49+
if (reader as? DataReader)?.sourceDataSize != 0 {
50+
let fhWrite = FileHandle(fileDescriptor: pipe.fdWrite.rawValue)
51+
fhWrite.writeabilityHandler = { fh in
52+
let closeFH = {
53+
fhWrite.writeabilityHandler = nil
54+
if Darwin.close(fh.fileDescriptor) == -1 {
55+
Conf.logger?.error("Failed closing write end of fd for pipe to swift; pipe might stay open forever.", metadata: ["errno": "\(errno)", "errno-str": "\(Errno(rawValue: errno).localizedDescription)"])
56+
}
57+
}
58+
59+
do {
60+
try reader.peekData(size: Swift.max(0, maxCacheSize - (reader.currentStreamReadPosition - reader.currentReadPosition)), allowReadingLess: true, { _ in })
61+
try reader.peekData(size: reader.currentStreamReadPosition - reader.currentReadPosition, allowReadingLess: false, { bytes in
62+
let (writtenNow, readError) = {
63+
guard bytes.count > 0 else {
64+
return (0, Int32(0))
65+
}
66+
67+
var ret: Int
68+
repeat {
69+
Conf.logger?.trace("Trying to write on write end of pipe.", metadata: ["bytes_count": "\(bytes.count)"])
70+
ret = Darwin.write(fh.fileDescriptor, bytes.baseAddress!, bytes.count)
71+
} while ret == -1 && errno == EINTR
72+
return (ret, errno)
73+
}()
74+
75+
if writtenNow > 0 {
76+
do {try reader.readData(size: writtenNow, allowReadingLess: false, { _ in /*nop: we only update the current read position.*/ })}
77+
catch {Conf.logger?.critical("Invalid StreamReader (or internal logic error)! Reading from the stream failed but the data should already be in the buffer.")}
78+
} else if writtenNow < 0 {
79+
if [EAGAIN, EWOULDBLOCK].contains(readError) {
80+
/* We ignore the write error and let the writeabilityHandler call us back (let’s hope it will!). */
81+
Conf.logger?.debug("Failed write end of fd for pipe to swift with temporary error (EAGAIN or EWOULDBLOCK). We wait for the writeabilityHandler to be called again.", metadata: ["errno": "\(readError)", "errno-str": "\(Errno(rawValue: readError).localizedDescription)"])
82+
} else {
83+
Conf.logger?.error("Failed writing in fd for pipe. We close the stream now.", metadata: ["errno": "\(readError)", "errno-str": "\(Errno(rawValue: readError).localizedDescription)"])
84+
closeFH()
85+
}
86+
}
87+
88+
if reader.streamHasReachedEOF, reader.currentReadPosition == reader.currentStreamReadPosition {
89+
/* We have reached the end of the stream; let’s close the stream! */
90+
Conf.logger?.trace("Closing write end of pipe fd.")
91+
closeFH()
92+
}
93+
})
94+
} catch {
95+
Conf.logger?.error("Failed reading from the stream for writing to pipe. Bailing.", metadata: ["error": "\(error)"])
96+
closeFH()
97+
}
98+
}
99+
} else {
100+
try pipe.fdWrite.close()
101+
}
102+
103+
return pipe.fdRead
104+
}
105+
106+
static func readFdOfPipeForStreaming(data: Data) throws -> FileDescriptor {
107+
return try readFdOfPipeForStreaming(dataFromReader: DataReader(data: data))
108+
}
109+
110+
}

Sources/ProcessInvocation/ProcessInvocation.swift

Lines changed: 39 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ import CMacroExports
4040
Some signals are forwarded by default.
4141

4242
IMHO the signal forwarding method, though a bit more complex (in this case, a lot of the complexity is hidden by this object),
43-
is better than using the same PGID than the parent for the child.
43+
is better than using the same PGID than the parent for the child.
4444
In a shell, if a long running process is launched from a bash script, and said bash script is killed using a signal
45-
(but from another process sending a signal, not from the tty), the child won’t be killed!
45+
(but from another process sending a signal, not from the tty), the child won’t be killed!
4646
Using signal forwarding, it will.
4747

4848
Some interesting links:
@@ -62,10 +62,10 @@ import CMacroExports
6262
It is true on the `main` branch though (2021-04-01).
6363

6464
- Important: All of the `additionalOutputFileDescriptors` are closed when the end of their respective stream are reached
65-
(i.e. the function takes “ownership” of the file descriptors).
65+
(i.e. the function takes “ownership” of the file descriptors).
6666
Maybe later we’ll add an option not to close at end of the stream.
6767
Additionally on Linux the fds will be set non-blocking
68-
(clients should not care as they have given up ownership of the fd, but it’s still good to know IMHO).
68+
(clients should not care as they have given up ownership of the fd, but it’s still good to know IMHO).
6969

7070
- Important: AFAICT the absolute ref for `PATH` resolution is [from exec function in FreeBSD source](https://opensource.apple.com/source/Libc/Libc-1439.100.3/gen/FreeBSD/exec.c.auto.html) (end of file).
7171
Sadly `Process` does not report the actual errors and seem to always report “File not found” errors when the executable cannot be run.
@@ -140,9 +140,9 @@ public struct ProcessInvocation : AsyncSequence {
140140
public var workingDirectory: URL? = nil
141141
public var environment: [String: String]? = nil
142142

143-
public var stdin: FileDescriptor? = nil
144-
public var stdoutRedirect: RedirectMode = .capture
145-
public var stderrRedirect: RedirectMode = .capture
143+
public var stdinRedirect: InputRedirectMode = .none
144+
public var stdoutRedirect: OutputRedirectMode = .capture
145+
public var stderrRedirect: OutputRedirectMode = .capture
146146

147147
public var signalsToProcess: Set<Signal> = Signal.toForwardToSubprocesses
148148
public var signalHandling: (Signal) -> SignalHandling
@@ -214,7 +214,7 @@ public struct ProcessInvocation : AsyncSequence {
214214
public init(
215215
_ executable: FilePath, _ args: String..., usePATH: Bool = true, customPATH: [FilePath]?? = nil,
216216
workingDirectory: URL? = nil, environment: [String: String]? = nil,
217-
stdin: FileDescriptor? = nil, stdoutRedirect: RedirectMode = .capture, stderrRedirect: RedirectMode = .capture,
217+
stdinRedirect: InputRedirectMode = .fromNull, stdoutRedirect: OutputRedirectMode = .capture, stderrRedirect: OutputRedirectMode = .capture,
218218
signalsToProcess: Set<Signal> = Signal.toForwardToSubprocesses,
219219
signalHandling: @escaping (Signal) -> SignalHandling = { .default(for: $0) },
220220
fileDescriptorsToSend: [FileDescriptor /* Value in **child** */: FileDescriptor /* Value in **parent** */] = [:],
@@ -226,7 +226,7 @@ public struct ProcessInvocation : AsyncSequence {
226226
self.init(
227227
executable, args: args, usePATH: usePATH, customPATH: customPATH,
228228
workingDirectory: workingDirectory, environment: environment,
229-
stdin: stdin, stdoutRedirect: stdoutRedirect, stderrRedirect: stderrRedirect,
229+
stdinRedirect: stdinRedirect, stdoutRedirect: stdoutRedirect, stderrRedirect: stderrRedirect,
230230
signalsToProcess: signalsToProcess,
231231
signalHandling: signalHandling,
232232
fileDescriptorsToSend: fileDescriptorsToSend,
@@ -249,7 +249,7 @@ public struct ProcessInvocation : AsyncSequence {
249249
public init(
250250
_ executable: FilePath, args: [String], usePATH: Bool = true, customPATH: [FilePath]?? = nil,
251251
workingDirectory: URL? = nil, environment: [String: String]? = nil,
252-
stdin: FileDescriptor? = nil, stdoutRedirect: RedirectMode = .capture, stderrRedirect: RedirectMode = .capture,
252+
stdinRedirect: InputRedirectMode = .fromNull, stdoutRedirect: OutputRedirectMode = .capture, stderrRedirect: OutputRedirectMode = .capture,
253253
signalsToProcess: Set<Signal> = Signal.toForwardToSubprocesses,
254254
signalHandling: @escaping (Signal) -> SignalHandling = { .default(for: $0) },
255255
fileDescriptorsToSend: [FileDescriptor /* Value in **child** */: FileDescriptor /* Value in **parent** */] = [:],
@@ -266,7 +266,7 @@ public struct ProcessInvocation : AsyncSequence {
266266
self.workingDirectory = workingDirectory
267267
self.environment = environment
268268

269-
self.stdin = stdin
269+
self.stdinRedirect = stdinRedirect
270270
self.stdoutRedirect = stdoutRedirect
271271
self.stderrRedirect = stderrRedirect
272272

@@ -391,8 +391,9 @@ public struct ProcessInvocation : AsyncSequence {
391391
You retrieve the process and a dispatch group you can wait on to be notified when the process and all of its outputs are done.
392392
You can also set the termination handler of the process, but you should wait on the dispatch group to be sure all of the outputs have finished streaming. */
393393
public func invoke(outputHandler: @escaping (_ result: Result<RawLineWithSource, Error>, _ signalEndOfInterestForStream: () -> Void, _ process: Process) -> Void, terminationHandler: ((_ process: Process) -> Void)? = nil) throws -> (Process, DispatchGroup) {
394+
assert(!fileDescriptorsToSend.values.contains(.standardInput), "Standard input must be modified using stdinRedirect")
394395
assert(!fileDescriptorsToSend.values.contains(.standardOutput), "Standard output must be modified using stdoutRedirect")
395-
assert(!fileDescriptorsToSend.values.contains(.standardError), "Standard error must be modified using stderrRedirect")
396+
assert(!fileDescriptorsToSend.values.contains(.standardError), "Standard error must be modified using stderrRedirect")
396397

397398
let g = DispatchGroup()
398399
#if canImport(eXtenderZ)
@@ -564,8 +565,17 @@ public struct ProcessInvocation : AsyncSequence {
564565

565566
let actualExecutablePath: FilePath
566567
if fileDescriptorsToSend.isEmpty {
567-
/* We add closeOnDealloc:false to be explicit, but it’s the default. */
568-
p.standardInput = stdin.flatMap{ FileHandle(fileDescriptor: $0.rawValue, closeOnDealloc: false) }
568+
switch stdinRedirect {
569+
case .none: (/*nop*/)
570+
case .fromNull: p.standardInput = nil
571+
case .sendFromReader(let reader):
572+
let fd = try Self.readFdOfPipeForStreaming(dataFromReader: reader, maxCacheSize: 32 * 1024 * 1024)
573+
p.standardInput = FileHandle(fileDescriptor: fd.rawValue, closeOnDealloc: false)
574+
fdsToCloseAfterRun.insert(fd)
575+
case .fromFd(let fd, let shouldClose):
576+
p.standardInput = FileHandle(fileDescriptor: fd.rawValue, closeOnDealloc: false)
577+
if shouldClose {fdsToCloseAfterRun.insert(fd)}
578+
}
569579
p.arguments = args
570580
actualExecutablePath = executable
571581
forcedPreprendedPATH = nil
@@ -585,11 +595,11 @@ public struct ProcessInvocation : AsyncSequence {
585595
}
586596

587597
/* The socket to send the fd.
588-
* The tuple thingy _should_ be _in effect_ equivalent to the C version `int sv[2] = {-1, -1};`.
589-
* <https://forums.swift.org/t/guarantee-in-memory-tuple-layout-or-dont/40122>
598+
* The tuple thingy _should_ be _in effect_ equivalent to the C version `int sv[2] = {-1, -1};`:
599+
* <https://forums.swift.org/t/guarantee-in-memory-tuple-layout-or-dont/40122>
590600
* Stride and alignment should be the equal for CInt.
591-
* Funnily, it seems to only work in debug compilation, not in release…
592-
* var sv: (CInt, CInt) = (-1, -1) */
601+
* Funnily, it seems to only work in debug compilation, not in release… */
602+
// var sv: (CInt, CInt) = (-1, -1) */
593603
let sv = UnsafeMutablePointer<CInt>.allocate(capacity: 2)
594604
sv.initialize(repeating: -1, count: 2)
595605
defer {sv.deallocate()}
@@ -620,10 +630,17 @@ public struct ProcessInvocation : AsyncSequence {
620630
p.standardInput = FileHandle(fileDescriptor: fd1.rawValue, closeOnDealloc: false)
621631
fdToSendFds = fd0
622632

623-
if fileDescriptorsToSend[FileDescriptor.standardInput] == nil {
624-
/* We must add stdin in the list of file descriptors to send so that
625-
* stdin is restored to its original value when the final process is exec’d from the bridge. */
626-
fileDescriptorsToSend[FileDescriptor.standardInput] = FileDescriptor.standardInput
633+
/* We must send the modified stdin in the list of file descriptors to send! */
634+
switch stdinRedirect {
635+
case .none: fileDescriptorsToSend[.standardInput] = .standardInput
636+
case .fromNull: (/*nop*/)
637+
case .sendFromReader(let reader):
638+
let fd = try Self.readFdOfPipeForStreaming(dataFromReader: reader, maxCacheSize: 32 * 1024 * 1024)
639+
fileDescriptorsToSend[.standardInput] = fd
640+
// fdsToCloseAfterRun.insert(fd)
641+
case .fromFd(let fd, let shouldClose):
642+
fileDescriptorsToSend[.standardInput] = fd
643+
// if shouldClose {fdsToCloseAfterRun.insert(fd)}
627644
}
628645
}
629646

@@ -772,26 +789,6 @@ public struct ProcessInvocation : AsyncSequence {
772789
return (p, g)
773790
}
774791

775-
/**
776-
Returns a simple pipe. Different than using the `Pipe()` object from Foundation because you get control on when the fds are closed.
777-
778-
- Important: The `FileDescriptor`s returned **must** be closed manually. */
779-
public static func unownedPipe() throws -> (fdRead: FileDescriptor, fdWrite: FileDescriptor) {
780-
let pipepointer = UnsafeMutablePointer<CInt>.allocate(capacity: 2)
781-
defer {pipepointer.deallocate()}
782-
pipepointer.initialize(to: -1)
783-
784-
guard pipe(pipepointer) == 0 else {
785-
throw Err.systemError(Errno(rawValue: errno))
786-
}
787-
788-
let fdRead = pipepointer.advanced(by: 0).pointee
789-
let fdWrite = pipepointer.advanced(by: 1).pointee
790-
assert(fdRead != -1 && fdWrite != -1)
791-
792-
return (FileDescriptor(rawValue: fdRead), FileDescriptor(rawValue: fdWrite))
793-
}
794-
795792
public struct Iterator : AsyncIteratorProtocol {
796793

797794
public typealias Element = RawLineWithSource

0 commit comments

Comments
 (0)