Skip to content

Commit ac1952f

Browse files
authored
Data channel reliability (#688)
* Create TTL map * Refactor data channel event * Avoid cloning keys * Create TxQueue * Implement reliable retry * Derive debug * Avoid unsafe unwrap * Obtain packet kind from RTC event * Refactor incoming data packet handling * Set participant info on outgoing data packets * Create helpers for E2E testing * Create E2E test for data channel reliability * Put E2E tests behind an internal feature * Enable E2E tests in CI
1 parent 68ea142 commit ac1952f

File tree

11 files changed

+808
-195
lines changed

11 files changed

+808
-195
lines changed

.github/workflows/tests.yml

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@ jobs:
3535
target: x86_64-pc-windows-msvc
3636
- os: macos-latest
3737
target: x86_64-apple-darwin
38+
e2e-testing: true
3839
- os: ubuntu-latest
3940
target: x86_64-unknown-linux-gnu
41+
e2e-testing: true
4042

4143
name: Test (${{ matrix.target }})
4244
runs-on: ${{ matrix.os }}
@@ -52,11 +54,29 @@ jobs:
5254
sudo apt update -y
5355
sudo apt install -y libssl-dev libx11-dev libgl1-mesa-dev libxext-dev libva-dev libdrm-dev libnvidia-decode-570 libnvidia-compute-570 nvidia-cuda-dev
5456
57+
- name: Install LiveKit server
58+
if: ${{ matrix.e2e-testing }}
59+
run: |
60+
if [[ "${{ matrix.os }}" == "ubuntu-latest" ]]; then
61+
curl -sSL https://get.livekit.io | bash
62+
elif [[ "${{ matrix.os }}" == "macos-latest" ]]; then
63+
brew install livekit
64+
fi
65+
5566
- uses: actions/checkout@v3
5667
with:
5768
submodules: true
5869

59-
- name: Test
70+
- name: Run LiveKit server
71+
if: ${{ matrix.e2e-testing }}
72+
run: livekit-server --dev &
73+
74+
- name: Test (no E2E)
75+
if: ${{ !matrix.e2e-testing }}
6076
run: cargo +nightly test --release --verbose --target ${{ matrix.target }} -- --nocapture
6177

78+
- name: Test (with E2E)
79+
if: ${{ matrix.e2e-testing }}
80+
run: cargo +nightly test --release --verbose --target ${{ matrix.target }} --features __lk-e2e-test -- --nocapture
81+
6282

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

livekit/Cargo.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ native-tls-vendored = ["livekit-api/native-tls-vendored"]
2222
rustls-tls-native-roots = ["livekit-api/rustls-tls-native-roots"]
2323
rustls-tls-webpki-roots = ["livekit-api/rustls-tls-webpki-roots"]
2424
__rustls-tls = ["livekit-api/__rustls-tls"]
25-
26-
# internal features (used by livekit-ffi)
27-
__lk-internal = []
25+
__lk-internal = [] # internal features (used by livekit-ffi)
26+
__lk-e2e-test = [] # end-to-end testing with a LiveKit server
2827

2928
[dependencies]
3029
livekit-runtime = { workspace = true, default-features = false }
@@ -45,3 +44,6 @@ semver = "1.0"
4544
libloading = { version = "0.8.6" }
4645
bytes = "1.10.1"
4746
bmrng = "0.5.2"
47+
48+
[dev-dependencies]
49+
anyhow = "1.0.99"

livekit/src/room/mod.rs

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -354,14 +354,6 @@ pub struct RoomOptions {
354354
pub rtc_config: RtcConfiguration,
355355
pub join_retries: u32,
356356
pub sdk_options: RoomSdkOptions,
357-
pub preregistration: Option<PreRegistration>,
358-
}
359-
360-
#[derive(Debug, Clone)]
361-
#[non_exhaustive]
362-
pub struct PreRegistration {
363-
text_stream_topics: Vec<String>,
364-
byte_stream_topics: Vec<String>,
365357
}
366358

367359
impl Default for RoomOptions {
@@ -381,7 +373,6 @@ impl Default for RoomOptions {
381373
},
382374
join_retries: 3,
383375
sdk_options: RoomSdkOptions::default(),
384-
preregistration: None,
385376
}
386377
}
387378
}
@@ -577,8 +568,6 @@ impl Room {
577568
let (incoming_stream_manager, open_rx) = IncomingStreamManager::new();
578569
let (outgoing_stream_manager, packet_rx) = OutgoingStreamManager::new();
579570

580-
let identity = local_participant.identity().clone();
581-
582571
let room_info = join_response.room.unwrap();
583572
let inner = Arc::new(RoomSession {
584573
sid_promise: Promise::new(),
@@ -671,7 +660,6 @@ impl Room {
671660
));
672661
let outgoing_stream_handle = livekit_runtime::spawn(outgoing_data_stream_task(
673662
packet_rx,
674-
identity,
675663
rtc_engine.clone(),
676664
close_rx.resubscribe(),
677665
));
@@ -1187,8 +1175,7 @@ impl RoomSession {
11871175
}),
11881176
publish_tracks: self.local_participant.published_tracks_info(),
11891177
data_channels: dcs,
1190-
// unimplemented, stubbed for now
1191-
datachannel_receive_states: Vec::new(),
1178+
datachannel_receive_states: session.data_channel_receive_states(),
11921179
};
11931180

