Skip to content

Commit f1f7a64

Browse files
committed
Move message queue to the extension host
1 parent 2e59347 commit f1f7a64

File tree

12 files changed

+314
-195
lines changed

12 files changed

+314
-195
lines changed

packages/types/src/message.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -248,14 +248,9 @@ export type TokenUsage = z.infer<typeof tokenUsageSchema>
248248
* QueuedMessage
249249
*/
250250

251-
/**
252-
* Represents a message that is queued to be sent when sending is enabled
253-
*/
254251
export interface QueuedMessage {
255-
/** Unique identifier for the queued message */
252+
timestamp: number
256253
id: string
257-
/** The text content of the message */
258254
text: string
259-
/** Array of image data URLs attached to the message */
260-
images: string[]
255+
images?: string[]
261256
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import { EventEmitter } from "events"
2+
3+
import { v4 as uuidv4 } from "uuid"
4+
5+
import { QueuedMessage } from "@roo-code/types"
6+
7+
export interface MessageQueueState {
8+
messages: QueuedMessage[]
9+
isProcessing: boolean
10+
isPaused: boolean
11+
}
12+
13+
export interface QueueEvents {
14+
stateChanged: [messages: QueuedMessage[]]
15+
}
16+
17+
export class MessageQueueService extends EventEmitter<QueueEvents> {
18+
private _messages: QueuedMessage[]
19+
20+
constructor() {
21+
super()
22+
23+
this._messages = []
24+
}
25+
26+
private findMessage(id: string) {
27+
const index = this._messages.findIndex((msg) => msg.id === id)
28+
29+
if (index === -1) {
30+
return { index, message: undefined }
31+
}
32+
33+
return { index, message: this._messages[index] }
34+
}
35+
36+
public addMessage(text: string, images?: string[]): QueuedMessage | undefined {
37+
if (!text && !images?.length) {
38+
return undefined
39+
}
40+
41+
const message: QueuedMessage = {
42+
timestamp: Date.now(),
43+
id: uuidv4(),
44+
text,
45+
images,
46+
}
47+
48+
this._messages.push(message)
49+
this.emit("stateChanged", this._messages)
50+
51+
return message
52+
}
53+
54+
public removeMessage(id: string): boolean {
55+
const { index, message } = this.findMessage(id)
56+
57+
if (!message) {
58+
return false
59+
}
60+
61+
console.log("removeMessage", message)
62+
this._messages.splice(index, 1)
63+
this.emit("stateChanged", this._messages)
64+
return true
65+
}
66+
67+
public updateMessage(id: string, text: string, images?: string[]): boolean {
68+
const { message } = this.findMessage(id)
69+
70+
if (!message) {
71+
return false
72+
}
73+
74+
console.log("updateMessage", message)
75+
message.timestamp = Date.now()
76+
message.text = text
77+
message.images = images
78+
this.emit("stateChanged", this._messages)
79+
return true
80+
}
81+
82+
public dequeueMessage(): QueuedMessage | undefined {
83+
const message = this._messages.shift()
84+
this.emit("stateChanged", this._messages)
85+
return message
86+
}
87+
88+
public get messages(): QueuedMessage[] {
89+
return this._messages
90+
}
91+
92+
public isEmpty(): boolean {
93+
return this._messages.length === 0
94+
}
95+
96+
public dispose(): void {
97+
this._messages = []
98+
this.removeAllListeners()
99+
}
100+
}

src/core/task-persistence/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
export { readApiMessages, saveApiMessages } from "./apiMessages"
1+
export { type ApiMessage, readApiMessages, saveApiMessages } from "./apiMessages"
22
export { readTaskMessages, saveTaskMessages } from "./taskMessages"
33
export { taskMetadata } from "./taskMetadata"

src/core/task/Task.ts

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import pWaitFor from "p-wait-for"
1010
import { serializeError } from "serialize-error"
1111

1212
import {
13-
type RooCodeSettings,
1413
type TaskLike,
1514
type TaskMetadata,
1615
type TaskEvents,
@@ -42,6 +41,7 @@ import { CloudService, BridgeOrchestrator } from "@roo-code/cloud"
4241
// api
4342
import { ApiHandler, ApiHandlerCreateMessageMetadata, buildApiHandler } from "../../api"
4443
import { ApiStream } from "../../api/transform/stream"
44+
import { maybeRemoveImageBlocks } from "../../api/transform/image-cleaning"
4545

4646
// shared
4747
import { findLastIndex } from "../../shared/array"
@@ -79,6 +79,7 @@ import { SYSTEM_PROMPT } from "../prompts/system"
7979

8080
// core modules
8181
import { ToolRepetitionDetector } from "../tools/ToolRepetitionDetector"
82+
import { restoreTodoListForTask } from "../tools/updateTodoListTool"
8283
import { FileContextTracker } from "../context-tracking/FileContextTracker"
8384
import { RooIgnoreController } from "../ignore/RooIgnoreController"
8485
import { RooProtectedController } from "../protect/RooProtectedController"
@@ -88,7 +89,14 @@ import { truncateConversationIfNeeded } from "../sliding-window"
8889
import { ClineProvider } from "../webview/ClineProvider"
8990
import { MultiSearchReplaceDiffStrategy } from "../diff/strategies/multi-search-replace"
9091
import { MultiFileSearchReplaceDiffStrategy } from "../diff/strategies/multi-file-search-replace"
91-
import { readApiMessages, saveApiMessages, readTaskMessages, saveTaskMessages, taskMetadata } from "../task-persistence"
92+
import {
93+
type ApiMessage,
94+
readApiMessages,
95+
saveApiMessages,
96+
readTaskMessages,
97+
saveTaskMessages,
98+
taskMetadata,
99+
} from "../task-persistence"
92100
import { getEnvironmentDetails } from "../environment/getEnvironmentDetails"
93101
import { checkContextWindowExceededError } from "../context/context-management/context-error-handling"
94102
import {
@@ -100,12 +108,11 @@ import {
100108
checkpointDiff,
101109
} from "../checkpoints"
102110
import { processUserContentMentions } from "../mentions/processUserContentMentions"
103-
import { ApiMessage } from "../task-persistence/apiMessages"
104111
import { getMessagesSinceLastSummary, summarizeConversation } from "../condense"
105-
import { maybeRemoveImageBlocks } from "../../api/transform/image-cleaning"
106-
import { restoreTodoListForTask } from "../tools/updateTodoListTool"
107-
import { AutoApprovalHandler } from "./AutoApprovalHandler"
108112
import { Gpt5Metadata, ClineMessageWithMetadata } from "./types"
113+
import { MessageQueueService } from "../message-queue/MessageQueueService"
114+
115+
import { AutoApprovalHandler } from "./AutoApprovalHandler"
109116

110117
const MAX_EXPONENTIAL_BACKOFF_SECONDS = 600 // 10 minutes
111118
const DEFAULT_USAGE_COLLECTION_TIMEOUT_MS = 5000 // 5 seconds
@@ -259,6 +266,9 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
259266
// Task Bridge
260267
enableBridge: boolean
261268

269+
// Message Queue Service
270+
public readonly messageQueueService: MessageQueueService
271+
262272
// Streaming
263273
isWaitingForFirstChunk = false
264274
isStreaming = false
@@ -356,9 +366,12 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
356366
TelemetryService.instance.captureTaskCreated(this.taskId)
357367
}
358368

359-
// Initialize the assistant message parser
369+
// Initialize the assistant message parser.
360370
this.assistantMessageParser = new AssistantMessageParser()
361371

372+
this.messageQueueService = new MessageQueueService()
373+
this.messageQueueService.on("stateChanged", () => this.providerRef.deref()?.postStateToWebview())
374+
362375
// Only set up diff strategy if diff is enabled.
363376
if (this.diffEnabled) {
364377
// Default to old strategy, will be updated if experiment is enabled.
@@ -759,10 +772,14 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
759772
// The state is mutable if the message is complete and the task will
760773
// block (via the `pWaitFor`).
761774
const isBlocking = !(this.askResponse !== undefined || this.lastMessageTs !== askTs)
762-
const isStatusMutable = !partial && isBlocking
775+
const isMessageQueued = !this.messageQueueService.isEmpty()
776+
const isStatusMutable = !partial && isBlocking && !isMessageQueued
763777
let statusMutationTimeouts: NodeJS.Timeout[] = []
778+
let messageQueueTimeout: NodeJS.Timeout | undefined
764779

765780
if (isStatusMutable) {
781+
console.log(`Task#ask will block -> type: ${type}`)
782+
766783
if (isInteractiveAsk(type)) {
767784
statusMutationTimeouts.push(
768785
setTimeout(() => {
@@ -797,9 +814,19 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
797814
}, 1_000),
798815
)
799816
}
817+
} else if (isMessageQueued) {
818+
console.log("Task#ask will process message queue after a timeout")
819+
820+
messageQueueTimeout = setTimeout(() => {
821+
const message = this.messageQueueService.dequeueMessage()
822+
823+
if (message) {
824+
this.submitUserMessage(message.text, message.images)
825+
}
826+
}, 1_000)
800827
}
801828

802-
// Wait for askResponse to be set
829+
// Wait for askResponse to be set.
803830
await pWaitFor(() => this.askResponse !== undefined || this.lastMessageTs !== askTs, { interval: 100 })
804831

805832
if (this.lastMessageTs !== askTs) {
@@ -817,6 +844,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
817844
// Cancel the timeouts if they are still running.
818845
statusMutationTimeouts.forEach((timeout) => clearTimeout(timeout))
819846

847+
if (messageQueueTimeout) {
848+
clearTimeout(messageQueueTimeout)
849+
}
850+
820851
// Switch back to an active state.
821852
if (this.idleAsk || this.resumableAsk || this.interactiveAsk) {
822853
this.idleAsk = undefined
@@ -1406,8 +1437,8 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
14061437
newUserContent.push(...formatResponse.imageBlocks(responseImages))
14071438
}
14081439

1409-
// Ensure we have at least some content to send to the API
1410-
// If newUserContent is empty, add a minimal resumption message
1440+
// Ensure we have at least some content to send to the API.
1441+
// If newUserContent is empty, add a minimal resumption message.
14111442
if (newUserContent.length === 0) {
14121443
newUserContent.push({
14131444
type: "text",
@@ -1417,14 +1448,20 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
14171448

14181449
await this.overwriteApiConversationHistory(modifiedApiConversationHistory)
14191450

1420-
// Task resuming from history item
1421-
1451+
// Task resuming from history item.
14221452
await this.initiateTaskLoop(newUserContent)
14231453
}
14241454

14251455
public dispose(): void {
14261456
console.log(`[Task#dispose] disposing task ${this.taskId}.${this.instanceId}`)
14271457

1458+
// Dispose message queue.
1459+
try {
1460+
this.messageQueueService.dispose()
1461+
} catch (error) {
1462+
console.error("Error disposing message queue:", error)
1463+
}
1464+
14281465
// Remove all event listeners to prevent memory leaks.
14291466
try {
14301467
this.removeAllListeners()

0 commit comments

Comments
 (0)