Skip to content

Commit 5d78e29

Browse files
committed
proto: Preemptive SendStream hashmap lookup
Previously, all 5 primitive methods on SendStream (write_source, stopped finish, set_priority, and priority) performed a self.state.send[self.id] hashmap lookup. Now that self.state.send is an IndexMap, this commit replaces these individual lookups with a single key-to-index lookup upon construction, and saves the Option<usize> result to a new field. This removes certain tradeoffs between performance and borrowing complexity, and thereby facilitates subsequent refactor commits.
1 parent d676d4d commit 5d78e29

File tree

1 file changed

+20
-36
lines changed
  • quinn-proto/src/connection/streams

1 file changed

+20
-36
lines changed

quinn-proto/src/connection/streams/mod.rs

Lines changed: 20 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use tracing::trace;
1010
use super::spaces::{Retransmits, ThinRetransmits};
1111
use crate::{
1212
Dir, StreamId, VarInt,
13-
connection::streams::state::{get_or_insert_recv, get_or_insert_send},
13+
connection::streams::state::get_or_insert_recv,
1414
frame,
1515
};
1616

@@ -195,6 +195,7 @@ impl RecvStream<'_> {
195195
/// Access to streams
196196
pub struct SendStream<'a> {
197197
pub(super) id: StreamId,
198+
pub(super) index: Option<usize>,
198199
pub(super) state: &'a mut StreamsState,
199200
pub(super) pending: &'a mut Retransmits,
200201
pub(super) conn_state: &'a super::State,
@@ -218,8 +219,10 @@ impl<'a> SendStream<'a> {
218219
pending: &'a mut Retransmits,
219220
conn_state: &'a super::State,
220221
) -> Self {
222+
let index = state.send.get_index_of(&id);
221223
Self {
222224
id,
225+
index,
223226
state,
224227
pending,
225228
conn_state,
@@ -253,12 +256,8 @@ impl<'a> SendStream<'a> {
253256

254257
let max_send_data = self.state.max_send_data(self.id);
255258

256-
let stream = self
257-
.state
258-
.send
259-
.get_mut(&self.id)
260-
.map(get_or_insert_send(max_send_data))
261-
.ok_or(WriteError::ClosedStream)?;
259+
let index = self.index.ok_or(WriteError::ClosedStream)?;
260+
let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data));
262261

263262
if limit == 0 {
264263
trace!(
@@ -285,11 +284,8 @@ impl<'a> SendStream<'a> {
285284

286285
/// Check if this stream was stopped, get the reason if it was
287286
pub fn stopped(&self) -> Result<Option<VarInt>, ClosedStream> {
288-
match self.state.send.get(&self.id).as_ref() {
289-
Some(Some(s)) => Ok(s.stop_reason),
290-
Some(None) => Ok(None),
291-
None => Err(ClosedStream { _private: () }),
292-
}
287+
let index = self.index.ok_or(ClosedStream { _private: () })?;
288+
Ok(self.state.send[index].as_ref().and_then(|s| s.stop_reason))
293289
}
294290

295291
/// Finish a send stream, signalling that no more data will be sent.
@@ -299,12 +295,9 @@ impl<'a> SendStream<'a> {
299295
/// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
300296
pub fn finish(&mut self) -> Result<(), FinishError> {
301297
let max_send_data = self.state.max_send_data(self.id);
302-
let stream = self
303-
.state
304-
.send
305-
.get_mut(&self.id)
306-
.map(get_or_insert_send(max_send_data))
307-
.ok_or(FinishError::ClosedStream)?;
298+
299+
let index = self.index.ok_or(FinishError::ClosedStream)?;
300+
let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data));
308301

309302
let was_pending = stream.is_pending();
310303
stream.finish()?;
@@ -321,12 +314,8 @@ impl<'a> SendStream<'a> {
321314
/// - when applied to a receive stream
322315
pub fn reset(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
323316
let max_send_data = self.state.max_send_data(self.id);
324-
let stream = self
325-
.state
326-
.send
327-
.get_mut(&self.id)
328-
.map(get_or_insert_send(max_send_data))
329-
.ok_or(ClosedStream { _private: () })?;
317+
let index = self.index.ok_or(ClosedStream { _private: () })?;
318+
let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data));
330319

331320
if matches!(stream.state, SendState::ResetSent) {
332321
// Redundant reset call
@@ -350,12 +339,8 @@ impl<'a> SendStream<'a> {
350339
/// - when applied to a receive stream
351340
pub fn set_priority(&mut self, priority: i32) -> Result<(), ClosedStream> {
352341
let max_send_data = self.state.max_send_data(self.id);
353-
let stream = self
354-
.state
355-
.send
356-
.get_mut(&self.id)
357-
.map(get_or_insert_send(max_send_data))
358-
.ok_or(ClosedStream { _private: () })?;
342+
let index = self.index.ok_or(ClosedStream { _private: () })?;
343+
let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data));
359344

360345
stream.priority = priority;
361346
Ok(())
@@ -366,13 +351,12 @@ impl<'a> SendStream<'a> {
366351
/// # Panics
367352
/// - when applied to a receive stream
368353
pub fn priority(&self) -> Result<i32, ClosedStream> {
369-
let stream = self
370-
.state
371-
.send
372-
.get(&self.id)
373-
.ok_or(ClosedStream { _private: () })?;
354+
let index = self.index.ok_or(ClosedStream { _private: () })?;
374355

375-
Ok(stream.as_ref().map(|s| s.priority).unwrap_or_default())
356+
Ok(self.state.send[index]
357+
.as_ref()
358+
.map(|s| s.priority)
359+
.unwrap_or_default())
376360
}
377361
}
378362

0 commit comments

Comments
 (0)