Skip to content

Commit a9243f8

Browse files
committed
Optimize reassembly streams
Replaces the BTreeMap-based fragment storage with a new, specialized "IntervalList" backed by a sorted Vec. The previous implementation treated every received fragment as an individual node in a B-Tree. This approach had several drawbacks for high-throughput networking: - With many fragments, there would be many short-lived allocations, and while the allocator is fast, it's still costly. - Tree traversal involved pointer chasing across non-contiguous memory, causing CPU cache misses. - Strictly in-order arrival (the 99% case) still required a tree search and rebalancing operations (logarithmic time complexity). The new IntervalList stores *ranges* of contiguous data (Intervals) rather than individual points. When a fragment arrives, it is checked against existing intervals and merged if contiguous. For in-order delivery, the new fragment simply extends the last interval in the vector. This is an O(1) operation with zero allocation. Metadata is stored in a contiguous Vec, ensuring that checking for gaps or overlaps utilizes CPU caches effectively. A message split into 100 sequential fragments now consumes a single Interval struct (approx. 24 bytes) rather than a large number of tree nodes. This structure unifies the storage logic for both SCTP standards by abstracting the sort key (how the gaps are sorted in the vector): - For traditional (RFC9260) streams, the (SSN, TSN) is the key. Unordered messages have SSN=0, so for them, the TSN is the sorting key. For ordered messages, every fragment is first grouped by SSN, and then sorted by TSN. As a message is always sent in full before sending another message, all fragments are contiguous in TSN number space. - For interleaved (RFC8260) streams, the (MID, FSN) is used as the key. Note that while inserting into the middle of a vector is theoretically costly, in this specific domain, the length of the vector represents the number of *gaps* (packet loss events), not the total number of packets. In real-world traffic, a stream rarely experiences more than a handful of disjoint intervals at any given time. Therefore, the cost of shifting a few vector elements is negligible compared to the overhead of maintaining a tree structure.
1 parent 73273e2 commit a9243f8

File tree

7 files changed

+543
-222
lines changed

7 files changed

+543
-222
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ and this project adheres to
1313
- Relaxed sequence number handling for stream reset requests.
1414
- Corrected outgoing requests to be compliant with RFC6525.
1515

16+
### Changed
17+
18+
- Optimized performance of message reassembly.
19+
1620
## 0.1.9 - 2025-02-16
1721

1822
### Fixed

src/rx/interleaved_reassembly_streams.rs

Lines changed: 76 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -13,79 +13,93 @@
1313
// limitations under the License.
1414

1515
use crate::api::Message;
16-
use crate::api::PpId;
1716
use crate::api::StreamId;
1817
use crate::api::handover::HandoverOrderedStream;
1918
use crate::api::handover::HandoverReadiness;
2019
use crate::api::handover::HandoverUnorderedStream;
2120
use crate::api::handover::SocketHandoverState;
2221
use crate::packet::SkippedStream;
2322
use crate::packet::data::Data;
23+
use crate::rx::IntervalList;
24+
use crate::rx::ReassemblyKey;
2425
use crate::rx::reassembly_streams::ReassemblyStreams;
2526
use crate::types::Fsn;
2627
use crate::types::Mid;
2728
use crate::types::StreamKey;
2829
use crate::types::Tsn;
29-
use std::collections::BTreeMap;
3030
use std::collections::HashMap;
3131

32+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
33+
pub struct InterleavedKey {
34+
pub mid: Mid, // Primary sort key
35+
pub fsn: Fsn, // Secondary sort key
36+
}
37+
38+
impl ReassemblyKey for InterleavedKey {
39+
fn next(&self) -> Self {
40+
InterleavedKey { mid: self.mid, fsn: self.fsn + 1 }
41+
}
42+
}
43+
3244
pub struct OrderedStream {
33-
chunks_by_mid: BTreeMap<Mid, BTreeMap<Fsn, Data>>,
45+
stream_id: StreamId,
46+
intervals: IntervalList<InterleavedKey>,
3447
next_mid: Mid,
3548
}
3649

