Skip to content

Commit 783f239

Browse files
refactor: improve concurrency handling and error reporting in various managers
1 parent eedb36d commit 783f239

File tree

7 files changed

+105
-49
lines changed

7 files changed

+105
-49
lines changed

platforms/macos/Sources/AppState+KeyboardShortcuts.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ extension AppState {
66
/// Sets up global keyboard shortcuts.
77
func setupKeyboardShortcuts() {
88
KeyboardShortcuts.onKeyUp(for: .toggleMainWindow) { [weak self] in
9-
Task { @MainActor in
9+
// KeyboardShortcuts callbacks run on the main thread, so we can use assumeIsolated
10+
// This avoids creating a detached Task that outlives the callback context
11+
MainActor.assumeIsolated {
1012
self?.toggleMainWindow()
1113
}
1214
}

platforms/macos/Sources/Managers/PortForwardManager+Monitoring.swift

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ extension PortForwardManager {
2222
/// Checks all connections and reconnects if needed.
2323
func checkConnections() async {
2424
guard !isKillingProcesses else { return }
25-
for state in connections {
25+
// Take a snapshot to avoid data race during iteration
26+
let snapshot = connections
27+
for state in snapshot {
2628
guard state.config.isEnabled && state.config.autoReconnect else { continue }
2729

2830
if state.config.useDirectExec, state.config.proxyPort != nil {
@@ -45,7 +47,7 @@ extension PortForwardManager {
4547
// Reconnect on error
4648
if state.portForwardStatus == .connected && hasError {
4749
let wasConnected = state.isFullyConnected
48-
state.lastError = "kubectl error"
50+
state.lastError = "kubectl port-forward error on port \(localPort)"
4951
state.portForwardStatus = .disconnected
5052
state.proxyStatus = .disconnected
5153
if wasConnected {
@@ -128,7 +130,7 @@ extension PortForwardManager {
128130

129131
if state.proxyStatus == .connected && hasError {
130132
let wasConnected = state.isFullyConnected
131-
state.lastError = "Proxy error"
133+
state.lastError = "Proxy error on port \(state.config.proxyPort ?? 0)"
132134
state.portForwardStatus = .disconnected
133135
state.proxyStatus = .disconnected
134136
if wasConnected {

platforms/macos/Sources/Managers/PortForwardManager.swift

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,22 @@ final class PortForwardManager {
3232
connections.filter(\.isFullyConnected).count
3333
}
3434

35+
// MARK: - Helper Methods
36+
37+
/// Finds a connection state by its ID
38+
/// - Parameter id: The UUID of the connection to find
39+
/// - Returns: The connection state if found, nil otherwise
40+
func connection(for id: UUID) -> PortForwardConnectionState? {
41+
connections.first { $0.id == id }
42+
}
43+
44+
/// Finds the index of a connection by its ID
45+
/// - Parameter id: The UUID of the connection to find
46+
/// - Returns: The index if found, nil otherwise
47+
func connectionIndex(for id: UUID) -> Int? {
48+
connections.firstIndex { $0.id == id }
49+
}
50+
3551
init() {
3652
loadConnections()
3753
}
@@ -55,14 +71,14 @@ final class PortForwardManager {
5571
}
5672

5773
func removeConnection(_ id: UUID) {
58-
guard let index = connections.firstIndex(where: { $0.id == id }) else { return }
74+
guard let index = connectionIndex(for: id) else { return }
5975
stopConnection(id)
6076
connections.remove(at: index)
6177
saveConnections()
6278
}
6379

6480
func updateConnection(_ config: PortForwardConnectionConfig) {
65-
guard let index = connections.firstIndex(where: { $0.id == config.id }) else { return }
81+
guard let index = connectionIndex(for: config.id) else { return }
6682
let wasConnected = connections[index].isFullyConnected
6783
if wasConnected {
6884
stopConnection(config.id)
@@ -117,22 +133,31 @@ final class PortForwardManager {
117133

118134
func startConnection(_ id: UUID) {
119135
guard !isKillingProcesses else { return }
120-
guard let state = connections.first(where: { $0.id == id }) else { return }
136+
guard let state = connection(for: id) else { return }
121137
let config = state.config
122138

123139
// Reset intentional stop flag when starting
124140
state.isIntentionallyStopped = false
125141

126-
Task {
127-
await processManager.setLogHandler(for: id) { [weak state] message, type, isError in
142+
state.portForwardStatus = .connecting
143+
144+
// Set up handlers and start port forward in a single task to ensure proper ordering
145+
state.portForwardTask = Task { [weak self, weak state] in
146+
guard let self = self, let state = state else { return }
147+
148+
// Set log handler with proper weak capture
149+
let logHandler: LogHandler = { [weak state] message, type, isError in
150+
guard let state = state else { return }
128151
Task { @MainActor in
129-
state?.appendLog(message, type: type, isError: isError)
152+
state.appendLog(message, type: type, isError: isError)
130153
}
131154
}
155+
await self.processManager.setLogHandler(for: id, handler: logHandler)
132156

133-
await processManager.setPortConflictHandler(for: id) { [weak self, weak state] port in
157+
// Set port conflict handler with proper weak capture
158+
let conflictHandler: PortConflictHandler = { [weak self, weak state] port in
159+
guard let self = self, let state = state else { return }
134160
Task { @MainActor in
135-
guard let self = self, let state = state else { return }
136161
state.appendLog("Port \(port) in use, auto-recovering...", type: .portForward, isError: false)
137162

138163
await self.processManager.killProcessOnPort(port)
@@ -143,16 +168,14 @@ final class PortForwardManager {
143168
self.restartConnection(id)
144169
}
145170
}
146-
}
171+
await self.processManager.setPortConflictHandler(for: id, handler: conflictHandler)
147172

148-
state.portForwardStatus = .connecting
149-
state.portForwardTask = Task {
150-
await runPortForward(for: state, config: config)
173+
await self.runPortForward(for: state, config: config)
151174
}
152175
}
153176

154177
func stopConnection(_ id: UUID) {
155-
guard let state = connections.first(where: { $0.id == id }) else { return }
178+
guard let state = connection(for: id) else { return }
156179

157180
// Mark as intentionally stopped to avoid disconnect notification
158181
state.isIntentionallyStopped = true

platforms/macos/Sources/Managers/PortForwardProcessManager.swift

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,30 +36,33 @@ actor PortForwardProcessManager {
3636
let task = Task { [weak self] in
3737
let handle = pipe.fileHandleForReading
3838

39-
while true {
40-
let data = handle.availableData
41-
if data.isEmpty { break }
39+
// Use async bytes stream for non-blocking read
40+
// This avoids potential deadlock from blocking availableData calls
41+
do {
42+
for try await line in handle.bytes.lines {
43+
guard !Task.isCancelled else { break }
4244

43-
if let output = String(data: data, encoding: .utf8)?.trimmingCharacters(in: .whitespacesAndNewlines), !output.isEmpty {
44-
let lines = output.components(separatedBy: .newlines)
45-
for line in lines where !line.isEmpty {
46-
let isError = PortForwardOutputParser.isErrorLine(line)
45+
let trimmedLine = line.trimmingCharacters(in: .whitespacesAndNewlines)
46+
guard !trimmedLine.isEmpty else { continue }
4747

48-
if isError {
49-
await self?.markConnectionError(id: id)
50-
}
48+
let isError = PortForwardOutputParser.isErrorLine(trimmedLine)
5149

52-
if let port = PortForwardOutputParser.detectPortConflict(in: line) {
53-
if let handler = await self?.portConflictHandlers[id] {
54-
handler(port)
55-
}
56-
}
50+
if isError {
51+
await self?.markConnectionError(id: id)
52+
}
5753

58-
if let handler = await self?.logHandlers[id] {
59-
handler(line, type, isError)
54+
if let port = PortForwardOutputParser.detectPortConflict(in: trimmedLine) {
55+
if let handler = await self?.portConflictHandlers[id] {
56+
handler(port)
6057
}
6158
}
59+
60+
if let handler = await self?.logHandlers[id] {
61+
handler(trimmedLine, type, isError)
62+
}
6263
}
64+
} catch {
65+
// Stream ended or was cancelled - this is expected when process terminates
6366
}
6467
}
6568

platforms/macos/Sources/Managers/TunnelManager.swift

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,17 @@ final class TunnelManager {
2121
/// Cached installation status (observable for UI updates)
2222
private(set) var isCloudflaredInstalled: Bool = false
2323

24+
/// Task for cleaning up orphaned tunnels from previous sessions
25+
private var cleanupTask: Task<Void, Never>?
26+
2427
// MARK: - Initialization
2528

2629
init() {
2730
// Check initial installation status
2831
isCloudflaredInstalled = cloudflaredService.isInstalled
2932

3033
// Clean up any orphaned tunnel processes from previous crashed sessions
31-
Task {
34+
cleanupTask = Task {
3235
await cleanupOrphanedTunnels()
3336
}
3437
}
@@ -61,6 +64,15 @@ final class TunnelManager {
6164
/// - port: The local port to expose
6265
/// - portInfoId: Optional reference to the PortInfo this tunnel is for
6366
func startTunnel(for port: Int, portInfoId: UUID? = nil) {
67+
// Ensure cleanup has completed before starting new tunnels
68+
Task {
69+
await cleanupTask?.value
70+
await _startTunnelImpl(for: port, portInfoId: portInfoId)
71+
}
72+
}
73+
74+
/// Internal implementation of startTunnel after cleanup is complete
75+
private func _startTunnelImpl(for port: Int, portInfoId: UUID? = nil) async {
6476
// Check if tunnel already exists for this port
6577
if let existing = tunnels.first(where: { $0.port == port && $0.status != .error }) {
6678
// Already tunneling this port - just copy the URL if available
@@ -75,10 +87,13 @@ final class TunnelManager {
7587

7688
state.status = .starting
7789

78-
Task {
79-
await cloudflaredService.setURLHandler(for: state.id) { [weak self, weak state] url in
90+
Task { [weak self, weak state] in
91+
guard let self = self, let state = state else { return }
92+
93+
// Set URL handler with proper weak capture
94+
let urlHandler: @Sendable (String) -> Void = { [weak self, weak state] url in
95+
guard let state = state else { return }
8096
Task { @MainActor in
81-
guard let state = state else { return }
8297
state.tunnelURL = url
8398
state.status = .active
8499
state.startTime = Date()
@@ -93,30 +108,37 @@ final class TunnelManager {
93108
)
94109
}
95110
}
111+
await self.cloudflaredService.setURLHandler(for: state.id, handler: urlHandler)
96112

97-
await cloudflaredService.setErrorHandler(for: state.id) { [weak state] error in
113+
// Set error handler with proper weak capture
114+
let errorHandler: @Sendable (String) -> Void = { [weak state] error in
115+
guard let state = state else { return }
98116
Task { @MainActor in
99-
guard let state = state else { return }
100117
state.lastError = error
101118
if state.status != .active {
102119
state.status = .error
103120
}
104121
}
105122
}
123+
await self.cloudflaredService.setErrorHandler(for: state.id, handler: errorHandler)
106124

107125
do {
108-
let process = try await cloudflaredService.startTunnel(id: state.id, port: port)
126+
let process = try await self.cloudflaredService.startTunnel(id: state.id, port: port)
109127

110128
// Wait a bit to see if process starts successfully
111129
try? await Task.sleep(for: .seconds(3))
112130

113131
if !process.isRunning && state.status != .active {
114-
state.status = .error
115-
state.lastError = "Process terminated unexpectedly"
132+
await MainActor.run {
133+
state.status = .error
134+
state.lastError = "Process terminated unexpectedly"
135+
}
116136
}
117137
} catch {
118-
state.status = .error
119-
state.lastError = error.localizedDescription
138+
await MainActor.run {
139+
state.status = .error
140+
state.lastError = error.localizedDescription
141+
}
120142
}
121143
}
122144
}

platforms/macos/Sources/Managers/UpdateManager.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,10 @@ final class UpdateManager {
115115
.store(in: &cancellables)
116116
}
117117

118-
/// Storage for Combine cancellables
119-
@ObservationIgnored private var cancellables: Set<AnyCancellable> = []
118+
/// Storage for Combine cancellables.
119+
/// Note: These subscriptions live for the app's lifetime alongside the UpdateManager,
120+
/// so cleanup in deinit is not necessary. The subscriptions are intentionally retained.
121+
@ObservationIgnored private var cancellables = Set<AnyCancellable>()
120122

121123
// MARK: - Public Methods
122124

platforms/macos/Sources/PortScanner.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ actor PortScanner {
5252
let commands = await getProcessCommands()
5353
return parseLsofOutput(output, commands: commands)
5454
} catch {
55+
print("[PortScanner] Failed to scan ports: \(error.localizedDescription)")
5556
return []
5657
}
5758
}
@@ -106,6 +107,7 @@ actor PortScanner {
106107

107108
return commands
108109
} catch {
110+
print("[PortScanner] Failed to get process commands: \(error.localizedDescription)")
109111
return [:]
110112
}
111113
}
@@ -130,7 +132,7 @@ actor PortScanner {
130132
* @param commands - Dictionary of PID to full command string from ps
131133
* @returns Array of unique PortInfo objects, sorted by port number
132134
*/
133-
private func parseLsofOutput(_ output: String, commands: [Int: String]) -> [PortInfo] {
135+
nonisolated private func parseLsofOutput(_ output: String, commands: [Int: String]) -> [PortInfo] {
134136
var ports: [PortInfo] = []
135137
var seen: Set<String> = []
136138
let lines = output.components(separatedBy: .newlines)
@@ -208,7 +210,7 @@ actor PortScanner {
208210
* @param fd - File descriptor number
209211
* @returns PortInfo object or nil if parsing fails
210212
*/
211-
private func parseAddress(_ address: String, processName: String, pid: Int, user: String, command: String, fd: String) -> PortInfo? {
213+
nonisolated private func parseAddress(_ address: String, processName: String, pid: Int, user: String, command: String, fd: String) -> PortInfo? {
212214
let parts: [String]
213215

214216
if address.hasPrefix("[") {

0 commit comments

Comments
 (0)