Skip to content

Commit ebf94a8

Browse files
committed
fix: prevent queued messages from being inserted into ongoing sessions
Fixes #7084 by implementing a message queue system that ensures messages sent during an ongoing chat session are properly queued and processed sequentially after the current message completes. Changes: - Added message queue and processing state tracking to Task class - Implemented queue logic in handleWebviewAskResponse - Added processNextQueuedMessage for sequential processing - Updated ExtensionMessage interface to support queue notifications - Added comprehensive tests for message queueing functionality
1 parent dcbb7a6 commit ebf94a8

File tree

3 files changed

+330
-22
lines changed

3 files changed

+330
-22
lines changed

src/core/task/Task.ts

Lines changed: 84 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
131131
readonly taskNumber: number
132132
readonly workspacePath: string
133133

134+
// Message queue for handling user messages
135+
private messageQueue: Array<{ text: string; images?: string[] }> = []
136+
private isProcessingMessage: boolean = false
137+
134138
/**
135139
* The mode associated with this task. Persisted across sessions
136140
* to maintain user context when reopening tasks from history.
@@ -742,11 +746,48 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
742746
}
743747

744748
handleWebviewAskResponse(askResponse: ClineAskResponse, text?: string, images?: string[]) {
749+
// If we're currently processing a message and this is a new user message,
750+
// queue it instead of processing immediately
751+
if (this.isProcessingMessage && askResponse === "messageResponse" && (text || images?.length)) {
752+
this.messageQueue.push({ text: text || "", images })
753+
this.providerRef
754+
.deref()
755+
?.log(`[Task#handleWebviewAskResponse] Message queued. Queue size: ${this.messageQueue.length}`)
756+
757+
// Notify the user that their message has been queued
758+
this.providerRef.deref()?.postMessageToWebview({
759+
type: "messageQueued",
760+
queueSize: this.messageQueue.length,
761+
})
762+
return
763+
}
764+
745765
this.askResponse = askResponse
746766
this.askResponseText = text
747767
this.askResponseImages = images
748768
}
749769

770+
// Process the next message in the queue
771+
private async processNextQueuedMessage() {
772+
if (this.messageQueue.length === 0 || this.isProcessingMessage) {
773+
return
774+
}
775+
776+
const nextMessage = this.messageQueue.shift()
777+
if (!nextMessage) {
778+
return
779+
}
780+
781+
this.providerRef
782+
.deref()
783+
?.log(
784+
`[Task#processNextQueuedMessage] Processing queued message. Remaining in queue: ${this.messageQueue.length}`,
785+
)
786+
787+
// Submit the queued message as a new user message
788+
this.submitUserMessage(nextMessage.text, nextMessage.images)
789+
}
790+
750791
public submitUserMessage(text: string, images?: string[]): void {
751792
try {
752793
const trimmed = (text ?? "").trim()
@@ -1409,30 +1450,37 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
14091450
let includeFileDetails = true
14101451

14111452
this.emit(RooCodeEventName.TaskStarted)
1453+
this.isProcessingMessage = true
14121454

1413-
while (!this.abort) {
1414-
const didEndLoop = await this.recursivelyMakeClineRequests(nextUserContent, includeFileDetails)
1415-
includeFileDetails = false // We only need file details the first time.
1416-
1417-
// The way this agentic loop works is that cline will be given a
1418-
// task that he then calls tools to complete. Unless there's an
1419-
// attempt_completion call, we keep responding back to him with his
1420-
// tool's responses until he either attempt_completion or does not
1421-
// use anymore tools. If he does not use anymore tools, we ask him
1422-
// to consider if he's completed the task and then call
1423-
// attempt_completion, otherwise proceed with completing the task.
1424-
// There is a MAX_REQUESTS_PER_TASK limit to prevent infinite
1425-
// requests, but Cline is prompted to finish the task as efficiently
1426-
// as he can.
1427-
1428-
if (didEndLoop) {
1429-
// For now a task never 'completes'. This will only happen if
1430-
// the user hits max requests and denies resetting the count.
1431-
break
1432-
} else {
1433-
nextUserContent = [{ type: "text", text: formatResponse.noToolsUsed() }]
1434-
this.consecutiveMistakeCount++
1455+
try {
1456+
while (!this.abort) {
1457+
const didEndLoop = await this.recursivelyMakeClineRequests(nextUserContent, includeFileDetails)
1458+
includeFileDetails = false // We only need file details the first time.
1459+
1460+
// The way this agentic loop works is that cline will be given a
1461+
// task that he then calls tools to complete. Unless there's an
1462+
// attempt_completion call, we keep responding back to him with his
1463+
// tool's responses until he either attempt_completion or does not
1464+
// use anymore tools. If he does not use anymore tools, we ask him
1465+
// to consider if he's completed the task and then call
1466+
// attempt_completion, otherwise proceed with completing the task.
1467+
// There is a MAX_REQUESTS_PER_TASK limit to prevent infinite
1468+
// requests, but Cline is prompted to finish the task as efficiently
1469+
// as he can.
1470+
1471+
if (didEndLoop) {
1472+
// For now a task never 'completes'. This will only happen if
1473+
// the user hits max requests and denies resetting the count.
1474+
break
1475+
} else {
1476+
nextUserContent = [{ type: "text", text: formatResponse.noToolsUsed() }]
1477+
this.consecutiveMistakeCount++
1478+
}
14351479
}
1480+
} finally {
1481+
this.isProcessingMessage = false
1482+
// Process any queued messages after the current task completes
1483+
await this.processNextQueuedMessage()
14361484
}
14371485
}
14381486

@@ -1444,6 +1492,9 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
14441492
throw new Error(`[RooCode#recursivelyMakeRooRequests] task ${this.taskId}.${this.instanceId} aborted`)
14451493
}
14461494

1495+
// Mark that we're processing a message
1496+
this.isProcessingMessage = true
1497+
14471498
if (this.consecutiveMistakeLimit > 0 && this.consecutiveMistakeCount >= this.consecutiveMistakeLimit) {
14481499
const { response, text, images } = await this.ask(
14491500
"mistake_limit_reached",
@@ -1889,8 +1940,19 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
18891940
})
18901941
}
18911942

1943+
// Mark processing complete and check for queued messages
1944+
this.isProcessingMessage = false
1945+
1946+
// Process next queued message if any
1947+
if (this.messageQueue.length > 0) {
1948+
await this.processNextQueuedMessage()
1949+
}
1950+
18921951
return didEndLoop // Will always be false for now.
18931952
} catch (error) {
1953+
// Mark processing complete even on error
1954+
this.isProcessingMessage = false
1955+
18941956
// This should never happen since the only thing that can throw an
18951957
// error is the attemptApiRequest, which is wrapped in a try catch
18961958
// that sends an ask where if noButtonClicked, will clear current

src/core/task/__tests__/Task.spec.ts

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1613,5 +1613,249 @@ describe("Cline", () => {
16131613
consoleErrorSpy.mockRestore()
16141614
})
16151615
})
1616+
1617+
describe("Message Queueing", () => {
1618+
let task: Task
1619+
let mockProvider: any
1620+
1621+
beforeEach(() => {
1622+
mockProvider = {
1623+
context: {
1624+
globalStorageUri: { fsPath: "/test/storage" },
1625+
},
1626+
getState: vi.fn().mockResolvedValue({}),
1627+
postMessageToWebview: vi.fn().mockResolvedValue(undefined),
1628+
postStateToWebview: vi.fn().mockResolvedValue(undefined),
1629+
updateTaskHistory: vi.fn().mockResolvedValue(undefined),
1630+
log: vi.fn(), // Add log method to mock provider
1631+
}
1632+
1633+
task = new Task({
1634+
provider: mockProvider,
1635+
apiConfiguration: mockApiConfig,
1636+
task: "test task",
1637+
startTask: false,
1638+
})
1639+
})
1640+
1641+
it("should queue messages when already processing", async () => {
1642+
// Access private properties via any cast for testing
1643+
const taskAny = task as any
1644+
1645+
// Set processing state
1646+
taskAny.isProcessingMessage = true
1647+
1648+
// Spy on postMessageToWebview
1649+
const postMessageSpy = vi.spyOn(mockProvider, "postMessageToWebview")
1650+
1651+
// Try to handle a new message while processing
1652+
await task.handleWebviewAskResponse("messageResponse", "new message while processing")
1653+
1654+
// Verify message was queued
1655+
expect(taskAny.messageQueue).toHaveLength(1)
1656+
expect(taskAny.messageQueue[0]).toEqual({
1657+
text: "new message while processing",
1658+
images: undefined,
1659+
})
1660+
1661+
// Verify queue notification was sent
1662+
expect(postMessageSpy).toHaveBeenCalledWith({
1663+
type: "messageQueued",
1664+
queueSize: 1,
1665+
})
1666+
})
1667+
1668+
it("should process messages immediately when not processing", async () => {
1669+
// Access private properties via any cast
1670+
const taskAny = task as any
1671+
1672+
// Directly test the queueing logic
1673+
taskAny.isProcessingMessage = false
1674+
1675+
// Call handleWebviewAskResponse
1676+
await task.handleWebviewAskResponse("messageResponse", "test message")
1677+
1678+
// Since we're not processing, it should start processing immediately
1679+
// The message should not be in the queue
1680+
expect(taskAny.messageQueue).toHaveLength(0)
1681+
})
1682+
1683+
it("should process queued messages in FIFO order", async () => {
1684+
// Access private properties via any cast
1685+
const taskAny = task as any
1686+
1687+
// Set processing state
1688+
taskAny.isProcessingMessage = true
1689+
1690+
// Queue multiple messages
1691+
await task.handleWebviewAskResponse("messageResponse", "first message")
1692+
await task.handleWebviewAskResponse("messageResponse", "second message")
1693+
await task.handleWebviewAskResponse("messageResponse", "third message")
1694+
1695+
// Verify all messages were queued in order
1696+
expect(taskAny.messageQueue).toHaveLength(3)
1697+
expect(taskAny.messageQueue[0].text).toBe("first message")
1698+
expect(taskAny.messageQueue[1].text).toBe("second message")
1699+
expect(taskAny.messageQueue[2].text).toBe("third message")
1700+
1701+
// Reset processing state and remove first message to simulate processing
1702+
taskAny.isProcessingMessage = false
1703+
const firstMessage = taskAny.messageQueue.shift()
1704+
1705+
// Verify FIFO order is maintained
1706+
expect(firstMessage.text).toBe("first message")
1707+
expect(taskAny.messageQueue).toHaveLength(2)
1708+
expect(taskAny.messageQueue[0].text).toBe("second message")
1709+
})
1710+
1711+
it("should handle empty queue gracefully", async () => {
1712+
// Access private properties via any cast
1713+
const taskAny = task as any
1714+
1715+
// Ensure queue is empty
1716+
taskAny.messageQueue = []
1717+
1718+
// Mock initiateTaskLoop
1719+
const initiateTaskLoopSpy = vi.spyOn(taskAny, "initiateTaskLoop").mockResolvedValue(undefined)
1720+
1721+
// Try to process next queued message
1722+
await taskAny.processNextQueuedMessage()
1723+
1724+
// Verify no processing occurred
1725+
expect(initiateTaskLoopSpy).not.toHaveBeenCalled()
1726+
expect(taskAny.isProcessingMessage).toBe(false)
1727+
})
1728+
1729+
it("should set processing state correctly during message handling", async () => {
1730+
// Access private properties via any cast
1731+
const taskAny = task as any
1732+
1733+
// Initially not processing
1734+
expect(taskAny.isProcessingMessage).toBe(false)
1735+
1736+
// Mock recursivelyMakeClineRequests to track processing
1737+
let processingDuringRequest = false
1738+
vi.spyOn(taskAny, "recursivelyMakeClineRequests").mockImplementation(async () => {
1739+
processingDuringRequest = taskAny.isProcessingMessage
1740+
return true
1741+
})
1742+
1743+
// Mock other required methods
1744+
vi.spyOn(task, "say").mockResolvedValue(undefined)
1745+
vi.spyOn(taskAny, "addToApiConversationHistory").mockReturnValue(undefined)
1746+
vi.spyOn(taskAny, "saveApiConversationHistory").mockResolvedValue(undefined)
1747+
1748+
// Start processing a message
1749+
await taskAny.initiateTaskLoop([{ type: "text", text: "test message" }])
1750+
1751+
// Verify processing state was set during request
1752+
expect(processingDuringRequest).toBe(true)
1753+
1754+
// Verify processing state is reset after completion
1755+
expect(taskAny.isProcessingMessage).toBe(false)
1756+
})
1757+
1758+
it("should send queue size updates when queueing multiple messages", async () => {
1759+
// Access private properties via any cast
1760+
const taskAny = task as any
1761+
1762+
// Set processing state
1763+
taskAny.isProcessingMessage = true
1764+
1765+
// Spy on postMessageToWebview
1766+
const postMessageSpy = vi.spyOn(mockProvider, "postMessageToWebview")
1767+
1768+
// Queue multiple messages
1769+
await task.handleWebviewAskResponse("messageResponse", "message 1")
1770+
await task.handleWebviewAskResponse("messageResponse", "message 2")
1771+
await task.handleWebviewAskResponse("messageResponse", "message 3")
1772+
1773+
// Verify queue notifications were sent with correct sizes
1774+
expect(postMessageSpy).toHaveBeenNthCalledWith(1, {
1775+
type: "messageQueued",
1776+
queueSize: 1,
1777+
})
1778+
expect(postMessageSpy).toHaveBeenNthCalledWith(2, {
1779+
type: "messageQueued",
1780+
queueSize: 2,
1781+
})
1782+
expect(postMessageSpy).toHaveBeenNthCalledWith(3, {
1783+
type: "messageQueued",
1784+
queueSize: 3,
1785+
})
1786+
})
1787+
1788+
it("should handle rapid message submissions correctly", async () => {
1789+
// Access private properties via any cast
1790+
const taskAny = task as any
1791+
1792+
// Set processing state
1793+
taskAny.isProcessingMessage = true
1794+
1795+
// Simulate rapid message submissions
1796+
const promises = []
1797+
for (let i = 1; i <= 10; i++) {
1798+
promises.push(task.handleWebviewAskResponse("messageResponse", `rapid message ${i}`))
1799+
}
1800+
1801+
// Wait for all to complete
1802+
await Promise.all(promises)
1803+
1804+
// Verify all messages were queued
1805+
expect(taskAny.messageQueue).toHaveLength(10)
1806+
for (let i = 0; i < 10; i++) {
1807+
expect(taskAny.messageQueue[i].text).toBe(`rapid message ${i + 1}`)
1808+
}
1809+
})
1810+
1811+
it("should reset processing state when recursivelyMakeClineRequests completes", async () => {
1812+
// Access private properties via any cast
1813+
const taskAny = task as any
1814+
1815+
// Mock the recursive method to track state changes
1816+
vi.spyOn(taskAny, "recursivelyMakeClineRequests").mockResolvedValue(true)
1817+
1818+
// Mock other required methods
1819+
vi.spyOn(task, "say").mockResolvedValue(undefined)
1820+
vi.spyOn(taskAny, "addToApiConversationHistory").mockReturnValue(undefined)
1821+
vi.spyOn(taskAny, "saveApiConversationHistory").mockResolvedValue(undefined)
1822+
1823+
// Queue a message for after processing
1824+
taskAny.messageQueue = [{ text: "queued message" }]
1825+
1826+
// Mock processNextQueuedMessage
1827+
const processNextSpy = vi.spyOn(taskAny, "processNextQueuedMessage").mockResolvedValue(undefined)
1828+
1829+
// Start processing
1830+
await taskAny.initiateTaskLoop([{ type: "text", text: "initial message" }])
1831+
1832+
// Verify processing state was reset
1833+
expect(taskAny.isProcessingMessage).toBe(false)
1834+
1835+
// Verify next queued message was triggered
1836+
expect(processNextSpy).toHaveBeenCalled()
1837+
})
1838+
1839+
it("should handle undefined provider reference when sending queue notifications", async () => {
1840+
// Access private properties via any cast
1841+
const taskAny = task as any
1842+
1843+
// Set processing state
1844+
taskAny.isProcessingMessage = true
1845+
1846+
// Simulate weakref returning undefined
1847+
Object.defineProperty(task, "providerRef", {
1848+
value: { deref: () => undefined },
1849+
writable: false,
1850+
configurable: true,
1851+
})
1852+
1853+
// Try to queue a message - this should not throw
1854+
await task.handleWebviewAskResponse("messageResponse", "message without provider")
1855+
1856+
// Verify message was still queued
1857+
expect(taskAny.messageQueue).toHaveLength(1)
1858+
})
1859+
})
16161860
})
16171861
})

0 commit comments

Comments
 (0)