-
Notifications
You must be signed in to change notification settings - Fork 2.6k
feat: implement persistent telemetry event queue with multi-instance support #6576
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
871ed87
afe1cce
1ffa1fa
951feff
b1806e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ import type { SettingsService } from "./SettingsService" | |
| import { CloudSettingsService } from "./CloudSettingsService" | ||
| import { StaticSettingsService } from "./StaticSettingsService" | ||
| import { TelemetryClient } from "./TelemetryClient" | ||
| import { QueuedTelemetryClient, type MultiInstanceConfig, type QueueStatus } from "./queue" | ||
| import { ShareService, TaskNotFoundError } from "./ShareService" | ||
|
|
||
| type AuthStateChangedPayload = CloudServiceEvents["auth-state-changed"][0] | ||
|
|
@@ -34,6 +35,7 @@ export class CloudService extends EventEmitter<CloudServiceEvents> implements vs | |
| private settingsListener: (data: SettingsPayload) => void | ||
| private settingsService: SettingsService | null = null | ||
| private telemetryClient: TelemetryClient | null = null | ||
| private queuedTelemetryClient: QueuedTelemetryClient | null = null | ||
| private shareService: ShareService | null = null | ||
| private isInitialized = false | ||
| private log: (...args: unknown[]) => void | ||
|
|
@@ -88,12 +90,34 @@ export class CloudService extends EventEmitter<CloudServiceEvents> implements vs | |
| } | ||
|
|
||
| this.telemetryClient = new TelemetryClient(this.authService, this.settingsService) | ||
|
|
||
| // Configure multi-instance behavior | ||
| const multiInstanceConfig: MultiInstanceConfig = { | ||
| enabled: true, | ||
| lockDurationMs: 30000, // 30 seconds | ||
| lockCheckIntervalMs: 5000, // 5 seconds | ||
| lockAcquireTimeoutMs: 10000, // 10 seconds | ||
| mode: "compete", // All instances compete for the lock | ||
| } | ||
|
|
||
| // Check for environment variable overrides | ||
| const multiInstanceMode = process.env.ROO_CODE_MULTI_INSTANCE_MODE | ||
| if (multiInstanceMode === "leader" || multiInstanceMode === "disabled") { | ||
| multiInstanceConfig.mode = multiInstanceMode | ||
| } | ||
|
|
||
| this.queuedTelemetryClient = new QueuedTelemetryClient( | ||
| this.telemetryClient, | ||
| this.context, | ||
| false, // debug | ||
| multiInstanceConfig, | ||
| ) | ||
| this.shareService = new ShareService(this.authService, this.settingsService, this.log) | ||
|
|
||
| try { | ||
| TelemetryService.instance.register(this.telemetryClient) | ||
| TelemetryService.instance.register(this.queuedTelemetryClient) | ||
| } catch (error) { | ||
| this.log("[CloudService] Failed to register TelemetryClient:", error) | ||
| this.log("[CloudService] Failed to register QueuedTelemetryClient:", error) | ||
| } | ||
|
|
||
| this.isInitialized = true | ||
|
|
@@ -191,9 +215,9 @@ export class CloudService extends EventEmitter<CloudServiceEvents> implements vs | |
|
|
||
| // TelemetryClient | ||
|
|
||
| public captureEvent(event: TelemetryEvent): void { | ||
| public async captureEvent(event: TelemetryEvent): Promise<void> { | ||
| this.ensureInitialized() | ||
| this.telemetryClient!.capture(event) | ||
| await this.queuedTelemetryClient!.capture(event) | ||
| } | ||
|
|
||
| // ShareService | ||
|
|
@@ -224,7 +248,7 @@ export class CloudService extends EventEmitter<CloudServiceEvents> implements vs | |
|
|
||
| // Lifecycle | ||
|
|
||
| public dispose(): void { | ||
| public async dispose(): Promise<void> { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. VS Code's Disposable interface expects synchronous disposal, but this is now async. Consider handling the async cleanup differently, perhaps with a separate shutdown method that's called before dispose. |
||
| if (this.authService) { | ||
| this.authService.off("auth-state-changed", this.authStateListener) | ||
| this.authService.off("user-info", this.authUserInfoListener) | ||
|
|
@@ -235,6 +259,9 @@ export class CloudService extends EventEmitter<CloudServiceEvents> implements vs | |
| } | ||
| this.settingsService.dispose() | ||
| } | ||
| if (this.queuedTelemetryClient) { | ||
| await this.queuedTelemetryClient.shutdown() | ||
| } | ||
|
|
||
| this.isInitialized = false | ||
| } | ||
|
|
@@ -280,4 +307,64 @@ export class CloudService extends EventEmitter<CloudServiceEvents> implements vs | |
| static isEnabled(): boolean { | ||
| return !!this._instance?.isAuthenticated() | ||
| } | ||
|
|
||
| // Getters for queue integration | ||
| public getAuthService(): AuthService { | ||
| this.ensureInitialized() | ||
| return this.authService! | ||
| } | ||
|
|
||
| public getSettingsService(): SettingsService { | ||
| this.ensureInitialized() | ||
| return this.settingsService! | ||
| } | ||
|
|
||
| public getTelemetryClient(): TelemetryClient { | ||
| this.ensureInitialized() | ||
| return this.telemetryClient! | ||
| } | ||
|
|
||
| public getQueuedTelemetryClient(): QueuedTelemetryClient { | ||
| this.ensureInitialized() | ||
| return this.queuedTelemetryClient! | ||
| } | ||
|
|
||
| /** | ||
| * Process any queued telemetry events | ||
| * Returns the number of events successfully processed | ||
| */ | ||
| public async processQueuedEvents(): Promise<number> { | ||
| this.ensureInitialized() | ||
| return this.queuedTelemetryClient!.processQueue() | ||
| } | ||
|
|
||
| /** | ||
| * Get queue status including multi-instance information | ||
| */ | ||
| public async getQueueStatus(): Promise< | ||
| QueueStatus & { | ||
| instanceInfo?: { | ||
| instanceId: string | ||
| hostname: string | ||
| multiInstanceEnabled: boolean | ||
| multiInstanceMode: string | ||
| } | ||
| } | ||
| > { | ||
| this.ensureInitialized() | ||
| return this.queuedTelemetryClient!.getQueueStatus() | ||
| } | ||
|
|
||
| /** | ||
| * Get multi-instance lock statistics | ||
| */ | ||
| public async getLockStats(): Promise<{ | ||
| hasLock: boolean | ||
| lockHolder?: string | ||
| lockAge?: number | ||
| isExpired?: boolean | ||
| }> { | ||
| this.ensureInitialized() | ||
| return this.queuedTelemetryClient!.getLockStats() | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,2 +1,4 @@ | ||
| export * from "./CloudService" | ||
| export * from "./Config" | ||
| export type { AuthService, AuthServiceEvents, AuthState } from "./auth" | ||
| export * from "./queue" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| import { QueueProcessor, QueuedTelemetryEvent } from "./types" | ||
| import { TelemetryClient } from "../TelemetryClient" | ||
|
|
||
| /** | ||
| * Processes queued telemetry events by sending them to the cloud | ||
| */ | ||
| export class CloudQueueProcessor implements QueueProcessor { | ||
| constructor(private telemetryClient: TelemetryClient) {} | ||
|
|
||
| async process(event: QueuedTelemetryEvent): Promise<boolean> { | ||
| try { | ||
| // Use the telemetry client to send the event | ||
| await this.telemetryClient.capture(event.event) | ||
| // Only log errors, not successes | ||
| return true | ||
| } catch (error) { | ||
| const errorMessage = error instanceof Error ? error.message : String(error) | ||
| console.error(`[CloudQueueProcessor] Failed to process event ${event.id}:`, errorMessage) | ||
|
|
||
| // Store error for debugging | ||
| event.lastError = errorMessage | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mutating the event parameter by setting could cause memory issues if errors accumulate. Consider returning an updated event object instead: |
||
|
|
||
| // Determine if we should retry based on error type | ||
| if (this.isRetryableError(error)) { | ||
| return false | ||
| } | ||
|
|
||
| // Non-retryable error, consider it "processed" to remove from queue | ||
| // Only log actual errors, not warnings about non-retryable errors | ||
| return true | ||
| } | ||
| } | ||
|
|
||
| async isReady(): Promise<boolean> { | ||
| // Always ready - no connection detection per requirements | ||
| return true | ||
| } | ||
|
|
||
| private isRetryableError(error: unknown): boolean { | ||
| const errorMessage = error instanceof Error ? error.message : String(error) | ||
|
|
||
| // Don't retry validation errors | ||
| if (errorMessage.includes("validation") || errorMessage.includes("invalid")) { | ||
| return false | ||
| } | ||
|
|
||
| // Don't retry authentication errors (user needs to re-authenticate) | ||
| if (errorMessage.includes("401") || errorMessage.includes("403") || errorMessage.includes("Unauthorized")) { | ||
| return false | ||
| } | ||
|
|
||
| // Don't retry if the event schema is invalid | ||
| if (errorMessage.includes("Invalid telemetry event")) { | ||
| return false | ||
| } | ||
|
|
||
| // Retry network and server errors | ||
| return true | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider validating the multiInstanceConfig values (e.g., ensuring durations are positive, mode is valid) before using them.