diff --git a/packages/cloud/package.json b/packages/cloud/package.json index d67b5ae7eb..2737312192 100644 --- a/packages/cloud/package.json +++ b/packages/cloud/package.json @@ -13,13 +13,16 @@ "dependencies": { "@roo-code/telemetry": "workspace:^", "@roo-code/types": "workspace:^", + "ioredis": "^5.3.2", "zod": "^3.25.61" }, "devDependencies": { "@roo-code/config-eslint": "workspace:^", "@roo-code/config-typescript": "workspace:^", + "@types/ioredis-mock": "^8.2.6", "@types/node": "20.x", "@types/vscode": "^1.84.0", + "ioredis-mock": "^8.9.0", "vitest": "^3.2.3" } } diff --git a/packages/cloud/src/CloudAPI.ts b/packages/cloud/src/CloudAPI.ts new file mode 100644 index 0000000000..58c5507ad5 --- /dev/null +++ b/packages/cloud/src/CloudAPI.ts @@ -0,0 +1,141 @@ +import { + type ShareVisibility, + type ShareResponse, + shareResponseSchema, + type TaskBridgeRegisterResponse, + taskBridgeRegisterResponseSchema, +} from "@roo-code/types" + +import { getRooCodeApiUrl } from "./config" +import type { AuthService } from "./auth" +import { getUserAgent } from "./utils" +import { AuthenticationError, CloudAPIError, NetworkError, TaskNotFoundError } from "./errors" + +interface CloudAPIRequestOptions extends Omit { + timeout?: number + headers?: Record +} + +export class CloudAPI { + private authService: AuthService + private log: (...args: unknown[]) => void + private baseUrl: string + + constructor(authService: AuthService, log?: (...args: unknown[]) => void) { + this.authService = authService + this.log = log || console.log + this.baseUrl = getRooCodeApiUrl() + } + + private async request( + endpoint: string, + options: CloudAPIRequestOptions & { + parseResponse?: (data: unknown) => T + } = {}, + ): Promise { + const { timeout = 10000, parseResponse, headers = {}, ...fetchOptions } = options + + const sessionToken = this.authService.getSessionToken() + + if (!sessionToken) { + throw new AuthenticationError() + } + + const url = `${this.baseUrl}${endpoint}` + + const requestHeaders = { + "Content-Type": "application/json", + Authorization: `Bearer ${sessionToken}`, + "User-Agent": getUserAgent(), + ...headers, + } + + try { + const response = await fetch(url, { + ...fetchOptions, + headers: requestHeaders, + signal: AbortSignal.timeout(timeout), + }) + + if (!response.ok) { + await this.handleErrorResponse(response, endpoint) + } + + const data = await response.json() + + if (parseResponse) { + return parseResponse(data) + } + + return data as T + } catch (error) { + if (error instanceof TypeError && error.message.includes("fetch")) { + throw new NetworkError(`Network error while calling ${endpoint}`) + } + + if (error instanceof CloudAPIError) { + throw error + } + + if (error instanceof Error && error.name === "AbortError") { + throw new CloudAPIError(`Request to ${endpoint} timed out`, undefined, undefined) + } + + throw new CloudAPIError( + `Unexpected error while calling ${endpoint}: ${error instanceof Error ? error.message : String(error)}`, + ) + } + } + + private async handleErrorResponse(response: Response, endpoint: string): Promise { + let responseBody: unknown + + try { + responseBody = await response.json() + } catch { + responseBody = await response.text() + } + + switch (response.status) { + case 401: + throw new AuthenticationError() + case 404: + if (endpoint.includes("/share")) { + throw new TaskNotFoundError() + } + throw new CloudAPIError(`Resource not found: ${endpoint}`, 404, responseBody) + default: + throw new CloudAPIError( + `HTTP ${response.status}: ${response.statusText}`, + response.status, + responseBody, + ) + } + } + + async shareTask(taskId: string, visibility: ShareVisibility = "organization"): Promise { + this.log(`[CloudAPI] Sharing task ${taskId} with visibility: ${visibility}`) + + const response = await this.request("/api/extension/share", { + method: "POST", + body: JSON.stringify({ taskId, visibility }), + parseResponse: (data) => shareResponseSchema.parse(data), + }) + + this.log("[CloudAPI] Share response:", response) + return response + } + + async registerTaskBridge(taskId: string, bridgeUrl?: string): Promise { + this.log(`[CloudAPI] Registering task bridge for ${taskId}`, bridgeUrl ? `with URL: ${bridgeUrl}` : "") + + const response = await this.request(`/api/extension/tasks/${taskId}/register-bridge`, { + method: "POST", + body: JSON.stringify({ taskId, bridgeUrl }), + parseResponse: (data) => taskBridgeRegisterResponseSchema.parse(data), + }) + + this.log("[CloudAPI] Task bridge registration response:", response) + return response + } +} diff --git a/packages/cloud/src/CloudService.ts b/packages/cloud/src/CloudService.ts index 30d1545b23..48f637a0d1 100644 --- a/packages/cloud/src/CloudService.ts +++ b/packages/cloud/src/CloudService.ts @@ -7,17 +7,20 @@ import type { OrganizationSettings, ClineMessage, ShareVisibility, + TaskBridgeRegisterResponse, } from "@roo-code/types" import { TelemetryService } from "@roo-code/telemetry" import { CloudServiceCallbacks } from "./types" -import type { AuthService } from "./auth" -import { WebAuthService, StaticTokenAuthService } from "./auth" +import { type AuthService, WebAuthService, StaticTokenAuthService } from "./auth" +import { TaskNotFoundError } from "./errors" + import type { SettingsService } from "./SettingsService" import { CloudSettingsService } from "./CloudSettingsService" import { StaticSettingsService } from "./StaticSettingsService" import { TelemetryClient } from "./TelemetryClient" -import { ShareService, TaskNotFoundError } from "./ShareService" +import { CloudShareService } from "./CloudShareService" +import { CloudAPI } from "./CloudAPI" export class CloudService { private static _instance: CloudService | null = null @@ -28,7 +31,8 @@ export class CloudService { private authService: AuthService | null = null private settingsService: SettingsService | null = null private telemetryClient: TelemetryClient | null = null - private shareService: ShareService | null = null + private shareService: CloudShareService | null = null + private cloudAPI: CloudAPI | null = null private isInitialized = false private log: (...args: unknown[]) => void @@ -80,8 +84,9 @@ export class CloudService { this.settingsService = cloudSettingsService } + this.cloudAPI = new CloudAPI(this.authService, this.log) this.telemetryClient = new TelemetryClient(this.authService, this.settingsService) - this.shareService = new ShareService(this.authService, this.settingsService, this.log) + this.shareService = new CloudShareService(this.cloudAPI, this.settingsService, this.log) try { TelemetryService.instance.register(this.telemetryClient) @@ -202,7 +207,7 @@ export class CloudService { return await this.shareService!.shareTask(taskId, visibility) } catch (error) { if (error instanceof TaskNotFoundError && clineMessages) { - // Backfill messages and retry + // Backfill messages and retry. await this.telemetryClient!.backfillMessages(clineMessages, taskId) return await this.shareService!.shareTask(taskId, visibility) } @@ -215,6 +220,13 @@ export class CloudService { return this.shareService!.canShareTask() } + // Task Bridge + + public async registerTaskBridge(taskId: string, bridgeUrl?: string): Promise { + this.ensureInitialized() + return this.cloudAPI!.registerTaskBridge(taskId, bridgeUrl) + } + // Lifecycle public dispose(): void { @@ -225,6 +237,7 @@ export class CloudService { this.authService.off("logged-out", this.authListener) this.authService.off("user-info", this.authListener) } + if (this.settingsService) { this.settingsService.dispose() } diff --git a/packages/cloud/src/CloudSettingsService.ts b/packages/cloud/src/CloudSettingsService.ts index 6692d8141d..4d856f4df2 100644 --- a/packages/cloud/src/CloudSettingsService.ts +++ b/packages/cloud/src/CloudSettingsService.ts @@ -7,7 +7,7 @@ import { organizationSettingsSchema, } from "@roo-code/types" -import { getRooCodeApiUrl } from "./Config" +import { getRooCodeApiUrl } from "./config" import type { AuthService } from "./auth" import { RefreshTimer } from "./RefreshTimer" import type { SettingsService } from "./SettingsService" diff --git a/packages/cloud/src/CloudShareService.ts b/packages/cloud/src/CloudShareService.ts new file mode 100644 index 0000000000..91e0f6aa3f --- /dev/null +++ b/packages/cloud/src/CloudShareService.ts @@ -0,0 +1,43 @@ +import * as vscode from "vscode" + +import type { ShareResponse, ShareVisibility } from "@roo-code/types" + +import type { CloudAPI } from "./CloudAPI" +import type { SettingsService } from "./SettingsService" + +export class CloudShareService { + private cloudAPI: CloudAPI + private settingsService: SettingsService + private log: (...args: unknown[]) => void + + constructor(cloudAPI: CloudAPI, settingsService: SettingsService, log?: (...args: unknown[]) => void) { + this.cloudAPI = cloudAPI + this.settingsService = settingsService + this.log = log || console.log + } + + async shareTask(taskId: string, visibility: ShareVisibility = "organization"): Promise { + try { + const response = await this.cloudAPI.shareTask(taskId, visibility) + + if (response.success && response.shareUrl) { + // Copy to clipboard. + await vscode.env.clipboard.writeText(response.shareUrl) + } + + return response + } catch (error) { + this.log("[ShareService] Error sharing task:", error) + throw error + } + } + + async canShareTask(): Promise { + try { + return !!this.settingsService.getSettings()?.cloudSettings?.enableTaskSharing + } catch (error) { + this.log("[ShareService] Error checking if task can be shared:", error) + return false + } + } +} diff --git a/packages/cloud/src/ShareService.ts b/packages/cloud/src/ShareService.ts deleted file mode 100644 index 5dcc7cae3f..0000000000 --- a/packages/cloud/src/ShareService.ts +++ /dev/null @@ -1,88 +0,0 @@ -import * as vscode from "vscode" - -import { shareResponseSchema } from "@roo-code/types" -import { getRooCodeApiUrl } from "./Config" -import type { AuthService } from "./auth" -import type { SettingsService } from "./SettingsService" -import { getUserAgent } from "./utils" - -export type ShareVisibility = "organization" | "public" - -export class TaskNotFoundError extends Error { - constructor(taskId?: string) { - super(taskId ? `Task '${taskId}' not found` : "Task not found") - Object.setPrototypeOf(this, TaskNotFoundError.prototype) - } -} - -export class ShareService { - private authService: AuthService - private settingsService: SettingsService - private log: (...args: unknown[]) => void - - constructor(authService: AuthService, settingsService: SettingsService, log?: (...args: unknown[]) => void) { - this.authService = authService - this.settingsService = settingsService - this.log = log || console.log - } - - /** - * Share a task with specified visibility - * Returns the share response data - */ - async shareTask(taskId: string, visibility: ShareVisibility = "organization") { - try { - const sessionToken = this.authService.getSessionToken() - if (!sessionToken) { - throw new Error("Authentication required") - } - - const response = await fetch(`${getRooCodeApiUrl()}/api/extension/share`, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${sessionToken}`, - "User-Agent": getUserAgent(), - }, - body: JSON.stringify({ taskId, visibility }), - signal: AbortSignal.timeout(10000), - }) - - if (!response.ok) { - if (response.status === 404) { - throw new TaskNotFoundError(taskId) - } - throw new Error(`HTTP ${response.status}: ${response.statusText}`) - } - - const data = shareResponseSchema.parse(await response.json()) - this.log("[share] Share link created successfully:", data) - - if (data.success && data.shareUrl) { - // Copy to clipboard - await vscode.env.clipboard.writeText(data.shareUrl) - } - - return data - } catch (error) { - this.log("[share] Error sharing task:", error) - throw error - } - } - - /** - * Check if sharing is available - */ - async canShareTask(): Promise { - try { - if (!this.authService.isAuthenticated()) { - return false - } - - return !!this.settingsService.getSettings()?.cloudSettings?.enableTaskSharing - } catch (error) { - this.log("[share] Error checking if task can be shared:", error) - return false - } - } -} diff --git a/packages/cloud/src/StaticSettingsService.ts b/packages/cloud/src/StaticSettingsService.ts index 3aac37bda5..97e6cf7ea8 100644 --- a/packages/cloud/src/StaticSettingsService.ts +++ b/packages/cloud/src/StaticSettingsService.ts @@ -36,6 +36,6 @@ export class StaticSettingsService implements SettingsService { } public dispose(): void { - // No resources to clean up for static settings + // No resources to clean up for static settings. } } diff --git a/packages/cloud/src/TaskBridgeService.ts b/packages/cloud/src/TaskBridgeService.ts new file mode 100644 index 0000000000..ccc7851a5e --- /dev/null +++ b/packages/cloud/src/TaskBridgeService.ts @@ -0,0 +1,669 @@ +import Redis from "ioredis" +import { z } from "zod" + +import type { TaskLike, TaskEventHandlers } from "@roo-code/types" + +const NAMESPACE = "bridge" + +export interface TaskBridgeConfig { + url?: string + namespace?: string + reconnectOnError?: boolean + maxReconnectAttempts?: number + reconnectDelay?: number + connectionTimeout?: number + commandTimeout?: number +} + +const taskBridgeMessageTypes = ["message", "task_event"] as const + +type TaskBridgeMessageType = (typeof taskBridgeMessageTypes)[number] + +const taskBridgeMessagePayloadSchema = z.record(z.string(), z.unknown()) + +type TaskBridgeMessagePayload = z.infer + +const taskBridgeMessageSchema = z.object({ + taskId: z.string(), + type: z.enum(taskBridgeMessageTypes), + payload: taskBridgeMessagePayloadSchema, + timestamp: z.number(), +}) + +export type TaskBridgeMessage = z.infer + +const queuedMessageSchema = z.object({ + text: z.string(), + images: z.array(z.string()).optional(), + timestamp: z.number(), +}) + +export type QueuedMessage = z.infer + +interface InternalQueuedMessage { + id: string + taskId: string + message: QueuedMessage + timestamp: number + retryCount: number + maxRetries: number +} + +export class TaskBridgeService { + private static instance: TaskBridgeService | null = null + + private config: TaskBridgeConfig + private publisher: Redis | null = null + private subscriber: Redis | null = null + private isConnected: boolean = false + private reconnectAttempts: number = 0 + private reconnectTimeout: NodeJS.Timeout | null = null + private subscribedTasks: Map = new Map() + private taskEventHandlers: Record> = {} + + private messageQueues: Map = new Map() + private taskStatuses: Map = new Map() + private processingQueues: Set = new Set() + private queueProcessingTimeouts: Map = new Map() + private processingPromises: Map> = new Map() + + private readonly RETRY_DELAY_MS = 1000 + private readonly MAX_RETRY_DELAY_MS = 30000 + private readonly DEFAULT_MAX_RETRIES = 3 + + private constructor({ + url = "redis://localhost:6379", + namespace = NAMESPACE, + reconnectOnError = true, + maxReconnectAttempts = 10, + reconnectDelay = 5000, + connectionTimeout = 10000, + commandTimeout = 5000, + }: TaskBridgeConfig = {}) { + this.config = { + url, + namespace, + reconnectOnError, + maxReconnectAttempts, + reconnectDelay, + connectionTimeout, + commandTimeout, + } + + if (!this.config.url) { + throw new Error("[TaskBridgeService] Redis URL is required") + } + + if (!this.config.namespace || this.config.namespace.trim() === "") { + throw new Error("[TaskBridgeService] Namespace is required") + } + + if (this.config.maxReconnectAttempts! < 0) { + throw new Error("[TaskBridgeService] maxReconnectAttempts must be non-negative") + } + + if (this.config.reconnectDelay! < 0) { + throw new Error("[TaskBridgeService] reconnectDelay must be non-negative") + } + + if (this.config.connectionTimeout! < 0) { + throw new Error("[TaskBridgeService] connectionTimeout must be non-negative") + } + + if (this.config.commandTimeout! < 0) { + throw new Error("[TaskBridgeService] commandTimeout must be non-negative") + } + } + + public static getInstance(config?: TaskBridgeConfig) { + if (!TaskBridgeService.instance) { + TaskBridgeService.instance = new TaskBridgeService(config) + } else if (config) { + console.warn("[TaskBridgeService] Instance already exists. Configuration will be ignored.") + } + + return TaskBridgeService.instance + } + + public static resetInstance(): void { + if (TaskBridgeService.instance) { + TaskBridgeService.instance.disconnect().catch(() => {}) + TaskBridgeService.instance = null + } + } + + public async initialize() { + if (this.isConnected) { + return + } + + try { + this.publisher = new Redis(this.config.url!, { + retryStrategy: (times: number) => { + if (times > this.config.maxReconnectAttempts!) { + return null + } + + return Math.min(times * 50, 2000) + }, + enableOfflineQueue: false, + lazyConnect: true, + connectTimeout: this.config.connectionTimeout, + commandTimeout: this.config.commandTimeout, + }) + + this.subscriber = new Redis(this.config.url!, { + retryStrategy: (times: number) => { + if (times > this.config.maxReconnectAttempts!) { + return null + } + + return Math.min(times * 50, 2000) + }, + enableOfflineQueue: false, + lazyConnect: true, + connectTimeout: this.config.connectionTimeout, + commandTimeout: this.config.commandTimeout, + }) + + this.publisher.on("error", (error: Error) => this.handleConnectionError(error)) + this.publisher.on("close", () => this.handleConnectionError()) + + this.subscriber.on("error", (error: Error) => this.handleConnectionError(error)) + this.subscriber.on("close", () => this.handleConnectionError()) + + this.subscriber.on("message", (channel: string, buffer: string) => { + try { + const message = taskBridgeMessageSchema.parse(JSON.parse(buffer)) + const parts = channel.split(":") + const taskId = parts[parts.length - 2] + const task = this.subscribedTasks.get(taskId) + + if (!task) { + console.warn(`Received message for unsubscribed task: ${taskId}`) + return + } + + switch (message.type) { + case "message": + this.handleQueuedMessage(taskId, message.payload) + break + } + } catch (error) { + console.error("Error handling incoming message:", error) + } + }) + + await Promise.all([this.publisher.connect(), this.subscriber.connect()]) + await Promise.all([this.waitForConnection(this.publisher), this.waitForConnection(this.subscriber)]) + + this.isConnected = true + this.reconnectAttempts = 0 + + console.log(`[TaskBridgeService] connected -> ${this.config.url}`) + } catch (error) { + if (this.publisher) { + console.log(`[TaskBridgeService] disconnecting publisher`) + await this.publisher.quit().catch(() => {}) + this.publisher = null + } + + if (this.subscriber) { + console.log(`[TaskBridgeService] disconnecting subscriber`) + await this.subscriber.quit().catch(() => {}) + this.subscriber = null + } + + throw error + } + } + + public async subscribeToTask(task: TaskLike): Promise { + const channel = this.serverChannel(task.taskId) + console.log(`[TaskBridgeService] subscribeToTask -> ${channel}`) + + if (!this.isConnected || !this.subscriber) { + throw new Error("TaskBridgeService is not connected") + } + + await this.subscriber.subscribe(channel) + this.subscribedTasks.set(task.taskId, task) + this.setupTaskEventListeners(task) + } + + public async unsubscribeFromTask(taskId: string): Promise { + const channel = this.serverChannel(taskId) + console.log(`[TaskBridgeService] unsubscribeFromTask -> ${channel}`) + + if (!this.subscriber) { + return + } + + const task = this.subscribedTasks.get(taskId) + + if (task) { + this.removeTaskEventListeners(task) + this.subscribedTasks.delete(taskId) + } + + await this.subscriber.unsubscribe(channel) + } + + public async publish(taskId: string, type: TaskBridgeMessageType, payload: TaskBridgeMessagePayload) { + if (!this.isConnected || !this.publisher) { + throw new Error("TaskBridgeService is not connected") + } + + const data: TaskBridgeMessage = { + taskId, + type, + payload, + timestamp: Date.now(), + } + + await this.publisher.publish(this.clientChannel(taskId), JSON.stringify(data)) + } + + public async disconnect(): Promise { + console.log(`[TaskBridgeService] disconnecting`) + + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout) + this.reconnectTimeout = null + } + + for (const timeoutId of this.queueProcessingTimeouts.values()) { + clearTimeout(timeoutId) + } + + this.queueProcessingTimeouts.clear() + + // Wait for all processing to complete. + const processingPromises = Array.from(this.processingPromises.values()) + + if (processingPromises.length > 0) { + await Promise.allSettled(processingPromises) + } + + this.processingPromises.clear() + + // Unsubscribe from all tasks. + const unsubscribePromises = [] + + for (const taskId of this.subscribedTasks.keys()) { + unsubscribePromises.push(this.unsubscribeFromTask(taskId)) + } + + await Promise.allSettled(unsubscribePromises) + + // Remove event listeners before closing connections. + if (this.publisher) { + this.publisher.removeAllListeners() + await this.publisher.quit() + this.publisher = null + } + + if (this.subscriber) { + this.subscriber.removeAllListeners() + await this.subscriber.quit() + this.subscriber = null + } + + // Clear internal state. + this.messageQueues.clear() + this.taskStatuses.clear() + this.processingQueues.clear() + this.subscribedTasks.clear() + this.taskEventHandlers = {} + this.isConnected = false + TaskBridgeService.instance = null + } + + public get connected(): boolean { + return this.isConnected + } + + public get subscribedTaskCount(): number { + return this.subscribedTasks.size + } + + private waitForConnection(client: Redis): Promise { + return new Promise((resolve, reject) => { + if (client.status === "ready") { + resolve() + return + } + + const onReady = () => { + client.off("ready", onReady) + client.off("error", onError) + resolve() + } + + const onError = (error: Error) => { + client.off("ready", onReady) + client.off("error", onError) + reject(error) + } + + client.once("ready", onReady) + client.once("error", onError) + }) + } + + private handleConnectionError(error?: Error): void { + this.isConnected = false + + if (error) { + console.error("[TaskBridgeService] Connection error:", error.message) + } + + if (!this.config.reconnectOnError) { + console.warn("[TaskBridgeService] Reconnection disabled, service will remain disconnected") + return + } + + if (this.reconnectAttempts >= this.config.maxReconnectAttempts!) { + console.error(`[TaskBridgeService] Max reconnection attempts (${this.config.maxReconnectAttempts}) reached`) + return + } + + if (this.reconnectTimeout) { + return + } + + this.reconnectAttempts++ + + console.log( + `[TaskBridgeService] Scheduling reconnection attempt ${this.reconnectAttempts}/${this.config.maxReconnectAttempts}`, + ) + + this.reconnectTimeout = setTimeout(() => { + this.reconnectTimeout = null + this.initialize().catch((err) => { + console.error("[TaskBridgeService] Reconnection failed:", err) + this.handleConnectionError(err) + }) + }, this.config.reconnectDelay) + } + + private setupTaskEventListeners(task: TaskLike) { + const callbacks: Partial = { + // message: ({ action, message }) => + // this.publish(task.taskId, "task_event", { eventType: "message", data: { action, message } }), + taskStarted: () => { + console.log(`[TaskBridgeService#${task.taskId}] taskStarted`) + this.taskStatuses.set(task.taskId, false) + console.log(`[TaskBridgeService#${task.taskId}] busy`) + this.publish(task.taskId, "task_event", { eventType: "status", data: { status: "started" } }) + }, + taskUnpaused: () => { + console.log(`[TaskBridgeService#${task.taskId}] taskUnpaused`) + this.taskStatuses.set(task.taskId, false) + console.log(`[TaskBridgeService#${task.taskId}] busy`) + this.publish(task.taskId, "task_event", { eventType: "status", data: { status: "unpaused" } }) + }, + taskPaused: () => { + console.log(`[TaskBridgeService#${task.taskId}] taskPaused`) + this.taskStatuses.set(task.taskId, true) + console.log(`[TaskBridgeService#${task.taskId}] free`) + this.processQueueForTask(task.taskId) + this.publish(task.taskId, "task_event", { eventType: "status", data: { status: "paused" } }) + }, + taskAborted: () => { + console.log(`[TaskBridgeService#${task.taskId}] taskAborted`) + this.taskStatuses.set(task.taskId, true) + this.processQueueForTask(task.taskId) + console.log(`[TaskBridgeService#${task.taskId}] free`) + this.publish(task.taskId, "task_event", { eventType: "status", data: { status: "aborted" } }) + }, + taskCompleted: () => { + console.log(`[TaskBridgeService#${task.taskId}] taskCompleted`) + this.taskStatuses.set(task.taskId, true) + console.log(`[TaskBridgeService#${task.taskId}] free`) + this.processQueueForTask(task.taskId) + this.publish(task.taskId, "task_event", { eventType: "status", data: { status: "completed" } }) + }, + } + + this.taskEventHandlers[task.taskId] = callbacks + + const registerHandler = ( + eventName: K, + handler: TaskEventHandlers[K] | undefined, + ) => { + if (handler) { + task.on(eventName, handler) + } + } + + ;(Object.keys(callbacks) as Array).forEach((eventName) => { + registerHandler(eventName, callbacks[eventName]) + }) + } + + private removeTaskEventListeners(task: TaskLike): void { + const handlers = this.taskEventHandlers[task.taskId] + + if (!handlers) { + return + } + + const unregisterHandler = ( + eventName: K, + handler: TaskEventHandlers[K] | undefined, + ) => { + if (handler) { + task.off(eventName, handler) + } + } + + ;(Object.keys(handlers) as Array).forEach((eventName) => { + unregisterHandler(eventName, handlers[eventName]) + }) + + delete this.taskEventHandlers[task.taskId] + } + + private serverChannel(taskId: string): string { + return `${this.config.namespace}:${taskId}:server` + } + + private clientChannel(taskId: string): string { + return `${this.config.namespace}:${taskId}:client` + } + + private isTaskReady(taskId: string): boolean { + return this.taskStatuses.get(taskId) ?? false + } + + private handleQueuedMessage(taskId: string, payload: TaskBridgeMessagePayload) { + try { + console.log(`[TaskBridgeService#${taskId}] handleQueuedMessage`, payload) + const queuedMessage = queuedMessageSchema.parse(payload) + const isReady = this.isTaskReady(taskId) + console.log(`[TaskBridgeService#${taskId}] Task ready status: ${isReady}`) + + if (isReady) { + console.log(`[TaskBridgeService#${taskId}] Task is ready, delivering message immediately`) + this.deliverMessage(taskId, queuedMessage) + } else { + console.log(`[TaskBridgeService#${taskId}] Task is busy, enqueuing message`) + this.enqueueMessage(taskId, queuedMessage) + } + } catch (error) { + console.error(`[TaskBridgeService#${taskId}] Error handling queued message:`, error, payload) + } + } + + private enqueueMessage(taskId: string, message: QueuedMessage): void { + const queue = this.messageQueues.get(taskId) || [] + + queue.push({ + id: `${taskId}-${Date.now()}-${Math.random()}`, + taskId, + message, + timestamp: Date.now(), + retryCount: 0, + maxRetries: this.DEFAULT_MAX_RETRIES, + }) + + this.messageQueues.set(taskId, queue) + console.log(`Queued message for task ${taskId}. Queue size: ${queue.length}`) + } + + private async deliverMessage(taskId: string, message: QueuedMessage): Promise { + console.log(`[TaskBridgeService#${taskId}] deliverMessage: ${message.text}`) + const task = this.subscribedTasks.get(taskId) + + if (!task) { + console.warn(`Cannot deliver message: task ${taskId} not found`) + return false + } + + this.taskStatuses.set(taskId, false) + console.log(`[TaskBridgeService#${taskId}] busy`) + + try { + task.setMessageResponse(message.text, message.images) + return true + } catch (error) { + console.error(`Failed to deliver message to task ${taskId}:`, error) + return false + } + } + + private async processQueueForTask(taskId: string): Promise { + console.log(`[TaskBridgeService#${taskId}] processQueueForTask`) + + // Check if there's already a processing promise for this task. + const existingPromise = this.processingPromises.get(taskId) + + if (existingPromise) { + console.log(`[TaskBridgeService#${taskId}] waiting for existing processing to complete`) + + // Wait for the existing processing to complete. + await existingPromise + + // After existing processing completes, check if we need to process again. + // This handles the case where new messages arrived during processing. + if (this.messageQueues.has(taskId) && this.isTaskReady(taskId)) { + return this.processQueueForTask(taskId) + } + + return + } + + // Check if there's actually work to do. + const queue = this.messageQueues.get(taskId) + + if (!queue || queue.length === 0) { + console.log(`[TaskBridgeService#${taskId}] processQueueForTask - no queued message`) + return + } + + setTimeout(async () => { + // Create and store the processing promise. + const processingPromise = this._processQueue(taskId) + this.processingPromises.set(taskId, processingPromise) + + try { + await processingPromise + } finally { + // Clean up the promise reference. + this.processingPromises.delete(taskId) + } + }, 500) + } + + private async _processQueue(taskId: string): Promise { + const queue = this.messageQueues.get(taskId) + + if (!queue || queue.length === 0) { + console.log(`[TaskBridgeService#${taskId}] _processQueue - no queued messages`) + return + } + + this.processingQueues.add(taskId) + + try { + while (queue.length > 0 && this.isTaskReady(taskId)) { + const queuedMessage = queue[0] + console.log(`Processing queued message for task ${taskId}: ${JSON.stringify(queuedMessage)}`) + const success = await this.deliverMessage(taskId, queuedMessage.message) + + if (success) { + queue.shift() + } else { + queuedMessage.retryCount++ + + if (queuedMessage.retryCount >= queuedMessage.maxRetries) { + console.error(`Max retries reached for message in task ${taskId}. Removing from queue.`) + queue.shift() + } else { + const delay = Math.min( + this.RETRY_DELAY_MS * Math.pow(2, queuedMessage.retryCount - 1), + this.MAX_RETRY_DELAY_MS, + ) + + console.log(`Scheduling retry ${queuedMessage.retryCount} for task ${taskId} in ${delay}ms`) + + const timeoutId = setTimeout(() => { + this.queueProcessingTimeouts.delete(taskId) + this.processQueueForTask(taskId) + }, delay) + + this.queueProcessingTimeouts.set(taskId, timeoutId) + break + } + } + } + + if (queue.length === 0) { + this.messageQueues.delete(taskId) + } else { + this.messageQueues.set(taskId, queue) + } + } finally { + this.processingQueues.delete(taskId) + } + } + + public getStatus(): { + connected: boolean + reconnectAttempts: number + subscribedTasks: number + queuedMessages: number + processingTasks: number + } { + let totalQueuedMessages = 0 + + for (const queue of this.messageQueues.values()) { + totalQueuedMessages += queue.length + } + + return { + connected: this.isConnected, + reconnectAttempts: this.reconnectAttempts, + subscribedTasks: this.subscribedTasks.size, + queuedMessages: totalQueuedMessages, + processingTasks: this.processingQueues.size, + } + } + + /** + * Check if a specific task has queued messages + */ + public hasQueuedMessages(taskId: string): boolean { + const queue = this.messageQueues.get(taskId) + return queue ? queue.length > 0 : false + } + + /** + * Get the number of queued messages for a specific task + */ + public getQueuedMessageCount(taskId: string): number { + const queue = this.messageQueues.get(taskId) + return queue ? queue.length : 0 + } +} diff --git a/packages/cloud/src/TelemetryClient.ts b/packages/cloud/src/TelemetryClient.ts index e33843a30c..20d7445bff 100644 --- a/packages/cloud/src/TelemetryClient.ts +++ b/packages/cloud/src/TelemetryClient.ts @@ -6,10 +6,11 @@ import { } from "@roo-code/types" import { BaseTelemetryClient } from "@roo-code/telemetry" -import { getRooCodeApiUrl } from "./Config" -import type { AuthService } from "./auth" import type { SettingsService } from "./SettingsService" +import { getRooCodeApiUrl } from "./config" +import type { AuthService } from "./auth" + export class TelemetryClient extends BaseTelemetryClient { constructor( private authService: AuthService, diff --git a/packages/cloud/src/__tests__/CloudAPI.test.ts b/packages/cloud/src/__tests__/CloudAPI.test.ts new file mode 100644 index 0000000000..884bf227d7 --- /dev/null +++ b/packages/cloud/src/__tests__/CloudAPI.test.ts @@ -0,0 +1,359 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import type { MockedFunction } from "vitest" + +import { CloudAPIError, TaskNotFoundError, AuthenticationError, NetworkError } from "../errors" +import type { AuthService } from "../auth" + +import { CloudAPI } from "../CloudAPI" + +const mockFetch = vi.fn() +global.fetch = mockFetch as any + +vi.mock("../Config", () => ({ + getRooCodeApiUrl: () => "https://app.roocode.com", +})) + +vi.mock("../utils", () => ({ + getUserAgent: () => "Roo-Code 1.0.0", +})) + +describe("CloudAPI", () => { + let cloudAPI: CloudAPI + let mockAuthService: AuthService + let mockLog: MockedFunction<(...args: unknown[]) => void> + + beforeEach(() => { + vi.clearAllMocks() + mockFetch.mockClear() + + mockLog = vi.fn() + mockAuthService = { + getSessionToken: vi.fn(), + } as any + + cloudAPI = new CloudAPI(mockAuthService, mockLog) + }) + + describe("constructor", () => { + it("should initialize with auth service and logger", () => { + expect(cloudAPI).toBeDefined() + expect(mockLog).not.toHaveBeenCalled() + }) + + it("should use console.log when no logger provided", () => { + const apiWithoutLogger = new CloudAPI(mockAuthService) + expect(apiWithoutLogger).toBeDefined() + }) + }) + + describe("shareTask", () => { + it("should successfully share a task with organization visibility", async () => { + const mockResponse = { + success: true, + shareUrl: "https://app.roocode.com/share/abc123", + isNewShare: true, + } + + ;(mockAuthService.getSessionToken as any).mockReturnValue("session-token") + mockFetch.mockResolvedValue({ + ok: true, + json: vi.fn().mockResolvedValue(mockResponse), + }) + + const result = await cloudAPI.shareTask("task-123", "organization") + + expect(result).toEqual(mockResponse) + expect(mockFetch).toHaveBeenCalledWith("https://app.roocode.com/api/extension/share", { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: "Bearer session-token", + "User-Agent": "Roo-Code 1.0.0", + }, + body: JSON.stringify({ taskId: "task-123", visibility: "organization" }), + signal: expect.any(AbortSignal), + }) + expect(mockLog).toHaveBeenCalledWith("[CloudAPI] Sharing task task-123 with visibility: organization") + expect(mockLog).toHaveBeenCalledWith("[CloudAPI] Share response:", mockResponse) + }) + + it("should default to organization visibility when not specified", async () => { + const mockResponse = { success: true } + + ;(mockAuthService.getSessionToken as any).mockReturnValue("session-token") + mockFetch.mockResolvedValue({ + ok: true, + json: vi.fn().mockResolvedValue(mockResponse), + }) + + await cloudAPI.shareTask("task-123") + + expect(mockFetch).toHaveBeenCalledWith( + "https://app.roocode.com/api/extension/share", + expect.objectContaining({ + body: JSON.stringify({ taskId: "task-123", visibility: "organization" }), + }), + ) + }) + + it("should handle public visibility", async () => { + const mockResponse = { success: true } + + ;(mockAuthService.getSessionToken as any).mockReturnValue("session-token") + mockFetch.mockResolvedValue({ + ok: true, + json: vi.fn().mockResolvedValue(mockResponse), + }) + + await cloudAPI.shareTask("task-123", "public") + + expect(mockFetch).toHaveBeenCalledWith( + "https://app.roocode.com/api/extension/share", + expect.objectContaining({ + body: JSON.stringify({ taskId: "task-123", visibility: "public" }), + }), + ) + }) + + it("should throw AuthenticationError when no session token", async () => { + ;(mockAuthService.getSessionToken as any).mockReturnValue(undefined) + + await expect(cloudAPI.shareTask("task-123")).rejects.toThrow(AuthenticationError) + await expect(cloudAPI.shareTask("task-123")).rejects.toThrow("Authentication required") + }) + + it("should throw TaskNotFoundError for 404 responses", async () => { + ;(mockAuthService.getSessionToken as any).mockReturnValue("session-token") + mockFetch.mockResolvedValue({ + ok: false, + status: 404, + statusText: "Not Found", + json: vi.fn().mockResolvedValue({ error: "Task not found" }), + }) + + await expect(cloudAPI.shareTask("task-123")).rejects.toThrow(TaskNotFoundError) + await expect(cloudAPI.shareTask("task-123")).rejects.toThrow("Task not found") + }) + + it("should validate response schema", async () => { + const invalidResponse = { invalid: "data" } + + ;(mockAuthService.getSessionToken as any).mockReturnValue("session-token") + mockFetch.mockResolvedValue({ + ok: true, + json: vi.fn().mockResolvedValue(invalidResponse), + }) + + await expect(cloudAPI.shareTask("task-123")).rejects.toThrow() + }) + }) + + describe("registerTaskBridge", () => { + it("should successfully register task bridge without URL", async () => { + const mockResponse = { + success: true, + bridgeId: "bridge-123", + } + + ;(mockAuthService.getSessionToken as any).mockReturnValue("session-token") + mockFetch.mockResolvedValue({ + ok: true, + json: vi.fn().mockResolvedValue(mockResponse), + }) + + const result = await cloudAPI.registerTaskBridge("task-123") + + expect(result).toEqual(mockResponse) + expect(mockFetch).toHaveBeenCalledWith("https://app.roocode.com/api/extension/task-bridge/register", { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: "Bearer session-token", + "User-Agent": "Roo-Code 1.0.0", + }, + body: JSON.stringify({ taskId: "task-123" }), + signal: expect.any(AbortSignal), + }) + expect(mockLog).toHaveBeenCalledWith("[CloudAPI] Registering task bridge for task-123", "") + expect(mockLog).toHaveBeenCalledWith("[CloudAPI] Task bridge registration response:", mockResponse) + }) + + it("should successfully register task bridge with URL", async () => { + const mockResponse = { + success: true, + bridgeId: "bridge-123", + } + + ;(mockAuthService.getSessionToken as any).mockReturnValue("session-token") + mockFetch.mockResolvedValue({ + ok: true, + json: vi.fn().mockResolvedValue(mockResponse), + }) + + const result = await cloudAPI.registerTaskBridge("task-123", "redis://localhost:6379") + + expect(result).toEqual(mockResponse) + expect(mockFetch).toHaveBeenCalledWith( + "https://app.roocode.com/api/extension/task-bridge/register", + expect.objectContaining({ + body: JSON.stringify({ taskId: "task-123", bridgeUrl: "redis://localhost:6379" }), + }), + ) + expect(mockLog).toHaveBeenCalledWith( + "[CloudAPI] Registering task bridge for task-123", + "with URL: redis://localhost:6379", + ) + }) + + it("should handle registration failure", async () => { + const mockResponse = { + success: false, + error: "Task already has a bridge", + } + + ;(mockAuthService.getSessionToken as any).mockReturnValue("session-token") + mockFetch.mockResolvedValue({ + ok: true, + json: vi.fn().mockResolvedValue(mockResponse), + }) + + const result = await cloudAPI.registerTaskBridge("task-123") + + expect(result).toEqual(mockResponse) + expect(result.success).toBe(false) + expect(result.error).toBe("Task already has a bridge") + }) + + it("should throw AuthenticationError when no session token", async () => { + ;(mockAuthService.getSessionToken as any).mockReturnValue(undefined) + + await expect(cloudAPI.registerTaskBridge("task-123")).rejects.toThrow(AuthenticationError) + }) + + it("should validate response schema", async () => { + const invalidResponse = { invalid: "data" } + + ;(mockAuthService.getSessionToken as any).mockReturnValue("session-token") + mockFetch.mockResolvedValue({ + ok: true, + json: vi.fn().mockResolvedValue(invalidResponse), + }) + + await expect(cloudAPI.registerTaskBridge("task-123")).rejects.toThrow() + }) + }) + + describe("error handling", () => { + it("should handle 401 authentication errors", async () => { + ;(mockAuthService.getSessionToken as any).mockReturnValue("session-token") + mockFetch.mockResolvedValue({ + ok: false, + status: 401, + statusText: "Unauthorized", + json: vi.fn().mockResolvedValue({ error: "Invalid token" }), + }) + + await expect(cloudAPI.shareTask("task-123")).rejects.toThrow(AuthenticationError) + }) + + it("should handle generic HTTP errors", async () => { + ;(mockAuthService.getSessionToken as any).mockReturnValue("session-token") + mockFetch.mockResolvedValue({ + ok: false, + status: 500, + statusText: "Internal Server Error", + json: vi.fn().mockResolvedValue({ error: "Server error" }), + }) + + await expect(cloudAPI.shareTask("task-123")).rejects.toThrow(CloudAPIError) + await expect(cloudAPI.shareTask("task-123")).rejects.toThrow("HTTP 500: Internal Server Error") + }) + + it("should handle network errors", async () => { + ;(mockAuthService.getSessionToken as any).mockReturnValue("session-token") + mockFetch.mockRejectedValue(new TypeError("Failed to fetch")) + + await expect(cloudAPI.shareTask("task-123")).rejects.toThrow(NetworkError) + await expect(cloudAPI.shareTask("task-123")).rejects.toThrow( + "Network error while calling /api/extension/share", + ) + }) + + it("should handle timeout errors", async () => { + ;(mockAuthService.getSessionToken as any).mockReturnValue("session-token") + const timeoutError = new Error("AbortError") + timeoutError.name = "AbortError" + mockFetch.mockRejectedValue(timeoutError) + + await expect(cloudAPI.shareTask("task-123")).rejects.toThrow(CloudAPIError) + await expect(cloudAPI.shareTask("task-123")).rejects.toThrow("Request to /api/extension/share timed out") + }) + + it("should handle unexpected errors", async () => { + ;(mockAuthService.getSessionToken as any).mockReturnValue("session-token") + mockFetch.mockRejectedValue(new Error("Unexpected error")) + + await expect(cloudAPI.shareTask("task-123")).rejects.toThrow(CloudAPIError) + await expect(cloudAPI.shareTask("task-123")).rejects.toThrow( + "Unexpected error while calling /api/extension/share: Unexpected error", + ) + }) + + it("should handle non-JSON error responses", async () => { + ;(mockAuthService.getSessionToken as any).mockReturnValue("session-token") + mockFetch.mockResolvedValue({ + ok: false, + status: 500, + statusText: "Internal Server Error", + json: vi.fn().mockRejectedValue(new Error("Invalid JSON")), + text: vi.fn().mockResolvedValue("Plain text error"), + }) + + await expect(cloudAPI.shareTask("task-123")).rejects.toThrow(CloudAPIError) + }) + }) + + describe("custom error classes", () => { + it("should create CloudAPIError with correct properties", () => { + const error = new CloudAPIError("Test error", 500, { details: "test" }) + expect(error).toBeInstanceOf(Error) + expect(error.name).toBe("CloudAPIError") + expect(error.message).toBe("Test error") + expect(error.statusCode).toBe(500) + expect(error.responseBody).toEqual({ details: "test" }) + }) + + it("should create TaskNotFoundError with correct properties", () => { + const error = new TaskNotFoundError("task-123") + expect(error).toBeInstanceOf(CloudAPIError) + expect(error).toBeInstanceOf(Error) + expect(error.name).toBe("TaskNotFoundError") + expect(error.message).toBe("Task 'task-123' not found") + expect(error.statusCode).toBe(404) + }) + + it("should create TaskNotFoundError without taskId", () => { + const error = new TaskNotFoundError() + expect(error.message).toBe("Task not found") + }) + + it("should create AuthenticationError with correct properties", () => { + const error = new AuthenticationError("Custom auth error") + expect(error).toBeInstanceOf(CloudAPIError) + expect(error).toBeInstanceOf(Error) + expect(error.name).toBe("AuthenticationError") + expect(error.message).toBe("Custom auth error") + expect(error.statusCode).toBe(401) + }) + + it("should create NetworkError with correct properties", () => { + const error = new NetworkError("Network failed") + expect(error).toBeInstanceOf(CloudAPIError) + expect(error).toBeInstanceOf(Error) + expect(error.name).toBe("NetworkError") + expect(error.message).toBe("Network failed") + expect(error.statusCode).toBeUndefined() + }) + }) +}) diff --git a/packages/cloud/src/__tests__/CloudService.test.ts b/packages/cloud/src/__tests__/CloudService.test.ts index 1384b6de6b..7dbfc668b6 100644 --- a/packages/cloud/src/__tests__/CloudService.test.ts +++ b/packages/cloud/src/__tests__/CloudService.test.ts @@ -1,15 +1,17 @@ // npx vitest run src/__tests__/CloudService.test.ts import * as vscode from "vscode" + import type { ClineMessage } from "@roo-code/types" +import { TelemetryService } from "@roo-code/telemetry" import { CloudService } from "../CloudService" import { WebAuthService } from "../auth/WebAuthService" import { CloudSettingsService } from "../CloudSettingsService" -import { ShareService, TaskNotFoundError } from "../ShareService" +import { CloudShareService } from "../CloudShareService" import { TelemetryClient } from "../TelemetryClient" -import { TelemetryService } from "@roo-code/telemetry" import { CloudServiceCallbacks } from "../types" +import { TaskNotFoundError } from "../errors" vi.mock("vscode", () => ({ ExtensionContext: vi.fn(), @@ -31,7 +33,7 @@ vi.mock("../auth/WebAuthService") vi.mock("../CloudSettingsService") -vi.mock("../ShareService") +vi.mock("../CloudShareService") vi.mock("../TelemetryClient") @@ -151,7 +153,7 @@ describe("CloudService", () => { vi.mocked(WebAuthService).mockImplementation(() => mockAuthService as unknown as WebAuthService) vi.mocked(CloudSettingsService).mockImplementation(() => mockSettingsService as unknown as CloudSettingsService) - vi.mocked(ShareService).mockImplementation(() => mockShareService as unknown as ShareService) + vi.mocked(CloudShareService).mockImplementation(() => mockShareService as unknown as CloudShareService) vi.mocked(TelemetryClient).mockImplementation(() => mockTelemetryClient as unknown as TelemetryClient) vi.mocked(TelemetryService.hasInstance).mockReturnValue(true) diff --git a/packages/cloud/src/__tests__/ShareService.test.ts b/packages/cloud/src/__tests__/CloudShareService.test.ts similarity index 86% rename from packages/cloud/src/__tests__/ShareService.test.ts rename to packages/cloud/src/__tests__/CloudShareService.test.ts index dd5b669603..6fae1fbb9f 100644 --- a/packages/cloud/src/__tests__/ShareService.test.ts +++ b/packages/cloud/src/__tests__/CloudShareService.test.ts @@ -3,9 +3,11 @@ import type { MockedFunction } from "vitest" import * as vscode from "vscode" -import { ShareService, TaskNotFoundError } from "../ShareService" -import type { AuthService } from "../auth" +import { CloudAPI } from "../CloudAPI" +import { CloudShareService } from "../CloudShareService" import type { SettingsService } from "../SettingsService" +import type { AuthService } from "../auth" +import { CloudAPIError, TaskNotFoundError } from "../errors" // Mock fetch const mockFetch = vi.fn() @@ -44,10 +46,11 @@ vi.mock("../utils", () => ({ getUserAgent: () => "Roo-Code 1.0.0", })) -describe("ShareService", () => { - let shareService: ShareService +describe("CloudShareService", () => { + let shareService: CloudShareService let mockAuthService: AuthService let mockSettingsService: SettingsService + let mockCloudAPI: CloudAPI let mockLog: MockedFunction<(...args: unknown[]) => void> beforeEach(() => { @@ -65,7 +68,8 @@ describe("ShareService", () => { getSettings: vi.fn(), } as any - shareService = new ShareService(mockAuthService, mockSettingsService, mockLog) + mockCloudAPI = new CloudAPI(mockAuthService, mockLog) + shareService = new CloudShareService(mockCloudAPI, mockSettingsService, mockLog) }) describe("shareTask", () => { @@ -189,12 +193,12 @@ describe("ShareService", () => { ok: false, status: 404, statusText: "Not Found", + json: vi.fn().mockRejectedValue(new Error("Invalid JSON")), + text: vi.fn().mockResolvedValue("Not Found"), }) await expect(shareService.shareTask("task-123", "organization")).rejects.toThrow(TaskNotFoundError) - await expect(shareService.shareTask("task-123", "organization")).rejects.toThrow( - "Task 'task-123' not found", - ) + await expect(shareService.shareTask("task-123", "organization")).rejects.toThrow("Task not found") }) it("should throw generic Error for non-404 HTTP errors", async () => { @@ -203,12 +207,14 @@ describe("ShareService", () => { ok: false, status: 500, statusText: "Internal Server Error", + json: vi.fn().mockRejectedValue(new Error("Invalid JSON")), + text: vi.fn().mockResolvedValue("Internal Server Error"), }) + await expect(shareService.shareTask("task-123", "organization")).rejects.toThrow(CloudAPIError) await expect(shareService.shareTask("task-123", "organization")).rejects.toThrow( "HTTP 500: Internal Server Error", ) - await expect(shareService.shareTask("task-123", "organization")).rejects.not.toThrow(TaskNotFoundError) }) it("should create TaskNotFoundError with correct properties", async () => { @@ -217,6 +223,8 @@ describe("ShareService", () => { ok: false, status: 404, statusText: "Not Found", + json: vi.fn().mockRejectedValue(new Error("Invalid JSON")), + text: vi.fn().mockResolvedValue("Not Found"), }) try { @@ -225,7 +233,7 @@ describe("ShareService", () => { } catch (error) { expect(error).toBeInstanceOf(TaskNotFoundError) expect(error).toBeInstanceOf(Error) - expect((error as TaskNotFoundError).message).toBe("Task 'task-123' not found") + expect((error as TaskNotFoundError).message).toBe("Task not found") } }) }) @@ -277,8 +285,8 @@ describe("ShareService", () => { expect(result).toBe(false) }) - it("should return false when not authenticated", async () => { - ;(mockAuthService.isAuthenticated as any).mockReturnValue(false) + it("should return false when settings service returns undefined", async () => { + ;(mockSettingsService.getSettings as any).mockReturnValue(undefined) const result = await shareService.canShareTask() @@ -286,13 +294,17 @@ describe("ShareService", () => { }) it("should handle errors gracefully", async () => { - ;(mockAuthService.isAuthenticated as any).mockImplementation(() => { - throw new Error("Auth error") + ;(mockSettingsService.getSettings as any).mockImplementation(() => { + throw new Error("Settings error") }) const result = await shareService.canShareTask() expect(result).toBe(false) + expect(mockLog).toHaveBeenCalledWith( + "[ShareService] Error checking if task can be shared:", + expect.any(Error), + ) }) }) }) diff --git a/packages/cloud/src/__tests__/auth/WebAuthService.spec.ts b/packages/cloud/src/__tests__/auth/WebAuthService.spec.ts index 0e6681c20b..0500aa0a35 100644 --- a/packages/cloud/src/__tests__/auth/WebAuthService.spec.ts +++ b/packages/cloud/src/__tests__/auth/WebAuthService.spec.ts @@ -6,7 +6,7 @@ import * as vscode from "vscode" import { WebAuthService } from "../../auth/WebAuthService" import { RefreshTimer } from "../../RefreshTimer" -import * as Config from "../../Config" +import * as Config from "../../config" import * as utils from "../../utils" // Mock external dependencies diff --git a/packages/cloud/src/auth/AuthService.ts b/packages/cloud/src/auth/AuthService.ts index 11ed5161ed..f969314873 100644 --- a/packages/cloud/src/auth/AuthService.ts +++ b/packages/cloud/src/auth/AuthService.ts @@ -1,4 +1,5 @@ import EventEmitter from "events" + import type { CloudUserInfo } from "@roo-code/types" export interface AuthServiceEvents { diff --git a/packages/cloud/src/auth/StaticTokenAuthService.ts b/packages/cloud/src/auth/StaticTokenAuthService.ts index 11fc18d3fb..5a793efc86 100644 --- a/packages/cloud/src/auth/StaticTokenAuthService.ts +++ b/packages/cloud/src/auth/StaticTokenAuthService.ts @@ -1,6 +1,9 @@ import EventEmitter from "events" + import * as vscode from "vscode" + import type { CloudUserInfo } from "@roo-code/types" + import type { AuthService, AuthServiceEvents, AuthState } from "./AuthService" export class StaticTokenAuthService extends EventEmitter implements AuthService { diff --git a/packages/cloud/src/auth/WebAuthService.ts b/packages/cloud/src/auth/WebAuthService.ts index 82d3122426..890da4b836 100644 --- a/packages/cloud/src/auth/WebAuthService.ts +++ b/packages/cloud/src/auth/WebAuthService.ts @@ -6,11 +6,19 @@ import { z } from "zod" import type { CloudUserInfo, CloudOrganizationMembership } from "@roo-code/types" -import { getClerkBaseUrl, getRooCodeApiUrl, PRODUCTION_CLERK_BASE_URL } from "../Config" -import { RefreshTimer } from "../RefreshTimer" +import { getClerkBaseUrl, getRooCodeApiUrl, PRODUCTION_CLERK_BASE_URL } from "../config" import { getUserAgent } from "../utils" +import { InvalidClientTokenError } from "../errors" +import { RefreshTimer } from "../RefreshTimer" + import type { AuthService, AuthServiceEvents, AuthState } from "./AuthService" +const AUTH_STATE_KEY = "clerk-auth-state" + +/** + * AuthCredentials + */ + const authCredentialsSchema = z.object({ clientToken: z.string().min(1, "Client token cannot be empty"), sessionId: z.string().min(1, "Session ID cannot be empty"), @@ -19,7 +27,9 @@ const authCredentialsSchema = z.object({ type AuthCredentials = z.infer -const AUTH_STATE_KEY = "clerk-auth-state" +/** + * Clerk Schemas + */ const clerkSignInResponseSchema = z.object({ response: z.object({ @@ -69,13 +79,6 @@ const clerkOrganizationMembershipsSchema = z.object({ ), }) -class InvalidClientTokenError extends Error { - constructor() { - super("Invalid/Expired client token") - Object.setPrototypeOf(this, InvalidClientTokenError.prototype) - } -} - export class WebAuthService extends EventEmitter implements AuthService { private context: vscode.ExtensionContext private timer: RefreshTimer @@ -94,8 +97,9 @@ export class WebAuthService extends EventEmitter implements A this.context = context this.log = log || console.log - // Calculate auth credentials key based on Clerk base URL + // Calculate auth credentials key based on Clerk base URL. const clerkBaseUrl = getClerkBaseUrl() + if (clerkBaseUrl !== PRODUCTION_CLERK_BASE_URL) { this.authCredentialsKey = `clerk-auth-credentials-${clerkBaseUrl}` } else { diff --git a/packages/cloud/src/Config.ts b/packages/cloud/src/config.ts similarity index 81% rename from packages/cloud/src/Config.ts rename to packages/cloud/src/config.ts index 08b0cc7a18..e682d718ce 100644 --- a/packages/cloud/src/Config.ts +++ b/packages/cloud/src/config.ts @@ -1,7 +1,5 @@ -// Production constants export const PRODUCTION_CLERK_BASE_URL = "https://clerk.roocode.com" export const PRODUCTION_ROO_CODE_API_URL = "https://app.roocode.com" -// Functions with environment variable fallbacks export const getClerkBaseUrl = () => process.env.CLERK_BASE_URL || PRODUCTION_CLERK_BASE_URL export const getRooCodeApiUrl = () => process.env.ROO_CODE_API_URL || PRODUCTION_ROO_CODE_API_URL diff --git a/packages/cloud/src/errors.ts b/packages/cloud/src/errors.ts new file mode 100644 index 0000000000..7400f26b39 --- /dev/null +++ b/packages/cloud/src/errors.ts @@ -0,0 +1,42 @@ +export class CloudAPIError extends Error { + constructor( + message: string, + public statusCode?: number, + public responseBody?: unknown, + ) { + super(message) + this.name = "CloudAPIError" + Object.setPrototypeOf(this, CloudAPIError.prototype) + } +} + +export class TaskNotFoundError extends CloudAPIError { + constructor(taskId?: string) { + super(taskId ? `Task '${taskId}' not found` : "Task not found", 404) + this.name = "TaskNotFoundError" + Object.setPrototypeOf(this, TaskNotFoundError.prototype) + } +} + +export class AuthenticationError extends CloudAPIError { + constructor(message = "Authentication required") { + super(message, 401) + this.name = "AuthenticationError" + Object.setPrototypeOf(this, AuthenticationError.prototype) + } +} + +export class NetworkError extends CloudAPIError { + constructor(message = "Network error occurred") { + super(message) + this.name = "NetworkError" + Object.setPrototypeOf(this, NetworkError.prototype) + } +} + +export class InvalidClientTokenError extends Error { + constructor() { + super("Invalid/Expired client token") + Object.setPrototypeOf(this, InvalidClientTokenError.prototype) + } +} diff --git a/packages/cloud/src/index.ts b/packages/cloud/src/index.ts index 9770f349c6..4892a9a254 100644 --- a/packages/cloud/src/index.ts +++ b/packages/cloud/src/index.ts @@ -1,2 +1,5 @@ +export * from "./config" + +export * from "./CloudAPI" export * from "./CloudService" -export * from "./Config" +export * from "./TaskBridgeService" diff --git a/packages/cloud/src/utils.ts b/packages/cloud/src/utils.ts index cf87aa5e28..071fc09697 100644 --- a/packages/cloud/src/utils.ts +++ b/packages/cloud/src/utils.ts @@ -1,10 +1,5 @@ import * as vscode from "vscode" -/** - * Get the User-Agent string for API requests - * @param context Optional extension context for more accurate version detection - * @returns User-Agent string in format "Roo-Code {version}" - */ export function getUserAgent(context?: vscode.ExtensionContext): string { return `Roo-Code ${context?.extension?.packageJSON?.version || "unknown"}` } diff --git a/packages/types/src/cloud.ts b/packages/types/src/cloud.ts index 5ef90b6e5a..b9c25c8fb8 100644 --- a/packages/types/src/cloud.ts +++ b/packages/types/src/cloud.ts @@ -152,3 +152,14 @@ export const shareResponseSchema = z.object({ }) export type ShareResponse = z.infer + +/** + * Task Bridge Types + */ + +export const taskBridgeRegisterResponseSchema = z.object({ + success: z.boolean(), + error: z.string().optional(), +}) + +export type TaskBridgeRegisterResponse = z.infer diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index 44937da235..ed02278533 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -1,8 +1,6 @@ -export * from "./providers/index.js" - export * from "./api.js" -export * from "./codebase-index.js" export * from "./cloud.js" +export * from "./codebase-index.js" export * from "./experiment.js" export * from "./followup.js" export * from "./global-settings.js" @@ -15,9 +13,12 @@ export * from "./mode.js" export * from "./model.js" export * from "./provider-settings.js" export * from "./sharing.js" +export * from "./task.js" +export * from "./todo.js" export * from "./telemetry.js" export * from "./terminal.js" export * from "./tool.js" export * from "./type-fu.js" export * from "./vscode.js" -export * from "./todo.js" + +export * from "./providers/index.js" diff --git a/packages/types/src/task.ts b/packages/types/src/task.ts new file mode 100644 index 0000000000..d0c205df14 --- /dev/null +++ b/packages/types/src/task.ts @@ -0,0 +1,28 @@ +import { type ClineMessage, type TokenUsage } from "./message.js" +import { type ToolUsage, type ToolName } from "./tool.js" + +export type TaskEvents = { + message: [{ action: "created" | "updated"; message: ClineMessage }] + taskStarted: [] + taskModeSwitched: [taskId: string, mode: string] + taskPaused: [] + taskUnpaused: [] + taskAskResponded: [] + taskAborted: [] + taskSpawned: [taskId: string] + taskCompleted: [taskId: string, tokenUsage: TokenUsage, toolUsage: ToolUsage] + taskTokenUsageUpdated: [taskId: string, tokenUsage: TokenUsage] + taskToolFailed: [taskId: string, tool: ToolName, error: string] +} + +export type TaskEventHandlers = { + [K in keyof TaskEvents]: (...args: TaskEvents[K]) => void | Promise +} + +export interface TaskLike { + readonly taskId: string + + on(event: K, listener: (...args: TaskEvents[K]) => void | Promise): this + off(event: K, listener: (...args: TaskEvents[K]) => void | Promise): this + setMessageResponse(text: string, images?: string[]): void +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 311c51d0ba..ec305c9adb 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -361,6 +361,9 @@ importers: '@roo-code/types': specifier: workspace:^ version: link:../types + ioredis: + specifier: ^5.3.2 + version: 5.6.1 zod: specifier: ^3.25.61 version: 3.25.61 @@ -371,12 +374,18 @@ importers: '@roo-code/config-typescript': specifier: workspace:^ version: link:../config-typescript + '@types/ioredis-mock': + specifier: ^8.2.6 + version: 8.2.6(ioredis@5.6.1) '@types/node': specifier: 20.x version: 20.17.57 '@types/vscode': specifier: ^1.84.0 version: 1.100.0 + ioredis-mock: + specifier: ^8.9.0 + version: 8.9.0(@types/ioredis-mock@8.2.6(ioredis@5.6.1))(ioredis@5.6.1) vitest: specifier: ^3.2.3 version: 3.2.4(@types/debug@4.1.12)(@types/node@20.17.57)(@vitest/ui@3.2.4)(jiti@2.4.2)(jsdom@26.1.0)(lightningcss@1.30.1)(tsx@4.19.4)(yaml@2.8.0) @@ -1957,6 +1966,12 @@ packages: cpu: [x64] os: [win32] + '@ioredis/as-callback@3.0.0': + resolution: {integrity: sha512-Kqv1rZ3WbgOrS+hgzJ5xG5WQuhvzzSTRYvNeyPMLOAM78MHSnuKI20JeJGbpuAt//LCuP0vsexZcorqW7kWhJg==} + + '@ioredis/commands@1.3.0': + resolution: {integrity: sha512-M/T6Zewn7sDaBQEqIZ8Rb+i9y8qfGmq+5SDFSf9sA2lUZTmdDLVdOiQaeDp+Q4wElZ9HG1GAX5KhDaidp6LQsQ==} + '@isaacs/cliui@8.0.2': resolution: {integrity: sha512-O8jcjabXaleOG9DQ0+ARXWZBTfnP4WNAqzuiJK7ll44AmxGKv/J2M4TPjxjY3znBCfvBXFzucm1twdyFybFqEA==} engines: {node: '>=12'} @@ -3816,6 +3831,11 @@ packages: '@types/hast@3.0.4': resolution: {integrity: sha512-WPs+bbQw5aCj+x6laNGWLH3wviHtoCv/P3+otBhbOhJgG8qtpdAMlTCxLtsTWA7LH1Oh/bFCHsBn0TPS5m30EQ==} + '@types/ioredis-mock@8.2.6': + resolution: {integrity: sha512-5heqtZMvQ4nXARY0o8rc8cjkJjct2ScM12yCJ/h731S9He93a2cv+kAhwPCNwTKDfNH9gjRfLG4VpAEYJU0/gQ==} + peerDependencies: + ioredis: '>=5' + '@types/istanbul-lib-coverage@2.0.6': resolution: {integrity: sha512-2QF/t/auWm0lsy8XtKVPG19v3sSOQlJe/YHZgfjb/KBBHOGSV+J2q/S671rcq9uTBrLAXmZpqJiaQbMT+zNU1w==} @@ -5102,6 +5122,10 @@ packages: resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==} engines: {node: '>=0.4.0'} + denque@2.1.0: + resolution: {integrity: sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==} + engines: {node: '>=0.10'} + depd@2.0.0: resolution: {integrity: sha512-g7nH6P6dyDioJogAAGprGpCtVImJhpPk/roCzdb3fIh61/s/nPsfR6onyMwkCAR/OlC3yBC0lESvUoQEAssIrw==} engines: {node: '>= 0.8'} @@ -5739,6 +5763,14 @@ packages: picomatch: optional: true + fengari-interop@0.1.3: + resolution: {integrity: sha512-EtZ+oTu3kEwVJnoymFPBVLIbQcCoy9uWCVnMA6h3M/RqHkUBsLYp29+RRHf9rKr6GwjubWREU1O7RretFIXjHw==} + peerDependencies: + fengari: ^0.1.0 + + fengari@0.1.4: + resolution: {integrity: sha512-6ujqUuiIYmcgkGz8MGAdERU57EIluGGPSUgGPTsco657EHa+srq0S3/YUl/r9kx1+D+d4rGfYObd+m8K22gB1g==} + fetch-blob@3.2.0: resolution: {integrity: sha512-7yAQpD2UMJzLi1Dqv7qFYnPbaPx7ZfFK6PiIxQ4PfkGPyNyl2Ugx+a/umUonmKqjhM4DnfbMvdX6otXq83soQQ==} engines: {node: ^12.20 || >= 14.13} @@ -6275,6 +6307,17 @@ packages: resolution: {integrity: sha512-5Hh7Y1wQbvY5ooGgPbDaL5iYLAPzMTUrjMulskHLH6wnv/A+1q5rgEaiuqEjB+oxGXIVZs1FF+R/KPN3ZSQYYg==} engines: {node: '>=12'} + ioredis-mock@8.9.0: + resolution: {integrity: sha512-yIglcCkI1lvhwJVoMsR51fotZVsPsSk07ecTCgRTRlicG0Vq3lke6aAaHklyjmRNRsdYAgswqC2A0bPtQK4LSw==} + engines: {node: '>=12.22'} + peerDependencies: + '@types/ioredis-mock': ^8 + ioredis: ^5 + + ioredis@5.6.1: + resolution: {integrity: sha512-UxC0Yv1Y4WRJiGQxQkP0hfdL0/5/6YvdfOOClRgJ0qppSarkhneSa6UvkMkms0AkdGimSH3Ikqm+6mkMmX7vGA==} + engines: {node: '>=12.22.0'} + ip-address@9.0.5: resolution: {integrity: sha512-zHtQzGojZXTwZTHQqra+ETKd4Sn3vgi7uBmlPoXVWZqYvuKmtI0l/VZTjqGmJY9x88GGOaZ9+G9ES8hC4T4X8g==} engines: {node: '>= 12'} @@ -6943,6 +6986,9 @@ packages: lodash.includes@4.3.0: resolution: {integrity: sha512-W3Bx6mdkRTGtlJISOvVD/lbqjTlPPUDTMnlXZFnVwi9NKJ6tiAk6LVdlhZMm17VZisqhKcgzpO5Wz91PCt5b0w==} + lodash.isarguments@3.1.0: + resolution: {integrity: sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==} + lodash.isboolean@3.0.3: resolution: {integrity: sha512-Bz5mupy2SVbPHURB98VAcw+aHh4vRV5IPNhILUCsOzRmsTmSQ17jIuqopAentWoehktxGd9e/hbIXq980/1QJg==} @@ -8248,6 +8294,10 @@ packages: resolution: {integrity: sha512-GDhwkLfywWL2s6vEjyhri+eXmfH6j1L7JE27WhqLeYzoh/A3DBaYGEj2H/HFZCn/kMfim73FXxEJTw06WtxQwg==} engines: {node: '>= 14.18.0'} + readline-sync@1.4.10: + resolution: {integrity: sha512-gNva8/6UAe8QYepIQH/jQ2qn91Qj0B9sYjMBBs3QOB8F2CXcKgLxQaJRP76sWVRQt+QU+8fAkCbCvjjMFu7Ycw==} + engines: {node: '>= 0.8.0'} + recharts-scale@0.4.5: resolution: {integrity: sha512-kivNFO+0OcUNu7jQquLXAxz1FIwZj8nrj+YkOKc5694NbjCvcT6aSZiIzNzd2Kul4o4rTto8QVR9lMNtxD4G1w==} @@ -8266,6 +8316,14 @@ packages: resolution: {integrity: sha512-6tDA8g98We0zd0GvVeMT9arEOnTw9qM03L9cJXaCjrip1OO764RDBLBfrB4cwzNGDj5OA5ioymC9GkizgWJDUg==} engines: {node: '>=8'} + redis-errors@1.2.0: + resolution: {integrity: sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==} + engines: {node: '>=4'} + + redis-parser@3.0.0: + resolution: {integrity: sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==} + engines: {node: '>=4'} + redis@5.5.5: resolution: {integrity: sha512-x7vpciikEY7nptGzQrE5I+/pvwFZJDadPk/uEoyGSg/pZ2m/CX2n5EhSgUh+S5T7Gz3uKM6YzWcXEu3ioAsdFQ==} engines: {node: '>= 18'} @@ -8679,6 +8737,9 @@ packages: stacktrace-js@2.0.2: resolution: {integrity: sha512-Je5vBeY4S1r/RnLydLl0TBTi3F2qdfWmYsGvtfZgEI+SCprPppaIhQf5nGcal4gI4cGpCV/duLcAzT1np6sQqg==} + standard-as-callback@2.1.0: + resolution: {integrity: sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==} + statuses@2.0.1: resolution: {integrity: sha512-RwNA9Z/7PrK06rYLIzFMlaF+l73iwpzsqRIFgbMLbTcLD6cOao82TaWefPXQvB2fOC4AjuYSEndS7N/mTCbkdQ==} engines: {node: '>= 0.8'} @@ -11061,6 +11122,10 @@ snapshots: '@img/sharp-win32-x64@0.33.5': optional: true + '@ioredis/as-callback@3.0.0': {} + + '@ioredis/commands@1.3.0': {} + '@isaacs/cliui@8.0.2': dependencies: string-width: 5.1.2 @@ -13075,6 +13140,10 @@ snapshots: dependencies: '@types/unist': 3.0.3 + '@types/ioredis-mock@8.2.6(ioredis@5.6.1)': + dependencies: + ioredis: 5.6.1 + '@types/istanbul-lib-coverage@2.0.6': {} '@types/istanbul-lib-report@3.0.3': @@ -14537,6 +14606,8 @@ snapshots: delayed-stream@1.0.0: {} + denque@2.1.0: {} + depd@2.0.0: {} dequal@2.0.3: {} @@ -15288,6 +15359,16 @@ snapshots: optionalDependencies: picomatch: 4.0.2 + fengari-interop@0.1.3(fengari@0.1.4): + dependencies: + fengari: 0.1.4 + + fengari@0.1.4: + dependencies: + readline-sync: 1.4.10 + sprintf-js: 1.1.3 + tmp: 0.0.33 + fetch-blob@3.2.0: dependencies: node-domexception: 1.0.0 @@ -15916,6 +15997,30 @@ snapshots: internmap@2.0.3: {} + ioredis-mock@8.9.0(@types/ioredis-mock@8.2.6(ioredis@5.6.1))(ioredis@5.6.1): + dependencies: + '@ioredis/as-callback': 3.0.0 + '@ioredis/commands': 1.3.0 + '@types/ioredis-mock': 8.2.6(ioredis@5.6.1) + fengari: 0.1.4 + fengari-interop: 0.1.3(fengari@0.1.4) + ioredis: 5.6.1 + semver: 7.7.2 + + ioredis@5.6.1: + dependencies: + '@ioredis/commands': 1.3.0 + cluster-key-slot: 1.1.2 + debug: 4.4.1(supports-color@8.1.1) + denque: 2.1.0 + lodash.defaults: 4.2.0 + lodash.isarguments: 3.1.0 + redis-errors: 1.2.0 + redis-parser: 3.0.0 + standard-as-callback: 2.1.0 + transitivePeerDependencies: + - supports-color + ip-address@9.0.5: dependencies: jsbn: 1.1.0 @@ -16584,6 +16689,8 @@ snapshots: lodash.includes@4.3.0: {} + lodash.isarguments@3.1.0: {} + lodash.isboolean@3.0.3: {} lodash.isequal@4.5.0: {} @@ -18228,6 +18335,8 @@ snapshots: readdirp@4.1.2: {} + readline-sync@1.4.10: {} + recharts-scale@0.4.5: dependencies: decimal.js-light: 2.5.1 @@ -18252,6 +18361,12 @@ snapshots: indent-string: 4.0.0 strip-indent: 3.0.0 + redis-errors@1.2.0: {} + + redis-parser@3.0.0: + dependencies: + redis-errors: 1.2.0 + redis@5.5.5: dependencies: '@redis/bloom': 5.5.5(@redis/client@5.5.5) @@ -18805,6 +18920,8 @@ snapshots: stack-generator: 2.0.10 stacktrace-gps: 3.1.2 + standard-as-callback@2.1.0: {} + statuses@2.0.1: {} std-env@3.9.0: {} diff --git a/src/core/task/Task.ts b/src/core/task/Task.ts index 38c67b5021..f6436a0ce9 100644 --- a/src/core/task/Task.ts +++ b/src/core/task/Task.ts @@ -10,6 +10,8 @@ import pWaitFor from "p-wait-for" import { serializeError } from "serialize-error" import { + type TaskLike, + type TaskEvents, type ProviderSettings, type TokenUsage, type ToolUsage, @@ -19,15 +21,15 @@ import { type ClineMessage, type ClineSay, type ToolProgressStatus, - DEFAULT_CONSECUTIVE_MISTAKE_LIMIT, type HistoryItem, TelemetryEventName, TodoItem, getApiProtocol, getModelId, + DEFAULT_CONSECUTIVE_MISTAKE_LIMIT, } from "@roo-code/types" import { TelemetryService } from "@roo-code/telemetry" -import { CloudService } from "@roo-code/cloud" +import { CloudService, TaskBridgeService } from "@roo-code/cloud" // api import { ApiHandler, ApiHandlerCreateMessageMetadata, buildApiHandler } from "../../api" @@ -95,24 +97,6 @@ import { restoreTodoListForTask } from "../tools/updateTodoListTool" const MAX_EXPONENTIAL_BACKOFF_SECONDS = 600 // 10 minutes -export type TaskEvents = { - message: [{ action: "created" | "updated"; message: ClineMessage }] - taskStarted: [] - taskModeSwitched: [taskId: string, mode: string] - taskPaused: [] - taskUnpaused: [] - taskAskResponded: [] - taskAborted: [] - taskSpawned: [taskId: string] - taskCompleted: [taskId: string, tokenUsage: TokenUsage, toolUsage: ToolUsage] - taskTokenUsageUpdated: [taskId: string, tokenUsage: TokenUsage] - taskToolFailed: [taskId: string, tool: ToolName, error: string] -} - -export type TaskEventHandlers = { - [K in keyof TaskEvents]: (...args: TaskEvents[K]) => void | Promise -} - export type TaskOptions = { provider: ClineProvider apiConfiguration: ProviderSettings @@ -131,7 +115,7 @@ export type TaskOptions = { onCreated?: (task: Task) => void } -export class Task extends EventEmitter { +export class Task extends EventEmitter implements TaskLike { todoList?: TodoItem[] readonly taskId: string readonly instanceId: string @@ -247,6 +231,9 @@ export class Task extends EventEmitter { checkpointService?: RepoPerTaskCheckpointService checkpointServiceInitializing = false + // Task Bridge + taskBridgeService?: TaskBridgeService + // Streaming isWaitingForFirstChunk = false isStreaming = false @@ -351,6 +338,12 @@ export class Task extends EventEmitter { this.toolRepetitionDetector = new ToolRepetitionDetector(this.consecutiveMistakeLimit) + // TODO: Figure out when to enable task bridge. + // eslint-disable-next-line no-constant-condition + if (true) { + this.taskBridgeService = TaskBridgeService.getInstance() + } + onCreated?.(this) if (startTask) { @@ -700,7 +693,9 @@ export class Task extends EventEmitter { await this.addToClineMessages({ ts: askTs, type: "ask", ask: type, text, isProtected }) } + console.log(`[Task#${this.taskId}] pWaitFor askResponse...`, type) await pWaitFor(() => this.askResponse !== undefined || this.lastMessageTs !== askTs, { interval: 100 }) + console.log(`[Task#${this.taskId}] pWaitFor askResponse done!`, this.askResponse) if (this.lastMessageTs !== askTs) { // Could happen if we send multiple asks in a row i.e. with @@ -717,7 +712,11 @@ export class Task extends EventEmitter { return result } - async handleWebviewAskResponse(askResponse: ClineAskResponse, text?: string, images?: string[]) { + public setMessageResponse(text: string, images?: string[]) { + this.handleWebviewAskResponse("messageResponse", text, images) + } + + handleWebviewAskResponse(askResponse: ClineAskResponse, text?: string, images?: string[]) { this.askResponse = askResponse this.askResponseText = text this.askResponseImages = images @@ -1007,7 +1006,7 @@ export class Task extends EventEmitter { const lastClineMessage = this.clineMessages .slice() .reverse() - .find((m) => !(m.ask === "resume_task" || m.ask === "resume_completed_task")) // could be multiple resume tasks + .find((m) => !(m.ask === "resume_task" || m.ask === "resume_completed_task")) // Could be multiple resume tasks. let askType: ClineAsk if (lastClineMessage?.ask === "completion_result") { @@ -1018,9 +1017,11 @@ export class Task extends EventEmitter { this.isInitialized = true - const { response, text, images } = await this.ask(askType) // calls poststatetowebview + const { response, text, images } = await this.ask(askType) // Calls `postStateToWebview`. + let responseText: string | undefined let responseImages: string[] | undefined + if (response === "messageResponse") { await this.say("user_feedback", text, images) responseText = text @@ -1210,6 +1211,13 @@ export class Task extends EventEmitter { this.pauseInterval = undefined } + // Unsubscribe from TaskBridge service. + if (this.taskBridgeService) { + this.taskBridgeService + .unsubscribeFromTask(this.taskId) + .catch((error) => console.error("Error unsubscribing from task bridge:", error)) + } + // Release any terminals associated with this task. try { // Release any terminals associated with this task. @@ -1304,6 +1312,11 @@ export class Task extends EventEmitter { // Kicks off the checkpoints initialization process in the background. getCheckpointService(this) + if (this.taskBridgeService) { + await this.taskBridgeService.initialize() + await this.taskBridgeService.subscribeToTask(this) + } + let nextUserContent = userContent let includeFileDetails = true @@ -1311,7 +1324,7 @@ export class Task extends EventEmitter { while (!this.abort) { const didEndLoop = await this.recursivelyMakeClineRequests(nextUserContent, includeFileDetails) - includeFileDetails = false // we only need file details the first time + includeFileDetails = false // We only need file details the first time. // The way this agentic loop works is that cline will be given a // task that he then calls tools to complete. Unless there's an @@ -1637,13 +1650,13 @@ export class Task extends EventEmitter { // If this.abort is already true, it means the user clicked cancel, so we should // treat this as "user_cancelled" rather than "streaming_failed" const cancelReason = this.abort ? "user_cancelled" : "streaming_failed" + const streamingFailedMessage = this.abort ? undefined : (error.message ?? JSON.stringify(serializeError(error), null, 2)) - // Now call abortTask after determining the cancel reason + // Now call abortTask after determining the cancel reason. await this.abortTask() - await abortStream(cancelReason, streamingFailedMessage) const history = await provider?.getTaskWithId(this.taskId) diff --git a/src/core/tools/attemptCompletionTool.ts b/src/core/tools/attemptCompletionTool.ts index ef7881854f..79f2bee075 100644 --- a/src/core/tools/attemptCompletionTool.ts +++ b/src/core/tools/attemptCompletionTool.ts @@ -41,11 +41,13 @@ export async function attemptCompletionTool( if (preventCompletionWithOpenTodos && hasIncompleteTodos) { cline.consecutiveMistakeCount++ cline.recordToolError("attempt_completion") + pushToolResult( formatResponse.toolError( "Cannot complete task while there are incomplete todos. Please finish all todos before attempting completion.", ), ) + return } @@ -72,7 +74,7 @@ export async function attemptCompletionTool( await cline.ask("command", removeClosingTag("command", command), block.partial).catch(() => {}) } } else { - // no command, still outputting partial result + // No command, still outputting partial result await cline.say("completion_result", removeClosingTag("result", result), undefined, block.partial) } return