diff --git a/livekit/src/rtc_engine/peer_transport.rs b/livekit/src/rtc_engine/peer_transport.rs index be301b505..8b37ab0be 100644 --- a/livekit/src/rtc_engine/peer_transport.rs +++ b/livekit/src/rtc_engine/peer_transport.rs @@ -30,6 +30,8 @@ struct TransportInner { pending_candidates: Vec, renegotiate: bool, restarting_ice: bool, + // Publish-side target bitrate (bps) for offer munging + max_send_bitrate_bps: Option, } pub struct PeerTransport { @@ -55,6 +57,7 @@ impl PeerTransport { pending_candidates: Vec::default(), renegotiate: false, restarting_ice: false, + max_send_bitrate_bps: None, })), } } @@ -127,8 +130,91 @@ impl PeerTransport { Ok(answer) } + pub async fn set_max_send_bitrate_bps(&self, bps: Option) { + let mut inner = self.inner.lock().await; + inner.max_send_bitrate_bps = bps; + } + + fn compute_start_bitrate_kbps(ultimate_bps: Option) -> Option { + let ultimate_kbps = (ultimate_bps? / 1000) as u32; + if ultimate_kbps == 0 { + return None; + } + // JS / Flutter uses ~70% of ultimate; 100% is also reasonable per feedback. + let start_kbps = (ultimate_kbps as f64 * 0.7).round() as u32; + + // Clamp: avoid silly low/high values + Some(start_kbps.clamp(300, ultimate_kbps)) + } + + fn munge_x_google_start_bitrate(sdp: &str, start_bitrate_kbps: u32) -> String { + // Detect what line ending the original SDP uses + let uses_crlf = sdp.contains("\r\n"); + let eol = if uses_crlf { "\r\n" } else { "\n" }; + + // Split preserving the intended line ending style + let lines: Vec<&str> = + if uses_crlf { sdp.split("\r\n").collect() } else { sdp.split('\n').collect() }; + + // 1) Find VP9/AV1 payload types + let mut target_pts: Vec<&str> = Vec::new(); + for line in &lines { + let l = line.trim(); + if let Some(rest) = l.strip_prefix("a=rtpmap:") { + let mut it = rest.split_whitespace(); + let pt = it.next().unwrap_or(""); + let codec = it.next().unwrap_or(""); + if (codec.starts_with("VP9/90000") || codec.starts_with("AV1/90000")) + && !pt.is_empty() + { + target_pts.push(pt); + } + } + } + if target_pts.is_empty() { + return sdp.to_string(); + } + + // 2) Rewrite fmtp lines (minimal mutation) + let mut out: Vec = Vec::with_capacity(lines.len()); + for line in lines { + let mut rewritten = line.to_string(); + + for pt in &target_pts { + let prefix = format!("a=fmtp:{pt} "); + if rewritten.starts_with(&prefix) { + // Replace if present; append if not present + if let Some(pos) = rewritten.find("x-google-start-bitrate=") { + // replace existing value up to next ';' or end + let after = &rewritten[pos..]; + let end = + after.find(';').map(|i| pos + i).unwrap_or_else(|| rewritten.len()); + rewritten.replace_range( + pos..end, + &format!("x-google-start-bitrate={start_bitrate_kbps}"), + ); + } else { + rewritten + .push_str(&format!(";x-google-start-bitrate={start_bitrate_kbps}")); + } + break; + } + } + + out.push(rewritten); + } + + // Re-join using same EOL, and ensure trailing EOL (some parsers are picky) + let mut munged = out.join(eol); + if !munged.ends_with(eol) { + munged.push_str(eol); + } + munged + } + pub async fn create_and_send_offer(&self, options: OfferOptions) -> EngineResult<()> { let mut inner = self.inner.lock().await; + log::info!("Applying x-google-start-bitrate"); if options.ice_restart { inner.restarting_ice = true; @@ -151,7 +237,34 @@ impl PeerTransport { return Ok(()); } - let offer = self.peer_connection.create_offer(options).await?; + let mut offer = self.peer_connection.create_offer(options).await?; + let sdp = offer.to_string(); + let is_vp9 = sdp.contains(" VP9/90000"); + let is_av1 = sdp.contains(" AV1/90000"); + log::info!("SDP codecs present: VP9={}, AV1={}", is_vp9, is_av1); + if is_vp9 || is_av1 { + if let Some(start_kbps) = Self::compute_start_bitrate_kbps(inner.max_send_bitrate_bps) { + log::info!( + "Applying x-google-start-bitrate={} kbps (ultimate_bps={:?})", + start_kbps, + inner.max_send_bitrate_bps + ); + + let munged = Self::munge_x_google_start_bitrate(&sdp, start_kbps); + if munged != sdp { + log::info!("SDP munged successfully (VP9/AV1)"); + match SessionDescription::parse(&munged, offer.sdp_type()) { + Ok(parsed) => offer = parsed, + Err(e) => log::warn!( + "Failed to parse munged SDP, falling back to original offer: {e}" + ), + } + } else { + log::debug!("SDP munging produced no changes"); + } + } + } + self.peer_connection.set_local_description(offer.clone()).await?; if let Some(handler) = self.on_offer_handler.lock().as_mut() { @@ -161,3 +274,128 @@ impl PeerTransport { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::PeerTransport; + + #[test] + fn no_vp9_or_av1_is_noop() { + let sdp = "v=0\n\ +o=- 0 0 IN IP4 127.0.0.1\n\ +s=-\n\ +t=0 0\n\ +m=video 9 UDP/TLS/RTP/SAVPF 96\n\ +a=rtpmap:96 VP8/90000\n\ +a=fmtp:96 some=param\n"; + let out = PeerTransport::munge_x_google_start_bitrate(sdp, 3200); + assert_eq!(out, sdp, "should not change SDP if no VP9/AV1 present"); + } + + #[test] + fn vp9_with_fmtp_appends_start_bitrate_and_preserves_lf_and_trailing_eol() { + // LF-only SDP, ends with \n already + let sdp = "v=0\n\ +o=- 0 0 IN IP4 127.0.0.1\n\ +s=-\n\ +t=0 0\n\ +m=video 9 UDP/TLS/RTP/SAVPF 98\n\ +a=rtpmap:98 VP9/90000\n\ +a=fmtp:98 profile-id=0\n"; + let out = PeerTransport::munge_x_google_start_bitrate(sdp, 3200); + + assert!(out.contains("a=fmtp:98 profile-id=0;x-google-start-bitrate=3200\n")); + assert!(!out.contains("\r\n"), "should preserve LF-only line endings"); + assert!(out.ends_with('\n'), "should end with a trailing LF"); + } + + #[test] + fn av1_with_fmtp_replaces_existing_start_bitrate_value() { + let sdp = "v=0\n\ +o=- 0 0 IN IP4 127.0.0.1\n\ +s=-\n\ +t=0 0\n\ +m=video 9 UDP/TLS/RTP/SAVPF 104\n\ +a=rtpmap:104 AV1/90000\n\ +a=fmtp:104 x-google-start-bitrate=1000;foo=bar\n"; + let out = PeerTransport::munge_x_google_start_bitrate(sdp, 2500); + assert!( + out.contains("a=fmtp:104 x-google-start-bitrate=2500;foo=bar\n"), + "should replace existing x-google-start-bitrate value and keep other params" + ); + assert!(!out.contains("x-google-start-bitrate=1000"), "old bitrate value should be gone"); + } + + #[test] + fn vp9_without_fmtp_line_is_noop() { + // VP9 rtpmap exists, but no fmtp: function intentionally does not insert a new fmtp line. + let sdp = "v=0\n\ +o=- 0 0 IN IP4 127.0.0.1\n\ +s=-\n\ +t=0 0\n\ +m=video 9 UDP/TLS/RTP/SAVPF 98\n\ +a=rtpmap:98 VP9/90000\n"; + let out = PeerTransport::munge_x_google_start_bitrate(sdp, 3200); + assert_eq!( + out, sdp, + "should not modify SDP if there is no fmtp line for the VP9/AV1 payload type" + ); + } + + #[test] + fn preserves_crlf_and_adds_trailing_crlf_if_missing() { + // CRLF SDP without trailing CRLF at the end (common edge) + let sdp = "v=0\r\n\ +o=- 0 0 IN IP4 127.0.0.1\r\n\ +s=-\r\n\ +t=0 0\r\n\ +m=video 9 UDP/TLS/RTP/SAVPF 98\r\n\ +a=rtpmap:98 VP9/90000\r\n\ +a=fmtp:98 profile-id=0"; // <- no final \r\n + let out = PeerTransport::munge_x_google_start_bitrate(sdp, 3200); + assert!(out.contains("a=fmtp:98 profile-id=0;x-google-start-bitrate=3200\r\n")); + assert!(out.contains("\r\n"), "should keep CRLF line endings"); + assert!(out.ends_with("\r\n"), "should ensure trailing CRLF"); + assert!(!out.contains("\n") || out.contains("\r\n"), "should not introduce lone LF"); + } + + #[test] + fn multiple_pts_vp9_and_av1_only_mutate_matching_fmtp_lines() { + let sdp = "v=0\n\ +o=- 0 0 IN IP4 127.0.0.1\n\ +s=-\n\ +t=0 0\n\ +m=video 9 UDP/TLS/RTP/SAVPF 96 98 104\n\ +a=rtpmap:96 VP8/90000\n\ +a=rtpmap:98 VP9/90000\n\ +a=rtpmap:104 AV1/90000\n\ +a=fmtp:96 foo=bar\n\ +a=fmtp:98 profile-id=0\n\ +a=fmtp:104 x-google-start-bitrate=1111;baz=qux\n"; + let out = PeerTransport::munge_x_google_start_bitrate(sdp, 2222); + // VP8 fmtp should be unchanged + assert!(out.contains("a=fmtp:96 foo=bar\n")); + // VP9 fmtp should get appended + assert!(out.contains("a=fmtp:98 profile-id=0;x-google-start-bitrate=2222\n")); + // AV1 fmtp should get replaced + assert!(out.contains("a=fmtp:104 x-google-start-bitrate=2222;baz=qux\n")); + assert!(!out.contains("a=fmtp:104 x-google-start-bitrate=1111")); + } + + #[test] + fn does_not_duplicate_start_bitrate_when_already_present_no_semicolon_following() { + // Existing x-google-start-bitrate at end of line (no trailing ';') + let sdp = "v=0\n\ +o=- 0 0 IN IP4 127.0.0.1\n\ +s=-\n\ +t=0 0\n\ +m=video 9 UDP/TLS/RTP/SAVPF 98\n\ +a=rtpmap:98 VP9/90000\n\ +a=fmtp:98 profile-id=0;x-google-start-bitrate=1000\n"; + let out = PeerTransport::munge_x_google_start_bitrate(sdp, 3000); + assert!(out.contains("a=fmtp:98 profile-id=0;x-google-start-bitrate=3000\n")); + assert!(!out.contains("x-google-start-bitrate=1000")); + // ensure only one occurrence + assert_eq!(out.matches("x-google-start-bitrate=").count(), 1); + } +} diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index bf2874888..f111fdbf4 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -1333,6 +1333,16 @@ impl SessionInner { options: TrackPublishOptions, encodings: Vec, ) -> EngineResult { + // If video track, derive "ultimate" bitrate from encodings and stash it for offer munging. + // Must be done before encodings is moved into RtpTransceiverInit. + if track.kind() == TrackKind::Video { + let ultimate_bps: Option = { + let sum: u64 = encodings.iter().filter_map(|e| e.max_bitrate).sum(); + (sum > 0).then_some(sum) + }; + self.publisher_pc.set_max_send_bitrate_bps(ultimate_bps).await; + } + let init = RtpTransceiverInit { direction: RtpTransceiverDirection::SendOnly, stream_ids: Default::default(),