Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/agent-session-docs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="docs" "Improve Session API docs
46 changes: 42 additions & 4 deletions lib/src/agent/chat/transcription_stream_receiver.dart
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,48 @@ import 'message_receiver.dart';

/// Converts LiveKit transcription text streams into [ReceivedMessage]s.
///
/// Each stream corresponds to a single message (agent or user). The stream
/// yields textual updates which are aggregated until the message is finalized.
/// When a new message for the same participant arrives, previous partial
/// content is purged so that memory usage remains bounded.
/// This receiver is intended for agent-powered transcription streams produced by
/// the LiveKit Agents framework (text streams require `livekit-agents >= 1.0.0`).
///
/// The receiver listens on a text stream [topic] (default: `'lk.transcription'`)
/// and aggregates chunked updates into a single [ReceivedMessage] per transcript
/// segment.
///
/// ## Text stream semantics
///
/// - **Agent transcripts**: the agent emits a new text stream for each message.
/// The stream yields chunks that should be appended until the transcript is
/// finalized.
/// - **User transcripts**: the agent may resend the full transcription text for
/// a segment on each update (until finalized).
///
/// ## Message identity / diffing
///
/// The segment id (`lk.segment_id`) is stable across the lifetime of a transcript
/// segment. This receiver uses it as [ReceivedMessage.id], which makes it safe
/// to use for UI diffing (e.g. `ListView` keys). If `lk.segment_id` is missing,
/// the text stream id is used as a fallback.
///
/// When a new segment for the same participant arrives, older partial segments
/// are removed to keep memory usage bounded.
///
/// ## Example (agent transcript chunks)
///
/// Incoming chunks (same segment id):
/// ```text
/// { segment_id: "1", content: "Hello" }
/// { segment_id: "1", content: " world" }
/// { segment_id: "1", content: "!" }
/// ```
///
/// Output messages:
/// ```text
/// ReceivedMessage(id: "1", content: AgentTranscript("Hello"))
/// ReceivedMessage(id: "1", content: AgentTranscript("Hello world"))
/// ReceivedMessage(id: "1", content: AgentTranscript("Hello world!"))
/// ```
///
/// - SeeAlso: https://docs.livekit.io/agents/
class TranscriptionStreamReceiver implements MessageReceiver {
TranscriptionStreamReceiver({
required Room room,
Expand Down
6 changes: 6 additions & 0 deletions lib/src/agent/room_agent.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import 'constants.dart';

extension AgentRoom on Room {
/// All agent participants currently in the room.
///
/// - Note: This excludes participants that are publishing on behalf of
/// another participant (for example, "avatar worker" participants). Those
/// workers can be discovered by filtering [remoteParticipants] for
/// participants whose `lk.publish_on_behalf` attribute matches the agent's
/// identity.
Iterable<RemoteParticipant> get agentParticipants => remoteParticipants.values.where(
(participant) {
if (participant.kind != ParticipantKind.AGENT) {
Expand Down
8 changes: 8 additions & 0 deletions lib/src/agent/session.dart
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ import 'session_options.dart';
/// be sent with [sendText], and the message history can be inspected or restored
/// via [messages], [getMessageHistory], and [restoreMessageHistory].
///
/// Message transport is pluggable: you can provide custom [MessageSender] and
/// [MessageReceiver] implementations to integrate with different channels. By
/// default, [Session] uses:
/// - [TextMessageSender] (topic `'lk.chat'`) to send user text and emit loopback
/// messages for immediate UI updates.
/// - [TranscriptionStreamReceiver] (topic `'lk.transcription'`) to receive agent
/// and user transcripts as a message stream.
///
/// The session is designed to be observed from Flutter widgets (it extends
/// [ChangeNotifier] through [DisposableChangeNotifier]) in the same way that the
/// Swift implementation conforms to `ObservableObject`.
Expand Down
27 changes: 25 additions & 2 deletions lib/src/core/room_preconnect.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,31 @@ import '../preconnect/pre_connect_audio_buffer.dart';
import 'room.dart';

extension RoomPreConnect on Room {
/// Wrap an async operation while a pre-connect audio buffer records.
/// Stops and flushes on error.
/// Runs an async [operation] while a pre-connect audio buffer records.
///
/// This is primarily used for voice agent experiences to reduce perceived
/// latency: the microphone starts recording before [Room.connect] completes,
/// and the buffered audio is sent to the agent once it becomes active.
///
/// If [operation] throws, recording is stopped and the buffer is reset
/// (discarding any buffered audio).
///
/// Example:
/// ```dart
/// await room.withPreConnectAudio(
/// () async {
/// final creds = await tokenService.fetch();
/// await room.connect(creds.serverUrl, creds.participantToken);
/// return true;
/// },
/// timeout: const Duration(seconds: 20),
/// onError: (error) => logger.warning('Preconnect failed: $error'),
/// );
/// ```
///
/// - Note: Ensure microphone permissions are granted early in your app
/// lifecycle so pre-connect can start without additional prompts.
/// - SeeAlso: [PreConnectAudioBuffer]
Future<T> withPreConnectAudio<T>(
Future<T> Function() operation, {
Duration timeout = const Duration(seconds: 10),
Expand Down
14 changes: 14 additions & 0 deletions lib/src/json/agent_attributes.dart
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,32 @@ enum AgentState {

@JsonSerializable()
class TranscriptionAttributes {
/// Schema for transcription-related text stream attributes.
///
/// These attributes are attached to LiveKit text streams used for agent and
/// user transcriptions (typically on the `'lk.transcription'` topic).
///
/// - Note: Some agent implementations may encode `lk.transcription_final` as a
/// boolean or a string (`"true"`/`"false"`/`"1"`/`"0"`). This model accepts
/// both forms.
const TranscriptionAttributes({
this.lkSegmentId,
this.lkTranscribedTrackId,
this.lkTranscriptionFinal,
});

/// Stable identifier for the transcript segment (`lk.segment_id`).
///
/// A segment id remains stable for the lifetime of a transcript segment and
/// can be used to reconcile incremental updates in UIs.
@JsonKey(name: 'lk.segment_id')
final String? lkSegmentId;

/// Track id associated with the transcription (`lk.transcribed_track_id`).
@JsonKey(name: 'lk.transcribed_track_id')
final String? lkTranscribedTrackId;

/// Whether this segment is finalized (`lk.transcription_final`).
@JsonKey(name: 'lk.transcription_final', fromJson: _boolFromJson, toJson: _boolToJson)
final bool? lkTranscriptionFinal;

Expand Down
48 changes: 44 additions & 4 deletions lib/src/preconnect/pre_connect_audio_buffer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,17 @@ import '../types/participant_state.dart';

typedef PreConnectOnError = void Function(Object error);

/// Captures and buffers microphone audio before a room connection completes.
///
/// This is used by `Room.withPreConnectAudio` to reduce perceived latency for
/// voice agent experiences: the microphone can begin recording while the app is
/// still connecting and dispatching an agent, then the buffered audio is sent
/// once the agent becomes active.
///
/// Audio is buffered in memory (bounded by [defaultMaxSize]); if it overflows,
/// the oldest audio is dropped until the agent is ready.
class PreConnectAudioBuffer {
/// Topic used to send the buffered audio stream to agents.
static const String dataTopic = 'lk.agent.pre-connect-audio-buffer';

static const int defaultMaxSize = 10 * 1024 * 1024; // 10MB
Expand Down Expand Up @@ -72,16 +82,33 @@ class PreConnectAudioBuffer {
}) : _onError = onError,
_requestSampleRate = sampleRate;

// Getters
/// Whether pre-connect recording is currently active.
bool get isRecording => _isRecording;

/// Number of buffered bytes currently stored.
int get bufferedSize => _buffer.length;

/// The local audio track used for recording while pre-connecting.
LocalAudioTrack? get localTrack => _localTrack;

Future<LocalTrackPublishedEvent>? _localTrackPublishedEvent;

/// Future that completes when an agent is ready.
/// Completes when an agent becomes active and the buffer has been sent.
///
/// If the configured timeout elapses before an agent becomes active, this
/// future completes with an error.
Future<void> get agentReadyFuture => _agentReadyManager.future;

/// Starts capturing audio into an in-memory buffer.
///
/// [timeout] controls how long to wait for an agent to become active. When the
/// agent becomes active, buffered audio is sent automatically and
/// [agentReadyFuture] completes. If the timeout is reached first,
/// [agentReadyFuture] completes with an error and callers should [reset] the
/// buffer.
///
/// - Note: Ensure microphone permissions are granted before calling this, or
/// audio capture may fail depending on platform.
Future<void> startRecording({
Duration timeout = const Duration(seconds: 20),
}) async {
Expand Down Expand Up @@ -180,6 +207,11 @@ class PreConnectAudioBuffer {
));
}

/// Stops recording and releases native audio capture resources.
///
/// If [withError] is provided, [agentReadyFuture] completes with that error.
/// Otherwise, [agentReadyFuture] completes successfully (if not already
/// completed).
Future<void> stopRecording({Object? withError}) async {
if (!_isRecording) return;
_isRecording = false;
Expand Down Expand Up @@ -219,7 +251,8 @@ class PreConnectAudioBuffer {
logger.info('[Preconnect audio] stopped recording');
}

// Clean-up & reset for re-use
/// Stops recording and clears buffered audio and listeners so the instance
/// can be reused.
Future<void> reset() async {
await stopRecording();
_timeoutTimer?.cancel();
Expand All @@ -240,12 +273,19 @@ class PreConnectAudioBuffer {
logger.info('[Preconnect audio] reset');
}

// Dispose the audio buffer and clean up all resources.
/// Disposes this buffer (alias for [reset]).
Future<void> dispose() async {
await reset();
logger.info('[Preconnect audio] disposed');
}

/// Sends the currently buffered audio to one or more agent identities.
///
/// This is a one-shot operation; repeated calls are ignored after the buffer
/// has been sent.
///
/// The stream is written to [topic] (default: [dataTopic]) and includes
/// attributes that help the agent interpret the raw audio payload.
Future<void> sendAudioData({
required List<String> agents,
String topic = dataTopic,
Expand Down