Skip to content

Commit b23a2f5

Browse files
boivieeshrubs
authored andcommitted
Store active streams as a vector
The number of active streams is expected to be very low. A standard Rust HashMap uses a cryptographically secure hashing algorithm to prevent HashDoS attacks, and the overhead of hashing will exceed the cost of simply just having them in a vector and doing a linear scan, which is very cache-friendly.
1 parent 555d4ff commit b23a2f5

File tree

1 file changed

+21
-12
lines changed

1 file changed

+21
-12
lines changed

src/tx/stream_scheduler.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
use crate::api::StreamId;
1616
use std::cmp::Ordering;
17-
use std::collections::HashMap;
1817

1918
#[derive(Debug, PartialEq)]
2019
enum SchedulingParameters {
@@ -96,7 +95,7 @@ pub struct StreamScheduler {
9695
max_payload_bytes: usize,
9796
current_stream: Option<StreamId>,
9897
system_virtual_time: u64,
99-
active_streams: HashMap<StreamId, ActiveStreamInfo>,
98+
active_streams: Vec<ActiveStreamInfo>,
10099
}
101100

102101
/// Keeps track of all active streams and decides which stream that the next data chunk can be sent
@@ -107,7 +106,7 @@ impl StreamScheduler {
107106
max_payload_bytes,
108107
current_stream: None,
109108
system_virtual_time: 0,
110-
active_streams: HashMap::new(),
109+
active_streams: Vec::new(),
111110
}
112111
}
113112

@@ -125,10 +124,15 @@ impl StreamScheduler {
125124
return;
126125
}
127126

128-
let active_stream = self
129-
.active_streams
130-
.entry(stream_id)
131-
.or_insert_with(|| ActiveStreamInfo::new(stream_id, self.system_virtual_time));
127+
let pos = self.active_streams.iter().position(|s| s.stream_id == stream_id);
128+
let active_stream = match pos {
129+
Some(idx) => &mut self.active_streams[idx],
130+
None => {
131+
self.active_streams
132+
.push(ActiveStreamInfo::new(stream_id, self.system_virtual_time));
133+
self.active_streams.last_mut().unwrap()
134+
}
135+
};
132136
active_stream.parameters = priority.map_or(SchedulingParameters::RoundRobin, |weight| {
133137
SchedulingParameters::WeightedFairQueuing { weight }
134138
});
@@ -145,8 +149,8 @@ impl StreamScheduler {
145149
pub fn peek(&self, max_size: usize) -> Option<(StreamId, usize)> {
146150
let active_stream = self
147151
.current_stream
148-
.and_then(|stream_id| self.active_streams.get(&stream_id))
149-
.or_else(|| self.active_streams.values().min())?;
152+
.and_then(|stream_id| self.active_streams.iter().find(|s| s.stream_id == stream_id))
153+
.or_else(|| self.active_streams.iter().min())?;
150154

151155
Some((active_stream.stream_id, active_stream.bytes_remaining.min(max_size)))
152156
}
@@ -156,8 +160,11 @@ impl StreamScheduler {
156160
/// This must be called after having called [`Self::peek`], which guarantees that `stream_id`
157161
/// and `bytes` are valid.
158162
pub fn accept(&mut self, stream_id: StreamId, bytes: usize) {
159-
let active_stream =
160-
self.active_streams.get_mut(&stream_id).expect("accept called on untracked stream_id");
163+
let active_stream = self
164+
.active_streams
165+
.iter_mut()
166+
.find(|s| s.stream_id == stream_id)
167+
.expect("accept called on untracked stream_id");
161168
self.current_stream = Some(stream_id);
162169

163170
self.system_virtual_time = active_stream.consume_bytes(bytes, self.max_payload_bytes);
@@ -177,7 +184,9 @@ impl StreamScheduler {
177184
}
178185

179186
fn remove_stream(&mut self, stream_id: StreamId) {
180-
self.active_streams.remove(&stream_id);
187+
if let Some(pos) = self.active_streams.iter().position(|s| s.stream_id == stream_id) {
188+
self.active_streams.swap_remove(pos);
189+
}
181190
if self.current_stream == Some(stream_id) {
182191
self.current_stream = None;
183192
}

0 commit comments

Comments
 (0)