Skip to content

Commit a1f583a

Browse files
committed
fix: resolve message queue race condition during LLM processing
- Move queue check before status mutation logic to prevent race condition - Add continuous queue monitoring during pWaitFor to catch messages that arrive during processing - Process queued messages immediately when detected during wait period - Add comprehensive tests for queue race condition scenarios Fixes #8536
1 parent 5a3f911 commit a1f583a

File tree

2 files changed

+185
-22
lines changed

2 files changed

+185
-22
lines changed

src/core/task/Task.ts

Lines changed: 59 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -799,10 +799,36 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
799799
// The state is mutable if the message is complete and the task will
800800
// block (via the `pWaitFor`).
801801
const isBlocking = !(this.askResponse !== undefined || this.lastMessageTs !== askTs)
802-
const isMessageQueued = !this.messageQueueService.isEmpty()
803-
const isStatusMutable = !partial && isBlocking && !isMessageQueued
804802
let statusMutationTimeouts: NodeJS.Timeout[] = []
805803

804+
// Process any queued messages first
805+
let processedQueuedMessage = false
806+
if (!partial && isBlocking && !this.messageQueueService.isEmpty()) {
807+
console.log("Task#ask will process message queue")
808+
809+
const message = this.messageQueueService.dequeueMessage()
810+
811+
if (message) {
812+
processedQueuedMessage = true
813+
// Check if this is a tool approval ask that needs to be handled
814+
if (
815+
type === "tool" ||
816+
type === "command" ||
817+
type === "browser_action_launch" ||
818+
type === "use_mcp_server"
819+
) {
820+
// For tool approvals, we need to approve first, then send the message if there's text/images
821+
this.handleWebviewAskResponse("yesButtonClicked", message.text, message.images)
822+
} else {
823+
// For other ask types (like followup), fulfill the ask directly
824+
this.setMessageResponse(message.text, message.images)
825+
}
826+
}
827+
}
828+
829+
// Only set status mutations if we didn't process a queued message and there are no more queued messages
830+
const isStatusMutable = !partial && isBlocking && !processedQueuedMessage && this.messageQueueService.isEmpty()
831+
806832
if (isStatusMutable) {
807833
console.log(`Task#ask will block -> type: ${type}`)
808834

@@ -840,30 +866,41 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
840866
}, 1_000),
841867
)
842868
}
843-
} else if (isMessageQueued) {
844-
console.log("Task#ask will process message queue")
869+
}
845870

846-
const message = this.messageQueueService.dequeueMessage()
871+
// Wait for askResponse to be set, but also check for new queued messages periodically
872+
await pWaitFor(
873+
() => {
874+
// If response is ready, we're done
875+
if (this.askResponse !== undefined || this.lastMessageTs !== askTs) {
876+
return true
877+
}
847878

848-
if (message) {
849-
// Check if this is a tool approval ask that needs to be handled
850-
if (
851-
type === "tool" ||
852-
type === "command" ||
853-
type === "browser_action_launch" ||
854-
type === "use_mcp_server"
855-
) {
856-
// For tool approvals, we need to approve first, then send the message if there's text/images
857-
this.handleWebviewAskResponse("yesButtonClicked", message.text, message.images)
858-
} else {
859-
// For other ask types (like followup), fulfill the ask directly
860-
this.setMessageResponse(message.text, message.images)
879+
// Check if new messages were queued while waiting
880+
if (!this.messageQueueService.isEmpty()) {
881+
console.log("Task#ask detected new queued message while waiting")
882+
const message = this.messageQueueService.dequeueMessage()
883+
884+
if (message) {
885+
// Process the newly queued message
886+
if (
887+
type === "tool" ||
888+
type === "command" ||
889+
type === "browser_action_launch" ||
890+
type === "use_mcp_server"
891+
) {
892+
this.handleWebviewAskResponse("yesButtonClicked", message.text, message.images)
893+
} else {
894+
this.setMessageResponse(message.text, message.images)
895+
}
896+
return true
897+
}
861898
}
862-
}
863-
}
864899

865-
// Wait for askResponse to be set.
866-
await pWaitFor(() => this.askResponse !== undefined || this.lastMessageTs !== askTs, { interval: 100 })
900+
return false
901+
},
902+
{ interval: 100 },
903+
)
867904

