Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 59 additions & 22 deletions src/core/task/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -799,10 +799,36 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
// The state is mutable if the message is complete and the task will
// block (via the `pWaitFor`).
const isBlocking = !(this.askResponse !== undefined || this.lastMessageTs !== askTs)
const isMessageQueued = !this.messageQueueService.isEmpty()
const isStatusMutable = !partial && isBlocking && !isMessageQueued
let statusMutationTimeouts: NodeJS.Timeout[] = []

// Process any queued messages first
let processedQueuedMessage = false
if (!partial && isBlocking && !this.messageQueueService.isEmpty()) {
console.log("Task#ask will process message queue")

const message = this.messageQueueService.dequeueMessage()

if (message) {
processedQueuedMessage = true
// Check if this is a tool approval ask that needs to be handled
if (
type === "tool" ||
type === "command" ||
type === "browser_action_launch" ||
type === "use_mcp_server"
) {
// For tool approvals, we need to approve first, then send the message if there's text/images
this.handleWebviewAskResponse("yesButtonClicked", message.text, message.images)
} else {
// For other ask types (like followup), fulfill the ask directly
this.setMessageResponse(message.text, message.images)
}
}
}

// Only set status mutations if we didn't process a queued message and there are no more queued messages
const isStatusMutable = !partial && isBlocking && !processedQueuedMessage && this.messageQueueService.isEmpty()

if (isStatusMutable) {
console.log(`Task#ask will block -> type: ${type}`)

Expand Down Expand Up @@ -840,30 +866,41 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
}, 1_000),
)
}
} else if (isMessageQueued) {
console.log("Task#ask will process message queue")
}

const message = this.messageQueueService.dequeueMessage()
// Wait for askResponse to be set, but also check for new queued messages periodically
await pWaitFor(
() => {
// If response is ready, we're done
if (this.askResponse !== undefined || this.lastMessageTs !== askTs) {
return true
}

if (message) {
// Check if this is a tool approval ask that needs to be handled
if (
type === "tool" ||
type === "command" ||
type === "browser_action_launch" ||
type === "use_mcp_server"
) {
// For tool approvals, we need to approve first, then send the message if there's text/images
this.handleWebviewAskResponse("yesButtonClicked", message.text, message.images)
} else {
// For other ask types (like followup), fulfill the ask directly
this.setMessageResponse(message.text, message.images)
// Check if new messages were queued while waiting
if (!this.messageQueueService.isEmpty()) {
console.log("Task#ask detected new queued message while waiting")
const message = this.messageQueueService.dequeueMessage()

if (message) {
// Process the newly queued message
if (
type === "tool" ||
type === "command" ||
type === "browser_action_launch" ||
type === "use_mcp_server"
) {
this.handleWebviewAskResponse("yesButtonClicked", message.text, message.images)
} else {
this.setMessageResponse(message.text, message.images)
}
return true
}
}
}
}

// Wait for askResponse to be set.
await pWaitFor(() => this.askResponse !== undefined || this.lastMessageTs !== askTs, { interval: 100 })
return false
},
{ interval: 100 },
)

if (this.lastMessageTs !== askTs) {
// Could happen if we send multiple asks in a row i.e. with
Expand Down
126 changes: 126 additions & 0 deletions src/core/task/__tests__/Task.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1776,4 +1776,130 @@ describe("Cline", () => {
consoleErrorSpy.mockRestore()
})
})

describe("Message Queue Race Condition Fix", () => {
it("should process messages from queue when available", async () => {
const task = new Task({
provider: mockProvider,
apiConfiguration: mockApiConfig,
task: "test task",
startTask: false,
})

// Add a message to the queue
task.messageQueueService.addMessage("queued message", ["image.png"])

// Call ask which should process the queued message
const result = await task.ask("followup", "Initial question")

// Verify the queued message was processed
expect(result.response).toBe("messageResponse")
expect(result.text).toBe("queued message")
expect(result.images).toEqual(["image.png"])

// Verify queue is now empty
expect(task.messageQueueService.isEmpty()).toBe(true)
})

it("should handle tool approval messages from queue", async () => {
const task = new Task({
provider: mockProvider,
apiConfiguration: mockApiConfig,
task: "test task",
startTask: false,
})

// Add a message to the queue
task.messageQueueService.addMessage("approve with context", ["image.png"])

// Call ask for tool approval - should auto-approve with queued message
const result = await task.ask("tool", "Do you want to use this tool?")

// Verify the queued message was processed as tool approval
expect(result.response).toBe("yesButtonClicked")
expect(result.text).toBe("approve with context")
expect(result.images).toEqual(["image.png"])

// Verify queue is now empty
expect(task.messageQueueService.isEmpty()).toBe(true)
})

it("should check for new messages during wait period", async () => {
const task = new Task({
provider: mockProvider,
apiConfiguration: mockApiConfig,
task: "test task",
startTask: false,
})

// Mock pWaitFor to simulate adding a message during the wait
const originalPWaitFor = (await import("p-wait-for")).default
let conditionCheckCount = 0
vi.mocked(originalPWaitFor).mockImplementation(async (condition, options) => {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the test 'should check for new messages during wait period', the p-wait-for implementation is overridden but not restored. Consider restoring the original implementation after the test to avoid side‐effects on subsequent tests.

// Simulate checking the condition multiple times
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 — Test isolation: mockImplementation overrides the globally mocked p-wait-for for subsequent tests. Prefer mockImplementationOnce for this specific case or restore the mock in afterEach to avoid bleed-over.

while (true) {
conditionCheckCount++

// On the second check, add a message to the queue
if (conditionCheckCount === 2) {
task.messageQueueService.addMessage("delayed message")
// The condition should now detect the message and process it
task.setMessageResponse("delayed message")
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — Test doesn't exercise the intended queue-monitoring path. This line directly sets askResponse via setMessageResponse(), so pWaitFor() returns true without the ask() loop detecting and processing the newly queued message. Remove this direct call and let ask() consume the queued message, then assert queue empties and response/text come from the queue.

// Check the condition
const result = await condition()
if (result) {
return
}

// Prevent infinite loop
if (conditionCheckCount > 5) {
// Force completion
task.setMessageResponse("forced completion")
return
}

await new Promise((resolve) => setTimeout(resolve, 10))
}
})

// Call ask - initially no messages in queue
const result = await task.ask("followup", "Question")

// Should have processed the message that was added during wait
expect(result.response).toBe("messageResponse")
expect(result.text).toBe("delayed message")

// Verify condition was checked multiple times
expect(conditionCheckCount).toBeGreaterThan(1)
})

it("should handle multiple messages in queue", async () => {
const task = new Task({
provider: mockProvider,
apiConfiguration: mockApiConfig,
task: "test task",
startTask: false,
})

// Add multiple messages to the queue
task.messageQueueService.addMessage("first message")
task.messageQueueService.addMessage("second message")

// First ask should process first message
const result1 = await task.ask("followup", "Question 1")
expect(result1.text).toBe("first message")

// Queue should still have one message
expect(task.messageQueueService.isEmpty()).toBe(false)

// Second ask should process second message
const result2 = await task.ask("followup", "Question 2")
expect(result2.text).toBe("second message")

// Queue should now be empty
expect(task.messageQueueService.isEmpty()).toBe(true)
})
})
})
Loading