11941181
log::debug!("sending sync state {:?}", sync_state);
@@ -1737,15 +1724,12 @@ async fn incoming_data_stream_task(
17371724
/// Receives packets from the outgoing stream manager and send them.
17381725
async fn outgoing_data_stream_task(
17391726
mut packet_rx: UnboundedRequestReceiver<proto::DataPacket, Result<(), EngineError>>,
1740-
participant_identity: ParticipantIdentity,
17411727
engine: Arc<RtcEngine>,
17421728
mut close_rx: broadcast::Receiver<()>,
17431729
) {
17441730
loop {
17451731
tokio::select! {
1746-
Ok((mut packet, responder)) = packet_rx.recv() => {
1747-
// Set packet's participant identity field
1748-
packet.participant_identity = participant_identity.0.clone();
1732+
Ok((packet, responder)) = packet_rx.recv() => {
17491733
let result = engine.publish_data(packet, DataPacketKind::Reliable).await;
17501734
let _ = responder.respond(result);
17511735
},

livekit/src/room/utils/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::collections::HashMap;
22

33
pub mod take_cell;
4+
pub(crate) mod ttl_map;
5+
pub(crate) mod tx_queue;
46
pub mod utf8_chunk;
57

68
pub fn calculate_changed_attributes(

livekit/src/room/utils/ttl_map.rs

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Copyright 2025 LiveKit, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::{
16+
collections::HashMap,
17+
fmt::Debug,
18+
hash::Hash,
19+
time::{Duration, SystemTime},
20+
};
21+
22+
/// Time to live (TTL) map
23+
///
24+
/// Elements older than the TTL duration are automatically removed.
25+
///
26+
#[derive(Debug)]
27+
pub struct TtlMap<K, V> {
28+
inner: HashMap<K, Entry<V>>,
29+
last_cleanup: SystemTime,
30+
ttl: Duration,
31+
}
32+
33+
#[derive(Debug)]
34+
struct Entry<V> {
35+
value: V,
36+
expires_at: SystemTime,
37+
}
38+
39+
impl<K, V> TtlMap<K, V> {
40+
/// Creates an empty `TtlMap`.
41+
pub fn new(ttl: Duration) -> Self {
42+
Self { inner: HashMap::new(), last_cleanup: SystemTime::now(), ttl }
43+
}
44+
45+
/// Returns the number of elements in the map.
46+
pub fn len(&mut self) -> usize {
47+
self.cleanup();
48+
self.inner.len()
49+
}
50+
51+
/// An iterator visiting all key-value pairs in arbitrary order.
52+
/// The iterator element type is `(&'a K, &'a V)`.
53+
pub fn iter(&mut self) -> impl Iterator<Item = (&K, &V)> {
54+
self.cleanup();
55+
self.inner.iter().map(|(key, entry)| (key, &entry.value))
56+
}
57+
58+
/// Removes expired elements.
59+
fn cleanup(&mut self) {
60+
let now = SystemTime::now();
61+
self.inner.retain(|_, entry| entry.expires_at >= now);
62+
self.last_cleanup = now;
63+
}
64+
}
65+
66+
impl<K, V> TtlMap<K, V>
67+
where
68+
K: Eq + Hash + Clone,
69+
{
70+
/// Returns a reference to the value corresponding to the key.
71+
pub fn get(&mut self, k: &K) -> Option<&V> {
72+
let expires_at = self.inner.get(k).map(|entry| entry.expires_at)?;
73+
let now = SystemTime::now();
74+
if expires_at < now {
75+
_ = self.inner.remove(k);
76+
return None;
77+
}
78+
Some(&self.inner.get(k).unwrap().value)
79+
}
80+
81+
/// Sets the value for the given key.
82+
pub fn set(&mut self, k: &K, v: Option<V>) {
83+
let now = SystemTime::now();
84+
let Ok(elapsed) = now.duration_since(self.last_cleanup) else {
85+
log::error!("System clock anomaly detected");
86+
return;
87+
};
88+
let half_ttl = self.ttl.div_f64(2.0);
89+
if elapsed > half_ttl {
90+
self.cleanup();
91+
}
92+
93+
let Some(value) = v else {
94+
_ = self.inner.remove(&k);
95+
return;
96+
};
97+
let expires_at = now + self.ttl;
98+
let entry = Entry { value, expires_at };
99+
self.inner.insert(k.clone(), entry);
100+
}
101+
}
102+
103+
#[cfg(test)]
104+
mod tests {
105+
use super::*;
106+
use std::collections::HashSet;
107+
use tokio::time::sleep;
108+
109+
const SHORT_TTL: Duration = Duration::from_millis(100);
110+
111+
#[tokio::test]
112+
async fn test_expiration() {
113+
let mut map = TtlMap::<char, u8>::new(SHORT_TTL);
114+
map.set(&'a', Some(1));
115+
map.set(&'b', Some(2));
116+
map.set(&'c', Some(3));
117+
118+
assert_eq!(map.len(), 3);
119+
assert!(map.get(&'a').is_some());
120+
assert!(map.get(&'b').is_some());
121+
assert!(map.get(&'c').is_some());
122+
123+
sleep(SHORT_TTL).await;
124+
125+
assert_eq!(map.len(), 0);
126+
assert!(map.get(&'a').is_none());
127+
assert!(map.get(&'b').is_none());
128+
assert!(map.get(&'c').is_none());
129+
}
130+
131+
#[test]
132+
fn test_iter() {
133+
let mut map = TtlMap::<char, u8>::new(SHORT_TTL);
134+
map.set(&'a', Some(1));
135+
map.set(&'b', Some(2));
136+
map.set(&'c', Some(3));
137+
138+
let elements: HashSet<_> = map.iter().map(|(k, v)| (*k, *v)).collect();
139+
assert!(elements.contains(&('a', 1)));
140+
assert!(elements.contains(&('b', 2)));
141+
assert!(elements.contains(&('c', 3)));
142+
}
143+
}

0 commit comments

Comments
 (0)