From ca349a4dd5d21b2041506d8eea84f6d1b8b9d0d7 Mon Sep 17 00:00:00 2001 From: simon Date: Sun, 16 Nov 2025 20:09:47 +0100 Subject: [PATCH 1/5] _waitForGeneration hangs when speech task is canceled before play out --- .gitignore | 1 - agents/src/index.ts | 3 +- agents/src/telemetry/index.ts | 10 -- agents/src/telemetry/trace_types.ts | 88 --------- agents/src/telemetry/traces.ts | 266 ---------------------------- agents/src/telemetry/utils.ts | 61 ------- agents/src/voice/agent_activity.ts | 9 +- agents/src/voice/speech_handle.ts | 3 + 8 files changed, 12 insertions(+), 429 deletions(-) delete mode 100644 agents/src/telemetry/index.ts delete mode 100644 agents/src/telemetry/trace_types.ts delete mode 100644 agents/src/telemetry/traces.ts delete mode 100644 agents/src/telemetry/utils.ts diff --git a/.gitignore b/.gitignore index c44818c95..f3b6301d0 100644 --- a/.gitignore +++ b/.gitignore @@ -192,5 +192,4 @@ agents-js.code-workspace examples/src/test_*.ts # Ignore all markdown files except root README -*.md !README.md \ No newline at end of file diff --git a/agents/src/index.ts b/agents/src/index.ts index a92d4bf3c..bc599b735 100644 --- a/agents/src/index.ts +++ b/agents/src/index.ts @@ -16,7 +16,6 @@ import * as llm from './llm/index.js'; import * as metrics from './metrics/index.js'; import * as stream from './stream/index.js'; import * as stt from './stt/index.js'; -import * as telemetry from './telemetry/index.js'; import * as tokenize from './tokenize/index.js'; import * as tts from './tts/index.js'; import * as voice from './voice/index.js'; @@ -35,4 +34,4 @@ export * from './vad.js'; export * from './version.js'; export * from './worker.js'; -export { cli, inference, ipc, llm, metrics, stream, stt, telemetry, tokenize, tts, voice }; +export { cli, inference, ipc, llm, metrics, stream, stt, tokenize, tts, voice }; diff --git a/agents/src/telemetry/index.ts b/agents/src/telemetry/index.ts deleted file mode 100644 index 0bc6d1c3b..000000000 --- a/agents/src/telemetry/index.ts +++ /dev/null @@ -1,10 +0,0 @@ -// SPDX-FileCopyrightText: 2025 LiveKit, Inc. -// -// SPDX-License-Identifier: Apache-2.0 - -// TODO(brian): PR4 - Add logging integration exports -// TODO(brian): PR5 - Add uploadSessionReport export - -export * as traceTypes from './trace_types.js'; -export { setTracerProvider, setupCloudTracer, tracer, type StartSpanOptions } from './traces.js'; -export { recordException, recordRealtimeMetrics } from './utils.js'; diff --git a/agents/src/telemetry/trace_types.ts b/agents/src/telemetry/trace_types.ts deleted file mode 100644 index db76f7bc1..000000000 --- a/agents/src/telemetry/trace_types.ts +++ /dev/null @@ -1,88 +0,0 @@ -// SPDX-FileCopyrightText: 2025 LiveKit, Inc. -// -// SPDX-License-Identifier: Apache-2.0 - -// LiveKit custom attributes -export const ATTR_SPEECH_ID = 'lk.speech_id'; -export const ATTR_AGENT_LABEL = 'lk.agent_label'; -export const ATTR_START_TIME = 'lk.start_time'; -export const ATTR_END_TIME = 'lk.end_time'; -export const ATTR_RETRY_COUNT = 'lk.retry_count'; - -export const ATTR_PARTICIPANT_ID = 'lk.participant_id'; -export const ATTR_PARTICIPANT_IDENTITY = 'lk.participant_identity'; -export const ATTR_PARTICIPANT_KIND = 'lk.participant_kind'; - -// session start -export const ATTR_JOB_ID = 'lk.job_id'; -export const ATTR_AGENT_NAME = 'lk.agent_name'; -export const ATTR_ROOM_NAME = 'lk.room_name'; -export const ATTR_SESSION_OPTIONS = 'lk.session_options'; - -// assistant turn -export const ATTR_USER_INPUT = 'lk.user_input'; -export const ATTR_INSTRUCTIONS = 'lk.instructions'; -export const ATTR_SPEECH_INTERRUPTED = 'lk.interrupted'; - -// llm node -export const ATTR_CHAT_CTX = 'lk.chat_ctx'; -export const ATTR_FUNCTION_TOOLS = 'lk.function_tools'; -export const ATTR_RESPONSE_TEXT = 'lk.response.text'; -export const ATTR_RESPONSE_FUNCTION_CALLS = 'lk.response.function_calls'; - -// function tool -export const ATTR_FUNCTION_TOOL_NAME = 'lk.function_tool.name'; -export const ATTR_FUNCTION_TOOL_ARGS = 'lk.function_tool.arguments'; -export const ATTR_FUNCTION_TOOL_IS_ERROR = 'lk.function_tool.is_error'; -export const ATTR_FUNCTION_TOOL_OUTPUT = 'lk.function_tool.output'; - -// tts node -export const ATTR_TTS_INPUT_TEXT = 'lk.input_text'; -export const ATTR_TTS_STREAMING = 'lk.tts.streaming'; -export const ATTR_TTS_LABEL = 'lk.tts.label'; - -// eou detection -export const ATTR_EOU_PROBABILITY = 'lk.eou.probability'; -export const ATTR_EOU_UNLIKELY_THRESHOLD = 'lk.eou.unlikely_threshold'; -export const ATTR_EOU_DELAY = 'lk.eou.endpointing_delay'; -export const ATTR_EOU_LANGUAGE = 'lk.eou.language'; -export const ATTR_USER_TRANSCRIPT = 'lk.user_transcript'; -export const ATTR_TRANSCRIPT_CONFIDENCE = 'lk.transcript_confidence'; -export const ATTR_TRANSCRIPTION_DELAY = 'lk.transcription_delay'; -export const ATTR_END_OF_TURN_DELAY = 'lk.end_of_turn_delay'; - -// metrics -export const ATTR_LLM_METRICS = 'lk.llm_metrics'; -export const ATTR_TTS_METRICS = 'lk.tts_metrics'; -export const ATTR_REALTIME_MODEL_METRICS = 'lk.realtime_model_metrics'; - -// OpenTelemetry GenAI attributes -// OpenTelemetry specification: https://opentelemetry.io/docs/specs/semconv/registry/attributes/gen-ai/ -export const ATTR_GEN_AI_OPERATION_NAME = 'gen_ai.operation.name'; -export const ATTR_GEN_AI_REQUEST_MODEL = 'gen_ai.request.model'; -export const ATTR_GEN_AI_USAGE_INPUT_TOKENS = 'gen_ai.usage.input_tokens'; -export const ATTR_GEN_AI_USAGE_OUTPUT_TOKENS = 'gen_ai.usage.output_tokens'; - -// Unofficial OpenTelemetry GenAI attributes, recognized by LangFuse -// https://langfuse.com/integrations/native/opentelemetry#usage -// but not yet in the official OpenTelemetry specification. -export const ATTR_GEN_AI_USAGE_INPUT_TEXT_TOKENS = 'gen_ai.usage.input_text_tokens'; -export const ATTR_GEN_AI_USAGE_INPUT_AUDIO_TOKENS = 'gen_ai.usage.input_audio_tokens'; -export const ATTR_GEN_AI_USAGE_INPUT_CACHED_TOKENS = 'gen_ai.usage.input_cached_tokens'; -export const ATTR_GEN_AI_USAGE_OUTPUT_TEXT_TOKENS = 'gen_ai.usage.output_text_tokens'; -export const ATTR_GEN_AI_USAGE_OUTPUT_AUDIO_TOKENS = 'gen_ai.usage.output_audio_tokens'; - -// OpenTelemetry GenAI event names (for structured logging) -export const EVENT_GEN_AI_SYSTEM_MESSAGE = 'gen_ai.system.message'; -export const EVENT_GEN_AI_USER_MESSAGE = 'gen_ai.user.message'; -export const EVENT_GEN_AI_ASSISTANT_MESSAGE = 'gen_ai.assistant.message'; -export const EVENT_GEN_AI_TOOL_MESSAGE = 'gen_ai.tool.message'; -export const EVENT_GEN_AI_CHOICE = 'gen_ai.choice'; - -// Exception attributes -export const ATTR_EXCEPTION_TRACE = 'exception.stacktrace'; -export const ATTR_EXCEPTION_TYPE = 'exception.type'; -export const ATTR_EXCEPTION_MESSAGE = 'exception.message'; - -// Platform-specific attributes -export const ATTR_LANGFUSE_COMPLETION_START_TIME = 'langfuse.observation.completion_start_time'; diff --git a/agents/src/telemetry/traces.ts b/agents/src/telemetry/traces.ts deleted file mode 100644 index 1c5ba3408..000000000 --- a/agents/src/telemetry/traces.ts +++ /dev/null @@ -1,266 +0,0 @@ -// SPDX-FileCopyrightText: 2025 LiveKit, Inc. -// -// SPDX-License-Identifier: Apache-2.0 -import { - type Attributes, - type Context, - type Span, - type SpanOptions, - type Tracer, - type TracerProvider, - context as otelContext, - trace, -} from '@opentelemetry/api'; -import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http'; -import { CompressionAlgorithm } from '@opentelemetry/otlp-exporter-base'; -import { Resource } from '@opentelemetry/resources'; -import type { ReadableSpan, SpanProcessor } from '@opentelemetry/sdk-trace-base'; -import { BatchSpanProcessor, NodeTracerProvider } from '@opentelemetry/sdk-trace-node'; -import { ATTR_SERVICE_NAME } from '@opentelemetry/semantic-conventions'; -import { AccessToken } from 'livekit-server-sdk'; - -export interface StartSpanOptions { - /** Name of the span */ - name: string; - /** Optional parent context to use for this span */ - context?: Context; - /** Attributes to set on the span when it starts */ - attributes?: Attributes; - /** Whether to end the span when the function exits (default: true) */ - endOnExit?: boolean; -} - -/** - * A dynamic tracer that allows the tracer provider to be changed at runtime. - */ -class DynamicTracer { - private tracerProvider: TracerProvider; - private tracer: Tracer; - private readonly instrumentingModuleName: string; - - constructor(instrumentingModuleName: string) { - this.instrumentingModuleName = instrumentingModuleName; - this.tracerProvider = trace.getTracerProvider(); - this.tracer = trace.getTracer(instrumentingModuleName); - } - - /** - * Set a new tracer provider. This updates the underlying tracer instance. - * @param provider - The new tracer provider to use - */ - setProvider(provider: TracerProvider): void { - this.tracerProvider = provider; - this.tracer = this.tracerProvider.getTracer(this.instrumentingModuleName); - } - - /** - * Get the underlying OpenTelemetry tracer. - * Use this to access the full Tracer API when needed. - */ - getTracer(): Tracer { - return this.tracer; - } - - /** - * Start a span manually (without making it active). - * You must call span.end() when done. - * - * @param options - Span configuration including name - * @returns The created span - */ - startSpan(options: StartSpanOptions): Span { - const ctx = options.context || otelContext.active(); - const span = this.tracer.startSpan( - options.name, - { - attributes: options.attributes, - }, - ctx, - ); - - return span; - } - - /** - * Start a new span and make it active in the current context. - * The span will automatically be ended when the provided function completes (unless endOnExit=false). - * - * @param fn - The function to execute within the span context - * @param options - Span configuration including name - * @returns The result of the provided function - */ - async startActiveSpan(fn: (span: Span) => Promise, options: StartSpanOptions): Promise { - const ctx = options.context || otelContext.active(); - const endOnExit = options.endOnExit === undefined ? true : options.endOnExit; // default true - const opts: SpanOptions = { attributes: options.attributes }; - - return new Promise((resolve, reject) => { - this.tracer.startActiveSpan(options.name, opts, ctx, async (span) => { - try { - const result = await fn(span); - resolve(result); - } catch (error) { - reject(error); - } finally { - if (endOnExit) { - span.end(); - } - } - }); - }); - } - - /** - * Synchronous version of startActiveSpan for non-async operations. - * - * @param fn - The function to execute within the span context - * @param options - Span configuration including name - * @returns The result of the provided function - */ - startActiveSpanSync(fn: (span: Span) => T, options: StartSpanOptions): T { - const ctx = options.context || otelContext.active(); - const endOnExit = options.endOnExit === undefined ? true : options.endOnExit; // default true - const opts: SpanOptions = { attributes: options.attributes }; - - return this.tracer.startActiveSpan(options.name, opts, ctx, (span) => { - try { - return fn(span); - } finally { - if (endOnExit) { - span.end(); - } - } - }); - } -} - -/** - * The global tracer instance used throughout the agents framework. - * This tracer can have its provider updated at runtime via setTracerProvider(). - */ -export const tracer = new DynamicTracer('livekit-agents'); - -class MetadataSpanProcessor implements SpanProcessor { - private metadata: Attributes; - - constructor(metadata: Attributes) { - this.metadata = metadata; - } - - onStart(span: Span, _parentContext: Context): void { - span.setAttributes(this.metadata); - } - - onEnd(_span: ReadableSpan): void {} - - shutdown(): Promise { - return Promise.resolve(); - } - - forceFlush(): Promise { - return Promise.resolve(); - } -} - -// TODO(brian): PR4 - Add MetadataLogProcessor for structured logging - -// TODO(brian): PR4 - Add ExtraDetailsProcessor for structured logging - -/** - * Set the tracer provider for the livekit-agents framework. - * This should be called before agent session start if using custom tracer providers. - * - * @param provider - The tracer provider to use (must be a NodeTracerProvider) - * @param options - Optional configuration with metadata property to inject into all spans - * - * @example - * ```typescript - * import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node'; - * import { setTracerProvider } from '@livekit/agents/telemetry'; - * - * const provider = new NodeTracerProvider(); - * setTracerProvider(provider, { - * metadata: { room_id: 'room123', job_id: 'job456' } - * }); - * ``` - */ -export function setTracerProvider( - provider: NodeTracerProvider, - options?: { metadata?: Attributes }, -): void { - if (options?.metadata) { - provider.addSpanProcessor(new MetadataSpanProcessor(options.metadata)); - } - - tracer.setProvider(provider); -} - -/** - * Setup OpenTelemetry tracer for LiveKit Cloud observability. - * This configures OTLP exporters to send traces to LiveKit Cloud. - * - * @param options - Configuration for cloud tracer with roomId, jobId, and cloudHostname properties - * - * @internal - */ -export async function setupCloudTracer(options: { - roomId: string; - jobId: string; - cloudHostname: string; -}): Promise { - const { roomId, jobId, cloudHostname } = options; - - const apiKey = process.env.LIVEKIT_API_KEY; - const apiSecret = process.env.LIVEKIT_API_SECRET; - - if (!apiKey || !apiSecret) { - throw new Error('LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set for cloud tracing'); - } - - const token = new AccessToken(apiKey, apiSecret, { - identity: 'livekit-agents-telemetry', - ttl: '6h', - }); - token.addObservabilityGrant({ write: true }); - - try { - const jwt = await token.toJwt(); - - const headers = { - Authorization: `Bearer ${jwt}`, - }; - - const metadata: Attributes = { - room_id: roomId, - job_id: jobId, - }; - - const resource = new Resource({ - [ATTR_SERVICE_NAME]: 'livekit-agents', - room_id: roomId, - job_id: jobId, - }); - - // Configure OTLP exporter to send traces to LiveKit Cloud - const spanExporter = new OTLPTraceExporter({ - url: `https://${cloudHostname}/observability/traces/otlp/v0`, - headers, - compression: CompressionAlgorithm.GZIP, - }); - - const tracerProvider = new NodeTracerProvider({ - resource, - spanProcessors: [new MetadataSpanProcessor(metadata), new BatchSpanProcessor(spanExporter)], - }); - tracerProvider.register(); - - // Metadata processor is already configured in the constructor above - setTracerProvider(tracerProvider); - - // TODO(brian): PR4 - Add logger provider setup here for structured logging - // Similar to Python's setup: LoggerProvider, OTLPLogExporter, BatchLogRecordProcessor - } catch (error) { - console.error('Failed to setup cloud tracer:', error); - throw error; - } -} diff --git a/agents/src/telemetry/utils.ts b/agents/src/telemetry/utils.ts deleted file mode 100644 index 4eba4f043..000000000 --- a/agents/src/telemetry/utils.ts +++ /dev/null @@ -1,61 +0,0 @@ -// SPDX-FileCopyrightText: 2025 LiveKit, Inc. -// -// SPDX-License-Identifier: Apache-2.0 -import { type Span, SpanStatusCode, context as otelContext, trace } from '@opentelemetry/api'; -import type { RealtimeModelMetrics } from '../metrics/base.js'; -import * as traceTypes from './trace_types.js'; -import { tracer } from './traces.js'; - -export function recordException(span: Span, error: Error): void { - span.recordException(error); - span.setStatus({ - code: SpanStatusCode.ERROR, - message: error.message, - }); - - // Set exception attributes for better visibility - // (in case the exception event is not rendered by the backend) - span.setAttributes({ - [traceTypes.ATTR_EXCEPTION_TYPE]: error.constructor.name, - [traceTypes.ATTR_EXCEPTION_MESSAGE]: error.message, - [traceTypes.ATTR_EXCEPTION_TRACE]: error.stack || '', - }); -} - -export function recordRealtimeMetrics(span: Span, metrics: RealtimeModelMetrics): void { - const attrs: Record = { - [traceTypes.ATTR_GEN_AI_REQUEST_MODEL]: metrics.label || 'unknown', - [traceTypes.ATTR_REALTIME_MODEL_METRICS]: JSON.stringify(metrics), - [traceTypes.ATTR_GEN_AI_USAGE_INPUT_TOKENS]: metrics.inputTokens, - [traceTypes.ATTR_GEN_AI_USAGE_OUTPUT_TOKENS]: metrics.outputTokens, - [traceTypes.ATTR_GEN_AI_USAGE_INPUT_TEXT_TOKENS]: metrics.inputTokenDetails.textTokens, - [traceTypes.ATTR_GEN_AI_USAGE_INPUT_AUDIO_TOKENS]: metrics.inputTokenDetails.audioTokens, - [traceTypes.ATTR_GEN_AI_USAGE_INPUT_CACHED_TOKENS]: metrics.inputTokenDetails.cachedTokens, - [traceTypes.ATTR_GEN_AI_USAGE_OUTPUT_TEXT_TOKENS]: metrics.outputTokenDetails.textTokens, - [traceTypes.ATTR_GEN_AI_USAGE_OUTPUT_AUDIO_TOKENS]: metrics.outputTokenDetails.audioTokens, - }; - - // Add LangFuse-specific completion start time if TTFT is available - if (metrics.ttftMs !== undefined && metrics.ttftMs !== -1) { - const completionStartTime = metrics.timestamp + metrics.ttftMs; - // Convert to UTC ISO string for LangFuse compatibility - const completionStartTimeUtc = new Date(completionStartTime).toISOString(); - attrs[traceTypes.ATTR_LANGFUSE_COMPLETION_START_TIME] = completionStartTimeUtc; - } - - if (span.isRecording()) { - span.setAttributes(attrs); - } else { - const currentContext = otelContext.active(); - const spanContext = trace.setSpan(currentContext, span); - - // Create a dedicated child span for orphaned metrics - tracer.getTracer().startActiveSpan('realtime_metrics', {}, spanContext, (child) => { - try { - child.setAttributes(attrs); - } finally { - child.end(); - } - }); - } -} diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 7b4b53ce4..b4c01bf8f 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -848,7 +848,14 @@ export class AgentActivity implements RecognitionHooks { const speechHandle = heapItem[2]; this._currentSpeech = speechHandle; speechHandle._authorizeGeneration(); - await speechHandle._waitForGeneration(); + try { + await Promise.race([ + speechHandle._waitForGeneration(), + new Promise((_, reject) => setTimeout(() => reject('timeout after 30 seconds'), 30000)), + ]); + } catch (err) { + console.error('wait for generation failed', err, speechHandle); + } this._currentSpeech = undefined; } diff --git a/agents/src/voice/speech_handle.ts b/agents/src/voice/speech_handle.ts index 5e4a55e94..58ae71dba 100644 --- a/agents/src/voice/speech_handle.ts +++ b/agents/src/voice/speech_handle.ts @@ -165,6 +165,9 @@ export class SpeechHandle { /** @internal */ _authorizeGeneration(): void { + if (this.interruptFut.done) { + return; + } const fut = new Future(); this.generations.push(fut); this.authorizedEvent.set(); From a4e78cd8a994766d5c9393275c4589ea44a4177f Mon Sep 17 00:00:00 2001 From: simon Date: Mon, 17 Nov 2025 12:43:02 +0100 Subject: [PATCH 2/5] revert to main --- .gitignore | 1 + agents/src/index.ts | 3 +- agents/src/telemetry/index.ts | 10 ++ agents/src/telemetry/trace_types.ts | 88 +++++++++ agents/src/telemetry/traces.ts | 266 ++++++++++++++++++++++++++++ agents/src/telemetry/utils.ts | 61 +++++++ 6 files changed, 428 insertions(+), 1 deletion(-) create mode 100644 agents/src/telemetry/index.ts create mode 100644 agents/src/telemetry/trace_types.ts create mode 100644 agents/src/telemetry/traces.ts create mode 100644 agents/src/telemetry/utils.ts diff --git a/.gitignore b/.gitignore index f3b6301d0..c44818c95 100644 --- a/.gitignore +++ b/.gitignore @@ -192,4 +192,5 @@ agents-js.code-workspace examples/src/test_*.ts # Ignore all markdown files except root README +*.md !README.md \ No newline at end of file diff --git a/agents/src/index.ts b/agents/src/index.ts index bc599b735..a92d4bf3c 100644 --- a/agents/src/index.ts +++ b/agents/src/index.ts @@ -16,6 +16,7 @@ import * as llm from './llm/index.js'; import * as metrics from './metrics/index.js'; import * as stream from './stream/index.js'; import * as stt from './stt/index.js'; +import * as telemetry from './telemetry/index.js'; import * as tokenize from './tokenize/index.js'; import * as tts from './tts/index.js'; import * as voice from './voice/index.js'; @@ -34,4 +35,4 @@ export * from './vad.js'; export * from './version.js'; export * from './worker.js'; -export { cli, inference, ipc, llm, metrics, stream, stt, tokenize, tts, voice }; +export { cli, inference, ipc, llm, metrics, stream, stt, telemetry, tokenize, tts, voice }; diff --git a/agents/src/telemetry/index.ts b/agents/src/telemetry/index.ts new file mode 100644 index 000000000..0bc6d1c3b --- /dev/null +++ b/agents/src/telemetry/index.ts @@ -0,0 +1,10 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +// TODO(brian): PR4 - Add logging integration exports +// TODO(brian): PR5 - Add uploadSessionReport export + +export * as traceTypes from './trace_types.js'; +export { setTracerProvider, setupCloudTracer, tracer, type StartSpanOptions } from './traces.js'; +export { recordException, recordRealtimeMetrics } from './utils.js'; diff --git a/agents/src/telemetry/trace_types.ts b/agents/src/telemetry/trace_types.ts new file mode 100644 index 000000000..db76f7bc1 --- /dev/null +++ b/agents/src/telemetry/trace_types.ts @@ -0,0 +1,88 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +// LiveKit custom attributes +export const ATTR_SPEECH_ID = 'lk.speech_id'; +export const ATTR_AGENT_LABEL = 'lk.agent_label'; +export const ATTR_START_TIME = 'lk.start_time'; +export const ATTR_END_TIME = 'lk.end_time'; +export const ATTR_RETRY_COUNT = 'lk.retry_count'; + +export const ATTR_PARTICIPANT_ID = 'lk.participant_id'; +export const ATTR_PARTICIPANT_IDENTITY = 'lk.participant_identity'; +export const ATTR_PARTICIPANT_KIND = 'lk.participant_kind'; + +// session start +export const ATTR_JOB_ID = 'lk.job_id'; +export const ATTR_AGENT_NAME = 'lk.agent_name'; +export const ATTR_ROOM_NAME = 'lk.room_name'; +export const ATTR_SESSION_OPTIONS = 'lk.session_options'; + +// assistant turn +export const ATTR_USER_INPUT = 'lk.user_input'; +export const ATTR_INSTRUCTIONS = 'lk.instructions'; +export const ATTR_SPEECH_INTERRUPTED = 'lk.interrupted'; + +// llm node +export const ATTR_CHAT_CTX = 'lk.chat_ctx'; +export const ATTR_FUNCTION_TOOLS = 'lk.function_tools'; +export const ATTR_RESPONSE_TEXT = 'lk.response.text'; +export const ATTR_RESPONSE_FUNCTION_CALLS = 'lk.response.function_calls'; + +// function tool +export const ATTR_FUNCTION_TOOL_NAME = 'lk.function_tool.name'; +export const ATTR_FUNCTION_TOOL_ARGS = 'lk.function_tool.arguments'; +export const ATTR_FUNCTION_TOOL_IS_ERROR = 'lk.function_tool.is_error'; +export const ATTR_FUNCTION_TOOL_OUTPUT = 'lk.function_tool.output'; + +// tts node +export const ATTR_TTS_INPUT_TEXT = 'lk.input_text'; +export const ATTR_TTS_STREAMING = 'lk.tts.streaming'; +export const ATTR_TTS_LABEL = 'lk.tts.label'; + +// eou detection +export const ATTR_EOU_PROBABILITY = 'lk.eou.probability'; +export const ATTR_EOU_UNLIKELY_THRESHOLD = 'lk.eou.unlikely_threshold'; +export const ATTR_EOU_DELAY = 'lk.eou.endpointing_delay'; +export const ATTR_EOU_LANGUAGE = 'lk.eou.language'; +export const ATTR_USER_TRANSCRIPT = 'lk.user_transcript'; +export const ATTR_TRANSCRIPT_CONFIDENCE = 'lk.transcript_confidence'; +export const ATTR_TRANSCRIPTION_DELAY = 'lk.transcription_delay'; +export const ATTR_END_OF_TURN_DELAY = 'lk.end_of_turn_delay'; + +// metrics +export const ATTR_LLM_METRICS = 'lk.llm_metrics'; +export const ATTR_TTS_METRICS = 'lk.tts_metrics'; +export const ATTR_REALTIME_MODEL_METRICS = 'lk.realtime_model_metrics'; + +// OpenTelemetry GenAI attributes +// OpenTelemetry specification: https://opentelemetry.io/docs/specs/semconv/registry/attributes/gen-ai/ +export const ATTR_GEN_AI_OPERATION_NAME = 'gen_ai.operation.name'; +export const ATTR_GEN_AI_REQUEST_MODEL = 'gen_ai.request.model'; +export const ATTR_GEN_AI_USAGE_INPUT_TOKENS = 'gen_ai.usage.input_tokens'; +export const ATTR_GEN_AI_USAGE_OUTPUT_TOKENS = 'gen_ai.usage.output_tokens'; + +// Unofficial OpenTelemetry GenAI attributes, recognized by LangFuse +// https://langfuse.com/integrations/native/opentelemetry#usage +// but not yet in the official OpenTelemetry specification. +export const ATTR_GEN_AI_USAGE_INPUT_TEXT_TOKENS = 'gen_ai.usage.input_text_tokens'; +export const ATTR_GEN_AI_USAGE_INPUT_AUDIO_TOKENS = 'gen_ai.usage.input_audio_tokens'; +export const ATTR_GEN_AI_USAGE_INPUT_CACHED_TOKENS = 'gen_ai.usage.input_cached_tokens'; +export const ATTR_GEN_AI_USAGE_OUTPUT_TEXT_TOKENS = 'gen_ai.usage.output_text_tokens'; +export const ATTR_GEN_AI_USAGE_OUTPUT_AUDIO_TOKENS = 'gen_ai.usage.output_audio_tokens'; + +// OpenTelemetry GenAI event names (for structured logging) +export const EVENT_GEN_AI_SYSTEM_MESSAGE = 'gen_ai.system.message'; +export const EVENT_GEN_AI_USER_MESSAGE = 'gen_ai.user.message'; +export const EVENT_GEN_AI_ASSISTANT_MESSAGE = 'gen_ai.assistant.message'; +export const EVENT_GEN_AI_TOOL_MESSAGE = 'gen_ai.tool.message'; +export const EVENT_GEN_AI_CHOICE = 'gen_ai.choice'; + +// Exception attributes +export const ATTR_EXCEPTION_TRACE = 'exception.stacktrace'; +export const ATTR_EXCEPTION_TYPE = 'exception.type'; +export const ATTR_EXCEPTION_MESSAGE = 'exception.message'; + +// Platform-specific attributes +export const ATTR_LANGFUSE_COMPLETION_START_TIME = 'langfuse.observation.completion_start_time'; diff --git a/agents/src/telemetry/traces.ts b/agents/src/telemetry/traces.ts new file mode 100644 index 000000000..1c5ba3408 --- /dev/null +++ b/agents/src/telemetry/traces.ts @@ -0,0 +1,266 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { + type Attributes, + type Context, + type Span, + type SpanOptions, + type Tracer, + type TracerProvider, + context as otelContext, + trace, +} from '@opentelemetry/api'; +import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http'; +import { CompressionAlgorithm } from '@opentelemetry/otlp-exporter-base'; +import { Resource } from '@opentelemetry/resources'; +import type { ReadableSpan, SpanProcessor } from '@opentelemetry/sdk-trace-base'; +import { BatchSpanProcessor, NodeTracerProvider } from '@opentelemetry/sdk-trace-node'; +import { ATTR_SERVICE_NAME } from '@opentelemetry/semantic-conventions'; +import { AccessToken } from 'livekit-server-sdk'; + +export interface StartSpanOptions { + /** Name of the span */ + name: string; + /** Optional parent context to use for this span */ + context?: Context; + /** Attributes to set on the span when it starts */ + attributes?: Attributes; + /** Whether to end the span when the function exits (default: true) */ + endOnExit?: boolean; +} + +/** + * A dynamic tracer that allows the tracer provider to be changed at runtime. + */ +class DynamicTracer { + private tracerProvider: TracerProvider; + private tracer: Tracer; + private readonly instrumentingModuleName: string; + + constructor(instrumentingModuleName: string) { + this.instrumentingModuleName = instrumentingModuleName; + this.tracerProvider = trace.getTracerProvider(); + this.tracer = trace.getTracer(instrumentingModuleName); + } + + /** + * Set a new tracer provider. This updates the underlying tracer instance. + * @param provider - The new tracer provider to use + */ + setProvider(provider: TracerProvider): void { + this.tracerProvider = provider; + this.tracer = this.tracerProvider.getTracer(this.instrumentingModuleName); + } + + /** + * Get the underlying OpenTelemetry tracer. + * Use this to access the full Tracer API when needed. + */ + getTracer(): Tracer { + return this.tracer; + } + + /** + * Start a span manually (without making it active). + * You must call span.end() when done. + * + * @param options - Span configuration including name + * @returns The created span + */ + startSpan(options: StartSpanOptions): Span { + const ctx = options.context || otelContext.active(); + const span = this.tracer.startSpan( + options.name, + { + attributes: options.attributes, + }, + ctx, + ); + + return span; + } + + /** + * Start a new span and make it active in the current context. + * The span will automatically be ended when the provided function completes (unless endOnExit=false). + * + * @param fn - The function to execute within the span context + * @param options - Span configuration including name + * @returns The result of the provided function + */ + async startActiveSpan(fn: (span: Span) => Promise, options: StartSpanOptions): Promise { + const ctx = options.context || otelContext.active(); + const endOnExit = options.endOnExit === undefined ? true : options.endOnExit; // default true + const opts: SpanOptions = { attributes: options.attributes }; + + return new Promise((resolve, reject) => { + this.tracer.startActiveSpan(options.name, opts, ctx, async (span) => { + try { + const result = await fn(span); + resolve(result); + } catch (error) { + reject(error); + } finally { + if (endOnExit) { + span.end(); + } + } + }); + }); + } + + /** + * Synchronous version of startActiveSpan for non-async operations. + * + * @param fn - The function to execute within the span context + * @param options - Span configuration including name + * @returns The result of the provided function + */ + startActiveSpanSync(fn: (span: Span) => T, options: StartSpanOptions): T { + const ctx = options.context || otelContext.active(); + const endOnExit = options.endOnExit === undefined ? true : options.endOnExit; // default true + const opts: SpanOptions = { attributes: options.attributes }; + + return this.tracer.startActiveSpan(options.name, opts, ctx, (span) => { + try { + return fn(span); + } finally { + if (endOnExit) { + span.end(); + } + } + }); + } +} + +/** + * The global tracer instance used throughout the agents framework. + * This tracer can have its provider updated at runtime via setTracerProvider(). + */ +export const tracer = new DynamicTracer('livekit-agents'); + +class MetadataSpanProcessor implements SpanProcessor { + private metadata: Attributes; + + constructor(metadata: Attributes) { + this.metadata = metadata; + } + + onStart(span: Span, _parentContext: Context): void { + span.setAttributes(this.metadata); + } + + onEnd(_span: ReadableSpan): void {} + + shutdown(): Promise { + return Promise.resolve(); + } + + forceFlush(): Promise { + return Promise.resolve(); + } +} + +// TODO(brian): PR4 - Add MetadataLogProcessor for structured logging + +// TODO(brian): PR4 - Add ExtraDetailsProcessor for structured logging + +/** + * Set the tracer provider for the livekit-agents framework. + * This should be called before agent session start if using custom tracer providers. + * + * @param provider - The tracer provider to use (must be a NodeTracerProvider) + * @param options - Optional configuration with metadata property to inject into all spans + * + * @example + * ```typescript + * import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node'; + * import { setTracerProvider } from '@livekit/agents/telemetry'; + * + * const provider = new NodeTracerProvider(); + * setTracerProvider(provider, { + * metadata: { room_id: 'room123', job_id: 'job456' } + * }); + * ``` + */ +export function setTracerProvider( + provider: NodeTracerProvider, + options?: { metadata?: Attributes }, +): void { + if (options?.metadata) { + provider.addSpanProcessor(new MetadataSpanProcessor(options.metadata)); + } + + tracer.setProvider(provider); +} + +/** + * Setup OpenTelemetry tracer for LiveKit Cloud observability. + * This configures OTLP exporters to send traces to LiveKit Cloud. + * + * @param options - Configuration for cloud tracer with roomId, jobId, and cloudHostname properties + * + * @internal + */ +export async function setupCloudTracer(options: { + roomId: string; + jobId: string; + cloudHostname: string; +}): Promise { + const { roomId, jobId, cloudHostname } = options; + + const apiKey = process.env.LIVEKIT_API_KEY; + const apiSecret = process.env.LIVEKIT_API_SECRET; + + if (!apiKey || !apiSecret) { + throw new Error('LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set for cloud tracing'); + } + + const token = new AccessToken(apiKey, apiSecret, { + identity: 'livekit-agents-telemetry', + ttl: '6h', + }); + token.addObservabilityGrant({ write: true }); + + try { + const jwt = await token.toJwt(); + + const headers = { + Authorization: `Bearer ${jwt}`, + }; + + const metadata: Attributes = { + room_id: roomId, + job_id: jobId, + }; + + const resource = new Resource({ + [ATTR_SERVICE_NAME]: 'livekit-agents', + room_id: roomId, + job_id: jobId, + }); + + // Configure OTLP exporter to send traces to LiveKit Cloud + const spanExporter = new OTLPTraceExporter({ + url: `https://${cloudHostname}/observability/traces/otlp/v0`, + headers, + compression: CompressionAlgorithm.GZIP, + }); + + const tracerProvider = new NodeTracerProvider({ + resource, + spanProcessors: [new MetadataSpanProcessor(metadata), new BatchSpanProcessor(spanExporter)], + }); + tracerProvider.register(); + + // Metadata processor is already configured in the constructor above + setTracerProvider(tracerProvider); + + // TODO(brian): PR4 - Add logger provider setup here for structured logging + // Similar to Python's setup: LoggerProvider, OTLPLogExporter, BatchLogRecordProcessor + } catch (error) { + console.error('Failed to setup cloud tracer:', error); + throw error; + } +} diff --git a/agents/src/telemetry/utils.ts b/agents/src/telemetry/utils.ts new file mode 100644 index 000000000..4eba4f043 --- /dev/null +++ b/agents/src/telemetry/utils.ts @@ -0,0 +1,61 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { type Span, SpanStatusCode, context as otelContext, trace } from '@opentelemetry/api'; +import type { RealtimeModelMetrics } from '../metrics/base.js'; +import * as traceTypes from './trace_types.js'; +import { tracer } from './traces.js'; + +export function recordException(span: Span, error: Error): void { + span.recordException(error); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error.message, + }); + + // Set exception attributes for better visibility + // (in case the exception event is not rendered by the backend) + span.setAttributes({ + [traceTypes.ATTR_EXCEPTION_TYPE]: error.constructor.name, + [traceTypes.ATTR_EXCEPTION_MESSAGE]: error.message, + [traceTypes.ATTR_EXCEPTION_TRACE]: error.stack || '', + }); +} + +export function recordRealtimeMetrics(span: Span, metrics: RealtimeModelMetrics): void { + const attrs: Record = { + [traceTypes.ATTR_GEN_AI_REQUEST_MODEL]: metrics.label || 'unknown', + [traceTypes.ATTR_REALTIME_MODEL_METRICS]: JSON.stringify(metrics), + [traceTypes.ATTR_GEN_AI_USAGE_INPUT_TOKENS]: metrics.inputTokens, + [traceTypes.ATTR_GEN_AI_USAGE_OUTPUT_TOKENS]: metrics.outputTokens, + [traceTypes.ATTR_GEN_AI_USAGE_INPUT_TEXT_TOKENS]: metrics.inputTokenDetails.textTokens, + [traceTypes.ATTR_GEN_AI_USAGE_INPUT_AUDIO_TOKENS]: metrics.inputTokenDetails.audioTokens, + [traceTypes.ATTR_GEN_AI_USAGE_INPUT_CACHED_TOKENS]: metrics.inputTokenDetails.cachedTokens, + [traceTypes.ATTR_GEN_AI_USAGE_OUTPUT_TEXT_TOKENS]: metrics.outputTokenDetails.textTokens, + [traceTypes.ATTR_GEN_AI_USAGE_OUTPUT_AUDIO_TOKENS]: metrics.outputTokenDetails.audioTokens, + }; + + // Add LangFuse-specific completion start time if TTFT is available + if (metrics.ttftMs !== undefined && metrics.ttftMs !== -1) { + const completionStartTime = metrics.timestamp + metrics.ttftMs; + // Convert to UTC ISO string for LangFuse compatibility + const completionStartTimeUtc = new Date(completionStartTime).toISOString(); + attrs[traceTypes.ATTR_LANGFUSE_COMPLETION_START_TIME] = completionStartTimeUtc; + } + + if (span.isRecording()) { + span.setAttributes(attrs); + } else { + const currentContext = otelContext.active(); + const spanContext = trace.setSpan(currentContext, span); + + // Create a dedicated child span for orphaned metrics + tracer.getTracer().startActiveSpan('realtime_metrics', {}, spanContext, (child) => { + try { + child.setAttributes(attrs); + } finally { + child.end(); + } + }); + } +} From 71cc61e6bf6e634bd7ebc097a64f4d9d9c39596e Mon Sep 17 00:00:00 2001 From: simon Date: Mon, 17 Nov 2025 13:22:09 +0100 Subject: [PATCH 3/5] add test --- agents/src/voice/agent_activity.test.ts | 150 ++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 agents/src/voice/agent_activity.test.ts diff --git a/agents/src/voice/agent_activity.test.ts b/agents/src/voice/agent_activity.test.ts new file mode 100644 index 000000000..4d524ee78 --- /dev/null +++ b/agents/src/voice/agent_activity.test.ts @@ -0,0 +1,150 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { describe, expect, it } from 'vitest'; +import { initializeLogger } from '../log.js'; +import { Future, delay } from '../utils.js'; +import { Agent } from './agent.js'; +import { AgentActivity } from './agent_activity.js'; +import { AgentSession } from './agent_session.js'; +import { SpeechHandle } from './speech_handle.js'; + +// Initialize logger for tests +initializeLogger({ pretty: true, level: 'error' }); + +describe('AgentActivity - Issue #836: mainTask hanging when interrupting queued speech', () => { + describe('Real AgentActivity integration', () => { + it('should directly test mainTask with queue inspection', async () => { + // Create AgentActivity with access to private methods + const agent = new Agent({ instructions: 'Test agent' }); + const agentSession = new AgentSession({}); + const agentActivity = new AgentActivity(agent, agentSession); + + await agentActivity.start(); + + // Access private members through casting + const activity = agentActivity as any; + + // Create speeches + const speech1 = SpeechHandle.create(); + const speech2 = SpeechHandle.create(); + const speech3 = SpeechHandle.create(); + + // Interrupt speech2 + speech2.interrupt(); + + // Directly access and inspect the queue + expect(activity.speechQueue.size()).toBe(0); + + // Schedule speeches + activity.scheduleSpeech(speech1, 5); + activity.scheduleSpeech(speech2, 5); + activity.scheduleSpeech(speech3, 5); + + // Verify queue size + expect(activity.speechQueue.size()).toBe(3); + + // Mark generations done for non-interrupted speeches + setTimeout(() => { + if (!speech1.interrupted) speech1._markGenerationDone(); + }, 50); + setTimeout(() => { + if (!speech3.interrupted) speech3._markGenerationDone(); + }, 100); + + // Wait for mainTask to process + await delay(250); + + // After processing, queue should be empty + expect(activity.speechQueue.size()).toBe(0); + + // Verify current speech is cleared + expect(activity._currentSpeech).toBeUndefined(); + }); + + it('should test mainTask queue processing order with priorities', async () => { + // Test that mainTask respects priority ordering + const agent = new Agent({ instructions: 'Test agent' }); + const agentSession = new AgentSession({}); + const agentActivity = new AgentActivity(agent, agentSession); + + await agentActivity.start(); + + const activity = agentActivity as any; + + // Create speeches with different priorities + const lowPriority = SpeechHandle.create(); + const normalPriority = SpeechHandle.create(); + const highPriority = SpeechHandle.create(); + + // Interrupt all to make processing fast + lowPriority.interrupt(); + normalPriority.interrupt(); + highPriority.interrupt(); + + // Schedule in reverse priority order to test queue sorting + activity.scheduleSpeech(lowPriority, 0); // SPEECH_PRIORITY_LOW + activity.scheduleSpeech(normalPriority, 5); // SPEECH_PRIORITY_NORMAL + activity.scheduleSpeech(highPriority, 10); // SPEECH_PRIORITY_HIGH + + // Queue should have 3 items + expect(activity.speechQueue.size()).toBe(3); + + // Wait for mainTask to process + await delay(200); + + // Queue should be empty after processing + expect(activity.speechQueue.size()).toBe(0); + + // All speeches should be scheduled and interrupted + [lowPriority, normalPriority, highPriority].forEach((s) => { + expect(s.scheduled).toBe(true); + expect(s.interrupted).toBe(true); + }); + }); + + it('should verify mainTask does not hang with manual abort signal test', async () => { + // This test manually calls mainTask and tests abort behavior + const agent = new Agent({ instructions: 'Test agent' }); + const agentSession = new AgentSession({}); + const agentActivity = new AgentActivity(agent, agentSession); + + // Don't start() - we'll manually set up for mainTask testing + const activity = agentActivity as any; + + // Create an abort controller to stop mainTask + const abortController = new AbortController(); + + // Create interrupted speeches + const speech1 = SpeechHandle.create(); + const speech2 = SpeechHandle.create(); + speech1.interrupt(); + speech2.interrupt(); + + // Manually add to queue + activity.speechQueue.push([5, 1000, speech1]); + activity.speechQueue.push([5, 2000, speech2]); + activity.q_updated = new Future(); + activity.q_updated.resolve(); // Wake up mainTask + + // Call mainTask directly with timeout protection + const mainTaskPromise = activity.mainTask(abortController.signal); + + // Let mainTask process the interrupted speeches + await delay(100); + + // Abort the mainTask + abortController.abort(); + + // mainTask should exit cleanly without hanging + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('mainTask did not exit after abort')), 1000), + ); + + await expect(Promise.race([mainTaskPromise, timeoutPromise])).resolves.not.toThrow(); + + // Queue should be empty + expect(activity.speechQueue.size()).toBe(0); + }); + }); +}); From 11453e9a2fd601bb8af98b7e01435f9c48b65d16 Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 19 Nov 2025 16:33:58 +0100 Subject: [PATCH 4/5] waitForGeneration racing --- agents/src/voice/speech_handle.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/agents/src/voice/speech_handle.ts b/agents/src/voice/speech_handle.ts index 58ae71dba..59103afcc 100644 --- a/agents/src/voice/speech_handle.ts +++ b/agents/src/voice/speech_handle.ts @@ -165,9 +165,6 @@ export class SpeechHandle { /** @internal */ _authorizeGeneration(): void { - if (this.interruptFut.done) { - return; - } const fut = new Future(); this.generations.push(fut); this.authorizedEvent.set(); @@ -194,7 +191,7 @@ export class SpeechHandle { if (!generation) { throw new Error(`Generation at index ${index} not found.`); } - return generation.await; + return Promise.race([generation.await, this.interruptFut.await]); } /** @internal */ From 40640b765e57add91767b13c44f13ef586c2c398 Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 20 Nov 2025 17:17:22 +0100 Subject: [PATCH 5/5] revert timeout for _waitForGeneration --- agents/src/voice/agent_activity.ts | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index b4c01bf8f..7b4b53ce4 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -848,14 +848,7 @@ export class AgentActivity implements RecognitionHooks { const speechHandle = heapItem[2]; this._currentSpeech = speechHandle; speechHandle._authorizeGeneration(); - try { - await Promise.race([ - speechHandle._waitForGeneration(), - new Promise((_, reject) => setTimeout(() => reject('timeout after 30 seconds'), 30000)), - ]); - } catch (err) { - console.error('wait for generation failed', err, speechHandle); - } + await speechHandle._waitForGeneration(); this._currentSpeech = undefined; }