-
Notifications
You must be signed in to change notification settings - Fork 2.6k
feat: add persistent retry queue for failed telemetry events #6572
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -1,3 +1,4 @@ | ||||||||
| import * as vscode from "vscode" | ||||||||
| import { | ||||||||
| TelemetryEventName, | ||||||||
| type TelemetryEvent, | ||||||||
|
|
@@ -9,9 +10,13 @@ import { BaseTelemetryClient } from "@roo-code/telemetry" | |||||||
| import { getRooCodeApiUrl } from "./Config" | ||||||||
| import type { AuthService } from "./auth" | ||||||||
| import type { SettingsService } from "./SettingsService" | ||||||||
| import { TelemetryQueue } from "./TelemetryQueue" | ||||||||
|
|
||||||||
| export class TelemetryClient extends BaseTelemetryClient { | ||||||||
| private queue: TelemetryQueue | ||||||||
|
|
||||||||
| constructor( | ||||||||
| private context: vscode.ExtensionContext, | ||||||||
| private authService: AuthService, | ||||||||
| private settingsService: SettingsService, | ||||||||
| debug = false, | ||||||||
|
|
@@ -23,18 +28,19 @@ export class TelemetryClient extends BaseTelemetryClient { | |||||||
| }, | ||||||||
| debug, | ||||||||
| ) | ||||||||
| this.queue = new TelemetryQueue(context, debug) | ||||||||
| } | ||||||||
|
|
||||||||
| private async fetch(path: string, options: RequestInit) { | ||||||||
| private async fetch(path: string, options: RequestInit): Promise<Response | undefined> { | ||||||||
| if (!this.authService.isAuthenticated()) { | ||||||||
| return | ||||||||
| return undefined | ||||||||
| } | ||||||||
|
|
||||||||
| const token = this.authService.getSessionToken() | ||||||||
|
|
||||||||
| if (!token) { | ||||||||
| console.error(`[TelemetryClient#fetch] Unauthorized: No session token available.`) | ||||||||
| return | ||||||||
| return undefined | ||||||||
| } | ||||||||
|
|
||||||||
| const response = await fetch(`${getRooCodeApiUrl()}/api/${path}`, { | ||||||||
|
|
@@ -47,6 +53,8 @@ export class TelemetryClient extends BaseTelemetryClient { | |||||||
| `[TelemetryClient#fetch] ${options.method} ${path} -> ${response.status} ${response.statusText}`, | ||||||||
| ) | ||||||||
| } | ||||||||
|
|
||||||||
| return response | ||||||||
| } | ||||||||
|
|
||||||||
| public override async capture(event: TelemetryEvent) { | ||||||||
|
|
@@ -77,10 +85,80 @@ export class TelemetryClient extends BaseTelemetryClient { | |||||||
| return | ||||||||
| } | ||||||||
|
|
||||||||
| // Add event to queue | ||||||||
| await this.queue.enqueue(result.data) | ||||||||
|
|
||||||||
| // Process queue asynchronously if not already processing | ||||||||
| if (!this.queue.isProcessingQueue()) { | ||||||||
| // Don't await - let it process in the background | ||||||||
| this.processQueue().catch((error) => { | ||||||||
| if (this.debug) { | ||||||||
| console.error(`[TelemetryClient#capture] Error processing queue:`, error) | ||||||||
| } | ||||||||
| }) | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| /** | ||||||||
| * Processes the telemetry queue, sending events to the cloud service | ||||||||
| */ | ||||||||
| private async processQueue(): Promise<void> { | ||||||||
| if (!this.authService.isAuthenticated()) { | ||||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I notice the queue doesn't automatically process when authentication state changes. If a user was offline and queued events, then comes online and authenticates, they'd need to trigger a new event before the queue processes. Should we consider adding a listener for auth state changes to process the queue? |
||||||||
| if (this.debug) { | ||||||||
| console.info("[TelemetryClient#processQueue] Skipping: Not authenticated") | ||||||||
| } | ||||||||
| return | ||||||||
| } | ||||||||
|
|
||||||||
| this.queue.setProcessingState(true) | ||||||||
|
|
||||||||
| try { | ||||||||
| await this.fetch(`events`, { method: "POST", body: JSON.stringify(result.data) }) | ||||||||
| } catch (error) { | ||||||||
| console.error(`[TelemetryClient#capture] Error sending telemetry event: ${error}`) | ||||||||
| while (true) { | ||||||||
| const queuedEvent = await this.queue.peek() | ||||||||
| if (!queuedEvent) { | ||||||||
| break // Queue is empty | ||||||||
| } | ||||||||
|
|
||||||||
| try { | ||||||||
| // Attempt to send the event | ||||||||
| const response = await this.fetch(`events`, { | ||||||||
| method: "POST", | ||||||||
| body: JSON.stringify(queuedEvent.event), | ||||||||
| }) | ||||||||
|
|
||||||||
| // Check if response indicates success (fetch doesn't throw on HTTP errors) | ||||||||
| if (response === undefined || (response && response.ok !== false)) { | ||||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This condition seems a bit convoluted. Would this be clearer?
Suggested change
The current logic treats |
||||||||
| // Success - remove from queue | ||||||||
| await this.queue.dequeue(queuedEvent.id) | ||||||||
|
|
||||||||
| if (this.debug) { | ||||||||
| console.info(`[TelemetryClient#processQueue] Successfully sent event ${queuedEvent.id}`) | ||||||||
| } | ||||||||
| } else { | ||||||||
| // HTTP error - mark as failed | ||||||||
| await this.queue.markFailed(queuedEvent.id) | ||||||||
|
|
||||||||
| if (this.debug) { | ||||||||
| console.error(`[TelemetryClient#processQueue] HTTP error for event ${queuedEvent.id}`) | ||||||||
| } | ||||||||
|
|
||||||||
| // Stop processing on error to avoid rapid retry loops | ||||||||
| break | ||||||||
| } | ||||||||
| } catch (error) { | ||||||||
| // Network or other error - mark as failed and move to end of queue | ||||||||
| await this.queue.markFailed(queuedEvent.id) | ||||||||
|
|
||||||||
| if (this.debug) { | ||||||||
| console.error(`[TelemetryClient#processQueue] Failed to send event ${queuedEvent.id}:`, error) | ||||||||
| } | ||||||||
|
|
||||||||
| // Stop processing on error to avoid rapid retry loops | ||||||||
| break | ||||||||
| } | ||||||||
| } | ||||||||
| } finally { | ||||||||
| this.queue.setProcessingState(false) | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
|
|
||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,186 @@ | ||
| import * as vscode from "vscode" | ||
| import { randomUUID } from "crypto" | ||
| import type { RooCodeTelemetryEvent } from "@roo-code/types" | ||
|
|
||
| export interface QueuedTelemetryEvent { | ||
| id: string | ||
| event: RooCodeTelemetryEvent | ||
| timestamp: number | ||
| retryCount: number | ||
| } | ||
|
|
||
| export class TelemetryQueue { | ||
| private static readonly QUEUE_KEY = "rooCode.telemetryQueue" | ||
| private static readonly MAX_QUEUE_SIZE = 1000 // Prevent unbounded growth | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we consider making these limits configurable? Different deployment scenarios might benefit from different queue sizes and retry counts. Perhaps through extension settings or environment variables? |
||
| private static readonly MAX_RETRY_COUNT = 3 // Limit retries per event | ||
|
|
||
| private context: vscode.ExtensionContext | ||
| private isProcessing = false | ||
| private debug: boolean | ||
|
|
||
| constructor(context: vscode.ExtensionContext, debug = false) { | ||
| this.context = context | ||
| this.debug = debug | ||
| } | ||
|
|
||
| /** | ||
| * Adds a telemetry event to the queue | ||
| */ | ||
| public async enqueue(event: RooCodeTelemetryEvent): Promise<void> { | ||
| const queue = await this.getQueue() | ||
|
|
||
| // Prevent unbounded growth | ||
| if (queue.length >= TelemetryQueue.MAX_QUEUE_SIZE) { | ||
| if (this.debug) { | ||
| console.warn( | ||
| `[TelemetryQueue] Queue is full (${TelemetryQueue.MAX_QUEUE_SIZE} items), dropping oldest event`, | ||
| ) | ||
| } | ||
| queue.shift() // Remove oldest event | ||
| } | ||
|
|
||
| const queuedEvent: QueuedTelemetryEvent = { | ||
| id: randomUUID(), | ||
| event, | ||
| timestamp: Date.now(), | ||
| retryCount: 0, | ||
| } | ||
|
|
||
| queue.push(queuedEvent) | ||
| await this.saveQueue(queue) | ||
|
|
||
| if (this.debug) { | ||
| console.info( | ||
| `[TelemetryQueue] Enqueued event ${queuedEvent.id} (${event.type}), queue size: ${queue.length}`, | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Retrieves the next event from the queue without removing it | ||
| */ | ||
| public async peek(): Promise<QueuedTelemetryEvent | null> { | ||
| const queue = await this.getQueue() | ||
| return queue.length > 0 ? queue[0] : null | ||
| } | ||
|
|
||
| /** | ||
| * Removes a successfully sent event from the queue | ||
| */ | ||
| public async dequeue(eventId: string): Promise<void> { | ||
| const queue = await this.getQueue() | ||
| const filteredQueue = queue.filter((e) => e.id !== eventId) | ||
|
|
||
| if (queue.length !== filteredQueue.length) { | ||
| await this.saveQueue(filteredQueue) | ||
| if (this.debug) { | ||
| console.info(`[TelemetryQueue] Dequeued event ${eventId}, queue size: ${filteredQueue.length}`) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Increments retry count for a failed event and moves it to the end of the queue | ||
| */ | ||
| public async markFailed(eventId: string): Promise<void> { | ||
| const queue = await this.getQueue() | ||
| const eventIndex = queue.findIndex((e) => e.id === eventId) | ||
|
|
||
| if (eventIndex === -1) { | ||
| return | ||
| } | ||
|
|
||
| const event = queue[eventIndex] | ||
| event.retryCount++ | ||
|
|
||
| // Remove from current position | ||
| queue.splice(eventIndex, 1) | ||
|
|
||
| // If max retries not exceeded, add back to end of queue | ||
| if (event.retryCount < TelemetryQueue.MAX_RETRY_COUNT) { | ||
| queue.push(event) | ||
| if (this.debug) { | ||
| console.info( | ||
| `[TelemetryQueue] Marked event ${eventId} as failed (retry ${event.retryCount}/${TelemetryQueue.MAX_RETRY_COUNT})`, | ||
| ) | ||
| } | ||
| } else { | ||
| if (this.debug) { | ||
| console.warn(`[TelemetryQueue] Event ${eventId} exceeded max retries, removing from queue`) | ||
| } | ||
| } | ||
|
|
||
| await this.saveQueue(queue) | ||
| } | ||
|
|
||
| /** | ||
| * Gets the current queue size | ||
| */ | ||
| public async size(): Promise<number> { | ||
| const queue = await this.getQueue() | ||
| return queue.length | ||
| } | ||
|
|
||
| /** | ||
| * Checks if the queue is currently being processed | ||
| */ | ||
| public isProcessingQueue(): boolean { | ||
| return this.isProcessing | ||
| } | ||
|
|
||
| /** | ||
| * Sets the processing state | ||
| */ | ||
| public setProcessingState(processing: boolean): void { | ||
| this.isProcessing = processing | ||
| } | ||
|
|
||
| /** | ||
| * Clears all events from the queue | ||
| */ | ||
| public async clear(): Promise<void> { | ||
| await this.saveQueue([]) | ||
| if (this.debug) { | ||
| console.info("[TelemetryQueue] Queue cleared") | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Gets all queued events (for testing/debugging) | ||
| */ | ||
| public async getAll(): Promise<QueuedTelemetryEvent[]> { | ||
| return await this.getQueue() | ||
| } | ||
|
|
||
| private async getQueue(): Promise<QueuedTelemetryEvent[]> { | ||
| try { | ||
| const queue = this.context.globalState.get<QueuedTelemetryEvent[]>(TelemetryQueue.QUEUE_KEY) | ||
| // Validate that we got an array | ||
| if (Array.isArray(queue)) { | ||
| return queue | ||
| } | ||
| // If we got corrupted data, try to reset to empty array | ||
| if (queue !== undefined) { | ||
| console.warn("[TelemetryQueue] Corrupted queue data detected, resetting to empty array") | ||
| try { | ||
| await this.context.globalState.update(TelemetryQueue.QUEUE_KEY, []) | ||
| } catch (updateError) { | ||
| // If update fails, just log and continue with empty array | ||
| console.error("[TelemetryQueue] Failed to reset corrupted queue:", updateError) | ||
| } | ||
| } | ||
| return [] | ||
| } catch (error) { | ||
| console.error("[TelemetryQueue] Failed to get queue:", error) | ||
| return [] | ||
| } | ||
| } | ||
|
|
||
| private async saveQueue(queue: QueuedTelemetryEvent[]): Promise<void> { | ||
| try { | ||
| await this.context.globalState.update(TelemetryQueue.QUEUE_KEY, queue) | ||
| } catch (error) { | ||
| console.error("[TelemetryQueue] Failed to save queue:", error) | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intentional? There's a potential race condition here between checking
isProcessingQueue()and actually setting the processing state inprocessQueue(). Multiple simultaneous captures could trigger multiple queue processors.Could we consider moving the processing state flag setting immediately after the check, or using a more atomic approach?