Skip to content

Commit 58c8ebd

Browse files
committed
Fixed communication with Docker Engine API.
1 parent 712d1b6 commit 58c8ebd

File tree

1 file changed

+98
-67
lines changed

1 file changed

+98
-67
lines changed
Lines changed: 98 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import Foundation
2-
import Network
3-
import Synchronization
42

53
///
64
/// Minimal HTTP/1.1 client that speaks to the Docker Engine API
75
/// over the Unix domain socket that Docker Desktop exposes on macOS.
86
///
9-
struct DockerEngineClient {
7+
/// Uses POSIX sockets directly; `NWConnection` cannot drive a Unix-domain
8+
/// socket with TCP parameters on macOS and fails with `ENETDOWN`.
9+
///
10+
struct DockerEngineClient: Sendable {
1011
/// The file-system path to the Docker Engine Unix domain socket.
1112
let socketPath: String
1213

@@ -62,85 +63,87 @@ struct DockerEngineClient {
6263

6364
let socketPath = self.socketPath // capture value, not self
6465

65-
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<(statusCode: Int, body: Data), Error>) in
66-
let connection = NWConnection(to: .unix(path: socketPath), using: .tcp)
67-
68-
// Mutex-protected flag ensures continuation.resume is called exactly once
69-
// even if NWConnection delivers overlapping state/send callbacks.
70-
let done = Mutex<Bool>(false)
71-
72-
@Sendable func resumeOnce(with result: Result<(statusCode: Int, body: Data), Error>) {
73-
let shouldResume = done.withLock { flag -> Bool in
74-
guard !flag else { return false }
75-
flag = true
76-
return true
66+
return try await withCheckedThrowingContinuation { continuation in
67+
// Dispatch to a GCD thread so the blocking POSIX calls
68+
// don't stall the Swift concurrency cooperative thread pool.
69+
DispatchQueue.global(qos: .utility).async {
70+
do {
71+
let result = try dockerSocketRequest(socketPath: socketPath, requestData: requestData)
72+
continuation.resume(returning: result)
73+
} catch {
74+
continuation.resume(throwing: error)
7775
}
78-
guard shouldResume else { return }
79-
connection.cancel()
80-
continuation.resume(with: result)
8176
}
77+
}
78+
}
79+
}
8280

83-
connection.stateUpdateHandler = { state in
84-
switch state {
85-
case .ready:
86-
connection.send(content: requestData, completion: .contentProcessed { error in
87-
if let error {
88-
resumeOnce(with: .failure(error))
89-
return
90-
}
91-
receiveAll(connection: connection, accumulated: Data()) { result in
92-
switch result {
93-
case let .success(responseData):
94-
if let parsed = parseHTTPResponse(responseData) {
95-
resumeOnce(with: .success(parsed))
96-
} else {
97-
resumeOnce(with: .failure(DockerClientError.invalidResponse))
98-
}
99-
case let .failure(err):
100-
resumeOnce(with: .failure(err))
101-
}
102-
}
103-
})
104-
case let .failed(error):
105-
resumeOnce(with: .failure(error))
106-
case .cancelled, .setup, .preparing, .waiting:
107-
break
108-
@unknown default:
109-
break
110-
}
111-
}
81+
// MARK: - POSIX-socket transport (file-private free functions)
82+
83+
private func dockerSocketRequest(
84+
socketPath: String,
85+
requestData: Data
86+
) throws -> (statusCode: Int, body: Data) {
11287

113-
connection.start(queue: .global(qos: .utility))
88+
// ── 1. Open a Unix-domain stream socket ─────────────────────────────────
89+
let fd = socket(AF_UNIX, SOCK_STREAM, 0)
90+
guard fd >= 0 else {
91+
throw NSError(domain: NSPOSIXErrorDomain, code: Int(errno))
92+
}
93+
defer { close(fd) }
94+
95+
// ── 2. Connect to the Docker socket path ─────────────────────────────────
96+
var addr = sockaddr_un()
97+
addr.sun_family = sa_family_t(AF_UNIX)
98+
socketPath.withCString { src in
99+
withUnsafeMutableBytes(of: &addr.sun_path) { bytes in
100+
_ = memcpy(bytes.baseAddress!, src, strlen(src) + 1)
114101
}
115102
}
116-
}
103+
let connectResult = withUnsafePointer(to: &addr) { addrPtr in
104+
addrPtr.withMemoryRebound(to: sockaddr.self, capacity: 1) {
105+
connect(fd, $0, socklen_t(MemoryLayout<sockaddr_un>.size))
106+
}
107+
}
108+
guard connectResult == 0 else {
109+
throw NSError(domain: NSPOSIXErrorDomain, code: Int(errno))
110+
}
117111

118-
// MARK: - Private helpers (file-private free functions)
119-
120-
private func receiveAll(
121-
connection: NWConnection,
122-
accumulated: Data,
123-
completion: @escaping @Sendable (Result<Data, Error>) -> Void
124-
) {
125-
connection.receive(minimumIncompleteLength: 1, maximumLength: 65536) { data, _, isComplete, error in
126-
var updated = accumulated
127-
if let data { updated.append(data) }
128-
129-
if let error {
130-
completion(.failure(error))
131-
} else if isComplete {
132-
completion(.success(updated))
133-
} else {
134-
receiveAll(connection: connection, accumulated: updated, completion: completion)
112+
// ── 3. Send the full HTTP request ────────────────────────────────────────
113+
var totalSent = 0
114+
while totalSent < requestData.count {
115+
let sent = requestData.withUnsafeBytes { ptr in
116+
send(fd, ptr.baseAddress!.advanced(by: totalSent), ptr.count - totalSent, 0)
117+
}
118+
guard sent > 0 else {
119+
throw NSError(domain: NSPOSIXErrorDomain, code: Int(errno))
135120
}
121+
totalSent += sent
122+
}
123+
124+
// ── 4. Read until the server closes the connection (Connection: close) ───
125+
var responseData = Data()
126+
var buffer = [UInt8](repeating: 0, count: 8_192)
127+
while true {
128+
let n = buffer.withUnsafeMutableBytes { ptr in
129+
recv(fd, ptr.baseAddress!, ptr.count, 0)
130+
}
131+
if n <= 0 { break }
132+
responseData.append(buffer, count: n)
133+
}
134+
135+
// ── 5. Parse the HTTP response ───────────────────────────────────────────
136+
guard let parsed = parseHTTPResponse(responseData) else {
137+
throw DockerClientError.invalidResponse
136138
}
139+
return parsed
137140
}
138141

139142
private func parseHTTPResponse(_ data: Data) -> (statusCode: Int, body: Data)? {
140143
guard let separatorRange = data.range(of: Data("\r\n\r\n".utf8)) else { return nil }
141144

142145
let headerData = data[..<separatorRange.lowerBound]
143-
let body = Data(data[separatorRange.upperBound...])
146+
var body = Data(data[separatorRange.upperBound...])
144147

145148
guard
146149
let headerText = String(data: headerData, encoding: .utf8),
@@ -151,5 +154,33 @@ private func parseHTTPResponse(_ data: Data) -> (statusCode: Int, body: Data)? {
151154
let parts = statusLine.split(separator: " ", maxSplits: 2)
152155
guard parts.count >= 2, let statusCode = Int(parts[1]) else { return nil }
153156

157+
if headerText.lowercased().contains("transfer-encoding: chunked") {
158+
body = decodeChunked(body)
159+
}
160+
154161
return (statusCode: statusCode, body: body)
155162
}
163+
164+
private func decodeChunked(_ data: Data) -> Data {
165+
var result = Data()
166+
var position = data.startIndex
167+
let crlf = Data("\r\n".utf8)
168+
169+
while position < data.endIndex {
170+
guard let crlfRange = data[position...].range(of: crlf) else { break }
171+
172+
let sizeHex = data[position..<crlfRange.lowerBound]
173+
guard
174+
let sizeString = String(data: sizeHex, encoding: .ascii),
175+
let chunkSize = Int(sizeString.trimmingCharacters(in: .whitespaces), radix: 16),
176+
chunkSize > 0
177+
else { break }
178+
179+
let chunkStart = crlfRange.upperBound
180+
guard let chunkEnd = data.index(chunkStart, offsetBy: chunkSize, limitedBy: data.endIndex) else { break }
181+
result.append(data[chunkStart..<chunkEnd])
182+
position = data.index(chunkEnd, offsetBy: 2, limitedBy: data.endIndex) ?? data.endIndex
183+
}
184+
185+
return result
186+
}

0 commit comments

Comments
 (0)