Skip to content

Commit 6228b16

Browse files
committed
fix(mcp): resolve data race and reliability issues in user_collaboration tool
Three fixes for intermittent user_collaboration failures: 1. UserCollaborationTool: pendingResponses dictionary had a declared but unused NSLock. All reads/writes now go through lockedRead/lockedWrite helpers, preventing data races between Vapor's thread (submitUserResponse) and @mainactor polling (waitForUserResponse). 2. AgentOrchestrator: duplicate check for user response persistence was checking conversation.messages (old array) instead of messageBus.messages (source of truth), causing responses to sometimes not be persisted. 3. ChatWidget: collaboration UI state was cleared before the HTTP POST fired. On HTTP failure the response was silently lost. State now clears only after HTTP 200, with input text restored on failure for retry. Bump version to 20260313.2.
1 parent 9746a99 commit 6228b16

File tree

5 files changed

+92
-40
lines changed

5 files changed

+92
-40
lines changed

Info.plist

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
<key>CFBundlePackageType</key>
2020
<string>APPL</string>
2121
<key>CFBundleShortVersionString</key>
22-
<string>20260313.1</string>
22+
<string>20260313.2</string>
2323
<key>CFBundleVersion</key>
24-
<string>20260313.1</string>
24+
<string>20260313.2</string>
2525
<key>LSApplicationCategoryType</key>
2626
<string>public.app-category.productivity</string>
2727
<key>LSMinimumSystemVersion</key>