3750
impl OrderedStream {
38-
fn new(next_mid: Mid) -> Self {
39-
Self { chunks_by_mid: BTreeMap::new(), next_mid }
51+
fn new(stream_id: StreamId, next_mid: Mid) -> Self {
52+
Self { stream_id, intervals: IntervalList::default(), next_mid }
4053
}
4154

4255
fn try_assemble_next(&mut self, on_reassembled: &mut dyn FnMut(Message)) -> usize {
4356
let mut assembled_bytes = 0;
44-
while let Some(chunks) = self.chunks_by_mid.get(&self.next_mid) {
45-
if !is_complete_message(chunks) {
46-
break;
47-
}
4857

49-
let chunks = self.chunks_by_mid.remove(&self.next_mid).unwrap();
50-
let (stream_id, ppid, payload) = extract_payload(chunks);
58+
while let Some(interval) =
59+
self.intervals.pop_front_if_complete_and(|i| i.start.mid == self.next_mid)
60+
{
61+
let stream_id = self.stream_id;
62+
let ppid = interval.ppid;
63+
let payload = interval.collect_payload();
64+
5165
assembled_bytes += payload.len();
5266
on_reassembled(Message::new(stream_id, ppid, payload));
5367
self.next_mid += 1;
5468
}
69+
5570
assembled_bytes
5671
}
5772

5873
fn add(&mut self, data: Data, on_reassembled: &mut dyn FnMut(Message)) -> isize {
59-
let mut queued_bytes = 0;
60-
let mid = data.mid;
74+
if data.mid < self.next_mid {
75+
// Already delivered or skipped.
76+
return 0;
77+
}
6178

62-
if mid == self.next_mid && data.is_beginning && data.is_end {
63-
// Fast path - reassemble directly.
64-
on_reassembled(Message::new(data.stream_key.id(), data.ppid, data.payload));
79+
if data.mid == self.next_mid && data.is_beginning && data.is_end {
80+
// Fastpath for already assembled chunks.
81+
on_reassembled(Message::new(self.stream_id, data.ppid, data.payload));
6582
self.next_mid += 1;
66-
67-
// Check if this unblocked subsequent messages
68-
queued_bytes -= self.try_assemble_next(on_reassembled) as isize;
69-
return queued_bytes;
83+
let assembled = self.try_assemble_next(on_reassembled);
84+
return -(assembled as isize);
7085
}
7186

72-
queued_bytes += data.payload.len() as isize;
73-
self.chunks_by_mid.entry(mid).or_default().insert(data.fsn, data);
74-
75-
if mid == self.next_mid {
76-
queued_bytes -= self.try_assemble_next(on_reassembled) as isize;
77-
}
78-
queued_bytes
87+
let key = InterleavedKey { mid: data.mid, fsn: data.fsn };
88+
let queued_bytes = data.payload.len() as isize;
89+
self.intervals.add(key, data);
90+
let assembled = self.try_assemble_next(on_reassembled);
91+
queued_bytes - (assembled as isize)
7992
}
8093
}
8194

8295
pub struct UnorderedStream {
83-
chunks_by_mid: BTreeMap<Mid, BTreeMap<Fsn, Data>>,
96+
stream_id: StreamId,
97+
intervals: IntervalList<InterleavedKey>,
8498
}
8599

86100
impl UnorderedStream {
87-
fn new() -> Self {
88-
Self { chunks_by_mid: BTreeMap::new() }
101+
fn new(stream_id: StreamId) -> Self {
102+
Self { stream_id, intervals: IntervalList::default() }
89103
}
90104

91105
fn add(&mut self, data: Data, on_reassembled: &mut dyn FnMut(Message)) -> isize {
@@ -95,50 +109,24 @@ impl UnorderedStream {
95109
return 0;
96110
}
97111

98-
let mid = data.mid;
99-
let mut queued_bytes = data.payload.len() as isize;
100-
let chunks = self.chunks_by_mid.entry(mid).or_default();
101-
chunks.insert(data.fsn, data);
112+
let key = InterleavedKey { mid: data.mid, fsn: data.fsn };
113+
let queued_bytes = data.payload.len() as isize;
114+
let idx = self.intervals.add(key, data);
115+
116+
if let Some(interval) = self.intervals.pop_if_complete(idx) {
117+
let stream_id = self.stream_id;
118+
let ppid = interval.ppid;
119+
let payload = interval.collect_payload();
120+
let total_payload_len = payload.len();
102121

103-
if is_complete_message(chunks) {
104-
let chunks = self.chunks_by_mid.remove(&mid).unwrap();
105-
let (stream_id, ppid, payload) = extract_payload(chunks);
106-
queued_bytes -= payload.len() as isize;
107122
on_reassembled(Message::new(stream_id, ppid, payload));
123+
queued_bytes - (total_payload_len as isize)
124+
} else {
125+
queued_bytes
108126
}
109-
110-
queued_bytes
111-
}
112-
}
113-
114-
fn is_complete_message(chunks: &BTreeMap<Fsn, Data>) -> bool {
115-
if let (Some((first_fsn, first_data)), Some((last_fsn, last_data))) =
116-
(chunks.first_key_value(), chunks.last_key_value())
117-
{
118-
first_data.is_beginning
119-
&& last_data.is_end
120-
&& first_fsn.distance_to(*last_fsn) == (chunks.len() as u32 - 1)
121-
} else {
122-
false
123127
}
124128
}
125129

126-
fn extract_payload(chunks: BTreeMap<Fsn, Data>) -> (StreamId, PpId, Vec<u8>) {
127-
let first_data = chunks.values().next().expect("Chunks should not be empty");
128-
let stream_id = first_data.stream_key.id();
129-
let ppid = first_data.ppid;
130-
131-
// Calculate total size to pre-allocate
132-
let total_len: usize = chunks.values().map(|d| d.payload.len()).sum();
133-
let mut payload = Vec::with_capacity(total_len);
134-
135-
for (_, mut data) in chunks {
136-
payload.append(&mut data.payload);
137-
}
138-
139-
(stream_id, ppid, payload)
140-
}
141-
142130
pub struct InterleavedReassemblyStreams {
143131
ordered: HashMap<StreamId, OrderedStream>,
144132
unordered: HashMap<StreamId, UnorderedStream>,
@@ -156,12 +144,12 @@ impl ReassemblyStreams for InterleavedReassemblyStreams {
156144
StreamKey::Ordered(stream_id) => self
157145
.ordered
158146
.entry(stream_id)
159-
.or_insert_with(|| OrderedStream::new(Mid(0)))
147+
.or_insert_with(|| OrderedStream::new(stream_id, Mid(0)))
160148
.add(data, on_reassembled),
161149
StreamKey::Unordered(stream_id) => self
162150
.unordered
163151
.entry(stream_id)
164-
.or_insert_with(UnorderedStream::new)
152+
.or_insert_with(|| UnorderedStream::new(stream_id))
165153
.add(data, on_reassembled),
166154
}
167155
}
@@ -180,18 +168,10 @@ impl ReassemblyStreams for InterleavedReassemblyStreams {
180168
let stream = self
181169
.ordered
182170
.entry(*stream_id)
183-
.or_insert_with(|| OrderedStream::new(Mid(0)));
184-
185-
stream.chunks_by_mid.retain(|cur_mid, chunks| {
186-
if cur_mid <= mid {
187-
released_bytes += chunks
188-
.iter()
189-
.fold(0, |acc, (_, data)| acc + data.payload.len());
190-
false
191-
} else {
192-
true
193-
}
194-
});
171+
.or_insert_with(|| OrderedStream::new(*stream_id, Mid(0)));
172+
173+
released_bytes +=
174+
stream.intervals.retain(|interval| interval.start.mid > *mid);
195175

196176
if stream.next_mid <= *mid {
197177
stream.next_mid = *mid + 1;
@@ -201,19 +181,13 @@ impl ReassemblyStreams for InterleavedReassemblyStreams {
201181
released_bytes += stream.try_assemble_next(on_reassembled);
202182
}
203183
StreamKey::Unordered(stream_id) => {
204-
let stream =
205-
self.unordered.entry(*stream_id).or_insert_with(UnorderedStream::new);
206-
207-
stream.chunks_by_mid.retain(|cur_mid, chunks| {
208-
if cur_mid <= mid {
209-
released_bytes += chunks
210-
.iter()
211-
.fold(0, |acc, (_, data)| acc + data.payload.len());
212-
false
213-
} else {
214-
true
215-
}
216-
});
184+
let stream = self
185+
.unordered
186+
.entry(*stream_id)
187+
.or_insert_with(|| UnorderedStream::new(*stream_id));
188+
189+
released_bytes +=
190+
stream.intervals.retain(|interval| interval.start.mid > *mid);
217191
}
218192
}
219193
}
@@ -237,8 +211,8 @@ impl ReassemblyStreams for InterleavedReassemblyStreams {
237211
}
238212

239213
fn get_handover_readiness(&self) -> HandoverReadiness {
240-
let has_ordered_chunks = self.ordered.values().any(|s| !s.chunks_by_mid.is_empty());
241-
let has_unordered_chunks = self.unordered.values().any(|s| !s.chunks_by_mid.is_empty());
214+
let has_ordered_chunks = self.ordered.values().any(|s| !s.intervals.is_empty());
215+
let has_unordered_chunks = self.unordered.values().any(|s| !s.intervals.is_empty());
242216

243217
HandoverReadiness::STREAM_HAS_UNASSEMBLED_CHUNKS
244218
& (has_ordered_chunks | has_unordered_chunks)
@@ -261,10 +235,13 @@ impl ReassemblyStreams for InterleavedReassemblyStreams {
261235

262236
fn restore_from_state(&mut self, state: &SocketHandoverState) {
263237
for stream in &state.rx.ordered_streams {
264-
self.ordered.insert(StreamId(stream.id), OrderedStream::new(Mid(stream.next_ssn)));
238+
self.ordered.insert(
239+
StreamId(stream.id),
240+
OrderedStream::new(StreamId(stream.id), Mid(stream.next_ssn)),
241+
);
265242
}
266243
for stream in &state.rx.unordered_streams {
267-
self.unordered.insert(StreamId(stream.id), UnorderedStream::new());
244+
self.unordered.insert(StreamId(stream.id), UnorderedStream::new(StreamId(stream.id)));
268245
}
269246
}
270247
}

0 commit comments

Comments
 (0)