868905
if (this.lastMessageTs !== askTs) {
869906
// Could happen if we send multiple asks in a row i.e. with

src/core/task/__tests__/Task.spec.ts

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1776,4 +1776,130 @@ describe("Cline", () => {
17761776
consoleErrorSpy.mockRestore()
17771777
})
17781778
})
1779+
1780+
describe("Message Queue Race Condition Fix", () => {
1781+
it("should process messages from queue when available", async () => {
1782+
const task = new Task({
1783+
provider: mockProvider,
1784+
apiConfiguration: mockApiConfig,
1785+
task: "test task",
1786+
startTask: false,
1787+
})
1788+
1789+
// Add a message to the queue
1790+
task.messageQueueService.addMessage("queued message", ["image.png"])
1791+
1792+
// Call ask which should process the queued message
1793+
const result = await task.ask("followup", "Initial question")
1794+
1795+
// Verify the queued message was processed
1796+
expect(result.response).toBe("messageResponse")
1797+
expect(result.text).toBe("queued message")
1798+
expect(result.images).toEqual(["image.png"])
1799+
1800+
// Verify queue is now empty
1801+
expect(task.messageQueueService.isEmpty()).toBe(true)
1802+
})
1803+
1804+
it("should handle tool approval messages from queue", async () => {
1805+
const task = new Task({
1806+
provider: mockProvider,
1807+
apiConfiguration: mockApiConfig,
1808+
task: "test task",
1809+
startTask: false,
1810+
})
1811+
1812+
// Add a message to the queue
1813+
task.messageQueueService.addMessage("approve with context", ["image.png"])
1814+
1815+
// Call ask for tool approval - should auto-approve with queued message
1816+
const result = await task.ask("tool", "Do you want to use this tool?")
1817+
1818+
// Verify the queued message was processed as tool approval
1819+
expect(result.response).toBe("yesButtonClicked")
1820+
expect(result.text).toBe("approve with context")
1821+
expect(result.images).toEqual(["image.png"])
1822+
1823+
// Verify queue is now empty
1824+
expect(task.messageQueueService.isEmpty()).toBe(true)
1825+
})
1826+
1827+
it("should check for new messages during wait period", async () => {
1828+
const task = new Task({
1829+
provider: mockProvider,
1830+
apiConfiguration: mockApiConfig,
1831+
task: "test task",
1832+
startTask: false,
1833+
})
1834+
1835+
// Mock pWaitFor to simulate adding a message during the wait
1836+
const originalPWaitFor = (await import("p-wait-for")).default
1837+
let conditionCheckCount = 0
1838+
vi.mocked(originalPWaitFor).mockImplementation(async (condition, options) => {
1839+
// Simulate checking the condition multiple times
1840+
while (true) {
1841+
conditionCheckCount++
1842+
1843+
// On the second check, add a message to the queue
1844+
if (conditionCheckCount === 2) {
1845+
task.messageQueueService.addMessage("delayed message")
1846+
// The condition should now detect the message and process it
1847+
task.setMessageResponse("delayed message")
1848+
}
1849+
1850+
// Check the condition
1851+
const result = await condition()
1852+
if (result) {
1853+
return
1854+
}
1855+
1856+
// Prevent infinite loop
1857+
if (conditionCheckCount > 5) {
1858+
// Force completion
1859+
task.setMessageResponse("forced completion")
1860+
return
1861+
}
1862+
1863+
await new Promise((resolve) => setTimeout(resolve, 10))
1864+
}
1865+
})
1866+
1867+
// Call ask - initially no messages in queue
1868+
const result = await task.ask("followup", "Question")
1869+
1870+
// Should have processed the message that was added during wait
1871+
expect(result.response).toBe("messageResponse")
1872+
expect(result.text).toBe("delayed message")
1873+
1874+
// Verify condition was checked multiple times
1875+
expect(conditionCheckCount).toBeGreaterThan(1)
1876+
})
1877+
1878+
it("should handle multiple messages in queue", async () => {
1879+
const task = new Task({
1880+
provider: mockProvider,
1881+
apiConfiguration: mockApiConfig,
1882+
task: "test task",
1883+
startTask: false,
1884+
})
1885+
1886+
// Add multiple messages to the queue
1887+
task.messageQueueService.addMessage("first message")
1888+
task.messageQueueService.addMessage("second message")
1889+
1890+
// First ask should process first message
1891+
const result1 = await task.ask("followup", "Question 1")
1892+
expect(result1.text).toBe("first message")
1893+
1894+
// Queue should still have one message
1895+
expect(task.messageQueueService.isEmpty()).toBe(false)
1896+
1897+
// Second ask should process second message
1898+
const result2 = await task.ask("followup", "Question 2")
1899+
expect(result2.text).toBe("second message")
1900+
1901+
// Queue should now be empty
1902+
expect(task.messageQueueService.isEmpty()).toBe(true)
1903+
})
1904+
})
17791905
})

0 commit comments

Comments
 (0)