Resources/whats-new.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"releases": [
33
{
4-
"version": "20260313.1",
4+
"version": "20260313.2",
55
"release_date": "March 13, 2026",
66
"highlights": [
77
{

Sources/APIFramework/AgentOrchestrator.swift

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2134,12 +2134,14 @@ public class AgentOrchestrator: ObservableObject, IterationController {
21342134
])
21352135

21362136
/// PERSIST MESSAGE: Add user's response to conversation (PINNED for context preservation)
2137+
/// Must use Task @MainActor for Swift 6 concurrency compliance.
21372138
if let convId = conversationId {
21382139
Task { @MainActor in
21392140
if let conversation = self.conversationManager.conversations.first(where: { $0.id == convId }) {
2140-
let isDuplicate = conversation.messages.contains(where: {
2141+
/// Check MessageBus messages (source of truth) not conversation.messages
2142+
let isDuplicate = conversation.messageBus?.messages.contains(where: {
21412143
$0.isFromUser && $0.content == userInput
2142-
})
2144+
}) ?? false
21432145

21442146
if !isDuplicate {
21452147
let messageId = conversation.messageBus?.addUserMessage(
@@ -2152,6 +2154,10 @@ public class AgentOrchestrator: ObservableObject, IterationController {
21522154
"messageId": .string(messageId?.uuidString ?? "unknown"),
21532155
"conversationId": .string(convId.uuidString)
21542156
])
2157+
} else {
2158+
self.logger.debug("USER_COLLAB: Skipping duplicate user response persistence", metadata: [
2159+
"toolCallId": .string(toolCallId)
2160+
])
21552161
}
21562162
}
21572163
}

Sources/MCPFramework/Tools/UserCollaborationTool.swift

Lines changed: 60 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,31 @@ public class UserCollaborationTool: MCPTool, @unchecked Sendable {
7373
private let logger = Logger(label: "com.sam.mcp.UserCollaborationTool")
7474

7575
/// Shared state for pending user responses Key: toolCallId (UUID string), Value: PendingResponse.
76-
nonisolated(unsafe) private static var pendingResponses: [String: PendingResponse] = [:]
76+
/// Thread-safe access: ALL reads/writes MUST go through lockedRead/lockedWrite helpers.
77+
nonisolated(unsafe) private static var _pendingResponses: [String: PendingResponse] = [:]
7778
private static let responseLock = NSLock()
7879

80+
/// Thread-safe read from pendingResponses.
81+
private static func lockedRead<T>(_ body: ([String: PendingResponse]) -> T) -> T {
82+
responseLock.lock()
83+
defer { responseLock.unlock() }
84+
return body(_pendingResponses)
85+
}
86+
87+
/// Thread-safe write to pendingResponses.
88+
private static func lockedWrite(_ body: (inout [String: PendingResponse]) -> Void) {
89+
responseLock.lock()
90+
defer { responseLock.unlock() }
91+
body(&_pendingResponses)
92+
}
93+
94+
/// Thread-safe write with return value.
95+
private static func lockedWrite<T>(_ body: (inout [String: PendingResponse]) -> T) -> T {
96+
responseLock.lock()
97+
defer { responseLock.unlock() }
98+
return body(&_pendingResponses)
99+
}
100+
79101
public init() {}
80102

81103
public func initialize() async throws {
@@ -153,7 +175,7 @@ public class UserCollaborationTool: MCPTool, @unchecked Sendable {
153175
requestedAt: Date()
154176
)
155177

156-
Self.pendingResponses[toolCallId] = pending
178+
Self.lockedWrite { $0[toolCallId] = pending }
157179

158180
/// Send SSE event to notify UI NOTE: This will be handled by the streaming handler in APIHandler The event format: { type: "user_input_required", toolCallId, prompt, context?.
159181
await notifyUIForInput(toolCallId: toolCallId, prompt: prompt, context: userContext, conversationId: context.conversationId)
@@ -170,7 +192,7 @@ public class UserCollaborationTool: MCPTool, @unchecked Sendable {
170192
)
171193

172194
/// Clean up pending response.
173-
Self.pendingResponses.removeValue(forKey: toolCallId)
195+
Self.lockedWrite { $0.removeValue(forKey: toolCallId) }
174196

175197
return result
176198
}
@@ -189,18 +211,25 @@ public class UserCollaborationTool: MCPTool, @unchecked Sendable {
189211

190212
/// Log every 10 polls (every second) to track that we're still waiting
191213
if pollCount % 10 == 0 {
192-
let elapsed = Date().timeIntervalSince(startTime)
214+
let elapsed = Date().timeIntervalSince(startTime)
215+
let (hasPending, hasResponse) = Self.lockedRead { dict in
216+
(dict[toolCallId] != nil, dict[toolCallId]?.userResponse != nil)
217+
}
193218
logger.debug("COLLAB_DEBUG: Still waiting for user response", metadata: [
194219
"toolCallId": .string(toolCallId),
195220
"pollCount": .stringConvertible(pollCount),
196221
"elapsedSeconds": .stringConvertible(elapsed),
197-
"hasPending": .stringConvertible(Self.pendingResponses[toolCallId] != nil),
198-
"hasResponse": .stringConvertible(Self.pendingResponses[toolCallId]?.userResponse != nil)
222+
"hasPending": .stringConvertible(hasPending),
223+
"hasResponse": .stringConvertible(hasResponse)
199224
])
200225
}
201-
202-
/// Check if response received.
203-
if let pending = Self.pendingResponses[toolCallId], let response = pending.userResponse {
226+
227+
/// Check if response received (thread-safe read).
228+
let maybeResponse = Self.lockedRead { dict -> String? in
229+
dict[toolCallId]?.userResponse
230+
}
231+
232+
if let response = maybeResponse {
204233

205234
let waitTime = Date().timeIntervalSince(startTime)
206235
logger.info("USER_RESPONDED: User response received after \(String(format: "%.2f", waitTime))s", metadata: [
@@ -283,37 +312,42 @@ public class UserCollaborationTool: MCPTool, @unchecked Sendable {
283312
/// Submit user response for a pending tool call (called from API endpoint).
284313
public static func submitUserResponse(toolCallId: String, userInput: String) -> Bool {
285314
let logger = Logger(label: "com.sam.mcp.UserCollaborationTool.submitResponse")
315+
let pendingCount = lockedRead { $0.count }
316+
let hasPending = lockedRead { $0[toolCallId] != nil }
286317
logger.info("COLLAB_DEBUG: submitUserResponse called", metadata: [
287318
"toolCallId": .string(toolCallId),
288319
"userInputLength": .stringConvertible(userInput.count),
289-
"pendingCount": .stringConvertible(pendingResponses.count),
290-
"hasPending": .stringConvertible(pendingResponses[toolCallId] != nil)
320+
"pendingCount": .stringConvertible(pendingCount),
321+
"hasPending": .stringConvertible(hasPending)
291322
])
292323

293-
guard var pending = pendingResponses[toolCallId] else {
324+
/// Thread-safe update: read, modify, write back under lock.
325+
let result: (found: Bool, conversationId: UUID?) = lockedWrite { dict in
326+
guard var pending = dict[toolCallId] else {
327+
return (false, nil)
328+
}
329+
pending.userResponse = userInput
330+
pending.respondedAt = Date()
331+
dict[toolCallId] = pending
332+
return (true, pending.conversationId)
333+
}
334+
335+
guard result.found else {
336+
let availableIds = lockedRead { Array($0.keys).joined(separator: ", ") }
294337
logger.error("COLLAB_DEBUG: No pending response found for toolCallId", metadata: [
295338
"toolCallId": .string(toolCallId),
296-
"availableToolCallIds": .string(Array(pendingResponses.keys).joined(separator: ", "))
339+
"availableToolCallIds": .string(availableIds)
297340
])
298341
return false
299342
}
300343

301-
logger.debug("COLLAB_DEBUG: Updating pending response", metadata: [
302-
"toolCallId": .string(toolCallId),
303-
"waitTimeSeconds": .stringConvertible(Date().timeIntervalSince(pending.requestedAt))
304-
])
305-
306-
pending.userResponse = userInput
307-
pending.respondedAt = Date()
308-
pendingResponses[toolCallId] = pending
309-
310-
logger.debug("COLLAB_DEBUG: Pending response updated, posting notification")
344+
logger.debug("COLLAB_DEBUG: Pending response updated under lock, posting notification")
311345

312346
/// Notify that user response was received so AgentOrchestrator can emit it as streaming chunk
313347
ToolNotificationCenter.shared.postUserResponseReceived(
314348
toolCallId: toolCallId,
315349
userInput: userInput,
316-
conversationId: pending.conversationId
350+
conversationId: result.conversationId
317351
)
318352

319353
logger.info("COLLAB_DEBUG: Notification posted successfully", metadata: [
@@ -325,12 +359,12 @@ public class UserCollaborationTool: MCPTool, @unchecked Sendable {
325359

326360
/// Get pending response info (for debugging/monitoring).
327361
public static func getPendingResponse(toolCallId: String) -> PendingResponse? {
328-
return pendingResponses[toolCallId]
362+
return lockedRead { $0[toolCallId] }
329363
}
330364

331365
/// Get all pending responses (for debugging/monitoring).
332366
public static func getAllPendingResponses() -> [PendingResponse] {
333-
return Array(pendingResponses.values)
367+
return lockedRead { Array($0.values) }
334368
}
335369
}
336370

Sources/UserInterface/Chat/ChatWidget.swift

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3352,13 +3352,10 @@ public struct ChatWidget: View {
33523352

33533353
logger.info("USER_COLLAB: Submitting user response for collaboration tool call: \(toolCallId)")
33543354

3355-
/// Clear message text and reset collaboration state immediately
3356-
/// The user's response will appear via streaming from AgentOrchestrator
3355+
/// Clear message text immediately for UX responsiveness, but keep collaboration
3356+
/// state until HTTP succeeds so we can retry or restore on failure.
3357+
let savedInput = messageText
33573358
messageText = ""
3358-
isAwaitingUserInput = false
3359-
userCollaborationPrompt = ""
3360-
userCollaborationContext = nil
3361-
userCollaborationToolCallId = nil
33623359

33633360
/// Submit response to API endpoint
33643361
/// API will add to MessageBus and AgentOrchestrator will emit as streaming chunk
@@ -3381,14 +3378,29 @@ public struct ChatWidget: View {
33813378
let (_, response) = try await URLSession.shared.data(for: request)
33823379

33833380
guard let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 200 else {
3384-
logger.error("USER_COLLAB: Failed to submit user response - HTTP error")
3381+
logger.error("USER_COLLAB: Failed to submit user response - HTTP \((response as? HTTPURLResponse)?.statusCode ?? -1)")
3382+
/// Restore input so user can retry
3383+
await MainActor.run {
3384+
self.messageText = savedInput
3385+
}
33853386
return
33863387
}
33873388

3388-
logger.debug("User response submitted successfully")
3389-
/// Streaming will automatically resume after tool unblocks.
3389+
logger.debug("User response submitted successfully - clearing collaboration state")
3390+
3391+
/// Clear collaboration state only after successful submission
3392+
await MainActor.run {
3393+
self.isAwaitingUserInput = false
3394+
self.userCollaborationPrompt = ""
3395+
self.userCollaborationContext = nil
3396+
self.userCollaborationToolCallId = nil
3397+
}
33903398
} catch {
33913399
logger.error("Failed to submit user response: \(error)")
3400+
/// Restore input so user can retry
3401+
await MainActor.run {
3402+
self.messageText = savedInput
3403+
}
33923404
}
33933405
}
33943406
}

0 commit comments

Comments
 (0)