Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 239 additions & 1 deletion livekit/src/rtc_engine/peer_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ struct TransportInner {
pending_candidates: Vec<IceCandidate>,
renegotiate: bool,
restarting_ice: bool,
// Publish-side target bitrate (bps) for offer munging
max_send_bitrate_bps: Option<u64>,
}

pub struct PeerTransport {
Expand All @@ -55,6 +57,7 @@ impl PeerTransport {
pending_candidates: Vec::default(),
renegotiate: false,
restarting_ice: false,
max_send_bitrate_bps: None,
})),
}
}
Expand Down Expand Up @@ -127,8 +130,91 @@ impl PeerTransport {
Ok(answer)
}

pub async fn set_max_send_bitrate_bps(&self, bps: Option<u64>) {
let mut inner = self.inner.lock().await;
inner.max_send_bitrate_bps = bps;
}

fn compute_start_bitrate_kbps(ultimate_bps: Option<u64>) -> Option<u32> {
let ultimate_kbps = (ultimate_bps? / 1000) as u32;
if ultimate_kbps == 0 {
return None;
}
// JS / Flutter uses ~70% of ultimate; 100% is also reasonable per feedback.
let start_kbps = (ultimate_kbps as f64 * 0.7).round() as u32;

// Clamp: avoid silly low/high values
Some(start_kbps.clamp(300, ultimate_kbps))
}

fn munge_x_google_start_bitrate(sdp: &str, start_bitrate_kbps: u32) -> String {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use a proper sdp parser/rewriter? instead of string changes? https://crates.io/crates/sdp

in case we'd need to do more munging for other things

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ladvoc @cloudwebrtc , what do you think here ?

If this is the only place that we will need to parse sdp, I would prefer keeping this handmade parser rather than introducing a new dep

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JS munges SDP for several purposes, including enabling stereo publishing, and a few other things

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think brining in the SDP crate you have linked to do munging is ok. The crate is lightweight (minimal transitive dependencies) and it comes from the webrtc-rs project.

// Detect what line ending the original SDP uses
let uses_crlf = sdp.contains("\r\n");
let eol = if uses_crlf { "\r\n" } else { "\n" };

// Split preserving the intended line ending style
let lines: Vec<&str> =
if uses_crlf { sdp.split("\r\n").collect() } else { sdp.split('\n').collect() };

// 1) Find VP9/AV1 payload types
let mut target_pts: Vec<&str> = Vec::new();
for line in &lines {
let l = line.trim();
if let Some(rest) = l.strip_prefix("a=rtpmap:") {
let mut it = rest.split_whitespace();
let pt = it.next().unwrap_or("");
let codec = it.next().unwrap_or("");
if (codec.starts_with("VP9/90000") || codec.starts_with("AV1/90000"))
&& !pt.is_empty()
{
target_pts.push(pt);
}
}
}
if target_pts.is_empty() {
return sdp.to_string();
}

// 2) Rewrite fmtp lines (minimal mutation)
let mut out: Vec<String> = Vec::with_capacity(lines.len());
for line in lines {
let mut rewritten = line.to_string();

for pt in &target_pts {
let prefix = format!("a=fmtp:{pt} ");
if rewritten.starts_with(&prefix) {
// Replace if present; append if not present
if let Some(pos) = rewritten.find("x-google-start-bitrate=") {
// replace existing value up to next ';' or end
let after = &rewritten[pos..];
let end =
after.find(';').map(|i| pos + i).unwrap_or_else(|| rewritten.len());
rewritten.replace_range(
pos..end,
&format!("x-google-start-bitrate={start_bitrate_kbps}"),
);
} else {
rewritten
.push_str(&format!(";x-google-start-bitrate={start_bitrate_kbps}"));
}
break;
}
}

out.push(rewritten);
}

// Re-join using same EOL, and ensure trailing EOL (some parsers are picky)
let mut munged = out.join(eol);
if !munged.ends_with(eol) {
munged.push_str(eol);
}
munged
}

