diff --git a/.changeset/modern-views-fry.md b/.changeset/modern-views-fry.md new file mode 100644 index 000000000..f38492e07 --- /dev/null +++ b/.changeset/modern-views-fry.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents': patch +--- + +Added SessionReport and onSessionEnd callback diff --git a/agents/src/ipc/job_proc_lazy_main.ts b/agents/src/ipc/job_proc_lazy_main.ts index 88cddc199..763128277 100644 --- a/agents/src/ipc/job_proc_lazy_main.ts +++ b/agents/src/ipc/job_proc_lazy_main.ts @@ -88,7 +88,7 @@ const startJob = ( const ctx = new JobContext(proc, info, room, onConnect, onShutdown, new InfClient()); - const task = new Promise(async () => { + const task = (async () => { const unconnectedTimeout = setTimeout(() => { if (!(connect || shutdown)) { logger.warn( @@ -109,6 +109,14 @@ const startJob = ( process.send!({ case: 'exiting', value: { reason: close[1] } }); }); + // Close the primary agent session if it exists + if (ctx._primaryAgentSession) { + await ctx._primaryAgentSession.close(); + } + + // Generate and save/upload session report + await ctx._onSessionEnd(); + await room.disconnect(); logger.debug('disconnected from room'); @@ -122,7 +130,7 @@ const startJob = ( process.send!({ case: 'done' }); joinFuture.resolve(); - }); + })(); return { ctx, task }; }; diff --git a/agents/src/job.ts b/agents/src/job.ts index 702368eb6..63c380dd8 100644 --- a/agents/src/job.ts +++ b/agents/src/job.ts @@ -14,6 +14,8 @@ import { AsyncLocalStorage } from 'node:async_hooks'; import type { Logger } from 'pino'; import type { InferenceExecutor } from './ipc/inference_executor.js'; import { log } from './log.js'; +import type { AgentSession } from './voice/agent_session.js'; +import { type SessionReport, createSessionReport } from './voice/report.js'; // AsyncLocalStorage for job context, similar to Python's contextvars const jobContextStorage = new AsyncLocalStorage(); @@ -97,6 +99,9 @@ export class JobContext { #logger: Logger; #inferenceExecutor: InferenceExecutor; + /** @internal */ + _primaryAgentSession?: AgentSession; + private connected: boolean = false; constructor( @@ -232,6 +237,45 @@ export class JobContext { this.connected = true; } + makeSessionReport(session?: AgentSession): SessionReport { + const targetSession = session || this._primaryAgentSession; + + if (!targetSession) { + throw new Error('Cannot prepare report, no AgentSession was found'); + } + + // TODO(brian): implement and check recorder io + + return createSessionReport({ + jobId: this.job.id, + roomId: this.job.room?.sid || '', + room: this.job.room?.name || '', + options: targetSession.options, + events: targetSession._recordedEvents, + enableUserDataTraining: true, + chatHistory: targetSession.history.copy(), + }); + } + + async _onSessionEnd(): Promise { + const session = this._primaryAgentSession; + if (!session) { + return; + } + + const report = this.makeSessionReport(session); + + // TODO(brian): Implement CLI/console + + // TODO(brian): Implement session report upload to LiveKit Cloud + + this.#logger.debug('Session ended, report generated', { + jobId: report.jobId, + roomId: report.roomId, + eventsCount: report.events.length, + }); + } + /** * Gracefully shuts down the job, and runs all shutdown promises. * diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 758d116c4..0c3438380 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -26,6 +26,7 @@ import type { Agent } from './agent.js'; import { AgentActivity } from './agent_activity.js'; import type { _TurnDetector } from './audio_recognition.js'; import { + type AgentEvent, AgentSessionEventTypes, type AgentState, type AgentStateChangedEvent, @@ -124,6 +125,9 @@ export class AgentSession< private closingTask: Promise | null = null; + /** @internal */ + _recordedEvents: AgentEvent[] = []; + constructor(opts: AgentSessionOptions) { super(); @@ -169,6 +173,15 @@ export class AgentSession< this.options = { ...defaultVoiceOptions, ...voiceOptions }; } + emit( + event: K, + ...args: Parameters + ): boolean { + const eventData = args[0] as AgentEvent; + this._recordedEvents.push(eventData); + return super.emit(event, ...args); + } + get input(): AgentInput { return this._input; } @@ -198,11 +211,13 @@ export class AgentSession< room, inputOptions, outputOptions, + record = true, }: { agent: Agent; room: Room; inputOptions?: Partial; outputOptions?: Partial; + record?: boolean; }): Promise { if (this.started) { return; @@ -242,6 +257,17 @@ export class AgentSession< this.logger.debug('Auto-connecting to room via job context'); tasks.push(ctx.connect()); } + + if (record) { + if (ctx._primaryAgentSession === undefined) { + ctx._primaryAgentSession = this; + } else { + throw new Error( + 'Only one `AgentSession` can be the primary at a time. If you want to ignore primary designation, use session.start(record=False).', + ); + } + } + // TODO(AJS-265): add shutdown callback to job context tasks.push(this.updateActivity(this.agent)); diff --git a/agents/src/voice/index.ts b/agents/src/voice/index.ts index 3507ef19a..decd22bbc 100644 --- a/agents/src/voice/index.ts +++ b/agents/src/voice/index.ts @@ -6,5 +6,6 @@ export { AgentSession, type AgentSessionOptions } from './agent_session.js'; export * from './avatar/index.js'; export * from './background_audio.js'; export * from './events.js'; +export * from './report.js'; export * from './room_io/index.js'; export { RunContext } from './run_context.js'; diff --git a/agents/src/voice/report.ts b/agents/src/voice/report.ts new file mode 100644 index 000000000..1580df29e --- /dev/null +++ b/agents/src/voice/report.ts @@ -0,0 +1,72 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { ChatContext } from '../llm/chat_context.js'; +import type { VoiceOptions } from './agent_session.js'; +import type { AgentEvent } from './events.js'; + +export interface SessionReport { + jobId: string; + roomId: string; + room: string; + options: VoiceOptions; + events: AgentEvent[]; + chatHistory: ChatContext; + enableUserDataTraining: boolean; + timestamp: number; +} + +export interface SessionReportOptions { + jobId: string; + roomId: string; + room: string; + options: VoiceOptions; + events: AgentEvent[]; + chatHistory: ChatContext; + enableUserDataTraining?: boolean; + timestamp?: number; +} + +export function createSessionReport(opts: SessionReportOptions): SessionReport { + return { + jobId: opts.jobId, + roomId: opts.roomId, + room: opts.room, + options: opts.options, + events: opts.events, + chatHistory: opts.chatHistory, + enableUserDataTraining: opts.enableUserDataTraining ?? false, + timestamp: opts.timestamp ?? Date.now(), + }; +} + +export function sessionReportToJSON(report: SessionReport): Record { + const events: Record[] = []; + + for (const event of report.events) { + if (event.type === 'metrics_collected') { + continue; // metrics are too noisy, Cloud is using the chat_history as the source of truth + } + + events.push({ ...event }); + } + + return { + job_id: report.jobId, + room_id: report.roomId, + room: report.room, + events, + options: { + allow_interruptions: report.options.allowInterruptions, + discard_audio_if_uninterruptible: report.options.discardAudioIfUninterruptible, + min_interruption_duration: report.options.minInterruptionDuration, + min_interruption_words: report.options.minInterruptionWords, + min_endpointing_delay: report.options.minEndpointingDelay, + max_endpointing_delay: report.options.maxEndpointingDelay, + max_tool_steps: report.options.maxToolSteps, + }, + chat_history: report.chatHistory.toJSON({ excludeTimestamp: false }), + enable_user_data_training: report.enableUserDataTraining, + timestamp: report.timestamp, + }; +}