Skip to content

Commit 7c7ed05

Browse files
authored
Add a public method to wait for the relay to complete (#529)
- Adds a public method to wait for the relay to complete when both directions close - Improves error handling - Moves `UnixSocketRelayManager` into a separate file - Renames `SocketRelay` to `UnixSocketRelay` - Adds logs at the info level after converting other logs to the debug level previously
1 parent 71a0b45 commit 7c7ed05

File tree

4 files changed

+145
-83
lines changed

4 files changed

+145
-83
lines changed

Sources/Containerization/UnixSocketRelay.swift

Lines changed: 16 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -21,63 +21,7 @@ import Foundation
2121
import Logging
2222
import Synchronization
2323

24-
package actor UnixSocketRelayManager {
25-
private let vm: any VirtualMachineInstance
26-
private var relays: [String: SocketRelay]
27-
private let q: DispatchQueue
28-
private let log: Logger?
29-
30-
init(vm: any VirtualMachineInstance, log: Logger? = nil) {
31-
self.vm = vm
32-
self.relays = [:]
33-
self.q = DispatchQueue(label: "com.apple.containerization.socket-relay")
34-
self.log = log
35-
}
36-
}
37-
38-
extension UnixSocketRelayManager {
39-
func start(port: UInt32, socket: UnixSocketConfiguration) async throws {
40-
guard self.relays[socket.id] == nil else {
41-
throw ContainerizationError(
42-
.invalidState,
43-
message: "socket relay \(socket.id) already started"
44-
)
45-
}
46-
47-
let socketRelay = try SocketRelay(
48-
port: port,
49-
socket: socket,
50-
vm: self.vm,
51-
queue: self.q,
52-
log: self.log
53-
)
54-
55-
do {
56-
self.relays[socket.id] = socketRelay
57-
try await socketRelay.start()
58-
} catch {
59-
self.relays.removeValue(forKey: socket.id)
60-
}
61-
}
62-
63-
func stop(socket: UnixSocketConfiguration) async throws {
64-
guard let storedRelay = self.relays.removeValue(forKey: socket.id) else {
65-
throw ContainerizationError(
66-
.notFound,
67-
message: "failed to stop socket relay"
68-
)
69-
}
70-
try storedRelay.stop()
71-
}
72-
73-
func stopAll() async throws {
74-
for (_, relay) in self.relays {
75-
try relay.stop()
76-
}
77-
}
78-
}
79-
80-
package final class SocketRelay: Sendable {
24+
package final class UnixSocketRelay: Sendable {
8125
private let port: UInt32
8226
private let configuration: UnixSocketConfiguration
8327
private let log: Logger?
@@ -107,11 +51,11 @@ package final class SocketRelay: Sendable {
10751
}
10852

10953
deinit {
110-
self.state.withLock { $0.t?.cancel() }
54+
state.withLock { $0.t?.cancel() }
11155
}
11256
}
11357

114-
extension SocketRelay {
58+
extension UnixSocketRelay {
11559
func start() async throws {
11660
switch configuration.direction {
11761
case .outOf:
@@ -122,7 +66,7 @@ extension SocketRelay {
12266
}
12367

12468
func stop() throws {
125-
try self.state.withLock {
69+
try state.withLock {
12670
guard let t = $0.t else {
12771
throw ContainerizationError(
12872
.invalidState,
@@ -148,7 +92,7 @@ extension SocketRelay {
14892
}
14993

15094
private func setupHostVsockDial() async throws {
151-
let hostConn = self.configuration.destination
95+
let hostConn = configuration.destination
15296

15397
let socketType = try UnixType(
15498
path: hostConn.path,
@@ -161,10 +105,10 @@ extension SocketRelay {
161105
"listening on host UDS",
162106
metadata: [
163107
"path": "\(hostConn.path)",
164-
"vport": "\(self.port)",
108+
"vport": "\(port)",
165109
])
166110
let connectionStream = try hostSocket.acceptStream(closeOnDeinit: false)
167-
self.state.withLock {
111+
state.withLock {
168112
$0.t = Task {
169113
do {
170114
for try await connection in connectionStream {
@@ -184,19 +128,17 @@ extension SocketRelay {
184128
}
185129

186130
private func setupHostVsockListener() throws {
187-
let hostPath = self.configuration.source
188-
let port = self.port
189-
let log = self.log
131+
let hostPath = configuration.source
190132

191-
let listener = try self.vm.listen(self.port)
133+
let listener = try vm.listen(port)
192134
log?.info(
193135
"listening on guest vsock",
194136
metadata: [
195137
"path": "\(hostPath)",
196138
"vport": "\(port)",
197139
])
198140

199-
self.state.withLock {
141+
state.withLock {
200142
$0.listener = listener
201143
$0.t = Task {
202144
do {
@@ -205,12 +147,12 @@ extension SocketRelay {
205147
try await self.handleGuestVsockConn(
206148
vsockConn: connection,
207149
hostConnectionPath: hostPath,
208-
port: port,
209-
log: log
150+
port: self.port,
151+
log: self.log
210152
)
211153
}
212154
} catch {
213-
log?.error("failed to setup relay between vsock \(port) and \(hostPath.path): \(error)")
155+
self.log?.error("failed to setup relay between vsock \(self.port) and \(hostPath.path): \(error)")
214156
}
215157
}
216158
}
@@ -282,11 +224,11 @@ extension SocketRelay {
282224
let relay = BidirectionalRelay(
283225
fd1: hostFd,
284226
fd2: guestFd,
285-
queue: self.q,
286-
log: self.log
227+
queue: q,
228+
log: log
287229
)
288230

289-
self.state.withLock {
231+
state.withLock {
290232
$0.activeRelays[relayID] = relay
291233
}
292234

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
//===----------------------------------------------------------------------===//
2+
// Copyright © 2026 Apple Inc. and the Containerization project authors.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// https://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//===----------------------------------------------------------------------===//
16+
17+
import ContainerizationError
18+
import Foundation
19+
import Logging
20+
21+
package actor UnixSocketRelayManager {
22+
private let vm: any VirtualMachineInstance
23+
private var relays: [String: UnixSocketRelay]
24+
private let q: DispatchQueue
25+
private let log: Logger?
26+
27+
init(vm: any VirtualMachineInstance, log: Logger? = nil) {
28+
self.vm = vm
29+
self.relays = [:]
30+
self.q = DispatchQueue(label: "com.apple.containerization.socket-relay")
31+
self.log = log
32+
}
33+
}
34+
35+
extension UnixSocketRelayManager {
36+
func start(port: UInt32, socket: UnixSocketConfiguration) async throws {
37+
guard relays[socket.id] == nil else {
38+
throw ContainerizationError(
39+
.invalidState,
40+
message: "socket relay \(socket.id) already started"
41+
)
42+
}
43+
44+
let relay = try UnixSocketRelay(
45+
port: port,
46+
socket: socket,
47+
vm: vm,
48+
queue: q,
49+
log: log
50+
)
51+
52+
do {
53+
relays[socket.id] = relay
54+
try await relay.start()
55+
} catch {
56+
relays.removeValue(forKey: socket.id)
57+
}
58+
}
59+
60+
func stop(socket: UnixSocketConfiguration) async throws {
61+
guard let storedRelay = relays.removeValue(forKey: socket.id) else {
62+
throw ContainerizationError(
63+
.notFound,
64+
message: "failed to stop socket relay"
65+
)
66+
}
67+
try storedRelay.stop()
68+
}
69+
70+
func stopAll() async throws {
71+
for (_, relay) in relays {
72+
try relay.stop()
73+
}
74+
}
75+
}

Sources/ContainerizationOS/Socket/BidirectionalRelay.swift

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,14 @@ public final class BidirectionalRelay: Sendable {
3232
let source2: DispatchSourceRead
3333
}
3434

35+
private enum CompletionState {
36+
case pending
37+
case waiting(CheckedContinuation<Void, Never>)
38+
case completed
39+
}
40+
3541
private let state: Mutex<ConnectionSources?>
42+
private let completionState: Mutex<CompletionState>
3643

3744
// The buffers aren't used concurrently.
3845
private nonisolated(unsafe) let buffer1: UnsafeMutableBufferPointer<UInt8>
@@ -56,6 +63,7 @@ public final class BidirectionalRelay: Sendable {
5663
self.queue = queue ?? DispatchQueue(label: "com.apple.containerization.bidirectional-relay")
5764
self.log = log
5865
self.state = Mutex(nil)
66+
self.completionState = Mutex(.pending)
5967

6068
let pageSize = Int(getpagesize())
6169
self.buffer1 = UnsafeMutableBufferPointer<UInt8>.allocate(capacity: pageSize)
@@ -134,6 +142,22 @@ public final class BidirectionalRelay: Sendable {
134142
}
135143
}
136144

145+
/// Waits for the relay to complete.
146+
public func waitForCompletion() async {
147+
await withCheckedContinuation { c in
148+
completionState.withLock { state in
149+
switch state {
150+
case .pending:
151+
state = .waiting(c)
152+
case .waiting:
153+
fatalError("waitForCompletion called multiple times")
154+
case .completed:
155+
c.resume()
156+
}
157+
}
158+
}
159+
}
160+
137161
private func fdCopyHandler(
138162
buffer: UnsafeMutableBufferPointer<UInt8>,
139163
source: DispatchSourceRead,
@@ -253,5 +277,11 @@ public final class BidirectionalRelay: Sendable {
253277
)
254278
close(fd1)
255279
close(fd2)
280+
completionState.withLock { state in
281+
if case .waiting(let c) = state {
282+
c.resume()
283+
}
284+
state = .completed
285+
}
256286
}
257287
}

vminitd/Sources/vminitd/Server+GRPC.swift

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -198,19 +198,20 @@ extension Initd: Com_Apple_Containerization_Sandbox_V3_SandboxContextAsyncProvid
198198
"action": "\(request.action)",
199199
])
200200

201-
do {
202-
let proxy = VsockProxy(
203-
id: request.id,
204-
action: request.action == .into ? .dial : .listen,
205-
port: request.vsockPort,
206-
path: URL(fileURLWithPath: request.guestPath),
207-
udsPerms: request.guestSocketPermissions,
208-
log: log
209-
)
201+
let proxy = VsockProxy(
202+
id: request.id,
203+
action: request.action == .into ? .dial : .listen,
204+
port: request.vsockPort,
205+
path: URL(fileURLWithPath: request.guestPath),
206+
udsPerms: request.guestSocketPermissions,
207+
log: log
208+
)
210209

210+
do {
211211
try await proxy.start()
212212
try await state.add(proxy: proxy)
213213
} catch {
214+
try? await proxy.close()
214215
log.error(
215216
"proxyVsock",
216217
metadata: [
@@ -222,6 +223,14 @@ extension Initd: Com_Apple_Containerization_Sandbox_V3_SandboxContextAsyncProvid
222223
)
223224
}
224225

226+
log.info(
227+
"proxyVsock started",
228+
metadata: [
229+
"id": "\(request.id)",
230+
"port": "\(request.vsockPort)",
231+
"guestPath": "\(request.guestPath)",
232+
])
233+
225234
return .init()
226235
}
227236

@@ -250,6 +259,12 @@ extension Initd: Com_Apple_Containerization_Sandbox_V3_SandboxContextAsyncProvid
250259
)
251260
}
252261

262+
log.info(
263+
"stopVsockProxy completed",
264+
metadata: [
265+
"id": "\(request.id)"
266+
])
267+
253268
return .init()
254269
}
255270

0 commit comments

Comments
 (0)