From 4cc2a368b2ab71d089cf57f987841dcabd31a092 Mon Sep 17 00:00:00 2001 From: Nikola Irinchev Date: Fri, 5 Sep 2025 03:20:55 +0300 Subject: [PATCH 1/7] fix don't wait for telemetry events --- src/common/connectionManager.ts | 2 +- src/server.ts | 2 +- src/telemetry/telemetry.ts | 101 ++++++++++++++++++------------- src/tools/tool.ts | 17 ++++-- src/transports/streamableHttp.ts | 6 +- tests/unit/telemetry.test.ts | 22 +++++-- 6 files changed, 93 insertions(+), 57 deletions(-) diff --git a/src/common/connectionManager.ts b/src/common/connectionManager.ts index ac7a9b5a..ba7f4cb4 100644 --- a/src/common/connectionManager.ts +++ b/src/common/connectionManager.ts @@ -82,7 +82,7 @@ export type TestConnectionManager = ConnectionManager & { export abstract class ConnectionManager { protected clientName: string; - protected readonly _events; + protected readonly _events: EventEmitter; readonly events: Pick, "on" | "off" | "once">; private state: AnyConnectionState; diff --git a/src/server.ts b/src/server.ts index 914a823b..d3cc9dbd 100644 --- a/src/server.ts +++ b/src/server.ts @@ -193,7 +193,7 @@ export class Server { } } - this.telemetry.emitEvents([event]).catch(() => {}); + this.telemetry.emitEvents([event]); } private registerTools(): void { diff --git a/src/telemetry/telemetry.ts b/src/telemetry/telemetry.ts index 339ba419..1fe4e449 100644 --- a/src/telemetry/telemetry.ts +++ b/src/telemetry/telemetry.ts @@ -7,16 +7,29 @@ import { MACHINE_METADATA } from "./constants.js"; import { EventCache } from "./eventCache.js"; import { detectContainerEnv } from "../helpers/container.js"; import type { DeviceId } from "../helpers/deviceId.js"; +import { EventEmitter } from "stream"; type EventResult = { success: boolean; error?: Error; }; +async function timeout(promise: Promise, ms: number): Promise { + await Promise.race([new Promise((resolve) => setTimeout(resolve, ms)), promise]); +} + +export interface TelemetryEvents { + "events-emitted": []; + "events-send-failed": []; + "events-skipped": []; +} + export class Telemetry { private isBufferingEvents: boolean = true; /** Resolves when the setup is complete or a timeout occurs */ public setupPromise: Promise<[string, boolean]> | undefined; + public readonly events: EventEmitter = new EventEmitter(); + private eventCache: EventCache; private deviceId: DeviceId; @@ -57,6 +70,12 @@ export class Telemetry { private async setup(): Promise { if (!this.isTelemetryEnabled()) { + this.session.logger.info({ + id: LogId.telemetryEmitFailure, + context: "telemetry", + message: "Telemetry is disabled.", + noRedaction: true, + }); return; } @@ -71,34 +90,22 @@ export class Telemetry { public async close(): Promise { this.isBufferingEvents = false; - await this.emitEvents(this.eventCache.getEvents()); + await timeout(this.emit([]), 5_000); } /** * Emits events through the telemetry pipeline * @param events - The events to emit */ - public async emitEvents(events: BaseEvent[]): Promise { - try { - if (!this.isTelemetryEnabled()) { - this.session.logger.info({ - id: LogId.telemetryEmitFailure, - context: "telemetry", - message: "Telemetry is disabled.", - noRedaction: true, - }); - return; - } - - await this.emit(events); - } catch { - this.session.logger.debug({ - id: LogId.telemetryEmitFailure, - context: "telemetry", - message: "Error emitting telemetry events.", - noRedaction: true, - }); + public emitEvents(events: BaseEvent[]): void { + if (!this.isTelemetryEnabled()) { + this.events.emit("events-skipped"); + return; } + + // Don't wait for events to be sent - we should not block regular server + // operations on telemetry + void this.emit(events); } /** @@ -144,32 +151,44 @@ export class Telemetry { return; } - const cachedEvents = this.eventCache.getEvents(); - const allEvents = [...cachedEvents, ...events]; + try { + const cachedEvents = this.eventCache.getEvents(); + const allEvents = [...cachedEvents, ...events]; - this.session.logger.debug({ - id: LogId.telemetryEmitStart, - context: "telemetry", - message: `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)`, - }); + this.session.logger.debug({ + id: LogId.telemetryEmitStart, + context: "telemetry", + message: `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)`, + }); + + const result = await this.sendEvents(this.session.apiClient, allEvents); + if (result.success) { + this.eventCache.clearEvents(); + this.session.logger.debug({ + id: LogId.telemetryEmitSuccess, + context: "telemetry", + message: `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents, null, 2)}`, + }); + this.events.emit("events-emitted"); + return; + } - const result = await this.sendEvents(this.session.apiClient, allEvents); - if (result.success) { - this.eventCache.clearEvents(); this.session.logger.debug({ - id: LogId.telemetryEmitSuccess, + id: LogId.telemetryEmitFailure, context: "telemetry", - message: `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents, null, 2)}`, + message: `Error sending event to client: ${result.error instanceof Error ? result.error.message : String(result.error)}`, }); - return; + this.eventCache.appendEvents(events); + this.events.emit("events-send-failed"); + } catch (error) { + this.session.logger.debug({ + id: LogId.telemetryEmitFailure, + context: "telemetry", + message: `Error emitting telemetry events: ${error instanceof Error ? error.message : String(error)}`, + noRedaction: true, + }); + this.events.emit("events-send-failed"); } - - this.session.logger.debug({ - id: LogId.telemetryEmitFailure, - context: "telemetry", - message: `Error sending event to client: ${result.error instanceof Error ? result.error.message : String(result.error)}`, - }); - this.eventCache.appendEvents(events); } /** diff --git a/src/tools/tool.ts b/src/tools/tool.ts index 9a13eada..0115feb0 100644 --- a/src/tools/tool.ts +++ b/src/tools/tool.ts @@ -82,7 +82,14 @@ export abstract class ToolBase { }); const result = await this.execute(...args); - await this.emitToolEvent(startTime, result, ...args).catch(() => {}); + this.emitToolEvent(startTime, result, ...args); + + this.session.logger.debug({ + id: LogId.toolExecute, + context: "tool", + message: `Executed tool ${this.name}`, + noRedaction: true, + }); return result; } catch (error: unknown) { this.session.logger.error({ @@ -91,7 +98,7 @@ export abstract class ToolBase { message: `Error executing ${this.name}: ${error as string}`, }); const toolResult = await this.handleError(error, args[0] as ToolArgs); - await this.emitToolEvent(startTime, toolResult, ...args).catch(() => {}); + this.emitToolEvent(startTime, toolResult, ...args); return toolResult; } }; @@ -200,11 +207,11 @@ export abstract class ToolBase { * @param result - Whether the command succeeded or failed * @param args - The arguments passed to the tool */ - private async emitToolEvent( + private emitToolEvent( startTime: number, result: CallToolResult, ...args: Parameters> - ): Promise { + ): void { if (!this.telemetry.isTelemetryEnabled()) { return; } @@ -230,7 +237,7 @@ export abstract class ToolBase { event.properties.project_id = metadata.projectId; } - await this.telemetry.emitEvents([event]); + this.telemetry.emitEvents([event]); } } diff --git a/src/transports/streamableHttp.ts b/src/transports/streamableHttp.ts index ad04ec73..b3f8f9ad 100644 --- a/src/transports/streamableHttp.ts +++ b/src/transports/streamableHttp.ts @@ -124,7 +124,7 @@ export class StreamableHttpRunner extends TransportRunnerBase { // eslint-disable-next-line @typescript-eslint/no-misused-promises keepAliveLoop = setInterval(async () => { try { - this.logger.debug({ + server.session.logger.debug({ id: LogId.streamableHttpTransportKeepAlive, context: "streamableHttpTransport", message: "Sending ping", @@ -138,7 +138,7 @@ export class StreamableHttpRunner extends TransportRunnerBase { } catch (err) { try { failedPings++; - this.logger.warning({ + server.session.logger.warning({ id: LogId.streamableHttpTransportKeepAliveFailure, context: "streamableHttpTransport", message: `Error sending ping (attempt #${failedPings}): ${err instanceof Error ? err.message : String(err)}`, @@ -162,7 +162,7 @@ export class StreamableHttpRunner extends TransportRunnerBase { this.logger.error({ id: LogId.streamableHttpTransportSessionCloseFailure, context: "streamableHttpTransport", - message: `Error closing session: ${error instanceof Error ? error.message : String(error)}`, + message: `Error closing session ${sessionId}: ${error instanceof Error ? error.message : String(error)}`, }); } }, diff --git a/tests/unit/telemetry.test.ts b/tests/unit/telemetry.test.ts index 51341d56..78db6b7c 100644 --- a/tests/unit/telemetry.test.ts +++ b/tests/unit/telemetry.test.ts @@ -61,6 +61,16 @@ describe("Telemetry", () => { }; } + function emitEventsForTest(events: BaseEvent[]): Promise { + return new Promise((resolve) => { + telemetry.events.once("events-emitted", resolve); + telemetry.events.once("events-send-failed", resolve); + telemetry.events.once("events-skipped", resolve); + + telemetry.emitEvents(events); + }); + } + // Helper function to verify mock calls to reduce duplication function verifyMockCalls({ sendEventsCalls = 0, @@ -145,7 +155,7 @@ describe("Telemetry", () => { await telemetry.setupPromise; - await telemetry.emitEvents([testEvent]); + await emitEventsForTest([testEvent]); verifyMockCalls({ sendEventsCalls: 1, @@ -161,7 +171,7 @@ describe("Telemetry", () => { await telemetry.setupPromise; - await telemetry.emitEvents([testEvent]); + await emitEventsForTest([testEvent]); verifyMockCalls({ sendEventsCalls: 1, @@ -186,7 +196,7 @@ describe("Telemetry", () => { await telemetry.setupPromise; - await telemetry.emitEvents([newEvent]); + await emitEventsForTest([newEvent]); verifyMockCalls({ sendEventsCalls: 1, @@ -223,7 +233,7 @@ describe("Telemetry", () => { const commonProps = telemetry.getCommonProperties(); expect(commonProps.hosting_mode).toBe("vscode-extension"); - await telemetry.emitEvents([createTestEvent()]); + await emitEventsForTest([createTestEvent()]); const calls = mockApiClient.sendEvents.mock.calls; expect(calls).toHaveLength(1); @@ -305,7 +315,7 @@ describe("Telemetry", () => { it("should not send events", async () => { const testEvent = createTestEvent(); - await telemetry.emitEvents([testEvent]); + await emitEventsForTest([testEvent]); verifyMockCalls(); }); @@ -330,7 +340,7 @@ describe("Telemetry", () => { it("should not send events", async () => { const testEvent = createTestEvent(); - await telemetry.emitEvents([testEvent]); + await emitEventsForTest([testEvent]); verifyMockCalls(); }); From 724add974d88a5cf57d4cd6d7b755f2f913b1dc5 Mon Sep 17 00:00:00 2001 From: Nikola Irinchev Date: Fri, 5 Sep 2025 10:10:15 +0300 Subject: [PATCH 2/7] fix tests --- src/telemetry/telemetry.ts | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/telemetry/telemetry.ts b/src/telemetry/telemetry.ts index 1fe4e449..b7aced6a 100644 --- a/src/telemetry/telemetry.ts +++ b/src/telemetry/telemetry.ts @@ -7,17 +7,13 @@ import { MACHINE_METADATA } from "./constants.js"; import { EventCache } from "./eventCache.js"; import { detectContainerEnv } from "../helpers/container.js"; import type { DeviceId } from "../helpers/deviceId.js"; -import { EventEmitter } from "stream"; +import { EventEmitter } from "events"; type EventResult = { success: boolean; error?: Error; }; -async function timeout(promise: Promise, ms: number): Promise { - await Promise.race([new Promise((resolve) => setTimeout(resolve, ms)), promise]); -} - export interface TelemetryEvents { "events-emitted": []; "events-send-failed": []; @@ -90,7 +86,9 @@ export class Telemetry { public async close(): Promise { this.isBufferingEvents = false; - await timeout(this.emit([]), 5_000); + + // Wait up to 5 seconds for events to be sent before closing, but don't throw if it times out + await Promise.race([new Promise((resolve) => setTimeout(resolve, 5000)), this.emit([])]); } /** From 30194a584d4db3dba43497c573ca75bdfa7f87df Mon Sep 17 00:00:00 2001 From: Nikola Irinchev Date: Fri, 5 Sep 2025 11:43:02 +0300 Subject: [PATCH 3/7] add logs, fix a race condition with event sending --- src/common/logger.ts | 1 + src/telemetry/eventCache.ts | 22 +++++++++++++++------- src/telemetry/telemetry.ts | 27 +++++++++++++++++++++++---- 3 files changed, 39 insertions(+), 11 deletions(-) diff --git a/src/common/logger.ts b/src/common/logger.ts index bba35071..5414fc93 100644 --- a/src/common/logger.ts +++ b/src/common/logger.ts @@ -35,6 +35,7 @@ export const LogId = { telemetryMetadataError: mongoLogId(1_002_005), deviceIdResolutionError: mongoLogId(1_002_006), deviceIdTimeout: mongoLogId(1_002_007), + telemetryClose: mongoLogId(1_002_008), toolExecute: mongoLogId(1_003_001), toolExecuteFailure: mongoLogId(1_003_002), diff --git a/src/telemetry/eventCache.ts b/src/telemetry/eventCache.ts index c7554766..c6411bf5 100644 --- a/src/telemetry/eventCache.ts +++ b/src/telemetry/eventCache.ts @@ -34,11 +34,18 @@ export class EventCache { } /** - * Gets a copy of the currently cached events + * Gets the number of currently cached events + */ + public get size(): number { + return this.cache.size; + } + + /** + * Gets a copy of the currently cached events along with their ids * @returns Array of cached BaseEvent objects */ - public getEvents(): BaseEvent[] { - return Array.from(this.cache.values()); + public getEvents(): { id: number; event: BaseEvent }[] { + return Array.from(this.cache.entries()).map(([id, event]) => ({ id, event })); } /** @@ -53,10 +60,11 @@ export class EventCache { } /** - * Clears all cached events + * Removes cached events by their ids */ - public clearEvents(): void { - this.cache.clear(); - this.nextId = 0; + public removeEvents(ids: number[]): void { + for (const id of ids) { + this.cache.delete(id); + } } } diff --git a/src/telemetry/telemetry.ts b/src/telemetry/telemetry.ts index b7aced6a..57356c1e 100644 --- a/src/telemetry/telemetry.ts +++ b/src/telemetry/telemetry.ts @@ -87,8 +87,27 @@ export class Telemetry { public async close(): Promise { this.isBufferingEvents = false; + this.session.logger.debug({ + id: LogId.telemetryClose, + message: `Closing telemetry and flushing ${this.eventCache.size} events`, + context: "telemetry", + }); + // Wait up to 5 seconds for events to be sent before closing, but don't throw if it times out - await Promise.race([new Promise((resolve) => setTimeout(resolve, 5000)), this.emit([])]); + const flushTimeout = 5000; + await Promise.race([ + new Promise((resolve) => + setTimeout(() => { + this.session.logger.debug({ + id: LogId.telemetryClose, + message: `Failed to flush remaining events within ${flushTimeout}ms timeout`, + context: "telemetry", + }); + resolve(); + }, flushTimeout) + ), + this.emit([]), + ]); } /** @@ -151,7 +170,7 @@ export class Telemetry { try { const cachedEvents = this.eventCache.getEvents(); - const allEvents = [...cachedEvents, ...events]; + const allEvents = [...cachedEvents.map((e) => e.event), ...events]; this.session.logger.debug({ id: LogId.telemetryEmitStart, @@ -161,11 +180,11 @@ export class Telemetry { const result = await this.sendEvents(this.session.apiClient, allEvents); if (result.success) { - this.eventCache.clearEvents(); + this.eventCache.removeEvents(cachedEvents.map((e) => e.id)); this.session.logger.debug({ id: LogId.telemetryEmitSuccess, context: "telemetry", - message: `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents, null, 2)}`, + message: `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents)}`, }); this.events.emit("events-emitted"); return; From a14e5341ff0cb090030dae0a631acf9b2a13a8d3 Mon Sep 17 00:00:00 2001 From: Bianca Lisle Date: Fri, 5 Sep 2025 13:44:55 +0100 Subject: [PATCH 4/7] fix tests --- tests/unit/telemetry.test.ts | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/unit/telemetry.test.ts b/tests/unit/telemetry.test.ts index 78db6b7c..9db8ea95 100644 --- a/tests/unit/telemetry.test.ts +++ b/tests/unit/telemetry.test.ts @@ -24,8 +24,8 @@ describe("Telemetry", () => { hasCredentials: MockedFunction<() => boolean>; }; let mockEventCache: { - getEvents: MockedFunction<() => BaseEvent[]>; - clearEvents: MockedFunction<() => Promise>; + getEvents: MockedFunction<() => { id: number; event: BaseEvent }[]>; + removeEvents: MockedFunction<(ids: number[]) => Promise>; appendEvents: MockedFunction<(events: BaseEvent[]) => Promise>; }; let session: Session; @@ -74,23 +74,23 @@ describe("Telemetry", () => { // Helper function to verify mock calls to reduce duplication function verifyMockCalls({ sendEventsCalls = 0, - clearEventsCalls = 0, + removeEventsCalls = 0, appendEventsCalls = 0, sendEventsCalledWith = undefined, appendEventsCalledWith = undefined, }: { sendEventsCalls?: number; - clearEventsCalls?: number; + removeEventsCalls?: number; appendEventsCalls?: number; sendEventsCalledWith?: BaseEvent[] | undefined; appendEventsCalledWith?: BaseEvent[] | undefined; } = {}): void { const { calls: sendEvents } = mockApiClient.sendEvents.mock; - const { calls: clearEvents } = mockEventCache.clearEvents.mock; + const { calls: removeEvents } = mockEventCache.removeEvents.mock; const { calls: appendEvents } = mockEventCache.appendEvents.mock; expect(sendEvents.length).toBe(sendEventsCalls); - expect(clearEvents.length).toBe(clearEventsCalls); + expect(removeEvents.length).toBe(removeEventsCalls); expect(appendEvents.length).toBe(appendEventsCalls); if (sendEventsCalledWith) { @@ -123,7 +123,7 @@ describe("Telemetry", () => { // Setup mocked EventCache mockEventCache = new MockEventCache() as unknown as typeof mockEventCache; mockEventCache.getEvents = vi.fn().mockReturnValue([]); - mockEventCache.clearEvents = vi.fn().mockResolvedValue(undefined); + mockEventCache.removeEvents = vi.fn().mockResolvedValue(undefined); mockEventCache.appendEvents = vi.fn().mockResolvedValue(undefined); MockEventCache.getInstance = vi.fn().mockReturnValue(mockEventCache as unknown as EventCache); @@ -159,7 +159,7 @@ describe("Telemetry", () => { verifyMockCalls({ sendEventsCalls: 1, - clearEventsCalls: 1, + removeEventsCalls: 1, sendEventsCalledWith: [testEvent], }); }); @@ -192,7 +192,7 @@ describe("Telemetry", () => { }); // Set up mock to return cached events - mockEventCache.getEvents.mockReturnValueOnce([cachedEvent]); + mockEventCache.getEvents.mockReturnValueOnce([{ id: 0, event: cachedEvent }]); await telemetry.setupPromise; @@ -200,7 +200,7 @@ describe("Telemetry", () => { verifyMockCalls({ sendEventsCalls: 1, - clearEventsCalls: 1, + removeEventsCalls: 1, sendEventsCalledWith: [cachedEvent, newEvent], }); }); From dd80fc1bc011a6956ae7d0b9c673b435d2995e64 Mon Sep 17 00:00:00 2001 From: Bianca Lisle Date: Fri, 5 Sep 2025 14:11:14 +0100 Subject: [PATCH 5/7] address comment: add unref --- src/telemetry/telemetry.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/telemetry/telemetry.ts b/src/telemetry/telemetry.ts index 57356c1e..567ae067 100644 --- a/src/telemetry/telemetry.ts +++ b/src/telemetry/telemetry.ts @@ -96,16 +96,17 @@ export class Telemetry { // Wait up to 5 seconds for events to be sent before closing, but don't throw if it times out const flushTimeout = 5000; await Promise.race([ - new Promise((resolve) => - setTimeout(() => { + new Promise((resolve) => { + const timeout = setTimeout(() => { this.session.logger.debug({ id: LogId.telemetryClose, message: `Failed to flush remaining events within ${flushTimeout}ms timeout`, context: "telemetry", }); resolve(); - }, flushTimeout) - ), + }, flushTimeout); + timeout.unref(); + }), this.emit([]), ]); } From e9c3c7aba2b368a816760ad9419f1e5cfd4b85c9 Mon Sep 17 00:00:00 2001 From: Nikola Irinchev Date: Fri, 5 Sep 2025 18:20:06 +0300 Subject: [PATCH 6/7] Only unref the timeout after the promise has completed --- src/telemetry/telemetry.ts | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/telemetry/telemetry.ts b/src/telemetry/telemetry.ts index 567ae067..92a159c7 100644 --- a/src/telemetry/telemetry.ts +++ b/src/telemetry/telemetry.ts @@ -94,21 +94,23 @@ export class Telemetry { }); // Wait up to 5 seconds for events to be sent before closing, but don't throw if it times out - const flushTimeout = 5000; + const flushMaxWaitTime = 5000; + let flushTimeout: NodeJS.Timeout | undefined; await Promise.race([ new Promise((resolve) => { - const timeout = setTimeout(() => { + flushTimeout = setTimeout(() => { this.session.logger.debug({ id: LogId.telemetryClose, - message: `Failed to flush remaining events within ${flushTimeout}ms timeout`, + message: `Failed to flush remaining events within ${flushMaxWaitTime}ms timeout`, context: "telemetry", }); resolve(); - }, flushTimeout); - timeout.unref(); + }, flushMaxWaitTime); }), this.emit([]), ]); + + flushTimeout?.unref(); } /** From 544cd62ab476235343ac694f605f1b55e9a137ea Mon Sep 17 00:00:00 2001 From: Nikola Irinchev Date: Fri, 5 Sep 2025 18:21:48 +0300 Subject: [PATCH 7/7] clear the timeout after the promise completes --- src/telemetry/telemetry.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/telemetry/telemetry.ts b/src/telemetry/telemetry.ts index 92a159c7..bdba51a5 100644 --- a/src/telemetry/telemetry.ts +++ b/src/telemetry/telemetry.ts @@ -106,11 +106,12 @@ export class Telemetry { }); resolve(); }, flushMaxWaitTime); + flushTimeout.unref(); }), this.emit([]), ]); - flushTimeout?.unref(); + clearTimeout(flushTimeout); } /**