diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index e29a9d389..7b9deec17 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -42,6 +42,11 @@ mod signal_stream; pub use region::RegionUrlProvider; +#[cfg(feature = "signal-client-tokio")] +pub use tokio_tungstenite::Connector; +#[cfg(not(feature = "signal-client-tokio"))] +pub enum Connector {} + pub type SignalEmitter = mpsc::UnboundedSender; pub type SignalEvents = mpsc::UnboundedReceiver; pub type SignalResult = Result; @@ -84,11 +89,12 @@ impl Default for SignalSdkOptions { } } -#[derive(Debug, Clone)] +#[derive(Clone)] #[non_exhaustive] pub struct SignalOptions { pub auto_subscribe: bool, pub adaptive_stream: bool, + pub connector: Option, pub sdk_options: SignalSdkOptions, } @@ -97,11 +103,23 @@ impl Default for SignalOptions { Self { auto_subscribe: true, adaptive_stream: false, + connector: None, sdk_options: SignalSdkOptions::default(), } } } +impl Debug for SignalOptions { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SignalClient") + .field("auto_subscribe", &self.auto_subscribe) + .field("adaptive_stream", &self.adaptive_stream) + .field("connector", &self.connector.is_some()) + .field("sdk_options", &self.sdk_options) + .finish() + } +} + pub enum SignalEvent { /// Received a message from the server Message(Box), @@ -250,17 +268,18 @@ impl SignalInner { let lk_url = get_livekit_url(url, &options)?; // Try to connect to the SignalClient - let (stream, mut events) = match SignalStream::connect(lk_url.clone(), token).await { - Ok(stream) => stream, - Err(err) => { - if let SignalError::TokenFormat = err { + let (stream, mut events) = + match SignalStream::connect(lk_url.clone(), token, options.connector.clone()).await { + Ok(stream) => stream, + Err(err) => { + if let SignalError::TokenFormat = err { + return Err(err); + } + // Connection failed, try to retrieve more informations + Self::validate(lk_url).await?; return Err(err); } - // Connection failed, try to retrieve more informations - Self::validate(lk_url).await?; - return Err(err); - } - }; + }; let join_response = get_join_response(&mut events).await?; @@ -322,7 +341,8 @@ impl SignalInner { let mut lk_url = get_livekit_url(&self.url, &self.options).unwrap(); lk_url.query_pairs_mut().append_pair("reconnect", "1").append_pair("sid", sid); - let (new_stream, mut events) = SignalStream::connect(lk_url, &token).await?; + let (new_stream, mut events) = + SignalStream::connect(lk_url, &token, self.options.connector.clone()).await?; let reconnect_response = get_reconnect_response(&mut events).await?; *stream = Some(new_stream); diff --git a/livekit-api/src/signal_client/signal_stream.rs b/livekit-api/src/signal_client/signal_stream.rs index 37742b7a9..a01714f6a 100644 --- a/livekit-api/src/signal_client/signal_stream.rs +++ b/livekit-api/src/signal_client/signal_stream.rs @@ -34,12 +34,12 @@ use tokio::{ #[cfg(feature = "signal-client-tokio")] use tokio_tungstenite::{ - connect_async, + client_async_with_config, connect_async_tls_with_config, tungstenite::client::IntoClientRequest, tungstenite::error::ProtocolError, tungstenite::http::{header::AUTHORIZATION, HeaderValue}, tungstenite::{Error as WsError, Message}, - MaybeTlsStream, WebSocketStream, + Connector, MaybeTlsStream, WebSocketStream, }; #[cfg(feature = "__signal-client-async-compatible")] @@ -88,6 +88,7 @@ impl SignalStream { pub async fn connect( url: url::Url, token: &str, + tls_connector: Option, ) -> SignalResult<(Self, mpsc::UnboundedReceiver>)> { log::info!("connecting to {}", url); let mut request = url.clone().into_client_request()?; @@ -288,17 +289,18 @@ impl SignalStream { }; // Now perform WebSocket handshake over the established connection - let (ws_stream, _) = - tokio_tungstenite::client_async_with_config(request, stream, None).await?; + let (ws_stream, _) = client_async_with_config(request, stream, None).await?; ws_stream } else { // No proxy specified, connect directly - let (ws_stream, _) = connect_async(request).await?; + let (ws_stream, _) = + connect_async_tls_with_config(request, None, false, tls_connector).await?; ws_stream } } else { // Non-tokio build or no proxy - connect directly - let (ws_stream, _) = connect_async(request).await?; + let (ws_stream, _) = + connect_async_tls_with_config(request, None, false, tls_connector).await?; ws_stream }; @@ -307,6 +309,9 @@ impl SignalStream { #[cfg(not(feature = "signal-client-tokio"))] let (ws_stream, _) = connect_async(request).await?; + #[cfg(not(feature = "signal-client-tokio"))] + let _ = tls_connector; + let (ws_writer, ws_reader) = ws_stream.split(); let (emitter, events) = mpsc::unbounded_channel(); diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 3ee94088d..090c16833 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -348,7 +348,7 @@ impl From for SignalSdkOptions { } } -#[derive(Debug, Clone)] +#[derive(Clone)] #[non_exhaustive] pub struct RoomOptions { pub auto_subscribe: bool, @@ -361,8 +361,24 @@ pub struct RoomOptions { pub rtc_config: RtcConfiguration, pub join_retries: u32, pub sdk_options: RoomSdkOptions, + pub signal_options: SignalOptions, + pub connector: Option, +} +impl Debug for RoomOptions { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RoomOptions") + .field("auto_subscribe", &self.auto_subscribe) + .field("adaptive_stream", &self.adaptive_stream) + .field("dynacast", &self.dynacast) + .field("e2ee", &self.e2ee) + .field("rtc_config", &self.rtc_config) + .field("join_retries", &self.join_retries) + .field("sdk_options", &self.sdk_options) + .field("signal_options", &self.signal_options) + // Exclude connector field as it's not Debug + .finish() + } } - impl Default for RoomOptions { fn default() -> Self { Self { @@ -381,6 +397,8 @@ impl Default for RoomOptions { }, join_retries: 3, sdk_options: RoomSdkOptions::default(), + signal_options: SignalOptions::default(), + connector: None, } } } @@ -474,6 +492,7 @@ impl Room { signal_options.sdk_options = options.sdk_options.clone().into(); signal_options.auto_subscribe = options.auto_subscribe; signal_options.adaptive_stream = options.adaptive_stream; + signal_options.connector = options.connector.clone(); let (rtc_engine, join_response, engine_events) = RtcEngine::connect( url, token, @@ -1120,7 +1139,7 @@ impl RoomSession { } async fn send_sync_state(self: &Arc) { - let auto_subscribe = self.options.auto_subscribe; + let auto_subscribe = self.options.signal_options.auto_subscribe; let session = self.rtc_engine.session(); if session.subscriber().peer_connection().current_local_description().is_none() { @@ -1596,7 +1615,7 @@ impl RoomSession { name, metadata, attributes, - self.options.auto_subscribe, + self.options.signal_options.auto_subscribe, ); participant.on_track_published({