Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/common/connectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export type TestConnectionManager = ConnectionManager & {

export abstract class ConnectionManager {
protected clientName: string;
protected readonly _events;
protected readonly _events: EventEmitter<ConnectionManagerEvents>;
readonly events: Pick<EventEmitter<ConnectionManagerEvents>, "on" | "off" | "once">;
private state: AnyConnectionState;

Expand Down
1 change: 1 addition & 0 deletions src/common/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export const LogId = {
telemetryMetadataError: mongoLogId(1_002_005),
deviceIdResolutionError: mongoLogId(1_002_006),
deviceIdTimeout: mongoLogId(1_002_007),
telemetryClose: mongoLogId(1_002_008),

toolExecute: mongoLogId(1_003_001),
toolExecuteFailure: mongoLogId(1_003_002),
Expand Down
2 changes: 1 addition & 1 deletion src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ export class Server {
}
}

this.telemetry.emitEvents([event]).catch(() => {});
this.telemetry.emitEvents([event]);
}

private registerTools(): void {
Expand Down
22 changes: 15 additions & 7 deletions src/telemetry/eventCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,18 @@ export class EventCache {
}

/**
* Gets a copy of the currently cached events
* Gets the number of currently cached events
*/
public get size(): number {
return this.cache.size;
}

/**
* Gets a copy of the currently cached events along with their ids
* @returns Array of cached BaseEvent objects
*/
public getEvents(): BaseEvent[] {
return Array.from(this.cache.values());
public getEvents(): { id: number; event: BaseEvent }[] {
return Array.from(this.cache.entries()).map(([id, event]) => ({ id, event }));
}

/**
Expand All @@ -53,10 +60,11 @@ export class EventCache {
}

/**
* Clears all cached events
* Removes cached events by their ids
*/
public clearEvents(): void {
this.cache.clear();
this.nextId = 0;
public removeEvents(ids: number[]): void {
for (const id of ids) {
this.cache.delete(id);
}
}
}
122 changes: 81 additions & 41 deletions src/telemetry/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,25 @@ import { MACHINE_METADATA } from "./constants.js";
import { EventCache } from "./eventCache.js";
import { detectContainerEnv } from "../helpers/container.js";
import type { DeviceId } from "../helpers/deviceId.js";
import { EventEmitter } from "events";

type EventResult = {
success: boolean;
error?: Error;
};

export interface TelemetryEvents {
"events-emitted": [];
"events-send-failed": [];
"events-skipped": [];
}

export class Telemetry {
private isBufferingEvents: boolean = true;
/** Resolves when the setup is complete or a timeout occurs */
public setupPromise: Promise<[string, boolean]> | undefined;
public readonly events: EventEmitter<TelemetryEvents> = new EventEmitter();

private eventCache: EventCache;
private deviceId: DeviceId;

Expand Down Expand Up @@ -57,6 +66,12 @@ export class Telemetry {

private async setup(): Promise<void> {
if (!this.isTelemetryEnabled()) {
this.session.logger.info({
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: "Telemetry is disabled.",
noRedaction: true,
});
return;
}

Expand All @@ -71,34 +86,47 @@ export class Telemetry {

public async close(): Promise<void> {
this.isBufferingEvents = false;
await this.emitEvents(this.eventCache.getEvents());

this.session.logger.debug({
id: LogId.telemetryClose,
message: `Closing telemetry and flushing ${this.eventCache.size} events`,
context: "telemetry",
});

// Wait up to 5 seconds for events to be sent before closing, but don't throw if it times out
const flushMaxWaitTime = 5000;
let flushTimeout: NodeJS.Timeout | undefined;
await Promise.race([
new Promise<void>((resolve) => {
flushTimeout = setTimeout(() => {
this.session.logger.debug({
id: LogId.telemetryClose,
message: `Failed to flush remaining events within ${flushMaxWaitTime}ms timeout`,
context: "telemetry",
});
resolve();
}, flushMaxWaitTime);
flushTimeout.unref();
}),
this.emit([]),
]);

clearTimeout(flushTimeout);
}

/**
* Emits events through the telemetry pipeline
* @param events - The events to emit
*/
public async emitEvents(events: BaseEvent[]): Promise<void> {
try {
if (!this.isTelemetryEnabled()) {
this.session.logger.info({
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: "Telemetry is disabled.",
noRedaction: true,
});
return;
}

await this.emit(events);
} catch {
this.session.logger.debug({
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: "Error emitting telemetry events.",
noRedaction: true,
});
public emitEvents(events: BaseEvent[]): void {
if (!this.isTelemetryEnabled()) {
this.events.emit("events-skipped");
return;
}

// Don't wait for events to be sent - we should not block regular server
// operations on telemetry
void this.emit(events);
}

/**
Expand Down Expand Up @@ -144,32 +172,44 @@ export class Telemetry {
return;
}

const cachedEvents = this.eventCache.getEvents();
const allEvents = [...cachedEvents, ...events];
try {
const cachedEvents = this.eventCache.getEvents();
const allEvents = [...cachedEvents.map((e) => e.event), ...events];

this.session.logger.debug({
id: LogId.telemetryEmitStart,
context: "telemetry",
message: `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)`,
});
this.session.logger.debug({
id: LogId.telemetryEmitStart,
context: "telemetry",
message: `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)`,
});

const result = await this.sendEvents(this.session.apiClient, allEvents);
if (result.success) {
this.eventCache.removeEvents(cachedEvents.map((e) => e.id));
this.session.logger.debug({
id: LogId.telemetryEmitSuccess,
context: "telemetry",
message: `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents)}`,
});
this.events.emit("events-emitted");
return;
}

const result = await this.sendEvents(this.session.apiClient, allEvents);
if (result.success) {
this.eventCache.clearEvents();
this.session.logger.debug({
id: LogId.telemetryEmitSuccess,
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents, null, 2)}`,
message: `Error sending event to client: ${result.error instanceof Error ? result.error.message : String(result.error)}`,
});
return;
this.eventCache.appendEvents(events);
this.events.emit("events-send-failed");
} catch (error) {
this.session.logger.debug({
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: `Error emitting telemetry events: ${error instanceof Error ? error.message : String(error)}`,
noRedaction: true,
});
this.events.emit("events-send-failed");
}

this.session.logger.debug({
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: `Error sending event to client: ${result.error instanceof Error ? result.error.message : String(result.error)}`,
});
this.eventCache.appendEvents(events);
}

/**
Expand Down
17 changes: 12 additions & 5 deletions src/tools/tool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,14 @@ export abstract class ToolBase {
});

const result = await this.execute(...args);
await this.emitToolEvent(startTime, result, ...args).catch(() => {});
this.emitToolEvent(startTime, result, ...args);

this.session.logger.debug({
id: LogId.toolExecute,
context: "tool",
message: `Executed tool ${this.name}`,
noRedaction: true,
});
return result;
} catch (error: unknown) {
this.session.logger.error({
Expand All @@ -91,7 +98,7 @@ export abstract class ToolBase {
message: `Error executing ${this.name}: ${error as string}`,
});
const toolResult = await this.handleError(error, args[0] as ToolArgs<typeof this.argsShape>);
await this.emitToolEvent(startTime, toolResult, ...args).catch(() => {});
this.emitToolEvent(startTime, toolResult, ...args);
return toolResult;
}
};
Expand Down Expand Up @@ -200,11 +207,11 @@ export abstract class ToolBase {
* @param result - Whether the command succeeded or failed
* @param args - The arguments passed to the tool
*/
private async emitToolEvent(
private emitToolEvent(
startTime: number,
result: CallToolResult,
...args: Parameters<ToolCallback<typeof this.argsShape>>
): Promise<void> {
): void {
if (!this.telemetry.isTelemetryEnabled()) {
return;
}
Expand All @@ -230,7 +237,7 @@ export abstract class ToolBase {
event.properties.project_id = metadata.projectId;
}

await this.telemetry.emitEvents([event]);
this.telemetry.emitEvents([event]);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/transports/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
keepAliveLoop = setInterval(async () => {
try {
this.logger.debug({
server.session.logger.debug({
id: LogId.streamableHttpTransportKeepAlive,
context: "streamableHttpTransport",
message: "Sending ping",
Expand All @@ -138,7 +138,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
} catch (err) {
try {
failedPings++;
this.logger.warning({
server.session.logger.warning({
id: LogId.streamableHttpTransportKeepAliveFailure,
context: "streamableHttpTransport",
message: `Error sending ping (attempt #${failedPings}): ${err instanceof Error ? err.message : String(err)}`,
Expand All @@ -162,7 +162,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
this.logger.error({
id: LogId.streamableHttpTransportSessionCloseFailure,
context: "streamableHttpTransport",
message: `Error closing session: ${error instanceof Error ? error.message : String(error)}`,
message: `Error closing session ${sessionId}: ${error instanceof Error ? error.message : String(error)}`,
});
}
},
Expand Down
Loading
Loading