Skip to content

Commit 2b07e89

Browse files
authored
Relocate ProcessIO to ContainerClient. (apple#681)
- Makes ProcessIO more generally available. - `Application.handleProcess(processIO:process:)` becomes `processIO.handleProcess(process:log:)`. ## Motivation and Context This code shouldn't be in the CLI, it should be part of ContainerClient.
1 parent 0fe44a0 commit 2b07e89

File tree

5 files changed

+384
-365
lines changed

5 files changed

+384
-365
lines changed
Lines changed: 381 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,381 @@
1+
//===----------------------------------------------------------------------===//
2+
// Copyright © 2025 Apple Inc. and the container project authors. All rights reserved.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// https://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//===----------------------------------------------------------------------===//
16+
17+
import ContainerizationExtras
18+
import ContainerizationOS
19+
import Foundation
20+
import Logging
21+
22+
public struct ProcessIO: Sendable {
23+
let stdin: Pipe?
24+
let stdout: Pipe?
25+
let stderr: Pipe?
26+
var ioTracker: IoTracker?
27+
28+
static let signalSet: [Int32] = [
29+
SIGTERM,
30+
SIGINT,
31+
SIGUSR1,
32+
SIGUSR2,
33+
SIGWINCH,
34+
]
35+
36+
public struct IoTracker: Sendable {
37+
let stream: AsyncStream<Void>
38+
let cont: AsyncStream<Void>.Continuation
39+
let configuredStreams: Int
40+
}
41+
42+
public let stdio: [FileHandle?]
43+
44+
public let console: Terminal?
45+
46+
public static func create(tty: Bool, interactive: Bool, detach: Bool) throws -> ProcessIO {
47+
let current: Terminal? = try {
48+
if !tty || !interactive {
49+
return nil
50+
}
51+
let current = try Terminal.current
52+
try current.setraw()
53+
return current
54+
}()
55+
56+
var stdio = [FileHandle?](repeating: nil, count: 3)
57+
58+
let stdin: Pipe? = {
59+
if !interactive {
60+
return nil
61+
}
62+
return Pipe()
63+
}()
64+
65+
if let stdin {
66+
let pin = FileHandle.standardInput
67+
let stdinOSFile = OSFile(fd: pin.fileDescriptor)
68+
let pipeOSFile = OSFile(fd: stdin.fileHandleForWriting.fileDescriptor)
69+
try stdinOSFile.makeNonBlocking()
70+
nonisolated(unsafe) let buf = UnsafeMutableBufferPointer<UInt8>.allocate(capacity: Int(getpagesize()))
71+
72+
pin.readabilityHandler = { _ in
73+
Self.streamStdin(
74+
from: stdinOSFile,
75+
to: pipeOSFile,
76+
buffer: buf,
77+
) {
78+
pin.readabilityHandler = nil
79+
buf.deallocate()
80+
try? stdin.fileHandleForWriting.close()
81+
}
82+
}
83+
stdio[0] = stdin.fileHandleForReading
84+
}
85+
86+
let stdout: Pipe? = {
87+
if detach {
88+
return nil
89+
}
90+
return Pipe()
91+
}()
92+
93+
var configuredStreams = 0
94+
let (stream, cc) = AsyncStream<Void>.makeStream()
95+
if let stdout {
96+
configuredStreams += 1
97+
let pout: FileHandle = {
98+
if let current {
99+
return current.handle
100+
}
101+
return .standardOutput
102+
}()
103+
104+
let rout = stdout.fileHandleForReading
105+
rout.readabilityHandler = { handle in
106+
let data = handle.availableData
107+
if data.isEmpty {
108+
rout.readabilityHandler = nil
109+
cc.yield()
110+
return
111+
}
112+
try! pout.write(contentsOf: data)
113+
}
114+
stdio[1] = stdout.fileHandleForWriting
115+
}
116+
117+
let stderr: Pipe? = {
118+
if detach || tty {
119+
return nil
120+
}
121+
return Pipe()
122+
}()
123+
if let stderr {
124+
configuredStreams += 1
125+
let perr: FileHandle = .standardError
126+
let rerr = stderr.fileHandleForReading
127+
rerr.readabilityHandler = { handle in
128+
let data = handle.availableData
129+
if data.isEmpty {
130+
rerr.readabilityHandler = nil
131+
cc.yield()
132+
return
133+
}
134+
try! perr.write(contentsOf: data)
135+
}
136+
stdio[2] = stderr.fileHandleForWriting
137+
}
138+
139+
var ioTracker: IoTracker? = nil
140+
if configuredStreams > 0 {
141+
ioTracker = .init(stream: stream, cont: cc, configuredStreams: configuredStreams)
142+
}
143+
144+
return .init(
145+
stdin: stdin,
146+
stdout: stdout,
147+
stderr: stderr,
148+
ioTracker: ioTracker,
149+
stdio: stdio,
150+
console: current
151+
)
152+
}
153+
154+
public func handleProcess(process: ClientProcess, log: Logger) async throws -> Int32 {
155+
let signals = AsyncSignalHandler.create(notify: Self.signalSet)
156+
return try await withThrowingTaskGroup(of: Int32?.self, returning: Int32.self) { group in
157+
let waitAdded = group.addTaskUnlessCancelled {
158+
let code = try await process.wait()
159+
try await wait()
160+
return code
161+
}
162+
163+
guard waitAdded else {
164+
group.cancelAll()
165+
return -1
166+
}
167+
168+
try await process.start()
169+
try closeAfterStart()
170+
171+
if let current = console {
172+
let size = try current.size
173+
// It's supremely possible the process could've exited already. We shouldn't treat
174+
// this as fatal.
175+
try? await process.resize(size)
176+
_ = group.addTaskUnlessCancelled {
177+
let winchHandler = AsyncSignalHandler.create(notify: [SIGWINCH])
178+
for await _ in winchHandler.signals {
179+
do {
180+
try await process.resize(try current.size)
181+
} catch {
182+
log.error(
183+
"failed to send terminal resize event",
184+
metadata: [
185+
"error": "\(error)"
186+
]
187+
)
188+
}
189+
}
190+
return nil
191+
}
192+
} else {
193+
_ = group.addTaskUnlessCancelled {
194+
for await sig in signals.signals {
195+
do {
196+
try await process.kill(sig)
197+
} catch {
198+
log.error(
199+
"failed to send signal",
200+
metadata: [
201+
"signal": "\(sig)",
202+
"error": "\(error)",
203+
]
204+
)
205+
}
206+
}
207+
return nil
208+
}
209+
}
210+
211+
while true {
212+
let result = try await group.next()
213+
if result == nil {
214+
return -1
215+
}
216+
let status = result!
217+
if let status {
218+
group.cancelAll()
219+
return status
220+
}
221+
}
222+
return -1
223+
}
224+
}
225+
226+
public func closeAfterStart() throws {
227+
try stdin?.fileHandleForReading.close()
228+
try stdout?.fileHandleForWriting.close()
229+
try stderr?.fileHandleForWriting.close()
230+
}
231+
232+
public func close() throws {
233+
try console?.reset()
234+
}
235+
236+
public func wait() async throws {
237+
guard let ioTracker = self.ioTracker else {
238+
return
239+
}
240+
do {
241+
try await Timeout.run(seconds: 3) {
242+
var counter = ioTracker.configuredStreams
243+
for await _ in ioTracker.stream {
244+
counter -= 1
245+
if counter == 0 {
246+
ioTracker.cont.finish()
247+
break
248+
}
249+
}
250+
}
251+
} catch {
252+
throw error
253+
}
254+
}
255+
256+
static func streamStdin(
257+
from: OSFile,
258+
to: OSFile,
259+
buffer: UnsafeMutableBufferPointer<UInt8>,
260+
onErrorOrEOF: () -> Void,
261+
) {
262+
while true {
263+
let (bytesRead, action) = from.read(buffer)
264+
if bytesRead > 0 {
265+
let view = UnsafeMutableBufferPointer(
266+
start: buffer.baseAddress,
267+
count: bytesRead
268+
)
269+
270+
let (bytesWritten, _) = to.write(view)
271+
if bytesWritten != bytesRead {
272+
onErrorOrEOF()
273+
return
274+
}
275+
}
276+
277+
switch action {
278+
case .error(_), .eof, .brokenPipe:
279+
onErrorOrEOF()
280+
return
281+
case .again:
282+
return
283+
case .success:
284+
break
285+
}
286+
}
287+
}
288+
}
289+
290+
public struct OSFile: Sendable {
291+
private let fd: Int32
292+
293+
public enum IOAction: Equatable {
294+
case eof
295+
case again
296+
case success
297+
case brokenPipe
298+
case error(_ errno: Int32)
299+
}
300+
301+
public init(fd: Int32) {
302+
self.fd = fd
303+
}
304+
305+
public init(handle: FileHandle) {
306+
self.fd = handle.fileDescriptor
307+
}
308+
309+
func makeNonBlocking() throws {
310+
let flags = fcntl(fd, F_GETFL)
311+
guard flags != -1 else {
312+
throw POSIXError.fromErrno()
313+
}
314+
315+
if fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1 {
316+
throw POSIXError.fromErrno()
317+
}
318+
}
319+
320+
func write(_ buffer: UnsafeMutableBufferPointer<UInt8>) -> (wrote: Int, action: IOAction) {
321+
if buffer.count == 0 {
322+
return (0, .success)
323+
}
324+
325+
var bytesWrote: Int = 0
326+
while true {
327+
let n = Darwin.write(
328+
self.fd,
329+
buffer.baseAddress!.advanced(by: bytesWrote),
330+
buffer.count - bytesWrote
331+
)
332+
if n == -1 {
333+
if errno == EAGAIN || errno == EIO {
334+
return (bytesWrote, .again)
335+
}
336+
return (bytesWrote, .error(errno))
337+
}
338+
339+
if n == 0 {
340+
return (bytesWrote, .brokenPipe)
341+
}
342+
343+
bytesWrote += n
344+
if bytesWrote < buffer.count {
345+
continue
346+
}
347+
return (bytesWrote, .success)
348+
}
349+
}
350+
351+
func read(_ buffer: UnsafeMutableBufferPointer<UInt8>) -> (read: Int, action: IOAction) {
352+
if buffer.count == 0 {
353+
return (0, .success)
354+
}
355+
356+
var bytesRead: Int = 0
357+
while true {
358+
let n = Darwin.read(
359+
self.fd,
360+
buffer.baseAddress!.advanced(by: bytesRead),
361+
buffer.count - bytesRead
362+
)
363+
if n == -1 {
364+
if errno == EAGAIN || errno == EIO {
365+
return (bytesRead, .again)
366+
}
367+
return (bytesRead, .error(errno))
368+
}
369+
370+
if n == 0 {
371+
return (bytesRead, .eof)
372+
}
373+
374+
bytesRead += n
375+
if bytesRead < buffer.count {
376+
continue
377+
}
378+
return (bytesRead, .success)
379+
}
380+
}
381+
}

0 commit comments

Comments
 (0)