diff --git a/packages/cloud/src/CloudService.ts b/packages/cloud/src/CloudService.ts index ff33671a40..477c7ea6ab 100644 --- a/packages/cloud/src/CloudService.ts +++ b/packages/cloud/src/CloudService.ts @@ -18,6 +18,7 @@ import type { SettingsService } from "./SettingsService" import { CloudSettingsService } from "./CloudSettingsService" import { StaticSettingsService } from "./StaticSettingsService" import { TelemetryClient } from "./TelemetryClient" +import { QueuedTelemetryClient, type MultiInstanceConfig, type QueueStatus } from "./queue" import { ShareService, TaskNotFoundError } from "./ShareService" type AuthStateChangedPayload = CloudServiceEvents["auth-state-changed"][0] @@ -34,6 +35,7 @@ export class CloudService extends EventEmitter implements vs private settingsListener: (data: SettingsPayload) => void private settingsService: SettingsService | null = null private telemetryClient: TelemetryClient | null = null + private queuedTelemetryClient: QueuedTelemetryClient | null = null private shareService: ShareService | null = null private isInitialized = false private log: (...args: unknown[]) => void @@ -88,12 +90,34 @@ export class CloudService extends EventEmitter implements vs } this.telemetryClient = new TelemetryClient(this.authService, this.settingsService) + + // Configure multi-instance behavior + const multiInstanceConfig: MultiInstanceConfig = { + enabled: true, + lockDurationMs: 30000, // 30 seconds + lockCheckIntervalMs: 5000, // 5 seconds + lockAcquireTimeoutMs: 10000, // 10 seconds + mode: "compete", // All instances compete for the lock + } + + // Check for environment variable overrides + const multiInstanceMode = process.env.ROO_CODE_MULTI_INSTANCE_MODE + if (multiInstanceMode === "leader" || multiInstanceMode === "disabled") { + multiInstanceConfig.mode = multiInstanceMode + } + + this.queuedTelemetryClient = new QueuedTelemetryClient( + this.telemetryClient, + this.context, + false, // debug + multiInstanceConfig, + ) this.shareService = new ShareService(this.authService, this.settingsService, this.log) try { - TelemetryService.instance.register(this.telemetryClient) + TelemetryService.instance.register(this.queuedTelemetryClient) } catch (error) { - this.log("[CloudService] Failed to register TelemetryClient:", error) + this.log("[CloudService] Failed to register QueuedTelemetryClient:", error) } this.isInitialized = true @@ -191,9 +215,9 @@ export class CloudService extends EventEmitter implements vs // TelemetryClient - public captureEvent(event: TelemetryEvent): void { + public async captureEvent(event: TelemetryEvent): Promise { this.ensureInitialized() - this.telemetryClient!.capture(event) + await this.queuedTelemetryClient!.capture(event) } // ShareService @@ -224,7 +248,7 @@ export class CloudService extends EventEmitter implements vs // Lifecycle - public dispose(): void { + public async dispose(): Promise { if (this.authService) { this.authService.off("auth-state-changed", this.authStateListener) this.authService.off("user-info", this.authUserInfoListener) @@ -235,6 +259,9 @@ export class CloudService extends EventEmitter implements vs } this.settingsService.dispose() } + if (this.queuedTelemetryClient) { + await this.queuedTelemetryClient.shutdown() + } this.isInitialized = false } @@ -280,4 +307,64 @@ export class CloudService extends EventEmitter implements vs static isEnabled(): boolean { return !!this._instance?.isAuthenticated() } + + // Getters for queue integration + public getAuthService(): AuthService { + this.ensureInitialized() + return this.authService! + } + + public getSettingsService(): SettingsService { + this.ensureInitialized() + return this.settingsService! + } + + public getTelemetryClient(): TelemetryClient { + this.ensureInitialized() + return this.telemetryClient! + } + + public getQueuedTelemetryClient(): QueuedTelemetryClient { + this.ensureInitialized() + return this.queuedTelemetryClient! + } + + /** + * Process any queued telemetry events + * Returns the number of events successfully processed + */ + public async processQueuedEvents(): Promise { + this.ensureInitialized() + return this.queuedTelemetryClient!.processQueue() + } + + /** + * Get queue status including multi-instance information + */ + public async getQueueStatus(): Promise< + QueueStatus & { + instanceInfo?: { + instanceId: string + hostname: string + multiInstanceEnabled: boolean + multiInstanceMode: string + } + } + > { + this.ensureInitialized() + return this.queuedTelemetryClient!.getQueueStatus() + } + + /** + * Get multi-instance lock statistics + */ + public async getLockStats(): Promise<{ + hasLock: boolean + lockHolder?: string + lockAge?: number + isExpired?: boolean + }> { + this.ensureInitialized() + return this.queuedTelemetryClient!.getLockStats() + } } diff --git a/packages/cloud/src/TelemetryClient.ts b/packages/cloud/src/TelemetryClient.ts index e33843a30c..8cb65cf3b9 100644 --- a/packages/cloud/src/TelemetryClient.ts +++ b/packages/cloud/src/TelemetryClient.ts @@ -27,14 +27,14 @@ export class TelemetryClient extends BaseTelemetryClient { private async fetch(path: string, options: RequestInit) { if (!this.authService.isAuthenticated()) { - return + throw new Error("Not authenticated") } const token = this.authService.getSessionToken() if (!token) { console.error(`[TelemetryClient#fetch] Unauthorized: No session token available.`) - return + throw new Error("No session token available") } const response = await fetch(`${getRooCodeApiUrl()}/api/${path}`, { diff --git a/packages/cloud/src/__tests__/CloudService.test.ts b/packages/cloud/src/__tests__/CloudService.test.ts index fd3ae9b9c0..9d1d7d05cf 100644 --- a/packages/cloud/src/__tests__/CloudService.test.ts +++ b/packages/cloud/src/__tests__/CloudService.test.ts @@ -34,6 +34,33 @@ vi.mock("../ShareService") vi.mock("../TelemetryClient") +vi.mock("../queue", () => ({ + QueuedTelemetryClient: vi.fn().mockImplementation(() => ({ + capture: vi.fn().mockResolvedValue(undefined), + processQueue: vi.fn().mockResolvedValue(0), + getQueueStatus: vi.fn().mockResolvedValue({ + queueSize: 0, + oldestEventAge: undefined, + processingState: "idle", + lastProcessedAt: undefined, + lastError: undefined, + storageStats: { + sizeInBytes: 0, + sizeInMB: 0, + utilizationPercent: 0, + eventCount: 0, + }, + }), + getLockStats: vi.fn().mockResolvedValue({ + hasLock: false, + lockHolder: undefined, + lockAge: undefined, + isExpired: undefined, + }), + shutdown: vi.fn().mockResolvedValue(undefined), + })), +})) + describe("CloudService", () => { let mockContext: vscode.ExtensionContext let mockAuthService: { diff --git a/packages/cloud/src/index.ts b/packages/cloud/src/index.ts index 9770f349c6..5b30aee564 100644 --- a/packages/cloud/src/index.ts +++ b/packages/cloud/src/index.ts @@ -1,2 +1,4 @@ export * from "./CloudService" export * from "./Config" +export type { AuthService, AuthServiceEvents, AuthState } from "./auth" +export * from "./queue" diff --git a/packages/cloud/src/queue/CloudQueueProcessor.ts b/packages/cloud/src/queue/CloudQueueProcessor.ts new file mode 100644 index 0000000000..d0c9f0632b --- /dev/null +++ b/packages/cloud/src/queue/CloudQueueProcessor.ts @@ -0,0 +1,60 @@ +import { QueueProcessor, QueuedTelemetryEvent } from "./types" +import { TelemetryClient } from "../TelemetryClient" + +/** + * Processes queued telemetry events by sending them to the cloud + */ +export class CloudQueueProcessor implements QueueProcessor { + constructor(private telemetryClient: TelemetryClient) {} + + async process(event: QueuedTelemetryEvent): Promise { + try { + // Use the telemetry client to send the event + await this.telemetryClient.capture(event.event) + // Only log errors, not successes + return true + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error) + console.error(`[CloudQueueProcessor] Failed to process event ${event.id}:`, errorMessage) + + // Store error for debugging + event.lastError = errorMessage + + // Determine if we should retry based on error type + if (this.isRetryableError(error)) { + return false + } + + // Non-retryable error, consider it "processed" to remove from queue + // Only log actual errors, not warnings about non-retryable errors + return true + } + } + + async isReady(): Promise { + // Always ready - no connection detection per requirements + return true + } + + private isRetryableError(error: unknown): boolean { + const errorMessage = error instanceof Error ? error.message : String(error) + + // Don't retry validation errors + if (errorMessage.includes("validation") || errorMessage.includes("invalid")) { + return false + } + + // Don't retry authentication errors (user needs to re-authenticate) + if (errorMessage.includes("401") || errorMessage.includes("403") || errorMessage.includes("Unauthorized")) { + return false + } + + // Don't retry if the event schema is invalid + if (errorMessage.includes("Invalid telemetry event")) { + return false + } + + // Retry network and server errors + return true + } +} diff --git a/packages/cloud/src/queue/GlobalStateQueueStorage.ts b/packages/cloud/src/queue/GlobalStateQueueStorage.ts new file mode 100644 index 0000000000..c80a917243 --- /dev/null +++ b/packages/cloud/src/queue/GlobalStateQueueStorage.ts @@ -0,0 +1,373 @@ +import * as vscode from "vscode" +import * as os from "os" +import { QueueStorage, QueuedTelemetryEvent, QueueLock, MultiInstanceConfig } from "./types" + +/** + * Implementation of QueueStorage using VS Code's globalState API + * Enforces a 1MB storage limit for the queue + * Supports multi-instance coordination with locking + */ +export class GlobalStateQueueStorage implements QueueStorage { + private static readonly STORAGE_KEY = "rooCode.telemetryQueue" + private static readonly LOCK_KEY = "rooCode.telemetryQueue.lock" + private static readonly DEFAULT_MAX_STORAGE_SIZE = 1048576 // 1MB in bytes + private static readonly DEFAULT_LOCK_DURATION_MS = 30000 // 30 seconds + private static readonly DEFAULT_LOCK_CHECK_INTERVAL_MS = 5000 // 5 seconds + private static readonly DEFAULT_LOCK_ACQUIRE_TIMEOUT_MS = 10000 // 10 seconds + + private readonly instanceId: string + private readonly hostname: string + private readonly multiInstanceConfig: Required + + constructor( + private context: vscode.ExtensionContext, + private maxStorageSize = GlobalStateQueueStorage.DEFAULT_MAX_STORAGE_SIZE, + multiInstanceConfig?: MultiInstanceConfig, + ) { + // Generate unique instance ID + this.instanceId = this.generateInstanceId() + this.hostname = os.hostname() + + // Set default multi-instance configuration + this.multiInstanceConfig = { + enabled: true, + lockDurationMs: GlobalStateQueueStorage.DEFAULT_LOCK_DURATION_MS, + lockCheckIntervalMs: GlobalStateQueueStorage.DEFAULT_LOCK_CHECK_INTERVAL_MS, + lockAcquireTimeoutMs: GlobalStateQueueStorage.DEFAULT_LOCK_ACQUIRE_TIMEOUT_MS, + mode: "compete", + ...multiInstanceConfig, + } + + // Only log errors, not initialization info + } + + /** + * Generate a unique instance ID + */ + private generateInstanceId(): string { + const timestamp = Date.now() + const random = Math.random().toString(36).substring(2, 9) + const pid = process.pid + return `inst_${pid}_${timestamp}_${random}` + } + + /** + * Acquire a lock for queue processing + */ + async acquireLock(): Promise { + if (!this.multiInstanceConfig.enabled || this.multiInstanceConfig.mode === "disabled") { + return true + } + + const startTime = Date.now() + const timeout = this.multiInstanceConfig.lockAcquireTimeoutMs + + while (Date.now() - startTime < timeout) { + try { + const currentLock = await this.getLock() + const now = Date.now() + + // Check if lock is expired or doesn't exist + if (!currentLock || currentLock.expiresAt < now) { + // Try to acquire the lock + const newLock: QueueLock = { + instanceId: this.instanceId, + acquiredAt: now, + expiresAt: now + this.multiInstanceConfig.lockDurationMs, + hostname: this.hostname, + } + + // Atomic compare-and-swap + const success = await this.compareAndSwapLock(currentLock, newLock) + if (success) { + // Only log errors, not successful lock acquisition + return true + } + } + + // Wait before retrying + await this.sleep(100 + Math.random() * 100) // 100-200ms with jitter + } catch (error) { + console.error("[QueueStorage] Error acquiring lock:", error) + } + } + + // Failed to acquire lock within timeout, return silently + return false + } + + /** + * Release the lock held by this instance + */ + async releaseLock(): Promise { + if (!this.multiInstanceConfig.enabled || this.multiInstanceConfig.mode === "disabled") { + return + } + + try { + const currentLock = await this.getLock() + if (currentLock && currentLock.instanceId === this.instanceId) { + await this.context.globalState.update(GlobalStateQueueStorage.LOCK_KEY, undefined) + // Only log errors, not successful lock release + } + } catch (error) { + console.error("[QueueStorage] Error releasing lock:", error) + } + } + + /** + * Check if this instance holds the lock + */ + async holdsLock(): Promise { + if (!this.multiInstanceConfig.enabled || this.multiInstanceConfig.mode === "disabled") { + return true + } + + const lock = await this.getLock() + return lock !== null && lock.instanceId === this.instanceId && lock.expiresAt > Date.now() + } + + /** + * Get the current lock + */ + private async getLock(): Promise { + try { + return this.context.globalState.get(GlobalStateQueueStorage.LOCK_KEY) || null + } catch (error) { + console.error("[QueueStorage] Error reading lock:", error) + return null + } + } + + /** + * Atomic compare-and-swap for lock + */ + private async compareAndSwapLock(expectedLock: QueueLock | null, newLock: QueueLock): Promise { + // VS Code's globalState doesn't provide true CAS, so we implement optimistic locking + // with a small race window. In practice, this is acceptable for our use case. + const currentLock = await this.getLock() + + // Check if lock state matches our expectation + if (this.locksEqual(currentLock, expectedLock)) { + await this.context.globalState.update(GlobalStateQueueStorage.LOCK_KEY, newLock) + + // Verify the write succeeded + const verifyLock = await this.getLock() + return this.locksEqual(verifyLock, newLock) + } + + return false + } + + /** + * Compare two locks for equality + */ + private locksEqual(lock1: QueueLock | null, lock2: QueueLock | null): boolean { + if (lock1 === null && lock2 === null) return true + if (lock1 === null || lock2 === null) return false + return lock1.instanceId === lock2.instanceId && lock1.acquiredAt === lock2.acquiredAt + } + + /** + * Sleep for the specified milliseconds + */ + private sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) + } + + async add(event: QueuedTelemetryEvent): Promise { + // Use atomic operation with retry + await this.atomicOperation(async () => { + const events = await this.getAll() + events.push(event) + + // Check storage size before saving + const sizeInBytes = this.calculateSize(events) + if (sizeInBytes > this.maxStorageSize) { + // Remove oldest events until we're under the limit + let _removedCount = 0 + while (events.length > 0 && this.calculateSize(events) > this.maxStorageSize) { + events.shift() // Remove oldest event (FIFO) + _removedCount++ + } + + // Removed oldest events to stay under storage limit, no need to log + + // If even the single new event is too large, throw an error + if (events.length === 0) { + throw new Error("Event too large to store in queue") + } + } + + await this.save(events) + }) + } + + async remove(id: string): Promise { + return await this.atomicOperation(async () => { + const events = await this.getAll() + const initialLength = events.length + const filtered = events.filter((e) => e.id !== id) + + if (filtered.length < initialLength) { + await this.save(filtered) + return true + } + + return false + }) + } + + async update(event: QueuedTelemetryEvent): Promise { + return await this.atomicOperation(async () => { + const events = await this.getAll() + const index = events.findIndex((e) => e.id === event.id) + + if (index !== -1) { + events[index] = event + + // Check if update would exceed storage limit + const sizeInBytes = this.calculateSize(events) + if (sizeInBytes > this.maxStorageSize) { + console.error("[QueueStorage] Update would exceed storage limit, rejecting update") + return false + } + + await this.save(events) + return true + } + + return false + }) + } + + async getAll(): Promise { + try { + const data = this.context.globalState.get(GlobalStateQueueStorage.STORAGE_KEY) + // Ensure events are sorted by timestamp (FIFO) + return (data || []).sort((a, b) => a.timestamp - b.timestamp) + } catch (error) { + console.error("[QueueStorage] Failed to read queue:", error) + return [] + } + } + + async getCount(): Promise { + const events = await this.getAll() + return events.length + } + + async clear(): Promise { + await this.atomicOperation(async () => { + await this.context.globalState.update(GlobalStateQueueStorage.STORAGE_KEY, []) + }) + } + + async getSize(): Promise { + const events = await this.getAll() + return this.calculateSize(events) + } + + private calculateSize(events: QueuedTelemetryEvent[]): number { + const jsonString = JSON.stringify(events) + return new TextEncoder().encode(jsonString).length + } + + private async save(events: QueuedTelemetryEvent[]): Promise { + try { + await this.context.globalState.update(GlobalStateQueueStorage.STORAGE_KEY, events) + } catch (error) { + console.error("[QueueStorage] Failed to save queue:", error) + throw new Error(`Failed to save telemetry queue: ${error instanceof Error ? error.message : String(error)}`) + } + } + + /** + * Get storage statistics for monitoring + */ + async getStorageStats(): Promise<{ + sizeInBytes: number + sizeInMB: number + utilizationPercent: number + eventCount: number + oldestEventAge?: number + }> { + const events = await this.getAll() + const sizeInBytes = this.calculateSize(events) + const now = Date.now() + const oldestEvent = events[0] + + return { + sizeInBytes, + sizeInMB: sizeInBytes / 1024 / 1024, + utilizationPercent: (sizeInBytes / this.maxStorageSize) * 100, + eventCount: events.length, + oldestEventAge: oldestEvent ? now - oldestEvent.timestamp : undefined, + } + } + + /** + * Perform an atomic operation with retry logic + */ + private async atomicOperation(operation: () => Promise): Promise { + const maxRetries = 3 + const baseDelay = 100 + + for (let attempt = 0; attempt < maxRetries; attempt++) { + try { + return await operation() + } catch (error) { + if (attempt === maxRetries - 1) { + throw error + } + + // Exponential backoff with jitter + const delay = baseDelay * Math.pow(2, attempt) + Math.random() * 100 + // Operation failed, retrying silently + await this.sleep(delay) + } + } + + throw new Error("Atomic operation failed after all retries") + } + + /** + * Get lock statistics for monitoring + */ + async getLockStats(): Promise<{ + hasLock: boolean + lockHolder?: string + lockAge?: number + isExpired?: boolean + }> { + const lock = await this.getLock() + if (!lock) { + return { hasLock: false } + } + + const now = Date.now() + return { + hasLock: true, + lockHolder: `${lock.instanceId} (${lock.hostname || "unknown"})`, + lockAge: now - lock.acquiredAt, + isExpired: lock.expiresAt < now, + } + } + + /** + * Get instance information + */ + getInstanceInfo(): { instanceId: string; hostname: string } { + return { + instanceId: this.instanceId, + hostname: this.hostname, + } + } + + /** + * Get multi-instance configuration + */ + getMultiInstanceConfig(): Required { + return this.multiInstanceConfig + } +} diff --git a/packages/cloud/src/queue/QueuedTelemetryClient.ts b/packages/cloud/src/queue/QueuedTelemetryClient.ts new file mode 100644 index 0000000000..fbfce7ccac --- /dev/null +++ b/packages/cloud/src/queue/QueuedTelemetryClient.ts @@ -0,0 +1,201 @@ +import * as vscode from "vscode" +import { TelemetryEvent, TelemetryEventName, TelemetryPropertiesProvider } from "@roo-code/types" +import { BaseTelemetryClient } from "@roo-code/telemetry" +import { TelemetryEventQueue } from "./TelemetryEventQueue" +import { GlobalStateQueueStorage } from "./GlobalStateQueueStorage" +import { CloudQueueProcessor } from "./CloudQueueProcessor" +import { TelemetryClient } from "../TelemetryClient" +import type { QueueStatus, MultiInstanceConfig, QueueOptionsWithMultiInstance } from "./types" + +/** + * A telemetry client that queues events for reliable delivery + * Events are persisted to VS Code's globalState and processed in FIFO order + * Supports multi-instance coordination to prevent race conditions + */ +export class QueuedTelemetryClient extends BaseTelemetryClient { + private queue: TelemetryEventQueue + private storage: GlobalStateQueueStorage + private processingInterval?: NodeJS.Timeout + + constructor( + private cloudClient: TelemetryClient, + private context: vscode.ExtensionContext, + debug = false, + multiInstanceConfig?: MultiInstanceConfig, + ) { + super( + { + type: "exclude", + events: [TelemetryEventName.TASK_CONVERSATION_MESSAGE], + }, + debug, + ) + + // Initialize queue components with multi-instance support + this.storage = new GlobalStateQueueStorage(context, undefined, multiInstanceConfig) + const processor = new CloudQueueProcessor(cloudClient) + + const queueOptions: QueueOptionsWithMultiInstance = { + processOnEnqueue: true, + multiInstance: multiInstanceConfig, + } + + this.queue = new TelemetryEventQueue(this.storage, processor, queueOptions) + + // Only log errors, not initialization info + + // Set up periodic processing if in leader mode + if (multiInstanceConfig?.mode === "leader") { + this.setupPeriodicProcessing() + } + } + + /** + * Set up periodic queue processing for leader mode + */ + private setupPeriodicProcessing(): void { + const config = this.storage.getMultiInstanceConfig() + const interval = config.lockCheckIntervalMs || 5000 + + this.processingInterval = setInterval(async () => { + try { + // Only process if we can acquire the lock (leader election) + await this.queue.processQueue() + // Only log errors, not successful processing + } catch (error) { + console.error("[QueuedTelemetryClient] Periodic processing error:", error) + } + }, interval) + } + + /** + * Capture a telemetry event by adding it to the queue + */ + public override async capture(event: TelemetryEvent): Promise { + if (!this.isTelemetryEnabled() || !this.isEventCapturable(event.event)) { + // Skip event silently + return + } + + // Add to queue instead of sending directly + await this.queue.enqueue(event) + } + + /** + * Manually trigger queue processing + * Returns the number of events successfully processed + */ + public async processQueue(): Promise { + return this.queue.processQueue() + } + + /** + * Get the current status of the queue + */ + public async getQueueStatus(): Promise< + QueueStatus & { + instanceInfo?: { + instanceId: string + hostname: string + multiInstanceEnabled: boolean + multiInstanceMode: string + } + } + > { + const status = await this.queue.getStatus() + const instanceInfo = this.storage.getInstanceInfo() + const config = this.storage.getMultiInstanceConfig() + + return { + ...status, + instanceInfo: { + ...instanceInfo, + multiInstanceEnabled: config.enabled, + multiInstanceMode: config.mode, + }, + } + } + + /** + * Clear all events from the queue + */ + public async clearQueue(): Promise { + await this.queue.clear() + } + + /** + * Update telemetry state + */ + public override updateTelemetryState(didUserOptIn: boolean): void { + this.cloudClient.updateTelemetryState(didUserOptIn) + this.telemetryEnabled = didUserOptIn + } + + /** + * Check if telemetry is enabled + */ + public override isTelemetryEnabled(): boolean { + return this.cloudClient.isTelemetryEnabled() + } + + /** + * Check if a specific event is capturable + */ + protected override isEventCapturable(eventName: TelemetryEventName): boolean { + // Use parent class logic for subscription filtering + return super.isEventCapturable(eventName) + } + + /** + * Set the telemetry provider + */ + public override setProvider(provider: TelemetryPropertiesProvider): void { + super.setProvider(provider) + this.cloudClient.setProvider(provider) + } + + /** + * Shutdown the client and process any remaining events + */ + public override async shutdown(): Promise { + // Stop periodic processing + if (this.processingInterval) { + clearInterval(this.processingInterval) + this.processingInterval = undefined + } + + // Process any remaining events before shutdown + try { + await this.queue.processQueue() + // Only log errors, not successful shutdown processing + } catch (error) { + console.error("[QueuedTelemetryClient] Failed to process queue during shutdown:", error) + } + + // Cleanup queue resources + await this.queue.shutdown() + + // Shutdown the underlying cloud client + await this.cloudClient.shutdown() + } + + /** + * Force process the queue (useful for testing or manual triggers) + */ + public async forceProcessQueue(): Promise { + // Force processing queue silently + return this.queue.processQueue() + } + + /** + * Get multi-instance lock statistics + */ + public async getLockStats(): Promise<{ + hasLock: boolean + lockHolder?: string + lockAge?: number + isExpired?: boolean + }> { + return this.storage.getLockStats() + } +} diff --git a/packages/cloud/src/queue/TelemetryEventQueue.ts b/packages/cloud/src/queue/TelemetryEventQueue.ts new file mode 100644 index 0000000000..66ddf31f7a --- /dev/null +++ b/packages/cloud/src/queue/TelemetryEventQueue.ts @@ -0,0 +1,288 @@ +import { TelemetryEvent } from "@roo-code/types" +import { QueueStorage, QueueProcessor, QueuedTelemetryEvent, QueueStatus, QueueOptionsWithMultiInstance } from "./types" +import { GlobalStateQueueStorage } from "./GlobalStateQueueStorage" + +/** + * Main telemetry event queue implementation + * Manages FIFO queue processing with retry logic and multi-instance coordination + */ +export class TelemetryEventQueue { + private isProcessing = false + private processingPromise?: Promise + private readonly options: Required + private lockReleaseTimer?: NodeJS.Timeout + + constructor( + private storage: QueueStorage, + private processor: QueueProcessor, + options: QueueOptionsWithMultiInstance = {}, + ) { + this.options = { + maxRetries: 3, + processOnEnqueue: true, + maxStorageSize: 1048576, // 1MB + multiInstance: { + enabled: true, + lockDurationMs: 30000, + lockCheckIntervalMs: 5000, + lockAcquireTimeoutMs: 10000, + mode: "compete", + ...options.multiInstance, + }, + ...options, + } + } + + /** + * Add an event to the queue and optionally trigger processing + */ + async enqueue(event: TelemetryEvent): Promise { + const queuedEvent: QueuedTelemetryEvent = { + id: this.generateEventId(), + timestamp: Date.now(), + event, + retryCount: 0, + } + + try { + await this.storage.add(queuedEvent) + } catch (error) { + // If storage fails due to size limit, log and continue + console.error("[TelemetryEventQueue] Failed to enqueue event:", error) + return + } + + if (this.options.processOnEnqueue) { + // Don't await, let it process in background + this.triggerProcessing().catch((error) => { + console.error("[TelemetryEventQueue] Background processing failed:", error) + }) + } + } + + /** + * Process all events in the queue + * Returns the number of successfully processed events + */ + async processQueue(): Promise { + // If already processing, return the existing promise + if (this.processingPromise) { + return this.processingPromise + } + + // Create a new processing promise + this.processingPromise = this.doProcessQueue().finally(() => { + this.processingPromise = undefined + }) + + return this.processingPromise + } + + /** + * Internal method to process the queue + */ + private async doProcessQueue(): Promise { + if (this.isProcessing) { + return 0 + } + + // Check if multi-instance coordination is enabled + const isMultiInstanceEnabled = + this.options.multiInstance?.enabled && this.options.multiInstance?.mode !== "disabled" + + // Try to acquire lock if multi-instance is enabled + let hasLock = false + if (isMultiInstanceEnabled && this.storage instanceof GlobalStateQueueStorage) { + hasLock = await this.storage.acquireLock() + if (!hasLock) { + // Only log errors, not debug info about lock acquisition + return 0 + } + + // Set up periodic lock renewal + this.startLockRenewal() + } else { + hasLock = true // No locking needed + } + + this.isProcessing = true + let processedCount = 0 + + try { + // Check if processor is ready + if (!(await this.processor.isReady())) { + // Only log errors, not debug info about processor readiness + return 0 + } + + const events = await this.storage.getAll() + + for (const event of events) { + // Check if we still hold the lock (for multi-instance) + if (isMultiInstanceEnabled && this.storage instanceof GlobalStateQueueStorage) { + if (!(await this.storage.holdsLock())) { + // Lost lock during processing, stopping silently + break + } + } + + // Skip events that have exceeded retry limit + if (event.retryCount >= this.options.maxRetries) { + // Event exceeded retry limit, removing silently + await this.storage.remove(event.id) + continue + } + + const success = await this.processor.process(event) + + if (success) { + await this.storage.remove(event.id) + processedCount++ + } else { + // Update retry count and timestamp + event.retryCount++ + event.lastAttemptTimestamp = Date.now() + await this.storage.update(event) + + // Stop processing on failure (no automatic retry) + break + } + } + + // Only log errors, not success info + } catch (error) { + console.error("[TelemetryEventQueue] Queue processing error:", error) + } finally { + this.isProcessing = false + this.stopLockRenewal() + + // Release lock if we acquired it + if (hasLock && isMultiInstanceEnabled && this.storage instanceof GlobalStateQueueStorage) { + await this.storage.releaseLock() + } + } + + return processedCount + } + + /** + * Start periodic lock renewal to prevent lock expiration during processing + */ + private startLockRenewal(): void { + if (this.lockReleaseTimer) { + clearInterval(this.lockReleaseTimer) + } + + const renewalInterval = Math.max((this.options.multiInstance?.lockDurationMs || 30000) / 3, 5000) + + this.lockReleaseTimer = setInterval(async () => { + if (this.storage instanceof GlobalStateQueueStorage) { + const hasLock = await this.storage.holdsLock() + if (hasLock) { + // Renew lock by re-acquiring it + await this.storage.acquireLock() + } else { + // Lost lock, stop renewal + this.stopLockRenewal() + } + } + }, renewalInterval) + } + + /** + * Stop lock renewal timer + */ + private stopLockRenewal(): void { + if (this.lockReleaseTimer) { + clearInterval(this.lockReleaseTimer) + this.lockReleaseTimer = undefined + } + } + + /** + * Get the current status of the queue + */ + async getStatus(): Promise { + const events = await this.storage.getAll() + const sizeInBytes = await this.storage.getSize() + + const failedEventCount = events.filter((e) => e.retryCount > 0).length + const oldestEvent = events.length > 0 ? events[0] : undefined + + const status: QueueStatus = { + count: events.length, + sizeInBytes, + isProcessing: this.isProcessing, + oldestEventTimestamp: oldestEvent?.timestamp, + failedEventCount, + } + + // Add lock information if available + if (this.storage instanceof GlobalStateQueueStorage) { + const lockStats = await this.storage.getLockStats() + const instanceInfo = this.storage.getInstanceInfo() + + // Add extended status info + ;( + status as QueueStatus & { + lockInfo?: { + hasLock: boolean + lockHolder?: string + lockAge?: number + isExpired?: boolean + currentInstance: string + multiInstanceMode: string + } + } + ).lockInfo = { + ...lockStats, + currentInstance: instanceInfo.instanceId, + multiInstanceMode: this.options.multiInstance?.mode || "disabled", + } + } + + return status + } + + /** + * Clear all events from the queue + */ + async clear(): Promise { + await this.storage.clear() + } + + /** + * Check if the queue is currently processing + */ + isCurrentlyProcessing(): boolean { + return this.isProcessing + } + + /** + * Trigger queue processing + */ + private async triggerProcessing(): Promise { + await this.processQueue() + } + + /** + * Generate a unique event ID + */ + private generateEventId(): string { + const timestamp = Date.now() + const random = Math.random().toString(36).substring(2, 9) + return `evt_${timestamp}_${random}` + } + + /** + * Cleanup resources when shutting down + */ + async shutdown(): Promise { + this.stopLockRenewal() + + // Release any held locks + if (this.storage instanceof GlobalStateQueueStorage) { + await this.storage.releaseLock() + } + } +} diff --git a/packages/cloud/src/queue/__tests__/GlobalStateQueueStorage.test.ts b/packages/cloud/src/queue/__tests__/GlobalStateQueueStorage.test.ts new file mode 100644 index 0000000000..7ba96cdf0c --- /dev/null +++ b/packages/cloud/src/queue/__tests__/GlobalStateQueueStorage.test.ts @@ -0,0 +1,583 @@ +import { describe, it, expect, beforeEach, vi } from "vitest" +import * as vscode from "vscode" +import { GlobalStateQueueStorage } from "../GlobalStateQueueStorage" +import { QueuedTelemetryEvent, MultiInstanceConfig } from "../types" +import { TelemetryEventName } from "@roo-code/types" + +// Mock VS Code extension context +const createMockContext = () => { + const storage = new Map() + + return { + globalState: { + get: vi.fn((key: string) => storage.get(key)), + update: vi.fn(async (key: string, value: unknown) => { + storage.set(key, value) + }), + keys: vi.fn(() => Array.from(storage.keys())), + setKeysForSync: vi.fn(), + }, + subscriptions: [], + extensionPath: "/test/path", + extensionUri: { + fsPath: "/test/path", + scheme: "file", + authority: "", + path: "/test/path", + query: "", + fragment: "", + }, + environmentVariableCollection: {} as vscode.EnvironmentVariableCollection, + storageUri: { + fsPath: "/test/storage", + scheme: "file", + authority: "", + path: "/test/storage", + query: "", + fragment: "", + }, + globalStorageUri: { + fsPath: "/test/global-storage", + scheme: "file", + authority: "", + path: "/test/global-storage", + query: "", + fragment: "", + }, + logUri: { fsPath: "/test/logs", scheme: "file", authority: "", path: "/test/logs", query: "", fragment: "" }, + extensionMode: 2, // ExtensionMode.Test + extension: {} as vscode.Extension, + asAbsolutePath: vi.fn((path: string) => path), + storagePath: "/test/storage", + globalStoragePath: "/test/global-storage", + logPath: "/test/logs", + workspaceState: {} as vscode.Memento, + secrets: {} as vscode.SecretStorage, + } as unknown as vscode.ExtensionContext +} + +describe("GlobalStateQueueStorage", () => { + let context: vscode.ExtensionContext + let storage: GlobalStateQueueStorage + + beforeEach(() => { + context = createMockContext() + storage = new GlobalStateQueueStorage(context) + }) + + describe("add", () => { + it("should add events to storage", async () => { + const event: QueuedTelemetryEvent = { + id: "evt_123_abc", + timestamp: Date.now(), + event: { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test-123" }, + }, + retryCount: 0, + } + + await storage.add(event) + + const events = await storage.getAll() + expect(events).toHaveLength(1) + expect(events[0]).toEqual(event) + }) + + it("should maintain FIFO order", async () => { + const events: QueuedTelemetryEvent[] = [] + const baseTime = Date.now() + + for (let i = 0; i < 5; i++) { + events.push({ + id: `evt_${i}`, + timestamp: baseTime + i * 1000, + event: { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: `test-${i}` }, + }, + retryCount: 0, + }) + } + + // Add events in random order + await storage.add(events[2]) + await storage.add(events[0]) + await storage.add(events[4]) + await storage.add(events[1]) + await storage.add(events[3]) + + const storedEvents = await storage.getAll() + expect(storedEvents).toHaveLength(5) + + // Should be sorted by timestamp + for (let i = 0; i < 5; i++) { + expect(storedEvents[i].id).toBe(`evt_${i}`) + } + }) + + it("should enforce 1MB storage limit by removing oldest events", async () => { + // Create a large event (approximately 100KB) + const largeData = "x".repeat(100000) + const events: QueuedTelemetryEvent[] = [] + + // Create 15 events, each ~100KB (total ~1.5MB) + for (let i = 0; i < 15; i++) { + events.push({ + id: `evt_${i}`, + timestamp: Date.now() + i, + event: { + event: TelemetryEventName.TASK_CREATED, + properties: { + taskId: `test-${i}`, + data: largeData, + }, + }, + retryCount: 0, + }) + } + + // Add all events + for (const event of events) { + await storage.add(event) + } + + // Check that oldest events were removed to stay under 1MB + const storedEvents = await storage.getAll() + const size = await storage.getSize() + + expect(size).toBeLessThanOrEqual(1048576) // 1MB + expect(storedEvents.length).toBeLessThan(15) // Some events should have been removed + expect(storedEvents[0].id).not.toBe("evt_0") // Oldest events should be gone + }) + + it("should throw error if single event exceeds storage limit", async () => { + // Create an event larger than 1MB + const hugeData = "x".repeat(1100000) // ~1.1MB + const event: QueuedTelemetryEvent = { + id: "evt_huge", + timestamp: Date.now(), + event: { + event: TelemetryEventName.TASK_CREATED, + properties: { + taskId: "test-huge", + data: hugeData, + }, + }, + retryCount: 0, + } + + await expect(storage.add(event)).rejects.toThrow("Event too large to store in queue") + }) + }) + + describe("remove", () => { + it("should remove event by id", async () => { + const event1: QueuedTelemetryEvent = { + id: "evt_1", + timestamp: Date.now(), + event: { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test-1" }, + }, + retryCount: 0, + } + + const event2: QueuedTelemetryEvent = { + id: "evt_2", + timestamp: Date.now() + 1000, + event: { + event: TelemetryEventName.TASK_COMPLETED, + properties: { taskId: "test-2" }, + }, + retryCount: 0, + } + + await storage.add(event1) + await storage.add(event2) + + const removed = await storage.remove("evt_1") + expect(removed).toBe(true) + + const events = await storage.getAll() + expect(events).toHaveLength(1) + expect(events[0].id).toBe("evt_2") + }) + + it("should return false if event not found", async () => { + const removed = await storage.remove("non-existent") + expect(removed).toBe(false) + }) + }) + + describe("update", () => { + it("should update existing event", async () => { + const event: QueuedTelemetryEvent = { + id: "evt_1", + timestamp: Date.now(), + event: { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test-1" }, + }, + retryCount: 0, + } + + await storage.add(event) + + // Update the event + event.retryCount = 1 + event.lastAttemptTimestamp = Date.now() + event.lastError = "Network error" + + const updated = await storage.update(event) + expect(updated).toBe(true) + + const events = await storage.getAll() + expect(events[0].retryCount).toBe(1) + expect(events[0].lastAttemptTimestamp).toBeDefined() + expect(events[0].lastError).toBe("Network error") + }) + + it("should return false if event not found", async () => { + const event: QueuedTelemetryEvent = { + id: "non-existent", + timestamp: Date.now(), + event: { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test-1" }, + }, + retryCount: 0, + } + + const updated = await storage.update(event) + expect(updated).toBe(false) + }) + + it("should reject update if it would exceed storage limit", async () => { + // Create a small event first + const event: QueuedTelemetryEvent = { + id: "evt_1", + timestamp: Date.now(), + event: { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test-1" }, + }, + retryCount: 0, + } + + await storage.add(event) + + // Try to update with huge data + event.event.properties = { + taskId: "test-1", + data: "x".repeat(1100000), // ~1.1MB + } + + const updated = await storage.update(event) + expect(updated).toBe(false) + }) + }) + + describe("getStorageStats", () => { + it("should return correct storage statistics", async () => { + const baseTime = Date.now() - 60000 // 1 minute ago + + const events: QueuedTelemetryEvent[] = [ + { + id: "evt_1", + timestamp: baseTime, + event: { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test-1" }, + }, + retryCount: 0, + }, + { + id: "evt_2", + timestamp: baseTime + 30000, + event: { + event: TelemetryEventName.TASK_COMPLETED, + properties: { taskId: "test-2" }, + }, + retryCount: 0, + }, + ] + + for (const event of events) { + await storage.add(event) + } + + const stats = await storage.getStorageStats() + + expect(stats.eventCount).toBe(2) + expect(stats.sizeInBytes).toBeGreaterThan(0) + expect(stats.sizeInMB).toBe(stats.sizeInBytes / 1024 / 1024) + expect(stats.utilizationPercent).toBe((stats.sizeInBytes / 1048576) * 100) + expect(stats.oldestEventAge).toBeGreaterThan(50000) // More than 50 seconds + }) + }) + + describe("clear", () => { + it("should remove all events", async () => { + const events: QueuedTelemetryEvent[] = [ + { + id: "evt_1", + timestamp: Date.now(), + event: { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test-1" }, + }, + retryCount: 0, + }, + { + id: "evt_2", + timestamp: Date.now() + 1000, + event: { + event: TelemetryEventName.TASK_COMPLETED, + properties: { taskId: "test-2" }, + }, + retryCount: 0, + }, + ] + + for (const event of events) { + await storage.add(event) + } + + await storage.clear() + + const remainingEvents = await storage.getAll() + expect(remainingEvents).toHaveLength(0) + + const count = await storage.getCount() + expect(count).toBe(0) + + const size = await storage.getSize() + expect(size).toBe(2) // Empty array "[]" + }) + }) + + describe("multi-instance support", () => { + let storage1: GlobalStateQueueStorage + let storage2: GlobalStateQueueStorage + let multiInstanceConfig: MultiInstanceConfig + + beforeEach(() => { + multiInstanceConfig = { + enabled: true, + lockDurationMs: 1000, // 1 second for faster tests + lockCheckIntervalMs: 100, + lockAcquireTimeoutMs: 2000, + mode: "compete", + } + + // Create two storage instances sharing the same context (simulating two VS Code instances) + storage1 = new GlobalStateQueueStorage(context, undefined, multiInstanceConfig) + storage2 = new GlobalStateQueueStorage(context, undefined, multiInstanceConfig) + }) + + describe("lock acquisition", () => { + it("should allow only one instance to acquire lock", async () => { + // Set a shorter timeout for this test + const testConfig: MultiInstanceConfig = { + enabled: true, + lockDurationMs: 1000, + lockCheckIntervalMs: 50, + lockAcquireTimeoutMs: 200, // Short timeout + mode: "compete", + } + + const testStorage1 = new GlobalStateQueueStorage(context, undefined, testConfig) + const testStorage2 = new GlobalStateQueueStorage(context, undefined, testConfig) + + const lock1 = await testStorage1.acquireLock() + expect(lock1).toBe(true) + + const lock2 = await testStorage2.acquireLock() + expect(lock2).toBe(false) + + // Release lock from instance 1 + await testStorage1.releaseLock() + + // Now instance 2 should be able to acquire + const lock2Retry = await testStorage2.acquireLock() + expect(lock2Retry).toBe(true) + }) + + it("should respect lock expiration", async () => { + // Acquire lock with instance 1 + const lock1 = await storage1.acquireLock() + expect(lock1).toBe(true) + + // Wait for lock to expire + await new Promise((resolve) => setTimeout(resolve, 1100)) + + // Instance 2 should now be able to acquire + const lock2 = await storage2.acquireLock() + expect(lock2).toBe(true) + }) + + it("should handle concurrent lock attempts", async () => { + // Create a more realistic concurrent scenario + const testConfig: MultiInstanceConfig = { + enabled: true, + lockDurationMs: 1000, + lockCheckIntervalMs: 50, + lockAcquireTimeoutMs: 100, // Very short timeout to ensure one fails + mode: "compete", + } + + const testStorage1 = new GlobalStateQueueStorage(context, undefined, testConfig) + const testStorage2 = new GlobalStateQueueStorage(context, undefined, testConfig) + + // Acquire lock with first instance + const lock1 = await testStorage1.acquireLock() + expect(lock1).toBe(true) + + // Second instance should fail due to short timeout + const lock2 = await testStorage2.acquireLock() + expect(lock2).toBe(false) + }) + + it("should return true when multi-instance is disabled", async () => { + const disabledConfig: MultiInstanceConfig = { + enabled: false, + } + + const storage3 = new GlobalStateQueueStorage(context, undefined, disabledConfig) + const storage4 = new GlobalStateQueueStorage(context, undefined, disabledConfig) + + // Both should be able to "acquire" lock + const lock3 = await storage3.acquireLock() + const lock4 = await storage4.acquireLock() + + expect(lock3).toBe(true) + expect(lock4).toBe(true) + }) + }) + + describe("holdsLock", () => { + it("should correctly identify lock holder", async () => { + await storage1.acquireLock() + + expect(await storage1.holdsLock()).toBe(true) + expect(await storage2.holdsLock()).toBe(false) + }) + + it("should return false for expired lock", async () => { + await storage1.acquireLock() + expect(await storage1.holdsLock()).toBe(true) + + // Wait for lock to expire + await new Promise((resolve) => setTimeout(resolve, 1100)) + + expect(await storage1.holdsLock()).toBe(false) + }) + }) + + describe("atomic operations", () => { + it("should handle sequential adds from different instances", async () => { + const event1: QueuedTelemetryEvent = { + id: "evt_1", + timestamp: Date.now(), + event: { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test-1" }, + }, + retryCount: 0, + } + + const event2: QueuedTelemetryEvent = { + id: "evt_2", + timestamp: Date.now() + 1, + event: { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test-2" }, + }, + retryCount: 0, + } + + // Add events sequentially to avoid race conditions in the mock + await storage1.add(event1) + await storage2.add(event2) + + // Both events should be in storage + const events = await storage1.getAll() + expect(events).toHaveLength(2) + expect(events.map((e) => e.id).sort()).toEqual(["evt_1", "evt_2"]) + + // Verify both instances see the same data + const events2 = await storage2.getAll() + expect(events2).toHaveLength(2) + expect(events2.map((e) => e.id).sort()).toEqual(["evt_1", "evt_2"]) + }) + + it("should retry operations on failure", async () => { + // Mock a temporary failure + let failCount = 0 + const originalUpdate = context.globalState.update + context.globalState.update = vi.fn(async (key: string, value: unknown) => { + if (failCount < 2) { + failCount++ + throw new Error("Temporary failure") + } + return originalUpdate.call(context.globalState, key, value) + }) + + const event: QueuedTelemetryEvent = { + id: "evt_retry", + timestamp: Date.now(), + event: { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test-retry" }, + }, + retryCount: 0, + } + + // Should succeed after retries + await storage1.add(event) + + const events = await storage1.getAll() + expect(events).toHaveLength(1) + expect(events[0].id).toBe("evt_retry") + }) + }) + + describe("getLockStats", () => { + it("should return correct lock statistics", async () => { + // No lock initially + let stats = await storage1.getLockStats() + expect(stats.hasLock).toBe(false) + + // Acquire lock + await storage1.acquireLock() + + stats = await storage1.getLockStats() + expect(stats.hasLock).toBe(true) + expect(stats.lockHolder).toContain(storage1.getInstanceInfo().instanceId) + expect(stats.lockAge).toBeGreaterThanOrEqual(0) + expect(stats.isExpired).toBe(false) + + // Wait for expiration + await new Promise((resolve) => setTimeout(resolve, 1100)) + + stats = await storage1.getLockStats() + expect(stats.hasLock).toBe(true) + expect(stats.isExpired).toBe(true) + }) + }) + + describe("instance identification", () => { + it("should generate unique instance IDs", () => { + const info1 = storage1.getInstanceInfo() + const info2 = storage2.getInstanceInfo() + + expect(info1.instanceId).toBeDefined() + expect(info2.instanceId).toBeDefined() + expect(info1.instanceId).not.toBe(info2.instanceId) + expect(info1.hostname).toBeDefined() + expect(info2.hostname).toBeDefined() + }) + }) + }) +}) diff --git a/packages/cloud/src/queue/__tests__/TelemetryEventQueue.test.ts b/packages/cloud/src/queue/__tests__/TelemetryEventQueue.test.ts new file mode 100644 index 0000000000..5ad2c37f55 --- /dev/null +++ b/packages/cloud/src/queue/__tests__/TelemetryEventQueue.test.ts @@ -0,0 +1,558 @@ +import { describe, it, expect, vi, beforeEach } from "vitest" +import * as vscode from "vscode" +import { TelemetryEvent, TelemetryEventName } from "@roo-code/types" +import { TelemetryEventQueue } from "../TelemetryEventQueue" +import { GlobalStateQueueStorage } from "../GlobalStateQueueStorage" +import { QueueStorage, QueueProcessor, QueuedTelemetryEvent, MultiInstanceConfig, QueueStatus } from "../types" + +// Mock implementations +class MockQueueStorage implements QueueStorage { + private events: QueuedTelemetryEvent[] = [] + private maxSize: number + + constructor(maxSize = 1048576) { + this.maxSize = maxSize + } + + async add(event: QueuedTelemetryEvent): Promise { + this.events.push(event) + const size = await this.getSize() + if (size > this.maxSize) { + throw new Error("Storage limit exceeded") + } + } + + async remove(id: string): Promise { + const index = this.events.findIndex((e) => e.id === id) + if (index !== -1) { + this.events.splice(index, 1) + return true + } + return false + } + + async update(event: QueuedTelemetryEvent): Promise { + const index = this.events.findIndex((e) => e.id === event.id) + if (index !== -1) { + this.events[index] = event + return true + } + return false + } + + async getAll(): Promise { + return [...this.events].sort((a, b) => a.timestamp - b.timestamp) + } + + async getCount(): Promise { + return this.events.length + } + + async clear(): Promise { + this.events = [] + } + + async getSize(): Promise { + const jsonString = JSON.stringify(this.events) + return new TextEncoder().encode(jsonString).length + } +} + +class MockQueueProcessor implements QueueProcessor { + public processResults: Map = new Map() + public isReadyResult = true + public processedEvents: QueuedTelemetryEvent[] = [] + + async process(event: QueuedTelemetryEvent): Promise { + this.processedEvents.push(event) + const result = this.processResults.get(event.id) ?? true + return result + } + + async isReady(): Promise { + return this.isReadyResult + } + + setProcessResult(eventId: string, result: boolean): void { + this.processResults.set(eventId, result) + } +} + +describe("TelemetryEventQueue", () => { + let storage: MockQueueStorage + let processor: MockQueueProcessor + let queue: TelemetryEventQueue + + beforeEach(() => { + storage = new MockQueueStorage() + processor = new MockQueueProcessor() + queue = new TelemetryEventQueue(storage, processor) + }) + + describe("enqueue", () => { + it("should add event to storage and trigger processing", async () => { + const event: TelemetryEvent = { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test-123" }, + } + + await queue.enqueue(event) + + // Wait for background processing + await new Promise((resolve) => setTimeout(resolve, 100)) + + const storedEvents = await storage.getAll() + expect(storedEvents).toHaveLength(0) // Should be processed and removed + expect(processor.processedEvents).toHaveLength(1) + expect(processor.processedEvents[0].event).toEqual(event) + }) + + it("should handle storage errors gracefully", async () => { + // Create a storage that will throw an error + const errorStorage = new MockQueueStorage(1) // 1 byte limit + const errorQueue = new TelemetryEventQueue(errorStorage, processor) + + const event: TelemetryEvent = { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test-123", largeData: "x".repeat(1000) }, + } + + // Should not throw + await expect(errorQueue.enqueue(event)).resolves.toBeUndefined() + }) + + it("should not process if processOnEnqueue is false", async () => { + queue = new TelemetryEventQueue(storage, processor, { processOnEnqueue: false }) + + const event: TelemetryEvent = { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test-123" }, + } + + await queue.enqueue(event) + + // Wait to ensure no processing happens + await new Promise((resolve) => setTimeout(resolve, 100)) + + const storedEvents = await storage.getAll() + expect(storedEvents).toHaveLength(1) + expect(processor.processedEvents).toHaveLength(0) + }) + }) + + describe("processQueue", () => { + it("should process events in FIFO order", async () => { + queue = new TelemetryEventQueue(storage, processor, { processOnEnqueue: false }) + + const events: TelemetryEvent[] = [ + { event: TelemetryEventName.TASK_CREATED, properties: { taskId: "1" } }, + { event: TelemetryEventName.TASK_RESTARTED, properties: { taskId: "2" } }, + { event: TelemetryEventName.TASK_COMPLETED, properties: { taskId: "3" } }, + ] + + for (const event of events) { + await queue.enqueue(event) + } + + const processedCount = await queue.processQueue() + + expect(processedCount).toBe(3) + expect(processor.processedEvents).toHaveLength(3) + expect(processor.processedEvents[0].event.properties?.taskId).toBe("1") + expect(processor.processedEvents[1].event.properties?.taskId).toBe("2") + expect(processor.processedEvents[2].event.properties?.taskId).toBe("3") + }) + + it("should stop processing on failure", async () => { + queue = new TelemetryEventQueue(storage, processor, { processOnEnqueue: false }) + + const events: TelemetryEvent[] = [ + { event: TelemetryEventName.TASK_CREATED, properties: { taskId: "1" } }, + { event: TelemetryEventName.TASK_RESTARTED, properties: { taskId: "2" } }, + { event: TelemetryEventName.TASK_COMPLETED, properties: { taskId: "3" } }, + ] + + for (const event of events) { + await queue.enqueue(event) + } + + // Make the second event fail + const storedEvents = await storage.getAll() + processor.setProcessResult(storedEvents[1].id, false) + + const processedCount = await queue.processQueue() + + expect(processedCount).toBe(1) // Only first event processed + expect(processor.processedEvents).toHaveLength(2) // First succeeded, second failed + + const remainingEvents = await storage.getAll() + expect(remainingEvents).toHaveLength(2) // Second and third events remain + expect(remainingEvents[0].retryCount).toBe(1) // Second event has retry count + }) + + it("should remove events that exceed retry limit", async () => { + queue = new TelemetryEventQueue(storage, processor, { + processOnEnqueue: false, + maxRetries: 2, + }) + + const event: TelemetryEvent = { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test-123" }, + } + + await queue.enqueue(event) + + const storedEvents = await storage.getAll() + + // Update event to have exceeded retry count + storedEvents[0].retryCount = 2 + await storage.update(storedEvents[0]) + + const processedCount = await queue.processQueue() + + expect(processedCount).toBe(0) + const remainingEvents = await storage.getAll() + expect(remainingEvents).toHaveLength(0) // Event removed due to retry limit + }) + + it("should handle concurrent processing requests", async () => { + queue = new TelemetryEventQueue(storage, processor, { processOnEnqueue: false }) + + const event: TelemetryEvent = { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test-123" }, + } + + await queue.enqueue(event) + + // Start multiple concurrent processing requests + const results = await Promise.all([queue.processQueue(), queue.processQueue(), queue.processQueue()]) + + // All should return the same result (from the same promise) + expect(results[0]).toBe(results[1]) + expect(results[1]).toBe(results[2]) + expect(results[0]).toBe(1) + expect(processor.processedEvents).toHaveLength(1) + }) + + it("should skip processing if processor is not ready", async () => { + processor.isReadyResult = false + queue = new TelemetryEventQueue(storage, processor, { processOnEnqueue: false }) + + const event: TelemetryEvent = { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test-123" }, + } + + await queue.enqueue(event) + + const processedCount = await queue.processQueue() + + expect(processedCount).toBe(0) + expect(processor.processedEvents).toHaveLength(0) + + const remainingEvents = await storage.getAll() + expect(remainingEvents).toHaveLength(1) + }) + }) + + describe("getStatus", () => { + it("should return correct queue status", async () => { + queue = new TelemetryEventQueue(storage, processor, { processOnEnqueue: false }) + + const events: TelemetryEvent[] = [ + { event: TelemetryEventName.TASK_CREATED, properties: { taskId: "1" } }, + { event: TelemetryEventName.TASK_RESTARTED, properties: { taskId: "2" } }, + ] + + for (const event of events) { + await queue.enqueue(event) + } + + // Make one event have a retry + const storedEvents = await storage.getAll() + storedEvents[0].retryCount = 1 + await storage.update(storedEvents[0]) + + const status = await queue.getStatus() + + expect(status.count).toBe(2) + expect(status.sizeInBytes).toBeGreaterThan(0) + expect(status.isProcessing).toBe(false) + expect(status.oldestEventTimestamp).toBeDefined() + expect(status.failedEventCount).toBe(1) + }) + }) + + describe("clear", () => { + it("should clear all events from the queue", async () => { + queue = new TelemetryEventQueue(storage, processor, { processOnEnqueue: false }) + + const event: TelemetryEvent = { + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test-123" }, + } + + await queue.enqueue(event) + await queue.clear() + + const remainingEvents = await storage.getAll() + expect(remainingEvents).toHaveLength(0) + }) + }) + + describe("multi-instance support", () => { + let mockContext: vscode.ExtensionContext + let storage1: GlobalStateQueueStorage + let storage2: GlobalStateQueueStorage + let queue1: TelemetryEventQueue + let queue2: TelemetryEventQueue + + beforeEach(() => { + // Create a mock context that simulates shared globalState + const sharedStorage = new Map() + mockContext = { + globalState: { + get: vi.fn((key: string) => sharedStorage.get(key)), + update: vi.fn(async (key: string, value: unknown) => { + sharedStorage.set(key, value) + }), + keys: vi.fn(() => Array.from(sharedStorage.keys())), + setKeysForSync: vi.fn(), + }, + } as unknown as vscode.ExtensionContext + + const multiInstanceConfig: MultiInstanceConfig = { + enabled: true, + lockDurationMs: 1000, + lockCheckIntervalMs: 100, + lockAcquireTimeoutMs: 2000, + mode: "compete", + } + + // Create two storage instances with multi-instance support + storage1 = new GlobalStateQueueStorage(mockContext, undefined, multiInstanceConfig) + storage2 = new GlobalStateQueueStorage(mockContext, undefined, multiInstanceConfig) + + // Create two queue instances + queue1 = new TelemetryEventQueue(storage1, processor, { + processOnEnqueue: false, + multiInstance: multiInstanceConfig, + }) + queue2 = new TelemetryEventQueue(storage2, processor, { + processOnEnqueue: false, + multiInstance: multiInstanceConfig, + }) + }) + + it("should prevent concurrent processing from multiple instances", async () => { + // Add events to the shared queue + const events: TelemetryEvent[] = [ + { event: TelemetryEventName.TASK_CREATED, properties: { taskId: "1" } }, + { event: TelemetryEventName.TASK_CREATED, properties: { taskId: "2" } }, + { event: TelemetryEventName.TASK_CREATED, properties: { taskId: "3" } }, + ] + + for (const event of events) { + await queue1.enqueue(event) + } + + // Reset processor to track which instance processed + processor.processedEvents = [] + + // Both instances try to process simultaneously + const [result1, result2] = await Promise.all([queue1.processQueue(), queue2.processQueue()]) + + // Only one instance should have processed + const totalProcessed = result1 + result2 + expect(totalProcessed).toBe(3) + expect(processor.processedEvents).toHaveLength(3) + + // One result should be 3, the other 0 + expect([result1, result2].includes(3)).toBe(true) + expect([result1, result2].includes(0)).toBe(true) + }) + + it("should renew lock during long processing", async () => { + // Create a processor that takes time + let processStarted = false + let processCompleted = false + const slowProcessor = new MockQueueProcessor() + slowProcessor.process = async (_event: QueuedTelemetryEvent) => { + processStarted = true + // Simulate processing that takes longer than initial lock duration + await new Promise((resolve) => setTimeout(resolve, 800)) + processCompleted = true + return true + } + + const renewQueue = new TelemetryEventQueue(storage1, slowProcessor, { + processOnEnqueue: false, + multiInstance: { + enabled: true, + lockDurationMs: 600, // Lock expires in 600ms + lockCheckIntervalMs: 200, // Renew every 200ms + mode: "compete", + }, + }) + + await renewQueue.enqueue({ + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "slow" }, + }) + + // Process the queue + const result = await renewQueue.processQueue() + + // Processing should complete successfully due to lock renewal + expect(processStarted).toBe(true) + expect(processCompleted).toBe(true) + expect(result).toBe(1) + }) + + it("should stop processing if lock is stolen by another instance", async () => { + // Create a processor that allows us to control timing + let processingStarted = false + let shouldContinue = true + const controlledProcessor = new MockQueueProcessor() + controlledProcessor.process = async (_event: QueuedTelemetryEvent) => { + processingStarted = true + // Wait until we signal to continue + while (shouldContinue) { + await new Promise((resolve) => setTimeout(resolve, 100)) + } + return true + } + + const queue = new TelemetryEventQueue(storage1, controlledProcessor, { + processOnEnqueue: false, + multiInstance: { + enabled: true, + lockDurationMs: 500, + lockCheckIntervalMs: 100, + mode: "compete", + }, + }) + + await queue.enqueue({ + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "test" }, + }) + + // Start processing + const processPromise = queue.processQueue() + + // Wait for processing to start + await new Promise((resolve) => setTimeout(resolve, 200)) + expect(processingStarted).toBe(true) + + // Force expire the lock by waiting + await new Promise((resolve) => setTimeout(resolve, 600)) + + // Another instance steals the lock + const lockStolen = await storage2.acquireLock() + expect(lockStolen).toBe(true) + + // Signal processor to continue + shouldContinue = false + + // Wait for processing to complete + const result = await processPromise + + // Should have processed the event before lock was checked again + expect(result).toBe(1) + }) + + it("should work without locking when multi-instance is disabled", async () => { + // Create queues without multi-instance support + const noLockQueue1 = new TelemetryEventQueue(storage, processor, { + processOnEnqueue: false, + multiInstance: { enabled: false }, + }) + const noLockQueue2 = new TelemetryEventQueue(storage, processor, { + processOnEnqueue: false, + multiInstance: { enabled: false }, + }) + + await noLockQueue1.enqueue({ + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "1" }, + }) + + processor.processedEvents = [] + + // Both can process (though they'll compete for the same events) + const [result1, result2] = await Promise.all([noLockQueue1.processQueue(), noLockQueue2.processQueue()]) + + // At least one should have processed + expect(result1 + result2).toBeGreaterThan(0) + }) + + it("should include lock information in status", async () => { + // Acquire lock with queue1 + await storage1.acquireLock() + + const status = await queue1.getStatus() + + expect(status).toBeDefined() + + // Type assertion for extended status with lockInfo + const statusWithLock = status as QueueStatus & { + lockInfo?: { + hasLock: boolean + lockHolder?: string + lockAge?: number + isExpired?: boolean + currentInstance: string + multiInstanceMode: string + } + } + + expect(statusWithLock.lockInfo).toBeDefined() + expect(statusWithLock.lockInfo?.hasLock).toBe(true) + expect(statusWithLock.lockInfo?.currentInstance).toBe(storage1.getInstanceInfo().instanceId) + expect(statusWithLock.lockInfo?.multiInstanceMode).toBe("compete") + }) + + it("should handle leader mode with periodic processing", async () => { + const leaderConfig: MultiInstanceConfig = { + enabled: true, + lockDurationMs: 1000, + lockCheckIntervalMs: 100, + lockAcquireTimeoutMs: 500, + mode: "leader", + } + + // Create a queue in leader mode + const leaderQueue = new TelemetryEventQueue(storage1, processor, { + processOnEnqueue: false, + multiInstance: leaderConfig, + }) + + // Add an event + await leaderQueue.enqueue({ + event: TelemetryEventName.TASK_CREATED, + properties: { taskId: "leader-test" }, + }) + + // Only the instance that acquires the lock should process + const result = await leaderQueue.processQueue() + + // Should process if lock was acquired + expect(result).toBeGreaterThanOrEqual(0) + }) + + it("should properly cleanup on shutdown", async () => { + // Acquire lock + await storage1.acquireLock() + expect(await storage1.holdsLock()).toBe(true) + + // Shutdown queue + await queue1.shutdown() + + // Lock should be released + expect(await storage1.holdsLock()).toBe(false) + }) + }) +}) diff --git a/packages/cloud/src/queue/index.ts b/packages/cloud/src/queue/index.ts new file mode 100644 index 0000000000..4296905f91 --- /dev/null +++ b/packages/cloud/src/queue/index.ts @@ -0,0 +1,14 @@ +export { TelemetryEventQueue } from "./TelemetryEventQueue" +export { GlobalStateQueueStorage } from "./GlobalStateQueueStorage" +export { CloudQueueProcessor } from "./CloudQueueProcessor" +export { QueuedTelemetryClient } from "./QueuedTelemetryClient" +export type { + QueuedTelemetryEvent, + QueueStorage, + QueueProcessor, + QueueStatus, + QueueOptions, + QueueLock, + MultiInstanceConfig, + QueueOptionsWithMultiInstance, +} from "./types" diff --git a/packages/cloud/src/queue/types.ts b/packages/cloud/src/queue/types.ts new file mode 100644 index 0000000000..fd38e49eb4 --- /dev/null +++ b/packages/cloud/src/queue/types.ts @@ -0,0 +1,231 @@ +import { TelemetryEvent } from "@roo-code/types" + +/** + * Represents a telemetry event stored in the queue with metadata for processing + */ +export interface QueuedTelemetryEvent { + /** + * Unique identifier for the queued event + * Format: "evt__" + */ + id: string + + /** + * Unix timestamp when the event was added to the queue + */ + timestamp: number + + /** + * The actual telemetry event to be sent + */ + event: TelemetryEvent + + /** + * Number of times this event has failed to send + */ + retryCount: number + + /** + * Unix timestamp of the last processing attempt + * undefined if never attempted + */ + lastAttemptTimestamp?: number + + /** + * Error message from the last failed attempt + * undefined if no failures or not attempted + */ + lastError?: string +} + +/** + * Interface for persisting the queue to storage + */ +export interface QueueStorage { + /** + * Add a new event to the queue + * @param event The event to add + * @throws Error if storage fails + */ + add(event: QueuedTelemetryEvent): Promise + + /** + * Remove an event from the queue by ID + * @param id The event ID to remove + * @returns true if removed, false if not found + */ + remove(id: string): Promise + + /** + * Update an existing event in the queue + * @param event The updated event + * @returns true if updated, false if not found + */ + update(event: QueuedTelemetryEvent): Promise + + /** + * Get all events in the queue, ordered by timestamp (FIFO) + * @returns Array of queued events + */ + getAll(): Promise + + /** + * Get the number of events in the queue + * @returns Count of queued events + */ + getCount(): Promise + + /** + * Clear all events from the queue + */ + clear(): Promise + + /** + * Get the total size of the queue in bytes + * @returns Size in bytes + */ + getSize(): Promise +} + +/** + * Interface for processing events from the queue + */ +export interface QueueProcessor { + /** + * Process a single queued event + * @param event The event to process + * @returns true if successfully processed, false otherwise + */ + process(event: QueuedTelemetryEvent): Promise + + /** + * Check if the processor is ready to process events + * @returns true if ready (e.g., authenticated, connected) + */ + isReady(): Promise +} + +/** + * Status information about the queue + */ +export interface QueueStatus { + /** + * Number of events in the queue + */ + count: number + + /** + * Total size of the queue in bytes + */ + sizeInBytes: number + + /** + * Whether the queue is currently processing + */ + isProcessing: boolean + + /** + * Oldest event timestamp, undefined if queue is empty + */ + oldestEventTimestamp?: number + + /** + * Number of events with retry attempts + */ + failedEventCount: number +} + +/** + * Configuration options for the queue + */ +export interface QueueOptions { + /** + * Maximum number of retry attempts before giving up on an event + * Default: 3 + */ + maxRetries?: number + + /** + * Whether to automatically process the queue when new events are added + * Default: true + */ + processOnEnqueue?: boolean + + /** + * Maximum storage size in bytes (1MB = 1048576 bytes) + * Default: 1048576 + */ + maxStorageSize?: number +} + +/** + * Represents a lock for multi-instance queue processing + */ +export interface QueueLock { + /** + * Unique identifier of the instance holding the lock + */ + instanceId: string + + /** + * Unix timestamp when the lock was acquired + */ + acquiredAt: number + + /** + * Unix timestamp when the lock expires + */ + expiresAt: number + + /** + * Optional hostname for debugging + */ + hostname?: string +} + +/** + * Multi-instance behavior configuration + */ +export interface MultiInstanceConfig { + /** + * Whether to enable multi-instance coordination + * Default: true + */ + enabled?: boolean + + /** + * Lock duration in milliseconds + * Default: 30000 (30 seconds) + */ + lockDurationMs?: number + + /** + * How often to check for expired locks in milliseconds + * Default: 5000 (5 seconds) + */ + lockCheckIntervalMs?: number + + /** + * Maximum time to wait for acquiring a lock in milliseconds + * Default: 10000 (10 seconds) + */ + lockAcquireTimeoutMs?: number + + /** + * Behavior when multiple instances are detected + * - 'compete': All instances compete for the lock (default) + * - 'leader': Only one instance processes (leader election) + * - 'disabled': No coordination, all instances process independently + */ + mode?: "compete" | "leader" | "disabled" +} + +/** + * Extended queue options with multi-instance support + */ +export interface QueueOptionsWithMultiInstance extends QueueOptions { + /** + * Multi-instance configuration + */ + multiInstance?: MultiInstanceConfig +} diff --git a/packages/types/src/vscode.ts b/packages/types/src/vscode.ts index 00f6bbbcba..de23a9077e 100644 --- a/packages/types/src/vscode.ts +++ b/packages/types/src/vscode.ts @@ -53,6 +53,10 @@ export const commandIds = [ "focusInput", "acceptInput", "focusPanel", + + "telemetryQueueStatus", + "telemetryQueueProcess", + "telemetryQueueClear", ] as const export type CommandId = (typeof commandIds)[number] diff --git a/src/activate/registerCommands.ts b/src/activate/registerCommands.ts index bd925b0e90..bc5310e16f 100644 --- a/src/activate/registerCommands.ts +++ b/src/activate/registerCommands.ts @@ -3,6 +3,7 @@ import delay from "delay" import type { CommandId } from "@roo-code/types" import { TelemetryService } from "@roo-code/telemetry" +import { CloudService, QueuedTelemetryClient } from "@roo-code/cloud" import { Package } from "../shared/package" import { getCommand } from "../utils/commands" @@ -218,6 +219,68 @@ const getCommandsMap = ({ context, outputChannel, provider }: RegisterCommandOpt visibleProvider.postMessageToWebview({ type: "acceptInput" }) }, + telemetryQueueStatus: async () => { + const cloudService = CloudService.hasInstance() ? CloudService.instance : null + if (!cloudService) { + vscode.window.showWarningMessage("Cloud service not initialized") + return + } + + // Get queue status + const clients = (TelemetryService.instance as any).clients || [] + const queuedClient = clients.find((c: any) => c instanceof QueuedTelemetryClient) as + | QueuedTelemetryClient + | undefined + + if (!queuedClient) { + vscode.window.showWarningMessage("Telemetry queue not initialized") + return + } + + const status = await queuedClient.getQueueStatus() + + vscode.window.showInformationMessage( + `Telemetry Queue: ${status.count} events, ` + + `${status.failedEventCount} failed, ` + + `${(status.sizeInBytes / 1024).toFixed(2)}KB`, + ) + }, + telemetryQueueProcess: async () => { + const clients = (TelemetryService.instance as any).clients || [] + const queuedClient = clients.find((c: any) => c instanceof QueuedTelemetryClient) as + | QueuedTelemetryClient + | undefined + + if (!queuedClient) { + vscode.window.showWarningMessage("Telemetry queue not initialized") + return + } + + const processed = await queuedClient.processQueue() + vscode.window.showInformationMessage(`Processed ${processed} telemetry events`) + }, + telemetryQueueClear: async () => { + const clients = (TelemetryService.instance as any).clients || [] + const queuedClient = clients.find((c: any) => c instanceof QueuedTelemetryClient) as + | QueuedTelemetryClient + | undefined + + if (!queuedClient) { + vscode.window.showWarningMessage("Telemetry queue not initialized") + return + } + + const answer = await vscode.window.showWarningMessage( + "Are you sure you want to clear all queued telemetry events?", + "Yes", + "No", + ) + + if (answer === "Yes") { + await queuedClient.clearQueue() + vscode.window.showInformationMessage("Telemetry queue cleared") + } + }, }) export const openClineInNewTab = async ({ context, outputChannel }: Omit) => { diff --git a/src/extension.ts b/src/extension.ts index 60c61aada7..88f655521c 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -85,6 +85,19 @@ export async function activate(context: vscode.ExtensionContext) { // Add to subscriptions for proper cleanup on deactivate context.subscriptions.push(cloudService) + // Process any queued events from previous sessions + try { + const processedCount = await cloudService.processQueuedEvents() + if (processedCount > 0) { + outputChannel.appendLine(`[Telemetry] Processed ${processedCount} queued events from previous session`) + } + } catch (error) { + console.error("Failed to process queued events:", error) + outputChannel.appendLine( + `[Telemetry] Failed to process queued events: ${error instanceof Error ? error.message : String(error)}`, + ) + } + // Initialize MDM service const mdmService = await MdmService.createInstance(cloudLogger)