Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions livekit-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ __signal-client-async-compatible = [
"dep:futures-util",
"dep:isahc",
"dep:livekit-runtime",
"dep:base64"
]


Expand Down
229 changes: 197 additions & 32 deletions livekit-api/src/signal_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};

use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, Engine};
use http::StatusCode;
use livekit_protocol as proto;
use livekit_runtime::{interval, sleep, Instant, JoinHandle};
use parking_lot::Mutex;
use prost::Message;
use thiserror::Error;
use tokio::sync::{mpsc, Mutex as AsyncMutex, RwLock as AsyncRwLock};

Expand Down Expand Up @@ -90,6 +92,8 @@ pub struct SignalOptions {
pub auto_subscribe: bool,
pub adaptive_stream: bool,
pub sdk_options: SignalSdkOptions,
/// Enable single peer connection mode
pub single_peer_connection: bool,
}

impl Default for SignalOptions {
Expand All @@ -98,6 +102,7 @@ impl Default for SignalOptions {
auto_subscribe: true,
adaptive_stream: false,
sdk_options: SignalSdkOptions::default(),
single_peer_connection: true,
}
}
}
Expand All @@ -119,6 +124,8 @@ struct SignalInner {
options: SignalOptions,
join_response: proto::JoinResponse,
request_id: AtomicU32,
/// Tracks whether single PC mode is active (v1 path succeeded)
single_pc_mode_active: bool,
}

