Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/modern-views-fry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents': patch
---

Added SessionReport and onSessionEnd callback
12 changes: 10 additions & 2 deletions agents/src/ipc/job_proc_lazy_main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ const startJob = (

const ctx = new JobContext(proc, info, room, onConnect, onShutdown, new InfClient());

const task = new Promise<void>(async () => {
const task = (async () => {
const unconnectedTimeout = setTimeout(() => {
if (!(connect || shutdown)) {
logger.warn(
Expand All @@ -109,6 +109,14 @@ const startJob = (
process.send!({ case: 'exiting', value: { reason: close[1] } });
});

// Close the primary agent session if it exists
if (ctx._primaryAgentSession) {
await ctx._primaryAgentSession.close();
}

// Generate and save/upload session report
await ctx._onSessionEnd();

await room.disconnect();
logger.debug('disconnected from room');

Expand All @@ -122,7 +130,7 @@ const startJob = (

process.send!({ case: 'done' });
joinFuture.resolve();
});
})();

return { ctx, task };
};
Expand Down
44 changes: 44 additions & 0 deletions agents/src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import { AsyncLocalStorage } from 'node:async_hooks';
import type { Logger } from 'pino';
import type { InferenceExecutor } from './ipc/inference_executor.js';
import { log } from './log.js';
import type { AgentSession } from './voice/agent_session.js';
import { type SessionReport, createSessionReport } from './voice/report.js';

// AsyncLocalStorage for job context, similar to Python's contextvars
const jobContextStorage = new AsyncLocalStorage<JobContext>();
Expand Down Expand Up @@ -97,6 +99,9 @@ export class JobContext {
#logger: Logger;
#inferenceExecutor: InferenceExecutor;

/** @internal */
_primaryAgentSession?: AgentSession;

private connected: boolean = false;

constructor(
Expand Down Expand Up @@ -232,6 +237,45 @@ export class JobContext {
this.connected = true;
}

makeSessionReport(session?: AgentSession): SessionReport {
const targetSession = session || this._primaryAgentSession;

if (!targetSession) {
throw new Error('Cannot prepare report, no AgentSession was found');
}

// TODO(brian): implement and check recorder io

return createSessionReport({
jobId: this.job.id,
roomId: this.job.room?.sid || '',
room: this.job.room?.name || '',
options: targetSession.options,
events: targetSession._recordedEvents,
enableUserDataTraining: true,
chatHistory: targetSession.history.copy(),
});
}

async _onSessionEnd(): Promise<void> {
const session = this._primaryAgentSession;
if (!session) {
return;
}

const report = this.makeSessionReport(session);

// TODO(brian): Implement CLI/console

// TODO(brian): Implement session report upload to LiveKit Cloud

this.#logger.debug('Session ended, report generated', {
jobId: report.jobId,
roomId: report.roomId,
eventsCount: report.events.length,
});
}

/**
* Gracefully shuts down the job, and runs all shutdown promises.
*
Expand Down
26 changes: 26 additions & 0 deletions agents/src/voice/agent_session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import type { Agent } from './agent.js';
import { AgentActivity } from './agent_activity.js';
import type { _TurnDetector } from './audio_recognition.js';
import {
type AgentEvent,
AgentSessionEventTypes,
type AgentState,
type AgentStateChangedEvent,
Expand Down Expand Up @@ -124,6 +125,9 @@ export class AgentSession<

private closingTask: Promise<void> | null = null;

/** @internal */
_recordedEvents: AgentEvent[] = [];

constructor(opts: AgentSessionOptions<UserData>) {
super();

Expand Down Expand Up @@ -169,6 +173,15 @@ export class AgentSession<
this.options = { ...defaultVoiceOptions, ...voiceOptions };
}

emit<K extends keyof AgentSessionCallbacks>(
event: K,
...args: Parameters<AgentSessionCallbacks[K]>
): boolean {
const eventData = args[0] as AgentEvent;
this._recordedEvents.push(eventData);
return super.emit(event, ...args);
}

get input(): AgentInput {
return this._input;
}
Expand Down Expand Up @@ -198,11 +211,13 @@ export class AgentSession<
room,
inputOptions,
outputOptions,
record = true,
}: {
agent: Agent;
room: Room;
inputOptions?: Partial<RoomInputOptions>;
outputOptions?: Partial<RoomOutputOptions>;
record?: boolean;
}): Promise<void> {
if (this.started) {
return;
Expand Down Expand Up @@ -242,6 +257,17 @@ export class AgentSession<
this.logger.debug('Auto-connecting to room via job context');
tasks.push(ctx.connect());
}

if (record) {
if (ctx._primaryAgentSession === undefined) {
ctx._primaryAgentSession = this;
} else {
throw new Error(
'Only one `AgentSession` can be the primary at a time. If you want to ignore primary designation, use session.start(record=False).',
);
}
}

// TODO(AJS-265): add shutdown callback to job context
tasks.push(this.updateActivity(this.agent));

Expand Down
1 change: 1 addition & 0 deletions agents/src/voice/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ export { AgentSession, type AgentSessionOptions } from './agent_session.js';
export * from './avatar/index.js';
export * from './background_audio.js';
export * from './events.js';
export * from './report.js';
export * from './room_io/index.js';
export { RunContext } from './run_context.js';
72 changes: 72 additions & 0 deletions agents/src/voice/report.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { ChatContext } from '../llm/chat_context.js';
import type { VoiceOptions } from './agent_session.js';
import type { AgentEvent } from './events.js';

export interface SessionReport {
jobId: string;
roomId: string;
room: string;
options: VoiceOptions;
events: AgentEvent[];
chatHistory: ChatContext;
enableUserDataTraining: boolean;
timestamp: number;
}

export interface SessionReportOptions {
jobId: string;
roomId: string;
room: string;
options: VoiceOptions;
events: AgentEvent[];
chatHistory: ChatContext;
enableUserDataTraining?: boolean;
timestamp?: number;
}

export function createSessionReport(opts: SessionReportOptions): SessionReport {
return {
jobId: opts.jobId,
roomId: opts.roomId,
room: opts.room,
options: opts.options,
events: opts.events,
chatHistory: opts.chatHistory,
enableUserDataTraining: opts.enableUserDataTraining ?? false,
timestamp: opts.timestamp ?? Date.now(),
};
}

export function sessionReportToJSON(report: SessionReport): Record<string, unknown> {
const events: Record<string, unknown>[] = [];

for (const event of report.events) {
if (event.type === 'metrics_collected') {
continue; // metrics are too noisy, Cloud is using the chat_history as the source of truth
}

events.push({ ...event });
}

return {
job_id: report.jobId,
room_id: report.roomId,
room: report.room,
events,
options: {
allow_interruptions: report.options.allowInterruptions,
discard_audio_if_uninterruptible: report.options.discardAudioIfUninterruptible,
min_interruption_duration: report.options.minInterruptionDuration,
min_interruption_words: report.options.minInterruptionWords,
min_endpointing_delay: report.options.minEndpointingDelay,
max_endpointing_delay: report.options.maxEndpointingDelay,
max_tool_steps: report.options.maxToolSteps,
},
chat_history: report.chatHistory.toJSON({ excludeTimestamp: false }),
enable_user_data_training: report.enableUserDataTraining,
timestamp: report.timestamp,
};
}