Skip to content

Commit 538ea78

Browse files
run cleanupRelay only once in TerminalIO and StandardIO (#153)
1 parent 958a83b commit 538ea78

File tree

5 files changed

+46
-23
lines changed

5 files changed

+46
-23
lines changed

Sources/Integration/ProcessTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ extension IntegrationSuite {
205205

206206
let status = try await exec.wait()
207207
if status != 0 {
208-
throw IntegrationError.assert(msg: "process \(idx) status for \(status) != 0")
208+
throw IntegrationError.assert(msg: "process \(idx) status \(status) != 0")
209209
}
210210
var hasher = SHA256()
211211
hasher.update(data: buffer.data)

vminitd/Sources/vminitd/ManagedProcess.swift

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,11 @@ final class ManagedProcess: Sendable {
131131
extension ManagedProcess {
132132
func start() throws -> Int32 {
133133
try self.lock.withLock {
134-
log.debug("starting managed process")
134+
log.debug(
135+
"starting managed process",
136+
metadata: [
137+
"id": "\(id)"
138+
])
135139

136140
// Start the underlying process.
137141
try process.start()
@@ -154,7 +158,8 @@ extension ManagedProcess {
154158
log.debug(
155159
"started managed process",
156160
metadata: [
157-
"pid": "\(i)"
161+
"pid": "\(i)",
162+
"id": "\(id)",
158163
])
159164

160165
return i

vminitd/Sources/vminitd/ProcessSupervisor.swift

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,15 @@ actor ProcessSupervisor {
6464
let exited = Reaper.reap()
6565
self.log?.debug("finished wait4 of \(exited.count) processes")
6666

67-
for proc in processes {
68-
let pid = proc.pid
69-
self.log?.debug("checking for exit of managed process", metadata: ["pid": "\(pid)", "exits": "\(exited)"])
67+
self.log?.debug("checking for exit of managed process", metadata: ["exits": "\(exited)", "processes": "\(processes.count)"])
68+
let exitedProcesses = self.processes.filter { proc in
69+
exited.contains { pid, _ in
70+
proc.pid == pid
71+
}
72+
}
7073

74+
for proc in exitedProcesses {
75+
let pid = proc.pid
7176
if pid <= 0 {
7277
continue
7378
}
@@ -80,7 +85,6 @@ actor ProcessSupervisor {
8085
"status": "\(status)",
8186
"count": "\(processes.count - 1)",
8287
])
83-
8488
proc.setExit(status)
8589
self.processes.removeAll(where: { $0.pid == pid })
8690
}
@@ -95,7 +99,6 @@ actor ProcessSupervisor {
9599

96100
do {
97101
self.processes.append(process)
98-
99102
return try process.start()
100103
} catch {
101104
self.log?.error("process start failed \(error)", metadata: ["process-id": "\(process.id)"])

vminitd/Sources/vminitd/StandardIO.swift

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ final class StandardIO: ManagedProcess.IO & Sendable {
7878
port: stdinPort,
7979
cid: VsockType.hostCID
8080
)
81-
let stdinSocket = try Socket(type: type)
81+
let stdinSocket = try Socket(type: type, closeOnDeinit: false)
8282
try stdinSocket.connect()
8383
self.stdinSocket = stdinSocket
8484

@@ -93,7 +93,8 @@ final class StandardIO: ManagedProcess.IO & Sendable {
9393
port: stdoutPort,
9494
cid: VsockType.hostCID
9595
)
96-
let stdoutSocket = try Socket(type: type)
96+
// These fd's get closed when cleanupRelay is called
97+
let stdoutSocket = try Socket(type: type, closeOnDeinit: false)
9798
try stdoutSocket.connect()
9899
self.stdoutSocket = stdoutSocket
99100

@@ -108,7 +109,7 @@ final class StandardIO: ManagedProcess.IO & Sendable {
108109
port: stderrPort,
109110
cid: VsockType.hostCID
110111
)
111-
let stderrSocket = try Socket(type: type)
112+
let stderrSocket = try Socket(type: type, closeOnDeinit: false)
112113
try stderrSocket.connect()
113114
self.stderrSocket = stderrSocket
114115

@@ -125,12 +126,19 @@ final class StandardIO: ManagedProcess.IO & Sendable {
125126
func relay(readFromFd: Int32, writeToFd: Int32) throws {
126127
let readFrom = OSFile(fd: readFromFd)
127128
let writeTo = OSFile(fd: writeToFd)
128-
// `buf` isn't used concurrently.
129+
// `buf` and `didCleanup` aren't used concurrently.
129130
nonisolated(unsafe) let buf = UnsafeMutableBufferPointer<UInt8>.allocate(capacity: Int(getpagesize()))
131+
nonisolated(unsafe) var didCleanup = false
132+
133+
let cleanupRelay: @Sendable () -> Void = {
134+
if didCleanup { return }
135+
didCleanup = true
136+
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
137+
}
130138

131139
try ProcessSupervisor.default.poller.add(readFromFd, mask: EPOLLIN) { mask in
132140
if mask.isHangup && !mask.readyToRead {
133-
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
141+
cleanupRelay()
134142
return
135143
}
136144
// Loop so that in the case that someone wrote > buf.count down the pipe
@@ -146,7 +154,7 @@ final class StandardIO: ManagedProcess.IO & Sendable {
146154
let w = writeTo.write(view)
147155
if w.wrote != r.read {
148156
self.log?.error("stopping relay: short write for stdio")
149-
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
157+
cleanupRelay()
150158
return
151159
}
152160
}
@@ -156,13 +164,13 @@ final class StandardIO: ManagedProcess.IO & Sendable {
156164
self.log?.error("failed with errno \(errno) while reading for fd \(readFromFd)")
157165
fallthrough
158166
case .eof:
159-
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
167+
cleanupRelay()
160168
self.log?.debug("closing relay for \(readFromFd)")
161169
return
162170
case .again:
163171
// We read all we could, exit.
164172
if mask.isHangup {
165-
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
173+
cleanupRelay()
166174
}
167175
return
168176
default:

vminitd/Sources/vminitd/TerminalIO.swift

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ final class TerminalIO: ManagedProcess.IO & Sendable {
6565
port: stdinPort,
6666
cid: VsockType.hostCID
6767
)
68-
let stdinSocket = try Socket(type: type)
68+
let stdinSocket = try Socket(type: type, closeOnDeinit: false)
6969
try stdinSocket.connect()
7070
self.stdinSocket = stdinSocket
7171

@@ -80,7 +80,7 @@ final class TerminalIO: ManagedProcess.IO & Sendable {
8080
port: stdoutPort,
8181
cid: VsockType.hostCID
8282
)
83-
let stdoutSocket = try Socket(type: type)
83+
let stdoutSocket = try Socket(type: type, closeOnDeinit: false)
8484
try stdoutSocket.connect()
8585
self.stdoutSocket = stdoutSocket
8686

@@ -94,12 +94,19 @@ final class TerminalIO: ManagedProcess.IO & Sendable {
9494
func relay(readFromFd: Int32, writeToFd: Int32) throws {
9595
let readFrom = OSFile(fd: readFromFd)
9696
let writeTo = OSFile(fd: writeToFd)
97-
// `buf` isn't used concurrently.
97+
// `buf` and `didCleanup` aren't used concurrently.
9898
nonisolated(unsafe) let buf = UnsafeMutableBufferPointer<UInt8>.allocate(capacity: Int(getpagesize()))
99+
nonisolated(unsafe) var didCleanup = false
100+
101+
let cleanupRelay: @Sendable () -> Void = {
102+
if didCleanup { return }
103+
didCleanup = true
104+
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
105+
}
99106

100107
try ProcessSupervisor.default.poller.add(readFromFd, mask: EPOLLIN) { mask in
101108
if mask.isHangup && !mask.readyToRead {
102-
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
109+
cleanupRelay()
103110
return
104111
}
105112
// Loop so that in the case that someone wrote > buf.count down the pipe
@@ -115,7 +122,7 @@ final class TerminalIO: ManagedProcess.IO & Sendable {
115122
let w = writeTo.write(view)
116123
if w.wrote != r.read {
117124
self.log?.error("stopping relay: short write for stdio")
118-
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
125+
cleanupRelay()
119126
return
120127
}
121128
}
@@ -125,13 +132,13 @@ final class TerminalIO: ManagedProcess.IO & Sendable {
125132
self.log?.error("failed with errno \(errno) while reading for fd \(readFromFd)")
126133
fallthrough
127134
case .eof:
128-
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
135+
cleanupRelay()
129136
self.log?.debug("closing relay for \(readFromFd)")
130137
return
131138
case .again:
132139
// We read all we could, exit.
133140
if mask.isHangup {
134-
self.cleanupRelay(readFd: readFromFd, writeFd: writeToFd, buffer: buf, log: self.log)
141+
cleanupRelay()
135142
}
136143
return
137144
default:

0 commit comments

Comments
 (0)