pub async fn create_and_send_offer(&self, options: OfferOptions) -> EngineResult<()> {
let mut inner = self.inner.lock().await;
log::info!("Applying x-google-start-bitrate");

if options.ice_restart {
inner.restarting_ice = true;
Expand All @@ -151,7 +237,34 @@ impl PeerTransport {
return Ok(());
}

let offer = self.peer_connection.create_offer(options).await?;
let mut offer = self.peer_connection.create_offer(options).await?;
let sdp = offer.to_string();
let is_vp9 = sdp.contains(" VP9/90000");
let is_av1 = sdp.contains(" AV1/90000");
log::info!("SDP codecs present: VP9={}, AV1={}", is_vp9, is_av1);
if is_vp9 || is_av1 {
if let Some(start_kbps) = Self::compute_start_bitrate_kbps(inner.max_send_bitrate_bps) {
log::info!(
"Applying x-google-start-bitrate={} kbps (ultimate_bps={:?})",
start_kbps,
inner.max_send_bitrate_bps
);

let munged = Self::munge_x_google_start_bitrate(&sdp, start_kbps);
if munged != sdp {
log::info!("SDP munged successfully (VP9/AV1)");
match SessionDescription::parse(&munged, offer.sdp_type()) {
Ok(parsed) => offer = parsed,
Err(e) => log::warn!(
"Failed to parse munged SDP, falling back to original offer: {e}"
),
}
} else {
log::debug!("SDP munging produced no changes");
}
}
}

self.peer_connection.set_local_description(offer.clone()).await?;

if let Some(handler) = self.on_offer_handler.lock().as_mut() {
Expand All @@ -161,3 +274,128 @@ impl PeerTransport {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::PeerTransport;

#[test]
fn no_vp9_or_av1_is_noop() {
let sdp = "v=0\n\
o=- 0 0 IN IP4 127.0.0.1\n\
s=-\n\
t=0 0\n\
m=video 9 UDP/TLS/RTP/SAVPF 96\n\
a=rtpmap:96 VP8/90000\n\
a=fmtp:96 some=param\n";
let out = PeerTransport::munge_x_google_start_bitrate(sdp, 3200);
assert_eq!(out, sdp, "should not change SDP if no VP9/AV1 present");
}

#[test]
fn vp9_with_fmtp_appends_start_bitrate_and_preserves_lf_and_trailing_eol() {
// LF-only SDP, ends with \n already
let sdp = "v=0\n\
o=- 0 0 IN IP4 127.0.0.1\n\
s=-\n\
t=0 0\n\
m=video 9 UDP/TLS/RTP/SAVPF 98\n\
a=rtpmap:98 VP9/90000\n\
a=fmtp:98 profile-id=0\n";
let out = PeerTransport::munge_x_google_start_bitrate(sdp, 3200);

assert!(out.contains("a=fmtp:98 profile-id=0;x-google-start-bitrate=3200\n"));
assert!(!out.contains("\r\n"), "should preserve LF-only line endings");
assert!(out.ends_with('\n'), "should end with a trailing LF");
}

#[test]
fn av1_with_fmtp_replaces_existing_start_bitrate_value() {
let sdp = "v=0\n\
o=- 0 0 IN IP4 127.0.0.1\n\
s=-\n\
t=0 0\n\
m=video 9 UDP/TLS/RTP/SAVPF 104\n\
a=rtpmap:104 AV1/90000\n\
a=fmtp:104 x-google-start-bitrate=1000;foo=bar\n";
let out = PeerTransport::munge_x_google_start_bitrate(sdp, 2500);
assert!(
out.contains("a=fmtp:104 x-google-start-bitrate=2500;foo=bar\n"),
"should replace existing x-google-start-bitrate value and keep other params"
);
assert!(!out.contains("x-google-start-bitrate=1000"), "old bitrate value should be gone");
}

#[test]
fn vp9_without_fmtp_line_is_noop() {
// VP9 rtpmap exists, but no fmtp: function intentionally does not insert a new fmtp line.
let sdp = "v=0\n\
o=- 0 0 IN IP4 127.0.0.1\n\
s=-\n\
t=0 0\n\
m=video 9 UDP/TLS/RTP/SAVPF 98\n\
a=rtpmap:98 VP9/90000\n";
let out = PeerTransport::munge_x_google_start_bitrate(sdp, 3200);
assert_eq!(
out, sdp,
"should not modify SDP if there is no fmtp line for the VP9/AV1 payload type"
);
}

#[test]
fn preserves_crlf_and_adds_trailing_crlf_if_missing() {
// CRLF SDP without trailing CRLF at the end (common edge)
let sdp = "v=0\r\n\
o=- 0 0 IN IP4 127.0.0.1\r\n\
s=-\r\n\
t=0 0\r\n\
m=video 9 UDP/TLS/RTP/SAVPF 98\r\n\
a=rtpmap:98 VP9/90000\r\n\
a=fmtp:98 profile-id=0"; // <- no final \r\n
let out = PeerTransport::munge_x_google_start_bitrate(sdp, 3200);
assert!(out.contains("a=fmtp:98 profile-id=0;x-google-start-bitrate=3200\r\n"));
assert!(out.contains("\r\n"), "should keep CRLF line endings");
assert!(out.ends_with("\r\n"), "should ensure trailing CRLF");
assert!(!out.contains("\n") || out.contains("\r\n"), "should not introduce lone LF");
}

#[test]
fn multiple_pts_vp9_and_av1_only_mutate_matching_fmtp_lines() {
let sdp = "v=0\n\
o=- 0 0 IN IP4 127.0.0.1\n\
s=-\n\
t=0 0\n\
m=video 9 UDP/TLS/RTP/SAVPF 96 98 104\n\
a=rtpmap:96 VP8/90000\n\
a=rtpmap:98 VP9/90000\n\
a=rtpmap:104 AV1/90000\n\
a=fmtp:96 foo=bar\n\
a=fmtp:98 profile-id=0\n\
a=fmtp:104 x-google-start-bitrate=1111;baz=qux\n";
let out = PeerTransport::munge_x_google_start_bitrate(sdp, 2222);
// VP8 fmtp should be unchanged
assert!(out.contains("a=fmtp:96 foo=bar\n"));
// VP9 fmtp should get appended
assert!(out.contains("a=fmtp:98 profile-id=0;x-google-start-bitrate=2222\n"));
// AV1 fmtp should get replaced
assert!(out.contains("a=fmtp:104 x-google-start-bitrate=2222;baz=qux\n"));
assert!(!out.contains("a=fmtp:104 x-google-start-bitrate=1111"));
}

#[test]
fn does_not_duplicate_start_bitrate_when_already_present_no_semicolon_following() {
// Existing x-google-start-bitrate at end of line (no trailing ';')
let sdp = "v=0\n\
o=- 0 0 IN IP4 127.0.0.1\n\
s=-\n\
t=0 0\n\
m=video 9 UDP/TLS/RTP/SAVPF 98\n\
a=rtpmap:98 VP9/90000\n\
a=fmtp:98 profile-id=0;x-google-start-bitrate=1000\n";
let out = PeerTransport::munge_x_google_start_bitrate(sdp, 3000);
assert!(out.contains("a=fmtp:98 profile-id=0;x-google-start-bitrate=3000\n"));
assert!(!out.contains("x-google-start-bitrate=1000"));
// ensure only one occurrence
assert_eq!(out.matches("x-google-start-bitrate=").count(), 1);
}
}
10 changes: 10 additions & 0 deletions livekit/src/rtc_engine/rtc_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,16 @@ impl SessionInner {
options: TrackPublishOptions,
encodings: Vec<RtpEncodingParameters>,
) -> EngineResult<RtpTransceiver> {
// If video track, derive "ultimate" bitrate from encodings and stash it for offer munging.
// Must be done before encodings is moved into RtpTransceiverInit.
if track.kind() == TrackKind::Video {
let ultimate_bps: Option<u64> = {
let sum: u64 = encodings.iter().filter_map(|e| e.max_bitrate).sum();
(sum > 0).then_some(sum)
};
self.publisher_pc.set_max_send_bitrate_bps(ultimate_bps).await;
}

let init = RtpTransceiverInit {
direction: RtpTransceiverDirection::SendOnly,
stream_ids: Default::default(),
Expand Down