Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/common/connectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export type TestConnectionManager = ConnectionManager & {

export abstract class ConnectionManager {
protected clientName: string;
protected readonly _events;
protected readonly _events: EventEmitter<ConnectionManagerEvents>;
readonly events: Pick<EventEmitter<ConnectionManagerEvents>, "on" | "off" | "once">;
private state: AnyConnectionState;

Expand Down
2 changes: 1 addition & 1 deletion src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ export class Server {
}
}

this.telemetry.emitEvents([event]).catch(() => {});
this.telemetry.emitEvents([event]);
}

private registerTools(): void {
Expand Down
101 changes: 60 additions & 41 deletions src/telemetry/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,29 @@ import { MACHINE_METADATA } from "./constants.js";
import { EventCache } from "./eventCache.js";
import { detectContainerEnv } from "../helpers/container.js";
import type { DeviceId } from "../helpers/deviceId.js";
import { EventEmitter } from "stream";

type EventResult = {
success: boolean;
error?: Error;
};

async function timeout(promise: Promise<unknown>, ms: number): Promise<void> {
await Promise.race([new Promise((resolve) => setTimeout(resolve, ms)), promise]);
Copy link

Copilot AI Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout function doesn't actually enforce a timeout - it just waits for whichever promise resolves first. Consider using Promise.race with a rejection timeout to properly handle cases where the promise takes longer than the specified milliseconds.

Suggested change
async function timeout(promise: Promise<unknown>, ms: number): Promise<void> {
await Promise.race([new Promise((resolve) => setTimeout(resolve, ms)), promise]);
await Promise.race([
promise,
new Promise((_, reject) => setTimeout(() => reject(new Error(`Operation timed out after ${ms} ms`)), ms))
]);

Copilot uses AI. Check for mistakes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suggestion is actually good.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like it. The reason why I chose to resolve the promise is because I don't think close() should throw if we fail to emit the events on time. I'll add a comment explaining that and remove the function as it's just a 1-liner anyway.

Copy link
Collaborator

@kmruiz kmruiz Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this approach is that the timeout is only relevant for waiting, but the actual operation runs in the background (I don't see any usage of an AbortController here). At least the error would show in our logs even if we don't cancel the long-running op.

If you want to keep the resolution as is, I would at least print a log, so timeouts are not invisible. One of the reasons we are implementing this is because we think that telemetry is taking too much time at the end.

Copy link
Collaborator

@gagik gagik Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you check that this timeout isn't going to lead to the process being held up?
adding .unref() https://nodejs.org/api/timers.html#timeoutunref just to be safe would be good to.

}

export interface TelemetryEvents {
"events-emitted": [];
"events-send-failed": [];
"events-skipped": [];
}

export class Telemetry {
private isBufferingEvents: boolean = true;
/** Resolves when the setup is complete or a timeout occurs */
public setupPromise: Promise<[string, boolean]> | undefined;
public readonly events: EventEmitter<TelemetryEvents> = new EventEmitter();

private eventCache: EventCache;
private deviceId: DeviceId;

Expand Down Expand Up @@ -57,6 +70,12 @@ export class Telemetry {

private async setup(): Promise<void> {
if (!this.isTelemetryEnabled()) {
this.session.logger.info({
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: "Telemetry is disabled.",
noRedaction: true,
});
return;
}

Expand All @@ -71,34 +90,22 @@ export class Telemetry {

public async close(): Promise<void> {
this.isBufferingEvents = false;
await this.emitEvents(this.eventCache.getEvents());
await timeout(this.emit([]), 5_000);
Copy link

Copilot AI Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout function is called with an empty array [] instead of the cached events. This should be await timeout(this.emit(this.eventCache.getEvents()), 5_000); to ensure cached events are sent during cleanup.

Suggested change
await timeout(this.emit([]), 5_000);
await timeout(this.emit(this.eventCache.getEvents()), 5_000);

Copilot uses AI. Check for mistakes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong. We're already emitting the cached events in emit, so passing this.eventCache.getEvents() would have made it so they're emitted twice.

}

/**
* Emits events through the telemetry pipeline
* @param events - The events to emit
*/
public async emitEvents(events: BaseEvent[]): Promise<void> {
try {
if (!this.isTelemetryEnabled()) {
this.session.logger.info({
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: "Telemetry is disabled.",
noRedaction: true,
});
return;
}

await this.emit(events);
} catch {
this.session.logger.debug({
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: "Error emitting telemetry events.",
noRedaction: true,
});
public emitEvents(events: BaseEvent[]): void {
if (!this.isTelemetryEnabled()) {
this.events.emit("events-skipped");
return;
}

// Don't wait for events to be sent - we should not block regular server
// operations on telemetry
void this.emit(events);
}

/**
Expand Down Expand Up @@ -144,32 +151,44 @@ export class Telemetry {
return;
}

const cachedEvents = this.eventCache.getEvents();
const allEvents = [...cachedEvents, ...events];
try {
const cachedEvents = this.eventCache.getEvents();
const allEvents = [...cachedEvents, ...events];

this.session.logger.debug({
id: LogId.telemetryEmitStart,
context: "telemetry",
message: `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)`,
});
this.session.logger.debug({
id: LogId.telemetryEmitStart,
context: "telemetry",
message: `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)`,
});

const result = await this.sendEvents(this.session.apiClient, allEvents);
if (result.success) {
this.eventCache.clearEvents();
this.session.logger.debug({
id: LogId.telemetryEmitSuccess,
context: "telemetry",
message: `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents, null, 2)}`,
});
this.events.emit("events-emitted");
return;
}

const result = await this.sendEvents(this.session.apiClient, allEvents);
if (result.success) {
this.eventCache.clearEvents();
this.session.logger.debug({
id: LogId.telemetryEmitSuccess,
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents, null, 2)}`,
message: `Error sending event to client: ${result.error instanceof Error ? result.error.message : String(result.error)}`,
});
return;
this.eventCache.appendEvents(events);
this.events.emit("events-send-failed");
} catch (error) {
this.session.logger.debug({
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: `Error emitting telemetry events: ${error instanceof Error ? error.message : String(error)}`,
noRedaction: true,
});
this.events.emit("events-send-failed");
}

this.session.logger.debug({
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: `Error sending event to client: ${result.error instanceof Error ? result.error.message : String(result.error)}`,
});
this.eventCache.appendEvents(events);
}

/**
Expand Down
17 changes: 12 additions & 5 deletions src/tools/tool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,14 @@ export abstract class ToolBase {
});

const result = await this.execute(...args);
await this.emitToolEvent(startTime, result, ...args).catch(() => {});
this.emitToolEvent(startTime, result, ...args);

this.session.logger.debug({
id: LogId.toolExecute,
context: "tool",
message: `Executed tool ${this.name}`,
noRedaction: true,
});
return result;
} catch (error: unknown) {
this.session.logger.error({
Expand All @@ -91,7 +98,7 @@ export abstract class ToolBase {
message: `Error executing ${this.name}: ${error as string}`,
});
const toolResult = await this.handleError(error, args[0] as ToolArgs<typeof this.argsShape>);
await this.emitToolEvent(startTime, toolResult, ...args).catch(() => {});
this.emitToolEvent(startTime, toolResult, ...args);
return toolResult;
}
};
Expand Down Expand Up @@ -200,11 +207,11 @@ export abstract class ToolBase {
* @param result - Whether the command succeeded or failed
* @param args - The arguments passed to the tool
*/
private async emitToolEvent(
private emitToolEvent(
startTime: number,
result: CallToolResult,
...args: Parameters<ToolCallback<typeof this.argsShape>>
): Promise<void> {
): void {
if (!this.telemetry.isTelemetryEnabled()) {
return;
}
Expand All @@ -230,7 +237,7 @@ export abstract class ToolBase {
event.properties.project_id = metadata.projectId;
}

await this.telemetry.emitEvents([event]);
this.telemetry.emitEvents([event]);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/transports/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
keepAliveLoop = setInterval(async () => {
try {
this.logger.debug({
server.session.logger.debug({
id: LogId.streamableHttpTransportKeepAlive,
context: "streamableHttpTransport",
message: "Sending ping",
Expand All @@ -138,7 +138,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
} catch (err) {
try {
failedPings++;
this.logger.warning({
server.session.logger.warning({
id: LogId.streamableHttpTransportKeepAliveFailure,
context: "streamableHttpTransport",
message: `Error sending ping (attempt #${failedPings}): ${err instanceof Error ? err.message : String(err)}`,
Expand All @@ -162,7 +162,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
this.logger.error({
id: LogId.streamableHttpTransportSessionCloseFailure,
context: "streamableHttpTransport",
message: `Error closing session: ${error instanceof Error ? error.message : String(error)}`,
message: `Error closing session ${sessionId}: ${error instanceof Error ? error.message : String(error)}`,
});
}
},
Expand Down
22 changes: 16 additions & 6 deletions tests/unit/telemetry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ describe("Telemetry", () => {
};
}

function emitEventsForTest(events: BaseEvent[]): Promise<void> {
return new Promise((resolve) => {
telemetry.events.once("events-emitted", resolve);
telemetry.events.once("events-send-failed", resolve);
telemetry.events.once("events-skipped", resolve);

telemetry.emitEvents(events);
});
}

// Helper function to verify mock calls to reduce duplication
function verifyMockCalls({
sendEventsCalls = 0,
Expand Down Expand Up @@ -145,7 +155,7 @@ describe("Telemetry", () => {

await telemetry.setupPromise;

await telemetry.emitEvents([testEvent]);
await emitEventsForTest([testEvent]);

verifyMockCalls({
sendEventsCalls: 1,
Expand All @@ -161,7 +171,7 @@ describe("Telemetry", () => {

await telemetry.setupPromise;

await telemetry.emitEvents([testEvent]);
await emitEventsForTest([testEvent]);

verifyMockCalls({
sendEventsCalls: 1,
Expand All @@ -186,7 +196,7 @@ describe("Telemetry", () => {

await telemetry.setupPromise;

await telemetry.emitEvents([newEvent]);
await emitEventsForTest([newEvent]);

verifyMockCalls({
sendEventsCalls: 1,
Expand Down Expand Up @@ -223,7 +233,7 @@ describe("Telemetry", () => {
const commonProps = telemetry.getCommonProperties();
expect(commonProps.hosting_mode).toBe("vscode-extension");

await telemetry.emitEvents([createTestEvent()]);
await emitEventsForTest([createTestEvent()]);

const calls = mockApiClient.sendEvents.mock.calls;
expect(calls).toHaveLength(1);
Expand Down Expand Up @@ -305,7 +315,7 @@ describe("Telemetry", () => {
it("should not send events", async () => {
const testEvent = createTestEvent();

await telemetry.emitEvents([testEvent]);
await emitEventsForTest([testEvent]);

verifyMockCalls();
});
Expand All @@ -330,7 +340,7 @@ describe("Telemetry", () => {
it("should not send events", async () => {
const testEvent = createTestEvent();

await telemetry.emitEvents([testEvent]);
await emitEventsForTest([testEvent]);

verifyMockCalls();
});
Expand Down
Loading