Skip to content

Commit 1d46bd1

Browse files
authored
Implement deferred task subscriptions (RooCodeInc#7517)
1 parent 1e4c46f commit 1d46bd1

File tree

8 files changed

+106
-56
lines changed

8 files changed

+106
-56
lines changed

packages/cloud/src/bridge/BridgeOrchestrator.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ export interface BridgeOrchestratorOptions {
3131
export class BridgeOrchestrator {
3232
private static instance: BridgeOrchestrator | null = null
3333

34+
private static pendingTask: TaskLike | null = null
35+
3436
// Core
3537
private readonly userId: string
3638
private readonly socketBridgeUrl: string
@@ -116,6 +118,22 @@ export class BridgeOrchestrator {
116118
}
117119
}
118120

121+
/**
122+
* @TODO: What if subtasks also get spawned? We'd probably want deferred
123+
* subscriptions for those too.
124+
*/
125+
public static async subscribeToTask(task: TaskLike): Promise<void> {
126+
const instance = BridgeOrchestrator.instance
127+
128+
if (instance && instance.socketTransport.isConnected()) {
129+
console.log(`[BridgeOrchestrator#subscribeToTask] Subscribing to task ${task.taskId}`)
130+
await instance.subscribeToTask(task)
131+
} else {
132+
console.log(`[BridgeOrchestrator#subscribeToTask] Deferring subscription for task ${task.taskId}`)
133+
BridgeOrchestrator.pendingTask = task
134+
}
135+
}
136+
119137
private constructor(options: BridgeOrchestratorOptions) {
120138
this.userId = options.userId
121139
this.socketBridgeUrl = options.socketBridgeUrl
@@ -180,12 +198,27 @@ export class BridgeOrchestrator {
180198
const socket = this.socketTransport.getSocket()
181199

182200
if (!socket) {
183-
console.error("[BridgeOrchestrator] Socket not available after connect")
201+
console.error("[BridgeOrchestrator#handleConnect] Socket not available")
184202
return
185203
}
186204

187205
await this.extensionChannel.onConnect(socket)
188206
await this.taskChannel.onConnect(socket)
207+
208+
if (BridgeOrchestrator.pendingTask) {
209+
console.log(
210+
`[BridgeOrchestrator#handleConnect] Subscribing to task ${BridgeOrchestrator.pendingTask.taskId}`,
211+
)
212+
213+
try {
214+
await this.subscribeToTask(BridgeOrchestrator.pendingTask)
215+
BridgeOrchestrator.pendingTask = null
216+
} catch (error) {
217+
console.error(
218+
`[BridgeOrchestrator#handleConnect] subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
219+
)
220+
}
221+
}
189222
}
190223

191224
private handleDisconnect() {
@@ -261,6 +294,7 @@ export class BridgeOrchestrator {
261294
await this.taskChannel.cleanup(this.socketTransport.getSocket())
262295
await this.socketTransport.disconnect()
263296
BridgeOrchestrator.instance = null
297+
BridgeOrchestrator.pendingTask = null
264298
}
265299

266300
public async reconnect(): Promise<void> {

packages/cloud/src/bridge/TaskChannel.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,25 +163,27 @@ export class TaskChannel extends BaseChannel<
163163
public async unsubscribeFromTask(taskId: string, _socket: Socket): Promise<void> {
164164
const task = this.subscribedTasks.get(taskId)
165165

166+
if (!task) {
167+
return
168+
}
169+
166170
await this.publish(TaskSocketEvents.LEAVE, { taskId }, (response: LeaveResponse) => {
167171
if (response.success) {
168-
console.log(`[TaskChannel#unsubscribeFromTask] unsubscribed from ${taskId}`, response)
172+
console.log(`[TaskChannel#unsubscribeFromTask] unsubscribed from ${taskId}`)
169173
} else {
170174
console.error(`[TaskChannel#unsubscribeFromTask] failed to unsubscribe from ${taskId}`)
171175
}
172176

173177
// If we failed to unsubscribe then something is probably wrong and
174178
// we should still discard this task from `subscribedTasks`.
175-
if (task) {
176-
this.removeTaskListeners(task)
177-
this.subscribedTasks.delete(taskId)
178-
}
179+
this.removeTaskListeners(task)
180+
this.subscribedTasks.delete(taskId)
179181
})
180182
}
181183

182184
private setupTaskListeners(task: TaskLike): void {
183185
if (this.taskListeners.has(task.taskId)) {
184-
console.warn("[TaskChannel] Listeners already exist for task, removing old listeners:", task.taskId)
186+
console.warn(`[TaskChannel] Listeners already exist for task, removing old listeners for ${task.taskId}`)
185187
this.removeTaskListeners(task)
186188
}
187189

packages/cloud/src/bridge/__tests__/TaskChannel.test.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,8 +299,7 @@ describe("TaskChannel", () => {
299299

300300
// Verify warning was logged
301301
expect(warnSpy).toHaveBeenCalledWith(
302-
"[TaskChannel] Listeners already exist for task, removing old listeners:",
303-
taskId,
302+
`[TaskChannel] Listeners already exist for task, removing old listeners for ${taskId}`,
304303
)
305304

306305
// Verify only one set of listeners exists

packages/types/npm/package.metadata.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@roo-code/types",
3-
"version": "1.64.0",
3+
"version": "1.65.0",
44
"description": "TypeScript type definitions for Roo Code.",
55
"publishConfig": {
66
"access": "public",

src/core/task/Task.ts

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,6 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
256256

257257
// Task Bridge
258258
enableBridge: boolean
259-
bridge: BridgeOrchestrator | null = null
260259

261260
// Streaming
262261
isWaitingForFirstChunk = false
@@ -1084,14 +1083,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
10841083
private async startTask(task?: string, images?: string[]): Promise<void> {
10851084
if (this.enableBridge) {
10861085
try {
1087-
this.bridge = this.bridge || BridgeOrchestrator.getInstance()
1088-
1089-
if (this.bridge) {
1090-
await this.bridge.subscribeToTask(this)
1091-
}
1086+
await BridgeOrchestrator.subscribeToTask(this)
10921087
} catch (error) {
10931088
console.error(
1094-
`[Task#startTask] subscribeToTask failed - ${error instanceof Error ? error.message : String(error)}`,
1089+
`[Task#startTask] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
10951090
)
10961091
}
10971092
}
@@ -1156,14 +1151,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
11561151
private async resumeTaskFromHistory() {
11571152
if (this.enableBridge) {
11581153
try {
1159-
this.bridge = this.bridge || BridgeOrchestrator.getInstance()
1160-
1161-
if (this.bridge) {
1162-
await this.bridge.subscribeToTask(this)
1163-
}
1154+
await BridgeOrchestrator.subscribeToTask(this)
11641155
} catch (error) {
11651156
console.error(
1166-
`[Task#resumeTaskFromHistory] subscribeToTask failed - ${error instanceof Error ? error.message : String(error)}`,
1157+
`[Task#resumeTaskFromHistory] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
11671158
)
11681159
}
11691160
}
@@ -1417,10 +1408,9 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
14171408
}
14181409

14191410
public dispose(): void {
1420-
// Disposing task
1421-
console.log(`[Task] disposing task ${this.taskId}.${this.instanceId}`)
1411+
console.log(`[Task#dispose] disposing task ${this.taskId}.${this.instanceId}`)
14221412

1423-
// Remove all event listeners to prevent memory leaks
1413+
// Remove all event listeners to prevent memory leaks.
14241414
try {
14251415
this.removeAllListeners()
14261416
} catch (error) {
@@ -1433,13 +1423,14 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
14331423
this.pauseInterval = undefined
14341424
}
14351425

1436-
// Unsubscribe from TaskBridge service.
1437-
if (this.bridge) {
1438-
this.bridge
1439-
.unsubscribeFromTask(this.taskId)
1440-
.catch((error: unknown) => console.error("Error unsubscribing from task bridge:", error))
1441-
1442-
this.bridge = null
1426+
if (this.enableBridge) {
1427+
BridgeOrchestrator.getInstance()
1428+
?.unsubscribeFromTask(this.taskId)
1429+
.catch((error) =>
1430+
console.error(
1431+
`[Task#dispose] BridgeOrchestrator#unsubscribeFromTask() failed: ${error instanceof Error ? error.message : String(error)}`,
1432+
),
1433+
)
14431434
}
14441435

14451436
// Release any terminals associated with this task.

src/core/task/__tests__/Task.dispose.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ describe("Task dispose method", () => {
134134

135135
// Verify dispose was called and logged
136136
expect(consoleLogSpy).toHaveBeenCalledWith(
137-
expect.stringContaining(`[Task] disposing task ${task.taskId}.${task.instanceId}`),
137+
expect.stringContaining(`[Task#dispose] disposing task ${task.taskId}.${task.instanceId}`),
138138
)
139139

140140
// Verify removeAllListeners was called first (before other cleanup)

src/core/webview/ClineProvider.ts

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ export class ClineProvider
165165

166166
this.marketplaceManager = new MarketplaceManager(this.context, this.customModesManager)
167167

168+
// Forward <most> task events to the provider.
169+
// We do something fairly similar for the IPC-based API.
168170
this.taskCreationCallback = (instance: Task) => {
169171
this.emit(RooCodeEventName.TaskCreated, instance)
170172

@@ -346,18 +348,18 @@ export class ClineProvider
346348
let task = this.clineStack.pop()
347349

348350
if (task) {
351+
task.emit(RooCodeEventName.TaskUnfocused)
352+
349353
try {
350354
// Abort the running task and set isAbandoned to true so
351355
// all running promises will exit as well.
352356
await task.abortTask(true)
353357
} catch (e) {
354358
this.log(
355-
`[removeClineFromStack] encountered error while aborting task ${task.taskId}.${task.instanceId}: ${e.message}`,
359+
`[ClineProvider#removeClineFromStack] abortTask() failed ${task.taskId}.${task.instanceId}: ${e.message}`,
356360
)
357361
}
358362

359-
task.emit(RooCodeEventName.TaskUnfocused)
360-
361363
// Remove event listeners before clearing the reference.
362364
const cleanupFunctions = this.taskEventListeners.get(task)
363365

@@ -405,12 +407,6 @@ export class ClineProvider
405407
await this.getCurrentTask()?.resumePausedTask(lastMessage)
406408
}
407409

408-
// Clear the current task without treating it as a subtask.
409-
// This is used when the user cancels a task that is not a subtask.
410-
async clearTask() {
411-
await this.removeClineFromStack()
412-
}
413-
414410
resumeTask(taskId: string): void {
415411
// Use the existing showTaskWithId method which handles both current and historical tasks
416412
this.showTaskWithId(taskId).catch((error) => {
@@ -1307,6 +1303,16 @@ export class ClineProvider
13071303
await this.createTaskWithHistoryItem({ ...historyItem, rootTask, parentTask })
13081304
}
13091305

1306+
// Clear the current task without treating it as a subtask.
1307+
// This is used when the user cancels a task that is not a subtask.
1308+
async clearTask() {
1309+
if (this.clineStack.length > 0) {
1310+
const task = this.clineStack[this.clineStack.length - 1]
1311+
console.log(`[clearTask] clearing task ${task.taskId}.${task.instanceId}`)
1312+
await this.removeClineFromStack()
1313+
}
1314+
}
1315+
13101316
async updateCustomInstructions(instructions?: string) {
13111317
// User may be clearing the field.
13121318
await this.updateGlobalState("customInstructions", instructions || undefined)
@@ -1585,6 +1591,7 @@ export class ClineProvider
15851591
})
15861592
} catch (error) {
15871593
console.error("Failed to fetch marketplace data:", error)
1594+
15881595
// Send empty data on error to prevent UI from hanging
15891596
this.postMessageToWebview({
15901597
type: "marketplaceData",
@@ -2213,24 +2220,23 @@ export class ClineProvider
22132220
if (bridge) {
22142221
const currentTask = this.getCurrentTask()
22152222

2216-
if (currentTask && !currentTask.bridge) {
2223+
if (currentTask && !currentTask.enableBridge) {
22172224
try {
2218-
currentTask.bridge = bridge
2219-
await currentTask.bridge.subscribeToTask(currentTask)
2225+
currentTask.enableBridge = true
2226+
await BridgeOrchestrator.subscribeToTask(currentTask)
22202227
} catch (error) {
2221-
const message = `[ClineProvider#remoteControlEnabled] subscribeToTask failed - ${error instanceof Error ? error.message : String(error)}`
2228+
const message = `[ClineProvider#remoteControlEnabled] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`
22222229
this.log(message)
22232230
console.error(message)
22242231
}
22252232
}
22262233
} else {
22272234
for (const task of this.clineStack) {
2228-
if (task.bridge) {
2235+
if (task.enableBridge) {
22292236
try {
2230-
await task.bridge.unsubscribeFromTask(task.taskId)
2231-
task.bridge = null
2237+
await BridgeOrchestrator.getInstance()?.unsubscribeFromTask(task.taskId)
22322238
} catch (error) {
2233-
const message = `[ClineProvider#remoteControlEnabled] unsubscribeFromTask failed - ${error instanceof Error ? error.message : String(error)}`
2239+
const message = `[ClineProvider#remoteControlEnabled] BridgeOrchestrator#unsubscribeFromTask() failed: ${error instanceof Error ? error.message : String(error)}`
22342240
this.log(message)
22352241
console.error(message)
22362242
}

src/extension/api.ts

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -253,11 +253,29 @@ export class API extends EventEmitter<RooCodeEvents> implements RooCodeAPI {
253253
this.taskMap.delete(task.taskId)
254254
})
255255

256-
// Optional:
257-
// RooCodeEventName.TaskFocused
258-
// RooCodeEventName.TaskUnfocused
259-
// RooCodeEventName.TaskActive
260-
// RooCodeEventName.TaskIdle
256+
task.on(RooCodeEventName.TaskFocused, () => {
257+
this.emit(RooCodeEventName.TaskFocused, task.taskId)
258+
})
259+
260+
task.on(RooCodeEventName.TaskUnfocused, () => {
261+
this.emit(RooCodeEventName.TaskUnfocused, task.taskId)
262+
})
263+
264+
task.on(RooCodeEventName.TaskActive, () => {
265+
this.emit(RooCodeEventName.TaskActive, task.taskId)
266+
})
267+
268+
task.on(RooCodeEventName.TaskInteractive, () => {
269+
this.emit(RooCodeEventName.TaskInteractive, task.taskId)
270+
})
271+
272+
task.on(RooCodeEventName.TaskResumable, () => {
273+
this.emit(RooCodeEventName.TaskResumable, task.taskId)
274+
})
275+
276+
task.on(RooCodeEventName.TaskIdle, () => {
277+
this.emit(RooCodeEventName.TaskIdle, task.taskId)
278+
})
261279

262280
// Subtask Lifecycle
263281

0 commit comments

Comments
 (0)