pub struct SignalClient {
Expand Down Expand Up @@ -235,6 +242,12 @@ impl SignalClient {
pub fn next_request_id(&self) -> u32 {
self.inner.next_request_id().clone()
}

/// Returns whether single peer connection mode is active.
/// This is determined by whether the /rtc/v1 path was used successfully.
pub fn is_single_pc_mode_active(&self) -> bool {
self.inner.is_single_pc_mode_active()
}
}

impl SignalInner {
Expand All @@ -247,20 +260,56 @@ impl SignalInner {
proto::JoinResponse,
mpsc::UnboundedReceiver<Box<proto::signal_response::Message>>,
)> {
let lk_url = get_livekit_url(url, &options)?;

// Try v1 path first if single_peer_connection is enabled
let use_v1_path = options.single_peer_connection;
// For initial connection: reconnect=false, reconnect_reason=None, participant_sid=""
let lk_url = get_livekit_url(url, &options, use_v1_path, false, None, "")?;
// Try to connect to the SignalClient
let (stream, mut events) = match SignalStream::connect(lk_url.clone(), token).await {
Ok(stream) => stream,
Err(err) => {
if let SignalError::TokenFormat = err {
return Err(err);
let (stream, mut events, single_pc_mode_active) =
match SignalStream::connect(lk_url.clone(), token).await {
Ok((new_stream, stream_events)) => {
log::debug!(
"signal connection successful: path={}, single_pc_mode={}",
if use_v1_path { "v1" } else { "v0" },
use_v1_path
);
(new_stream, stream_events, use_v1_path)
}
// Connection failed, try to retrieve more information
Self::validate(lk_url).await?;
return Err(err);
}
};
Err(err) => {
log::warn!(
"signal connection failed on {} path: {:?}",
if use_v1_path { "v1" } else { "v0" },
err
);

if let SignalError::TokenFormat = err {
return Err(err);
}

// If using v1 path and it failed, always try fallback to v0 path.
// The v1 endpoint might not be available on older servers, and errors
// can manifest as various HTTP status codes (404, 401, 403) or connection errors.
if use_v1_path {
let lk_url_v0 = get_livekit_url(url, &options, false, false, None, "")?;
log::warn!("v1 path failed, falling back to v0 path");
match SignalStream::connect(lk_url_v0.clone(), token).await {
Ok((new_stream, stream_events)) => (new_stream, stream_events, false),
Err(err) => {
log::error!("v0 fallback also failed: {:?}", err);
if let SignalError::TokenFormat = err {
return Err(err);
}
Self::validate(lk_url_v0).await?;
return Err(err);
}
}
} else {
// Connection failed on v0 path, try to retrieve more information
Self::validate(lk_url).await?;
return Err(err);
}
}
};

let join_response = get_join_response(&mut events).await?;

Expand All @@ -274,6 +323,7 @@ impl SignalInner {
url: url.to_string(),
join_response: join_response.clone(),
request_id: AtomicU32::new(1),
single_pc_mode_active,
});

Ok((inner, join_response, events))
Expand All @@ -297,6 +347,11 @@ impl SignalInner {
Ok(())
}

/// Returns whether single peer connection mode is active
pub fn is_single_pc_mode_active(&self) -> bool {
self.single_pc_mode_active
}

/// Restart is called when trying to resume the room (RtcSession resume)
pub async fn restart(
self: &Arc<Self>,
Expand All @@ -315,8 +370,13 @@ impl SignalInner {
let sid = &self.join_response.participant.as_ref().unwrap().sid;
let token = self.token.lock().clone();

let mut lk_url = get_livekit_url(&self.url, &self.options).unwrap();
lk_url.query_pairs_mut().append_pair("reconnect", "1").append_pair("sid", sid);
// Use the same path that succeeded during initial connection
// For reconnects: reconnect=true, participant_sid=sid
// For v1 path: reconnect and sid are encoded in the join_request protobuf
// For v0 path: reconnect and sid are added as separate query parameters
let lk_url =
get_livekit_url(&self.url, &self.options, self.single_pc_mode_active, true, None, sid)
.unwrap();

let (new_stream, mut events) = SignalStream::connect(lk_url, &token).await?;
let reconnect_response = get_reconnect_response(&mut events).await?;
Expand Down Expand Up @@ -459,7 +519,78 @@ fn is_queuable(signal: &proto::signal_request::Message) -> bool {
)
}

fn get_livekit_url(url: &str, options: &SignalOptions) -> SignalResult<url::Url> {
/// Create the base64-encoded WrappedJoinRequest parameter required for v1 path
///
/// Parameters:
/// - options: SignalOptions containing auto_subscribe, adaptive_stream, etc.
/// - reconnect: true if this is a reconnection attempt
/// - participant_sid: the participant SID (only used during reconnection)
fn create_join_request_param(
options: &SignalOptions,
reconnect: bool,
reconnect_reason: Option<i32>,
participant_sid: &str,
) -> String {
let connection_settings = proto::ConnectionSettings {
auto_subscribe: options.auto_subscribe,
adaptive_stream: options.adaptive_stream,
..Default::default()
};

let client_info = proto::ClientInfo {
Copy link
Contributor

@ladvoc ladvoc Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Spreading ..Default::default() would be cleaner here:

let client_info = proto::ClientInfo {
    sdk: proto::client_info::Sdk::Rust as i32,
    version: options.sdk_options.sdk_version.clone().unwrap_or_default(),
    protocol: PROTOCOL_VERSION as i32,
    os: std::env::consts::OS.to_string(),
    client_protocol: 1, // Indicates support for RPC compression
    ..Default::default()
};

Same for the other protobuf types below where we set an empty string or Option::None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

sdk: proto::client_info::Sdk::Rust as i32,
version: options.sdk_options.sdk_version.clone().unwrap_or_default(),
protocol: PROTOCOL_VERSION as i32,
os: std::env::consts::OS.to_string(),
..Default::default()
};

let mut join_request = proto::JoinRequest {
client_info: Some(client_info),
connection_settings: Some(connection_settings),
reconnect,
..Default::default()
};

// Only set participant_sid if non-empty (for reconnects)
if !participant_sid.is_empty() {
join_request.participant_sid = participant_sid.to_string();
}

// Only set reconnect_reason if provided
if let Some(reason) = reconnect_reason {
join_request.reconnect_reason = reason;
}

// Serialize JoinRequest to bytes
let join_request_bytes = join_request.encode_to_vec();

// Create WrappedJoinRequest (JS doesn't explicitly set compression, so default is NONE)
let wrapped_join_request =
proto::WrappedJoinRequest { join_request: join_request_bytes, ..Default::default() };

// Serialize WrappedJoinRequest to bytes and base64 encode
let wrapped_bytes = wrapped_join_request.encode_to_vec();
BASE64_STANDARD.encode(&wrapped_bytes)
}

/// Build the LiveKit WebSocket URL for connection
///
/// Parameters:
/// - url: the base server URL
/// - options: SignalOptions
/// - use_v1_path: if true, use /rtc/v1 (single PC mode), otherwise /rtc (dual PC mode)
/// - reconnect: true if this is a reconnection attempt
/// - reconnect_reason: reason for reconnection (only used when reconnect=true)
/// - participant_sid: the participant SID (only used during reconnection)
fn get_livekit_url(
url: &str,
options: &SignalOptions,
use_v1_path: bool,
reconnect: bool,
reconnect_reason: Option<i32>,
participant_sid: &str,
) -> SignalResult<url::Url> {
let mut lk_url = url::Url::parse(url).map_err(|err| SignalError::UrlParse(err.to_string()))?;

if !lk_url.has_host() {
Expand All @@ -477,26 +608,46 @@ fn get_livekit_url(url: &str, options: &SignalOptions) -> SignalResult<url::Url>

if let Ok(mut segs) = lk_url.path_segments_mut() {
segs.push("rtc");
if use_v1_path {
segs.push("v1");
}
}

lk_url
.query_pairs_mut()
.append_pair("sdk", options.sdk_options.sdk.as_str())
.append_pair("protocol", PROTOCOL_VERSION.to_string().as_str())
.append_pair("auto_subscribe", if options.auto_subscribe { "1" } else { "0" })
.append_pair("adaptive_stream", if options.adaptive_stream { "1" } else { "0" });
if use_v1_path {
// For v1 path (single PC mode): only join_request param
// All other info (sdk, protocol, auto_subscribe, etc.) is inside the JoinRequest protobuf
let join_request_param =
create_join_request_param(options, reconnect, reconnect_reason, participant_sid);
lk_url.query_pairs_mut().append_pair("join_request", &join_request_param);
} else {
// For v0 path (dual PC mode): use URL query parameters
lk_url
.query_pairs_mut()
.append_pair("sdk", options.sdk_options.sdk.as_str())
.append_pair("protocol", PROTOCOL_VERSION.to_string().as_str())
.append_pair("auto_subscribe", if options.auto_subscribe { "1" } else { "0" })
.append_pair("adaptive_stream", if options.adaptive_stream { "1" } else { "0" });

if let Some(sdk_version) = &options.sdk_options.sdk_version {
lk_url.query_pairs_mut().append_pair("version", sdk_version.as_str());
}

if let Some(sdk_version) = &options.sdk_options.sdk_version {
lk_url.query_pairs_mut().append_pair("version", sdk_version.as_str());
// For reconnects in v0 path, add reconnect and sid as separate query parameters
if reconnect {
lk_url
.query_pairs_mut()
.append_pair("reconnect", "1")
.append_pair("sid", participant_sid);
}
}

Ok(lk_url)
}

/// Convert a WebSocket URL (with /rtc path) to the validate endpoint URL
/// Convert a WebSocket URL (with /rtc or /rtc/v1 path) to the validate endpoint URL
fn get_validate_url(mut ws_url: url::Url) -> url::Url {
ws_url.set_scheme(if ws_url.scheme() == "wss" { "https" } else { "http" }).unwrap();
// ws_url already has /rtc from get_livekit_url, so only append /validate
// ws_url already has /rtc or /rtc/v1 from get_livekit_url, so only append /validate
if let Ok(mut segs) = ws_url.path_segments_mut() {
segs.push("validate");
}
Expand Down Expand Up @@ -545,18 +696,32 @@ mod tests {
fn livekit_url_test() {
let io = SignalOptions::default();

assert!(get_livekit_url("localhost:7880", &io).is_err());
assert_eq!(get_livekit_url("https://localhost:7880", &io).unwrap().scheme(), "wss");
assert_eq!(get_livekit_url("http://localhost:7880", &io).unwrap().scheme(), "ws");
assert_eq!(get_livekit_url("wss://localhost:7880", &io).unwrap().scheme(), "wss");
assert_eq!(get_livekit_url("ws://localhost:7880", &io).unwrap().scheme(), "ws");
assert!(get_livekit_url("ftp://localhost:7880", &io).is_err());
assert!(get_livekit_url("localhost:7880", &io, false, false, None, "").is_err());
assert_eq!(
get_livekit_url("https://localhost:7880", &io, false, false, None, "")
.unwrap()
.scheme(),
"wss"
);
assert_eq!(
get_livekit_url("http://localhost:7880", &io, false, false, None, "").unwrap().scheme(),
"ws"
);
assert_eq!(
get_livekit_url("wss://localhost:7880", &io, false, false, None, "").unwrap().scheme(),
"wss"
);
assert_eq!(
get_livekit_url("ws://localhost:7880", &io, false, false, None, "").unwrap().scheme(),
"ws"
);
assert!(get_livekit_url("ftp://localhost:7880", &io, false, false, None, "").is_err());
}

#[test]
fn validate_url_test() {
let io = SignalOptions::default();
let lk_url = get_livekit_url("wss://localhost:7880", &io).unwrap();
let lk_url = get_livekit_url("wss://localhost:7880", &io, false, false, None, "").unwrap();
let validate_url = get_validate_url(lk_url);

// Should be /rtc/validate, not /rtc/rtc/validate
Expand Down
8 changes: 8 additions & 0 deletions livekit-ffi-node-bindings/src/proto/room_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2471,6 +2471,13 @@ export class RoomOptions extends Message<RoomOptions> {
*/
encryption?: E2eeOptions;

/**
* use single peer connection for both publish/subscribe (default: true)
*
* @generated from field: optional bool single_peer_connection = 8;
*/
singlePeerConnection?: boolean;

constructor(data?: PartialMessage<RoomOptions>) {
super();
proto2.util.initPartial(data, this);
Expand All @@ -2486,6 +2493,7 @@ export class RoomOptions extends Message<RoomOptions> {
{ no: 5, name: "rtc_config", kind: "message", T: RtcConfig, opt: true },
{ no: 6, name: "join_retries", kind: "scalar", T: 13 /* ScalarType.UINT32 */, opt: true },
{ no: 7, name: "encryption", kind: "message", T: E2eeOptions, opt: true },
{ no: 8, name: "single_peer_connection", kind: "scalar", T: 8 /* ScalarType.BOOL */, opt: true },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): RoomOptions {
Expand Down
1 change: 1 addition & 0 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ message RoomOptions {
optional RtcConfig rtc_config = 5; // allow to setup a custom RtcConfiguration
optional uint32 join_retries = 6;
optional E2eeOptions encryption = 7;
optional bool single_peer_connection = 8; // use single peer connection for both publish/subscribe (default: true)
}

//
Expand Down
2 changes: 2 additions & 0 deletions livekit-ffi/src/conversion/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ impl From<proto::RoomOptions> for RoomOptions {
options.join_retries = value.join_retries.unwrap_or(options.join_retries);
options.e2ee = e2ee;
options.encryption = encryption;
options.single_peer_connection =
value.single_peer_connection.unwrap_or(options.single_peer_connection);
options
}
}
Expand Down
1 change: 1 addition & 0 deletions livekit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ semver = "1.0"
libloading = { version = "0.8.6" }
bytes = { workspace = true }
bmrng = "0.5.2"
base64 = "0.22"

[dev-dependencies]
anyhow = { workspace = true }
Expand Down
Loading