Skip to content

Commit a629172

Browse files
committed
feat: add persistent retry queue for failed telemetry events
- Implement TelemetryQueue class with FIFO queue structure - Add persistence using VSCode globalState - Integrate queue into TelemetryClient for automatic retry - Process pending events when new events are captured - Add comprehensive test coverage for queue functionality - Limit queue size to 1000 events and max 3 retries per event Fixes #4940
1 parent 69685c7 commit a629172

File tree

6 files changed

+1194
-57
lines changed

6 files changed

+1194
-57
lines changed

packages/cloud/src/CloudService.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ export class CloudService extends EventEmitter<CloudServiceEvents> implements vs
8787
this.settingsService = cloudSettingsService
8888
}
8989

90-
this.telemetryClient = new TelemetryClient(this.authService, this.settingsService)
90+
this.telemetryClient = new TelemetryClient(this.context, this.authService, this.settingsService)
9191
this.shareService = new ShareService(this.authService, this.settingsService, this.log)
9292

9393
try {

packages/cloud/src/TelemetryClient.ts

Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import * as vscode from "vscode"
12
import {
23
TelemetryEventName,
34
type TelemetryEvent,
@@ -9,9 +10,13 @@ import { BaseTelemetryClient } from "@roo-code/telemetry"
910
import { getRooCodeApiUrl } from "./Config"
1011
import type { AuthService } from "./auth"
1112
import type { SettingsService } from "./SettingsService"
13+
import { TelemetryQueue } from "./TelemetryQueue"
1214

1315
export class TelemetryClient extends BaseTelemetryClient {
16+
private queue: TelemetryQueue
17+
1418
constructor(
19+
private context: vscode.ExtensionContext,
1520
private authService: AuthService,
1621
private settingsService: SettingsService,
1722
debug = false,
@@ -23,18 +28,19 @@ export class TelemetryClient extends BaseTelemetryClient {
2328
},
2429
debug,
2530
)
31+
this.queue = new TelemetryQueue(context, debug)
2632
}
2733

28-
private async fetch(path: string, options: RequestInit) {
34+
private async fetch(path: string, options: RequestInit): Promise<Response | undefined> {
2935
if (!this.authService.isAuthenticated()) {
30-
return
36+
return undefined
3137
}
3238

3339
const token = this.authService.getSessionToken()
3440

3541
if (!token) {
3642
console.error(`[TelemetryClient#fetch] Unauthorized: No session token available.`)
37-
return
43+
return undefined
3844
}
3945

4046
const response = await fetch(`${getRooCodeApiUrl()}/api/${path}`, {
@@ -47,6 +53,8 @@ export class TelemetryClient extends BaseTelemetryClient {
4753
`[TelemetryClient#fetch] ${options.method} ${path} -> ${response.status} ${response.statusText}`,
4854
)
4955
}
56+
57+
return response
5058
}
5159

5260
public override async capture(event: TelemetryEvent) {
@@ -77,10 +85,80 @@ export class TelemetryClient extends BaseTelemetryClient {
7785
return
7886
}
7987

88+
// Add event to queue
89+
await this.queue.enqueue(result.data)
90+
91+
// Process queue asynchronously if not already processing
92+
if (!this.queue.isProcessingQueue()) {
93+
// Don't await - let it process in the background
94+
this.processQueue().catch((error) => {
95+
if (this.debug) {
96+
console.error(`[TelemetryClient#capture] Error processing queue:`, error)
97+
}
98+
})
99+
}
100+
}
101+
102+
/**
103+
* Processes the telemetry queue, sending events to the cloud service
104+
*/
105+
private async processQueue(): Promise<void> {
106+
if (!this.authService.isAuthenticated()) {
107+
if (this.debug) {
108+
console.info("[TelemetryClient#processQueue] Skipping: Not authenticated")
109+
}
110+
return
111+
}
112+
113+
this.queue.setProcessingState(true)
114+
80115
try {
81-
await this.fetch(`events`, { method: "POST", body: JSON.stringify(result.data) })
82-
} catch (error) {
83-
console.error(`[TelemetryClient#capture] Error sending telemetry event: ${error}`)
116+
while (true) {
117+
const queuedEvent = await this.queue.peek()
118+
if (!queuedEvent) {
119+
break // Queue is empty
120+
}
121+
122+
try {
123+
// Attempt to send the event
124+
const response = await this.fetch(`events`, {
125+
method: "POST",
126+
body: JSON.stringify(queuedEvent.event),
127+
})
128+
129+
// Check if response indicates success (fetch doesn't throw on HTTP errors)
130+
if (response === undefined || (response && response.ok !== false)) {
131+
// Success - remove from queue
132+
await this.queue.dequeue(queuedEvent.id)
133+
134+
if (this.debug) {
135+
console.info(`[TelemetryClient#processQueue] Successfully sent event ${queuedEvent.id}`)
136+
}
137+
} else {
138+
// HTTP error - mark as failed
139+
await this.queue.markFailed(queuedEvent.id)
140+
141+
if (this.debug) {
142+
console.error(`[TelemetryClient#processQueue] HTTP error for event ${queuedEvent.id}`)
143+
}
144+
145+
// Stop processing on error to avoid rapid retry loops
146+
break
147+
}
148+
} catch (error) {
149+
// Network or other error - mark as failed and move to end of queue
150+
await this.queue.markFailed(queuedEvent.id)
151+
152+
if (this.debug) {
153+
console.error(`[TelemetryClient#processQueue] Failed to send event ${queuedEvent.id}:`, error)
154+
}
155+
156+
// Stop processing on error to avoid rapid retry loops
157+
break
158+
}
159+
}
160+
} finally {
161+
this.queue.setProcessingState(false)
84162
}
85163
}
86164

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
import * as vscode from "vscode"
2+
import { randomUUID } from "crypto"
3+
import type { RooCodeTelemetryEvent } from "@roo-code/types"
4+
5+
export interface QueuedTelemetryEvent {
6+
id: string
7+
event: RooCodeTelemetryEvent
8+
timestamp: number
9+
retryCount: number
10+
}
11+
12+
export class TelemetryQueue {
13+
private static readonly QUEUE_KEY = "rooCode.telemetryQueue"
14+
private static readonly MAX_QUEUE_SIZE = 1000 // Prevent unbounded growth
15+
private static readonly MAX_RETRY_COUNT = 3 // Limit retries per event
16+
17+
private context: vscode.ExtensionContext
18+
private isProcessing = false
19+
private debug: boolean
20+
21+
constructor(context: vscode.ExtensionContext, debug = false) {
22+
this.context = context
23+
this.debug = debug
24+
}
25+
26+
/**
27+
* Adds a telemetry event to the queue
28+
*/
29+
public async enqueue(event: RooCodeTelemetryEvent): Promise<void> {
30+
const queue = await this.getQueue()
31+
32+
// Prevent unbounded growth
33+
if (queue.length >= TelemetryQueue.MAX_QUEUE_SIZE) {
34+
if (this.debug) {
35+
console.warn(
36+
`[TelemetryQueue] Queue is full (${TelemetryQueue.MAX_QUEUE_SIZE} items), dropping oldest event`,
37+
)
38+
}
39+
queue.shift() // Remove oldest event
40+
}
41+
42+
const queuedEvent: QueuedTelemetryEvent = {
43+
id: randomUUID(),
44+
event,
45+
timestamp: Date.now(),
46+
retryCount: 0,
47+
}
48+
49+
queue.push(queuedEvent)
50+
await this.saveQueue(queue)
51+
52+
if (this.debug) {
53+
console.info(
54+
`[TelemetryQueue] Enqueued event ${queuedEvent.id} (${event.type}), queue size: ${queue.length}`,
55+
)
56+
}
57+
}
58+
59+
/**
60+
* Retrieves the next event from the queue without removing it
61+
*/
62+
public async peek(): Promise<QueuedTelemetryEvent | null> {
63+
const queue = await this.getQueue()
64+
return queue.length > 0 ? queue[0] : null
65+
}
66+
67+
/**
68+
* Removes a successfully sent event from the queue
69+
*/
70+
public async dequeue(eventId: string): Promise<void> {
71+
const queue = await this.getQueue()
72+
const filteredQueue = queue.filter((e) => e.id !== eventId)
73+
74+
if (queue.length !== filteredQueue.length) {
75+
await this.saveQueue(filteredQueue)
76+
if (this.debug) {
77+
console.info(`[TelemetryQueue] Dequeued event ${eventId}, queue size: ${filteredQueue.length}`)
78+
}
79+
}
80+
}
81+
82+
/**
83+
* Increments retry count for a failed event and moves it to the end of the queue
84+
*/
85+
public async markFailed(eventId: string): Promise<void> {
86+
const queue = await this.getQueue()
87+
const eventIndex = queue.findIndex((e) => e.id === eventId)
88+
89+
if (eventIndex === -1) {
90+
return
91+
}
92+
93+
const event = queue[eventIndex]
94+
event.retryCount++
95+
96+
// Remove from current position
97+
queue.splice(eventIndex, 1)
98+
99+
// If max retries not exceeded, add back to end of queue
100+
if (event.retryCount < TelemetryQueue.MAX_RETRY_COUNT) {
101+
queue.push(event)
102+
if (this.debug) {
103+
console.info(
104+
`[TelemetryQueue] Marked event ${eventId} as failed (retry ${event.retryCount}/${TelemetryQueue.MAX_RETRY_COUNT})`,
105+
)
106+
}
107+
} else {
108+
if (this.debug) {
109+
console.warn(`[TelemetryQueue] Event ${eventId} exceeded max retries, removing from queue`)
110+
}
111+
}
112+
113+
await this.saveQueue(queue)
114+
}
115+
116+
/**
117+
* Gets the current queue size
118+
*/
119+
public async size(): Promise<number> {
120+
const queue = await this.getQueue()
121+
return queue.length
122+
}
123+
124+
/**
125+
* Checks if the queue is currently being processed
126+
*/
127+
public isProcessingQueue(): boolean {
128+
return this.isProcessing
129+
}
130+
131+
/**
132+
* Sets the processing state
133+
*/
134+
public setProcessingState(processing: boolean): void {
135+
this.isProcessing = processing
136+
}
137+
138+
/**
139+
* Clears all events from the queue
140+
*/
141+
public async clear(): Promise<void> {
142+
await this.saveQueue([])
143+
if (this.debug) {
144+
console.info("[TelemetryQueue] Queue cleared")
145+
}
146+
}
147+
148+
/**
149+
* Gets all queued events (for testing/debugging)
150+
*/
151+
public async getAll(): Promise<QueuedTelemetryEvent[]> {
152+
return await this.getQueue()
153+
}
154+
155+
private async getQueue(): Promise<QueuedTelemetryEvent[]> {
156+
try {
157+
const queue = this.context.globalState.get<QueuedTelemetryEvent[]>(TelemetryQueue.QUEUE_KEY)
158+
// Validate that we got an array
159+
if (Array.isArray(queue)) {
160+
return queue
161+
}
162+
// If we got corrupted data, try to reset to empty array
163+
if (queue !== undefined) {
164+
console.warn("[TelemetryQueue] Corrupted queue data detected, resetting to empty array")
165+
try {
166+
await this.context.globalState.update(TelemetryQueue.QUEUE_KEY, [])
167+
} catch (updateError) {
168+
// If update fails, just log and continue with empty array
169+
console.error("[TelemetryQueue] Failed to reset corrupted queue:", updateError)
170+
}
171+
}
172+
return []
173+
} catch (error) {
174+
console.error("[TelemetryQueue] Failed to get queue:", error)
175+
return []
176+
}
177+
}
178+
179+
private async saveQueue(queue: QueuedTelemetryEvent[]): Promise<void> {
180+
try {
181+
await this.context.globalState.update(TelemetryQueue.QUEUE_KEY, queue)
182+
} catch (error) {
183+
console.error("[TelemetryQueue] Failed to save queue:", error)
184+
}
185+
}
186+
}

0 commit comments

Comments
 (0)