Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/cloud/src/CloudService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export class CloudService extends EventEmitter<CloudServiceEvents> implements vs
this.settingsService = cloudSettingsService
}

this.telemetryClient = new TelemetryClient(this.authService, this.settingsService)
this.telemetryClient = new TelemetryClient(this.context, this.authService, this.settingsService)
this.shareService = new ShareService(this.authService, this.settingsService, this.log)

try {
Expand Down
90 changes: 84 additions & 6 deletions packages/cloud/src/TelemetryClient.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as vscode from "vscode"
import {
TelemetryEventName,
type TelemetryEvent,
Expand All @@ -9,9 +10,13 @@ import { BaseTelemetryClient } from "@roo-code/telemetry"
import { getRooCodeApiUrl } from "./Config"
import type { AuthService } from "./auth"
import type { SettingsService } from "./SettingsService"
import { TelemetryQueue } from "./TelemetryQueue"

export class TelemetryClient extends BaseTelemetryClient {
private queue: TelemetryQueue

constructor(
private context: vscode.ExtensionContext,
private authService: AuthService,
private settingsService: SettingsService,
debug = false,
Expand All @@ -23,18 +28,19 @@ export class TelemetryClient extends BaseTelemetryClient {
},
debug,
)
this.queue = new TelemetryQueue(context, debug)
}

private async fetch(path: string, options: RequestInit) {
private async fetch(path: string, options: RequestInit): Promise<Response | undefined> {
if (!this.authService.isAuthenticated()) {
return
return undefined
}

const token = this.authService.getSessionToken()

if (!token) {
console.error(`[TelemetryClient#fetch] Unauthorized: No session token available.`)
return
return undefined
}

const response = await fetch(`${getRooCodeApiUrl()}/api/${path}`, {
Expand All @@ -47,6 +53,8 @@ export class TelemetryClient extends BaseTelemetryClient {
`[TelemetryClient#fetch] ${options.method} ${path} -> ${response.status} ${response.statusText}`,
)
}

return response
}

public override async capture(event: TelemetryEvent) {
Expand Down Expand Up @@ -77,10 +85,80 @@ export class TelemetryClient extends BaseTelemetryClient {
return
}

// Add event to queue
await this.queue.enqueue(result.data)

// Process queue asynchronously if not already processing
if (!this.queue.isProcessingQueue()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentional? There's a potential race condition here between checking isProcessingQueue() and actually setting the processing state in processQueue(). Multiple simultaneous captures could trigger multiple queue processors.

Could we consider moving the processing state flag setting immediately after the check, or using a more atomic approach?

// Don't await - let it process in the background
this.processQueue().catch((error) => {
if (this.debug) {
console.error(`[TelemetryClient#capture] Error processing queue:`, error)
}
})
}
}

/**
* Processes the telemetry queue, sending events to the cloud service
*/
private async processQueue(): Promise<void> {
if (!this.authService.isAuthenticated()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice the queue doesn't automatically process when authentication state changes. If a user was offline and queued events, then comes online and authenticates, they'd need to trigger a new event before the queue processes. Should we consider adding a listener for auth state changes to process the queue?

if (this.debug) {
console.info("[TelemetryClient#processQueue] Skipping: Not authenticated")
}
return
}

this.queue.setProcessingState(true)

try {
await this.fetch(`events`, { method: "POST", body: JSON.stringify(result.data) })
} catch (error) {
console.error(`[TelemetryClient#capture] Error sending telemetry event: ${error}`)
while (true) {
const queuedEvent = await this.queue.peek()
if (!queuedEvent) {
break // Queue is empty
}

try {
// Attempt to send the event
const response = await this.fetch(`events`, {
method: "POST",
body: JSON.stringify(queuedEvent.event),
})

// Check if response indicates success (fetch doesn't throw on HTTP errors)
if (response === undefined || (response && response.ok !== false)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition seems a bit convoluted. Would this be clearer?

Suggested change
if (response === undefined || (response && response.ok !== false)) {
// Check if response indicates success
if (response && response.ok) {

The current logic treats undefined as success, which might be confusing for future maintainers.

// Success - remove from queue
await this.queue.dequeue(queuedEvent.id)

if (this.debug) {
console.info(`[TelemetryClient#processQueue] Successfully sent event ${queuedEvent.id}`)
}
} else {
// HTTP error - mark as failed
await this.queue.markFailed(queuedEvent.id)

if (this.debug) {
console.error(`[TelemetryClient#processQueue] HTTP error for event ${queuedEvent.id}`)
}

// Stop processing on error to avoid rapid retry loops
break
}
} catch (error) {
// Network or other error - mark as failed and move to end of queue
await this.queue.markFailed(queuedEvent.id)

if (this.debug) {
console.error(`[TelemetryClient#processQueue] Failed to send event ${queuedEvent.id}:`, error)
}

// Stop processing on error to avoid rapid retry loops
break
}
}
} finally {
this.queue.setProcessingState(false)
}
}

Expand Down
186 changes: 186 additions & 0 deletions packages/cloud/src/TelemetryQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import * as vscode from "vscode"
import { randomUUID } from "crypto"
import type { RooCodeTelemetryEvent } from "@roo-code/types"

export interface QueuedTelemetryEvent {
id: string
event: RooCodeTelemetryEvent
timestamp: number
retryCount: number
}

export class TelemetryQueue {
private static readonly QUEUE_KEY = "rooCode.telemetryQueue"
private static readonly MAX_QUEUE_SIZE = 1000 // Prevent unbounded growth
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we consider making these limits configurable? Different deployment scenarios might benefit from different queue sizes and retry counts. Perhaps through extension settings or environment variables?

private static readonly MAX_RETRY_COUNT = 3 // Limit retries per event

private context: vscode.ExtensionContext
private isProcessing = false
private debug: boolean

constructor(context: vscode.ExtensionContext, debug = false) {
this.context = context
this.debug = debug
}

/**
* Adds a telemetry event to the queue
*/
public async enqueue(event: RooCodeTelemetryEvent): Promise<void> {
const queue = await this.getQueue()

// Prevent unbounded growth
if (queue.length >= TelemetryQueue.MAX_QUEUE_SIZE) {
if (this.debug) {
console.warn(
`[TelemetryQueue] Queue is full (${TelemetryQueue.MAX_QUEUE_SIZE} items), dropping oldest event`,
)
}
queue.shift() // Remove oldest event
}

const queuedEvent: QueuedTelemetryEvent = {
id: randomUUID(),
event,
timestamp: Date.now(),
retryCount: 0,
}

queue.push(queuedEvent)
await this.saveQueue(queue)

if (this.debug) {
console.info(
`[TelemetryQueue] Enqueued event ${queuedEvent.id} (${event.type}), queue size: ${queue.length}`,
)
}
}

/**
* Retrieves the next event from the queue without removing it
*/
public async peek(): Promise<QueuedTelemetryEvent | null> {
const queue = await this.getQueue()
return queue.length > 0 ? queue[0] : null
}

/**
* Removes a successfully sent event from the queue
*/
public async dequeue(eventId: string): Promise<void> {
const queue = await this.getQueue()
const filteredQueue = queue.filter((e) => e.id !== eventId)

if (queue.length !== filteredQueue.length) {
await this.saveQueue(filteredQueue)
if (this.debug) {
console.info(`[TelemetryQueue] Dequeued event ${eventId}, queue size: ${filteredQueue.length}`)
}
}
}

/**
* Increments retry count for a failed event and moves it to the end of the queue
*/
public async markFailed(eventId: string): Promise<void> {
const queue = await this.getQueue()
const eventIndex = queue.findIndex((e) => e.id === eventId)

if (eventIndex === -1) {
return
}

const event = queue[eventIndex]
event.retryCount++

// Remove from current position
queue.splice(eventIndex, 1)

// If max retries not exceeded, add back to end of queue
if (event.retryCount < TelemetryQueue.MAX_RETRY_COUNT) {
queue.push(event)
if (this.debug) {
console.info(
`[TelemetryQueue] Marked event ${eventId} as failed (retry ${event.retryCount}/${TelemetryQueue.MAX_RETRY_COUNT})`,
)
}
} else {
if (this.debug) {
console.warn(`[TelemetryQueue] Event ${eventId} exceeded max retries, removing from queue`)
}
}

await this.saveQueue(queue)
}

/**
* Gets the current queue size
*/
public async size(): Promise<number> {
const queue = await this.getQueue()
return queue.length
}

/**
* Checks if the queue is currently being processed
*/
public isProcessingQueue(): boolean {
return this.isProcessing
}

/**
* Sets the processing state
*/
public setProcessingState(processing: boolean): void {
this.isProcessing = processing
}

/**
* Clears all events from the queue
*/
public async clear(): Promise<void> {
await this.saveQueue([])
if (this.debug) {
console.info("[TelemetryQueue] Queue cleared")
}
}

/**
* Gets all queued events (for testing/debugging)
*/
public async getAll(): Promise<QueuedTelemetryEvent[]> {
return await this.getQueue()
}

private async getQueue(): Promise<QueuedTelemetryEvent[]> {
try {
const queue = this.context.globalState.get<QueuedTelemetryEvent[]>(TelemetryQueue.QUEUE_KEY)
// Validate that we got an array
if (Array.isArray(queue)) {
return queue
}
// If we got corrupted data, try to reset to empty array
if (queue !== undefined) {
console.warn("[TelemetryQueue] Corrupted queue data detected, resetting to empty array")
try {
await this.context.globalState.update(TelemetryQueue.QUEUE_KEY, [])
} catch (updateError) {
// If update fails, just log and continue with empty array
console.error("[TelemetryQueue] Failed to reset corrupted queue:", updateError)
}
}
return []
} catch (error) {
console.error("[TelemetryQueue] Failed to get queue:", error)
return []
}
}

private async saveQueue(queue: QueuedTelemetryEvent[]): Promise<void> {
try {
await this.context.globalState.update(TelemetryQueue.QUEUE_KEY, queue)
} catch (error) {
console.error("[TelemetryQueue] Failed to save queue:", error)
}
}
}
Loading
Loading