diff --git a/src/common/connectionManager.ts b/src/common/connectionManager.ts index ac7a9b5a..ba7f4cb4 100644 --- a/src/common/connectionManager.ts +++ b/src/common/connectionManager.ts @@ -82,7 +82,7 @@ export type TestConnectionManager = ConnectionManager & { export abstract class ConnectionManager { protected clientName: string; - protected readonly _events; + protected readonly _events: EventEmitter; readonly events: Pick, "on" | "off" | "once">; private state: AnyConnectionState; diff --git a/src/common/logger.ts b/src/common/logger.ts index bba35071..5414fc93 100644 --- a/src/common/logger.ts +++ b/src/common/logger.ts @@ -35,6 +35,7 @@ export const LogId = { telemetryMetadataError: mongoLogId(1_002_005), deviceIdResolutionError: mongoLogId(1_002_006), deviceIdTimeout: mongoLogId(1_002_007), + telemetryClose: mongoLogId(1_002_008), toolExecute: mongoLogId(1_003_001), toolExecuteFailure: mongoLogId(1_003_002), diff --git a/src/server.ts b/src/server.ts index 914a823b..d3cc9dbd 100644 --- a/src/server.ts +++ b/src/server.ts @@ -193,7 +193,7 @@ export class Server { } } - this.telemetry.emitEvents([event]).catch(() => {}); + this.telemetry.emitEvents([event]); } private registerTools(): void { diff --git a/src/telemetry/eventCache.ts b/src/telemetry/eventCache.ts index c7554766..c6411bf5 100644 --- a/src/telemetry/eventCache.ts +++ b/src/telemetry/eventCache.ts @@ -34,11 +34,18 @@ export class EventCache { } /** - * Gets a copy of the currently cached events + * Gets the number of currently cached events + */ + public get size(): number { + return this.cache.size; + } + + /** + * Gets a copy of the currently cached events along with their ids * @returns Array of cached BaseEvent objects */ - public getEvents(): BaseEvent[] { - return Array.from(this.cache.values()); + public getEvents(): { id: number; event: BaseEvent }[] { + return Array.from(this.cache.entries()).map(([id, event]) => ({ id, event })); } /** @@ -53,10 +60,11 @@ export class EventCache { } /** - * Clears all cached events + * Removes cached events by their ids */ - public clearEvents(): void { - this.cache.clear(); - this.nextId = 0; + public removeEvents(ids: number[]): void { + for (const id of ids) { + this.cache.delete(id); + } } } diff --git a/src/telemetry/telemetry.ts b/src/telemetry/telemetry.ts index 339ba419..bdba51a5 100644 --- a/src/telemetry/telemetry.ts +++ b/src/telemetry/telemetry.ts @@ -7,16 +7,25 @@ import { MACHINE_METADATA } from "./constants.js"; import { EventCache } from "./eventCache.js"; import { detectContainerEnv } from "../helpers/container.js"; import type { DeviceId } from "../helpers/deviceId.js"; +import { EventEmitter } from "events"; type EventResult = { success: boolean; error?: Error; }; +export interface TelemetryEvents { + "events-emitted": []; + "events-send-failed": []; + "events-skipped": []; +} + export class Telemetry { private isBufferingEvents: boolean = true; /** Resolves when the setup is complete or a timeout occurs */ public setupPromise: Promise<[string, boolean]> | undefined; + public readonly events: EventEmitter = new EventEmitter(); + private eventCache: EventCache; private deviceId: DeviceId; @@ -57,6 +66,12 @@ export class Telemetry { private async setup(): Promise { if (!this.isTelemetryEnabled()) { + this.session.logger.info({ + id: LogId.telemetryEmitFailure, + context: "telemetry", + message: "Telemetry is disabled.", + noRedaction: true, + }); return; } @@ -71,34 +86,47 @@ export class Telemetry { public async close(): Promise { this.isBufferingEvents = false; - await this.emitEvents(this.eventCache.getEvents()); + + this.session.logger.debug({ + id: LogId.telemetryClose, + message: `Closing telemetry and flushing ${this.eventCache.size} events`, + context: "telemetry", + }); + + // Wait up to 5 seconds for events to be sent before closing, but don't throw if it times out + const flushMaxWaitTime = 5000; + let flushTimeout: NodeJS.Timeout | undefined; + await Promise.race([ + new Promise((resolve) => { + flushTimeout = setTimeout(() => { + this.session.logger.debug({ + id: LogId.telemetryClose, + message: `Failed to flush remaining events within ${flushMaxWaitTime}ms timeout`, + context: "telemetry", + }); + resolve(); + }, flushMaxWaitTime); + flushTimeout.unref(); + }), + this.emit([]), + ]); + + clearTimeout(flushTimeout); } /** * Emits events through the telemetry pipeline * @param events - The events to emit */ - public async emitEvents(events: BaseEvent[]): Promise { - try { - if (!this.isTelemetryEnabled()) { - this.session.logger.info({ - id: LogId.telemetryEmitFailure, - context: "telemetry", - message: "Telemetry is disabled.", - noRedaction: true, - }); - return; - } - - await this.emit(events); - } catch { - this.session.logger.debug({ - id: LogId.telemetryEmitFailure, - context: "telemetry", - message: "Error emitting telemetry events.", - noRedaction: true, - }); + public emitEvents(events: BaseEvent[]): void { + if (!this.isTelemetryEnabled()) { + this.events.emit("events-skipped"); + return; } + + // Don't wait for events to be sent - we should not block regular server + // operations on telemetry + void this.emit(events); } /** @@ -144,32 +172,44 @@ export class Telemetry { return; } - const cachedEvents = this.eventCache.getEvents(); - const allEvents = [...cachedEvents, ...events]; + try { + const cachedEvents = this.eventCache.getEvents(); + const allEvents = [...cachedEvents.map((e) => e.event), ...events]; - this.session.logger.debug({ - id: LogId.telemetryEmitStart, - context: "telemetry", - message: `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)`, - }); + this.session.logger.debug({ + id: LogId.telemetryEmitStart, + context: "telemetry", + message: `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)`, + }); + + const result = await this.sendEvents(this.session.apiClient, allEvents); + if (result.success) { + this.eventCache.removeEvents(cachedEvents.map((e) => e.id)); + this.session.logger.debug({ + id: LogId.telemetryEmitSuccess, + context: "telemetry", + message: `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents)}`, + }); + this.events.emit("events-emitted"); + return; + } - const result = await this.sendEvents(this.session.apiClient, allEvents); - if (result.success) { - this.eventCache.clearEvents(); this.session.logger.debug({ - id: LogId.telemetryEmitSuccess, + id: LogId.telemetryEmitFailure, context: "telemetry", - message: `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents, null, 2)}`, + message: `Error sending event to client: ${result.error instanceof Error ? result.error.message : String(result.error)}`, }); - return; + this.eventCache.appendEvents(events); + this.events.emit("events-send-failed"); + } catch (error) { + this.session.logger.debug({ + id: LogId.telemetryEmitFailure, + context: "telemetry", + message: `Error emitting telemetry events: ${error instanceof Error ? error.message : String(error)}`, + noRedaction: true, + }); + this.events.emit("events-send-failed"); } - - this.session.logger.debug({ - id: LogId.telemetryEmitFailure, - context: "telemetry", - message: `Error sending event to client: ${result.error instanceof Error ? result.error.message : String(result.error)}`, - }); - this.eventCache.appendEvents(events); } /** diff --git a/src/tools/tool.ts b/src/tools/tool.ts index 9a13eada..0115feb0 100644 --- a/src/tools/tool.ts +++ b/src/tools/tool.ts @@ -82,7 +82,14 @@ export abstract class ToolBase { }); const result = await this.execute(...args); - await this.emitToolEvent(startTime, result, ...args).catch(() => {}); + this.emitToolEvent(startTime, result, ...args); + + this.session.logger.debug({ + id: LogId.toolExecute, + context: "tool", + message: `Executed tool ${this.name}`, + noRedaction: true, + }); return result; } catch (error: unknown) { this.session.logger.error({ @@ -91,7 +98,7 @@ export abstract class ToolBase { message: `Error executing ${this.name}: ${error as string}`, }); const toolResult = await this.handleError(error, args[0] as ToolArgs); - await this.emitToolEvent(startTime, toolResult, ...args).catch(() => {}); + this.emitToolEvent(startTime, toolResult, ...args); return toolResult; } }; @@ -200,11 +207,11 @@ export abstract class ToolBase { * @param result - Whether the command succeeded or failed * @param args - The arguments passed to the tool */ - private async emitToolEvent( + private emitToolEvent( startTime: number, result: CallToolResult, ...args: Parameters> - ): Promise { + ): void { if (!this.telemetry.isTelemetryEnabled()) { return; } @@ -230,7 +237,7 @@ export abstract class ToolBase { event.properties.project_id = metadata.projectId; } - await this.telemetry.emitEvents([event]); + this.telemetry.emitEvents([event]); } } diff --git a/src/transports/streamableHttp.ts b/src/transports/streamableHttp.ts index ad04ec73..b3f8f9ad 100644 --- a/src/transports/streamableHttp.ts +++ b/src/transports/streamableHttp.ts @@ -124,7 +124,7 @@ export class StreamableHttpRunner extends TransportRunnerBase { // eslint-disable-next-line @typescript-eslint/no-misused-promises keepAliveLoop = setInterval(async () => { try { - this.logger.debug({ + server.session.logger.debug({ id: LogId.streamableHttpTransportKeepAlive, context: "streamableHttpTransport", message: "Sending ping", @@ -138,7 +138,7 @@ export class StreamableHttpRunner extends TransportRunnerBase { } catch (err) { try { failedPings++; - this.logger.warning({ + server.session.logger.warning({ id: LogId.streamableHttpTransportKeepAliveFailure, context: "streamableHttpTransport", message: `Error sending ping (attempt #${failedPings}): ${err instanceof Error ? err.message : String(err)}`, @@ -162,7 +162,7 @@ export class StreamableHttpRunner extends TransportRunnerBase { this.logger.error({ id: LogId.streamableHttpTransportSessionCloseFailure, context: "streamableHttpTransport", - message: `Error closing session: ${error instanceof Error ? error.message : String(error)}`, + message: `Error closing session ${sessionId}: ${error instanceof Error ? error.message : String(error)}`, }); } }, diff --git a/tests/unit/telemetry.test.ts b/tests/unit/telemetry.test.ts index 51341d56..9db8ea95 100644 --- a/tests/unit/telemetry.test.ts +++ b/tests/unit/telemetry.test.ts @@ -24,8 +24,8 @@ describe("Telemetry", () => { hasCredentials: MockedFunction<() => boolean>; }; let mockEventCache: { - getEvents: MockedFunction<() => BaseEvent[]>; - clearEvents: MockedFunction<() => Promise>; + getEvents: MockedFunction<() => { id: number; event: BaseEvent }[]>; + removeEvents: MockedFunction<(ids: number[]) => Promise>; appendEvents: MockedFunction<(events: BaseEvent[]) => Promise>; }; let session: Session; @@ -61,26 +61,36 @@ describe("Telemetry", () => { }; } + function emitEventsForTest(events: BaseEvent[]): Promise { + return new Promise((resolve) => { + telemetry.events.once("events-emitted", resolve); + telemetry.events.once("events-send-failed", resolve); + telemetry.events.once("events-skipped", resolve); + + telemetry.emitEvents(events); + }); + } + // Helper function to verify mock calls to reduce duplication function verifyMockCalls({ sendEventsCalls = 0, - clearEventsCalls = 0, + removeEventsCalls = 0, appendEventsCalls = 0, sendEventsCalledWith = undefined, appendEventsCalledWith = undefined, }: { sendEventsCalls?: number; - clearEventsCalls?: number; + removeEventsCalls?: number; appendEventsCalls?: number; sendEventsCalledWith?: BaseEvent[] | undefined; appendEventsCalledWith?: BaseEvent[] | undefined; } = {}): void { const { calls: sendEvents } = mockApiClient.sendEvents.mock; - const { calls: clearEvents } = mockEventCache.clearEvents.mock; + const { calls: removeEvents } = mockEventCache.removeEvents.mock; const { calls: appendEvents } = mockEventCache.appendEvents.mock; expect(sendEvents.length).toBe(sendEventsCalls); - expect(clearEvents.length).toBe(clearEventsCalls); + expect(removeEvents.length).toBe(removeEventsCalls); expect(appendEvents.length).toBe(appendEventsCalls); if (sendEventsCalledWith) { @@ -113,7 +123,7 @@ describe("Telemetry", () => { // Setup mocked EventCache mockEventCache = new MockEventCache() as unknown as typeof mockEventCache; mockEventCache.getEvents = vi.fn().mockReturnValue([]); - mockEventCache.clearEvents = vi.fn().mockResolvedValue(undefined); + mockEventCache.removeEvents = vi.fn().mockResolvedValue(undefined); mockEventCache.appendEvents = vi.fn().mockResolvedValue(undefined); MockEventCache.getInstance = vi.fn().mockReturnValue(mockEventCache as unknown as EventCache); @@ -145,11 +155,11 @@ describe("Telemetry", () => { await telemetry.setupPromise; - await telemetry.emitEvents([testEvent]); + await emitEventsForTest([testEvent]); verifyMockCalls({ sendEventsCalls: 1, - clearEventsCalls: 1, + removeEventsCalls: 1, sendEventsCalledWith: [testEvent], }); }); @@ -161,7 +171,7 @@ describe("Telemetry", () => { await telemetry.setupPromise; - await telemetry.emitEvents([testEvent]); + await emitEventsForTest([testEvent]); verifyMockCalls({ sendEventsCalls: 1, @@ -182,15 +192,15 @@ describe("Telemetry", () => { }); // Set up mock to return cached events - mockEventCache.getEvents.mockReturnValueOnce([cachedEvent]); + mockEventCache.getEvents.mockReturnValueOnce([{ id: 0, event: cachedEvent }]); await telemetry.setupPromise; - await telemetry.emitEvents([newEvent]); + await emitEventsForTest([newEvent]); verifyMockCalls({ sendEventsCalls: 1, - clearEventsCalls: 1, + removeEventsCalls: 1, sendEventsCalledWith: [cachedEvent, newEvent], }); }); @@ -223,7 +233,7 @@ describe("Telemetry", () => { const commonProps = telemetry.getCommonProperties(); expect(commonProps.hosting_mode).toBe("vscode-extension"); - await telemetry.emitEvents([createTestEvent()]); + await emitEventsForTest([createTestEvent()]); const calls = mockApiClient.sendEvents.mock.calls; expect(calls).toHaveLength(1); @@ -305,7 +315,7 @@ describe("Telemetry", () => { it("should not send events", async () => { const testEvent = createTestEvent(); - await telemetry.emitEvents([testEvent]); + await emitEventsForTest([testEvent]); verifyMockCalls(); }); @@ -330,7 +340,7 @@ describe("Telemetry", () => { it("should not send events", async () => { const testEvent = createTestEvent(); - await telemetry.emitEvents([testEvent]); + await emitEventsForTest([testEvent]); verifyMockCalls(); });