-
Notifications
You must be signed in to change notification settings - Fork 2.6k
feat: add persistent retry queue for failed telemetry events (#4940) #6567
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
f34c411
5e3838a
448a4f0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -19,6 +19,8 @@ import { CloudSettingsService } from "./CloudSettingsService" | |||||||||||||||||
| import { StaticSettingsService } from "./StaticSettingsService" | ||||||||||||||||||
| import { TelemetryClient } from "./TelemetryClient" | ||||||||||||||||||
| import { ShareService, TaskNotFoundError } from "./ShareService" | ||||||||||||||||||
| import { ConnectionMonitor } from "./ConnectionMonitor" | ||||||||||||||||||
| import { TelemetryQueueManager } from "./TelemetryQueueManager" | ||||||||||||||||||
|
|
||||||||||||||||||
| type AuthStateChangedPayload = CloudServiceEvents["auth-state-changed"][0] | ||||||||||||||||||
| type AuthUserInfoPayload = CloudServiceEvents["user-info"][0] | ||||||||||||||||||
|
|
@@ -35,6 +37,8 @@ export class CloudService extends EventEmitter<CloudServiceEvents> implements vs | |||||||||||||||||
| private settingsService: SettingsService | null = null | ||||||||||||||||||
| private telemetryClient: TelemetryClient | null = null | ||||||||||||||||||
| private shareService: ShareService | null = null | ||||||||||||||||||
| private connectionMonitor: ConnectionMonitor | null = null | ||||||||||||||||||
| private queueManager: TelemetryQueueManager | null = null | ||||||||||||||||||
| private isInitialized = false | ||||||||||||||||||
| private log: (...args: unknown[]) => void | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
@@ -87,9 +91,60 @@ export class CloudService extends EventEmitter<CloudServiceEvents> implements vs | |||||||||||||||||
| this.settingsService = cloudSettingsService | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| this.telemetryClient = new TelemetryClient(this.authService, this.settingsService) | ||||||||||||||||||
| this.telemetryClient = new TelemetryClient(this.authService, this.settingsService, false, this.log) | ||||||||||||||||||
| this.shareService = new ShareService(this.authService, this.settingsService, this.log) | ||||||||||||||||||
|
|
||||||||||||||||||
| // Initialize connection monitor and queue manager | ||||||||||||||||||
| this.connectionMonitor = new ConnectionMonitor() | ||||||||||||||||||
| this.queueManager = TelemetryQueueManager.getInstance() | ||||||||||||||||||
|
|
||||||||||||||||||
| // Check if telemetry queue is enabled | ||||||||||||||||||
| let isQueueEnabled = true | ||||||||||||||||||
| try { | ||||||||||||||||||
| const { ContextProxy } = await import("../../../src/core/config/ContextProxy") | ||||||||||||||||||
| isQueueEnabled = ContextProxy.instance.getValue("telemetryQueueEnabled") ?? true | ||||||||||||||||||
| } catch (_error) { | ||||||||||||||||||
| // Default to enabled if we can't access settings | ||||||||||||||||||
| this.log("[CloudService] Could not access telemetryQueueEnabled setting, defaulting to enabled") | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| if (isQueueEnabled) { | ||||||||||||||||||
| // Set up connection monitoring with debouncing | ||||||||||||||||||
| let connectionRestoredDebounceTimer: NodeJS.Timeout | null = null | ||||||||||||||||||
|
||||||||||||||||||
| const connectionRestoredDebounceDelay = 3000 // 3 seconds | ||||||||||||||||||
|
|
||||||||||||||||||
| this.connectionMonitor.onConnectionRestored(() => { | ||||||||||||||||||
| this.log("[CloudService] Connection restored, scheduling queue processing") | ||||||||||||||||||
|
|
||||||||||||||||||
| // Clear any existing timer | ||||||||||||||||||
| if (connectionRestoredDebounceTimer) { | ||||||||||||||||||
| clearTimeout(connectionRestoredDebounceTimer) | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Schedule queue processing with debounce | ||||||||||||||||||
| connectionRestoredDebounceTimer = setTimeout(() => { | ||||||||||||||||||
| this.queueManager | ||||||||||||||||||
| ?.processQueue() | ||||||||||||||||||
| .then(() => { | ||||||||||||||||||
| this.log( | ||||||||||||||||||
| "[CloudService] Successfully processed queued events after connection restored", | ||||||||||||||||||
| ) | ||||||||||||||||||
| }) | ||||||||||||||||||
| .catch((error) => { | ||||||||||||||||||
| this.log("[CloudService] Error processing queue after connection restored:", error) | ||||||||||||||||||
| // Could implement retry logic here if needed in the future | ||||||||||||||||||
| }) | ||||||||||||||||||
| }, connectionRestoredDebounceDelay) | ||||||||||||||||||
| }) | ||||||||||||||||||
|
|
||||||||||||||||||
| // Start monitoring if authenticated | ||||||||||||||||||
| if (this.authService.isAuthenticated()) { | ||||||||||||||||||
| this.connectionMonitor.startMonitoring() | ||||||||||||||||||
| } | ||||||||||||||||||
| } else { | ||||||||||||||||||
| this.log("[CloudService] Telemetry queue is disabled") | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| try { | ||||||||||||||||||
| TelemetryService.instance.register(this.telemetryClient) | ||||||||||||||||||
| } catch (error) { | ||||||||||||||||||
|
|
@@ -222,6 +277,34 @@ export class CloudService extends EventEmitter<CloudServiceEvents> implements vs | |||||||||||||||||
| return this.shareService!.canShareTask() | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Connection Status | ||||||||||||||||||
|
|
||||||||||||||||||
| public isOnline(): boolean { | ||||||||||||||||||
| this.ensureInitialized() | ||||||||||||||||||
| return this.connectionMonitor?.getConnectionStatus() ?? true | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| public onConnectionRestored(callback: () => void): void { | ||||||||||||||||||
| this.ensureInitialized() | ||||||||||||||||||
| if (this.connectionMonitor) { | ||||||||||||||||||
| this.connectionMonitor.onConnectionRestored(callback) | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| public onConnectionLost(callback: () => void): void { | ||||||||||||||||||
| this.ensureInitialized() | ||||||||||||||||||
| if (this.connectionMonitor) { | ||||||||||||||||||
| this.connectionMonitor.onConnectionLost(callback) | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| public removeConnectionListener(event: "connection-restored" | "connection-lost", callback: () => void): void { | ||||||||||||||||||
| this.ensureInitialized() | ||||||||||||||||||
| if (this.connectionMonitor) { | ||||||||||||||||||
| this.connectionMonitor.removeListener(event, callback) | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Lifecycle | ||||||||||||||||||
|
|
||||||||||||||||||
| public dispose(): void { | ||||||||||||||||||
|
|
@@ -235,6 +318,9 @@ export class CloudService extends EventEmitter<CloudServiceEvents> implements vs | |||||||||||||||||
| } | ||||||||||||||||||
| this.settingsService.dispose() | ||||||||||||||||||
| } | ||||||||||||||||||
| if (this.connectionMonitor) { | ||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Critical: Memory leak - The Consider adding cleanup in the dispose method:
Suggested change
|
||||||||||||||||||
| this.connectionMonitor.dispose() | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| this.isInitialized = false | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,105 @@ | ||||||||||||
| import { EventEmitter } from "events" | ||||||||||||
| import { getRooCodeApiUrl } from "./Config" | ||||||||||||
|
|
||||||||||||
| export class ConnectionMonitor extends EventEmitter { | ||||||||||||
| private isOnline = true | ||||||||||||
| private checkInterval: NodeJS.Timeout | null = null | ||||||||||||
| private readonly healthCheckEndpoint = "/api/health" | ||||||||||||
| private readonly defaultCheckInterval = 30000 // 30 seconds | ||||||||||||
|
|
||||||||||||
| constructor() { | ||||||||||||
| super() | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /** | ||||||||||||
| * Check if the connection to the API is available | ||||||||||||
| */ | ||||||||||||
| public async checkConnection(): Promise<boolean> { | ||||||||||||
| try { | ||||||||||||
| const controller = new AbortController() | ||||||||||||
| const timeoutId = setTimeout(() => controller.abort(), 5000) // 5 second timeout | ||||||||||||
|
||||||||||||
| const timeoutId = setTimeout(() => controller.abort(), 5000) // 5 second timeout | |
| const timeoutId = setTimeout(() => controller.abort(), this.defaultTimeoutMs) // 5 second timeout |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout value is still hardcoded as 5000ms. Consider making this configurable by adding a class constant:
| const timeoutId = setTimeout(() => controller.abort(), 5000) // 5 second timeout | |
| private readonly defaultTimeoutMs = 5000 | |
| const timeoutId = setTimeout(() => controller.abort(), this.defaultTimeoutMs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dynamic import for ContextProxy could fail. Consider adding more robust error handling: