Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/common/connectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export type TestConnectionManager = ConnectionManager & {

export abstract class ConnectionManager {
protected clientName: string;
protected readonly _events;
protected readonly _events: EventEmitter<ConnectionManagerEvents>;
readonly events: Pick<EventEmitter<ConnectionManagerEvents>, "on" | "off" | "once">;
private state: AnyConnectionState;

Expand Down
1 change: 1 addition & 0 deletions src/common/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export const LogId = {
telemetryMetadataError: mongoLogId(1_002_005),
deviceIdResolutionError: mongoLogId(1_002_006),
deviceIdTimeout: mongoLogId(1_002_007),
telemetryClose: mongoLogId(1_002_008),

toolExecute: mongoLogId(1_003_001),
toolExecuteFailure: mongoLogId(1_003_002),
Expand Down
2 changes: 1 addition & 1 deletion src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ export class Server {
}
}

this.telemetry.emitEvents([event]).catch(() => {});
this.telemetry.emitEvents([event]);
}

private registerTools(): void {
Expand Down
22 changes: 15 additions & 7 deletions src/telemetry/eventCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,18 @@ export class EventCache {
}

/**
* Gets a copy of the currently cached events
* Gets the number of currently cached events
*/
public get size(): number {
return this.cache.size;
}

/**
* Gets a copy of the currently cached events along with their ids
* @returns Array of cached BaseEvent objects
*/
public getEvents(): BaseEvent[] {
return Array.from(this.cache.values());
public getEvents(): { id: number; event: BaseEvent }[] {
return Array.from(this.cache.entries()).map(([id, event]) => ({ id, event }));
}

/**
Expand All @@ -53,10 +60,11 @@ export class EventCache {
}

/**
* Clears all cached events
* Removes cached events by their ids
*/
public clearEvents(): void {
this.cache.clear();
this.nextId = 0;
public removeEvents(ids: number[]): void {
for (const id of ids) {
this.cache.delete(id);
}
}
}
119 changes: 78 additions & 41 deletions src/telemetry/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,25 @@ import { MACHINE_METADATA } from "./constants.js";
import { EventCache } from "./eventCache.js";
import { detectContainerEnv } from "../helpers/container.js";
import type { DeviceId } from "../helpers/deviceId.js";
import { EventEmitter } from "events";

type EventResult = {
success: boolean;
error?: Error;
};

export interface TelemetryEvents {
"events-emitted": [];
"events-send-failed": [];
"events-skipped": [];
}

export class Telemetry {
private isBufferingEvents: boolean = true;
/** Resolves when the setup is complete or a timeout occurs */
public setupPromise: Promise<[string, boolean]> | undefined;
public readonly events: EventEmitter<TelemetryEvents> = new EventEmitter();

private eventCache: EventCache;
private deviceId: DeviceId;

Expand Down Expand Up @@ -57,6 +66,12 @@ export class Telemetry {

private async setup(): Promise<void> {
if (!this.isTelemetryEnabled()) {
this.session.logger.info({
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: "Telemetry is disabled.",
noRedaction: true,
});
return;
}

Expand All @@ -71,34 +86,44 @@ export class Telemetry {

public async close(): Promise<void> {
this.isBufferingEvents = false;
await this.emitEvents(this.eventCache.getEvents());

this.session.logger.debug({
id: LogId.telemetryClose,
message: `Closing telemetry and flushing ${this.eventCache.size} events`,
context: "telemetry",
});

// Wait up to 5 seconds for events to be sent before closing, but don't throw if it times out
const flushTimeout = 5000;
await Promise.race([
new Promise<void>((resolve) => {
const timeout = setTimeout(() => {
this.session.logger.debug({
id: LogId.telemetryClose,
message: `Failed to flush remaining events within ${flushTimeout}ms timeout`,
context: "telemetry",
});
resolve();
}, flushTimeout);
timeout.unref();
}),
this.emit([]),
]);
}

/**
* Emits events through the telemetry pipeline
* @param events - The events to emit
*/
public async emitEvents(events: BaseEvent[]): Promise<void> {
try {
if (!this.isTelemetryEnabled()) {
this.session.logger.info({
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: "Telemetry is disabled.",
noRedaction: true,
});
return;
}

await this.emit(events);
} catch {
this.session.logger.debug({
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: "Error emitting telemetry events.",
noRedaction: true,
});
public emitEvents(events: BaseEvent[]): void {
if (!this.isTelemetryEnabled()) {
this.events.emit("events-skipped");
return;
}

// Don't wait for events to be sent - we should not block regular server
// operations on telemetry
void this.emit(events);
}

/**
Expand Down Expand Up @@ -144,32 +169,44 @@ export class Telemetry {
return;
}

const cachedEvents = this.eventCache.getEvents();
const allEvents = [...cachedEvents, ...events];
try {
const cachedEvents = this.eventCache.getEvents();
const allEvents = [...cachedEvents.map((e) => e.event), ...events];

this.session.logger.debug({
id: LogId.telemetryEmitStart,
context: "telemetry",
message: `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)`,
});
this.session.logger.debug({
id: LogId.telemetryEmitStart,
context: "telemetry",
message: `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)`,
});

const result = await this.sendEvents(this.session.apiClient, allEvents);
if (result.success) {
this.eventCache.removeEvents(cachedEvents.map((e) => e.id));
this.session.logger.debug({
id: LogId.telemetryEmitSuccess,
context: "telemetry",
message: `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents)}`,
});
this.events.emit("events-emitted");
return;
}

const result = await this.sendEvents(this.session.apiClient, allEvents);
if (result.success) {
this.eventCache.clearEvents();
this.session.logger.debug({
id: LogId.telemetryEmitSuccess,
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents, null, 2)}`,
message: `Error sending event to client: ${result.error instanceof Error ? result.error.message : String(result.error)}`,
});
return;
this.eventCache.appendEvents(events);
this.events.emit("events-send-failed");
} catch (error) {
this.session.logger.debug({
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: `Error emitting telemetry events: ${error instanceof Error ? error.message : String(error)}`,
noRedaction: true,
});
this.events.emit("events-send-failed");
}

this.session.logger.debug({
id: LogId.telemetryEmitFailure,
context: "telemetry",
message: `Error sending event to client: ${result.error instanceof Error ? result.error.message : String(result.error)}`,
});
this.eventCache.appendEvents(events);
}

/**
Expand Down
17 changes: 12 additions & 5 deletions src/tools/tool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,14 @@ export abstract class ToolBase {
});

const result = await this.execute(...args);
await this.emitToolEvent(startTime, result, ...args).catch(() => {});
this.emitToolEvent(startTime, result, ...args);

this.session.logger.debug({
id: LogId.toolExecute,
context: "tool",
message: `Executed tool ${this.name}`,
noRedaction: true,
});
return result;
} catch (error: unknown) {
this.session.logger.error({
Expand All @@ -91,7 +98,7 @@ export abstract class ToolBase {
message: `Error executing ${this.name}: ${error as string}`,
});
const toolResult = await this.handleError(error, args[0] as ToolArgs<typeof this.argsShape>);
await this.emitToolEvent(startTime, toolResult, ...args).catch(() => {});
this.emitToolEvent(startTime, toolResult, ...args);
return toolResult;
}
};
Expand Down Expand Up @@ -200,11 +207,11 @@ export abstract class ToolBase {
* @param result - Whether the command succeeded or failed
* @param args - The arguments passed to the tool
*/
private async emitToolEvent(
private emitToolEvent(
startTime: number,
result: CallToolResult,
...args: Parameters<ToolCallback<typeof this.argsShape>>
): Promise<void> {
): void {
if (!this.telemetry.isTelemetryEnabled()) {
return;
}
Expand All @@ -230,7 +237,7 @@ export abstract class ToolBase {
event.properties.project_id = metadata.projectId;
}

await this.telemetry.emitEvents([event]);
this.telemetry.emitEvents([event]);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/transports/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
keepAliveLoop = setInterval(async () => {
try {
this.logger.debug({
server.session.logger.debug({
id: LogId.streamableHttpTransportKeepAlive,
context: "streamableHttpTransport",
message: "Sending ping",
Expand All @@ -138,7 +138,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
} catch (err) {
try {
failedPings++;
this.logger.warning({
server.session.logger.warning({
id: LogId.streamableHttpTransportKeepAliveFailure,
context: "streamableHttpTransport",
message: `Error sending ping (attempt #${failedPings}): ${err instanceof Error ? err.message : String(err)}`,
Expand All @@ -162,7 +162,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
this.logger.error({
id: LogId.streamableHttpTransportSessionCloseFailure,
context: "streamableHttpTransport",
message: `Error closing session: ${error instanceof Error ? error.message : String(error)}`,
message: `Error closing session ${sessionId}: ${error instanceof Error ? error.message : String(error)}`,
});
}
},
Expand Down
Loading
Loading