Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 20 additions & 36 deletions quinn-proto/src/connection/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tracing::trace;
use super::spaces::{Retransmits, ThinRetransmits};
use crate::{
Dir, StreamId, VarInt,
connection::streams::state::{get_or_insert_recv, get_or_insert_send},
connection::streams::state::get_or_insert_recv,
frame,
};

Expand Down Expand Up @@ -195,6 +195,7 @@ impl RecvStream<'_> {
/// Access to streams
pub struct SendStream<'a> {
pub(super) id: StreamId,
pub(super) index: Option<usize>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the swap_remove_entry call introduced in the previous commit invalidates this?

pub(super) state: &'a mut StreamsState,
pub(super) pending: &'a mut Retransmits,
pub(super) conn_state: &'a super::State,
Expand All @@ -218,8 +219,10 @@ impl<'a> SendStream<'a> {
pending: &'a mut Retransmits,
conn_state: &'a super::State,
) -> Self {
let index = state.send.get_index_of(&id);
Self {
id,
index,
state,
pending,
conn_state,
Expand Down Expand Up @@ -253,12 +256,8 @@ impl<'a> SendStream<'a> {

let max_send_data = self.state.max_send_data(self.id);

let stream = self
.state
.send
.get_mut(&self.id)
.map(get_or_insert_send(max_send_data))
.ok_or(WriteError::ClosedStream)?;
let index = self.index.ok_or(WriteError::ClosedStream)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we raise this condition in new?

let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data));

if limit == 0 {
trace!(
Expand All @@ -285,11 +284,8 @@ impl<'a> SendStream<'a> {

/// Check if this stream was stopped, get the reason if it was
pub fn stopped(&self) -> Result<Option<VarInt>, ClosedStream> {
match self.state.send.get(&self.id).as_ref() {
Some(Some(s)) => Ok(s.stop_reason),
Some(None) => Ok(None),
None => Err(ClosedStream { _private: () }),
}
let index = self.index.ok_or(ClosedStream { _private: () })?;
Ok(self.state.send[index].as_ref().and_then(|s| s.stop_reason))
}

/// Finish a send stream, signalling that no more data will be sent.
Expand All @@ -299,12 +295,9 @@ impl<'a> SendStream<'a> {
/// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
pub fn finish(&mut self) -> Result<(), FinishError> {
let max_send_data = self.state.max_send_data(self.id);
let stream = self
.state
.send
.get_mut(&self.id)
.map(get_or_insert_send(max_send_data))
.ok_or(FinishError::ClosedStream)?;

let index = self.index.ok_or(FinishError::ClosedStream)?;
let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data));

let was_pending = stream.is_pending();
stream.finish()?;
Expand All @@ -321,12 +314,8 @@ impl<'a> SendStream<'a> {
/// - when applied to a receive stream
pub fn reset(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
let max_send_data = self.state.max_send_data(self.id);
let stream = self
.state
.send
.get_mut(&self.id)
.map(get_or_insert_send(max_send_data))
.ok_or(ClosedStream { _private: () })?;
let index = self.index.ok_or(ClosedStream { _private: () })?;
let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data));

if matches!(stream.state, SendState::ResetSent) {
// Redundant reset call
Expand All @@ -350,12 +339,8 @@ impl<'a> SendStream<'a> {
/// - when applied to a receive stream
pub fn set_priority(&mut self, priority: i32) -> Result<(), ClosedStream> {
let max_send_data = self.state.max_send_data(self.id);
let stream = self
.state
.send
.get_mut(&self.id)
.map(get_or_insert_send(max_send_data))
.ok_or(ClosedStream { _private: () })?;
let index = self.index.ok_or(ClosedStream { _private: () })?;
let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data));

stream.priority = priority;
Ok(())
Expand All @@ -366,13 +351,12 @@ impl<'a> SendStream<'a> {
/// # Panics
/// - when applied to a receive stream
pub fn priority(&self) -> Result<i32, ClosedStream> {
let stream = self
.state
.send
.get(&self.id)
.ok_or(ClosedStream { _private: () })?;
let index = self.index.ok_or(ClosedStream { _private: () })?;

Ok(stream.as_ref().map(|s| s.priority).unwrap_or_default())
Ok(self.state.send[index]
.as_ref()
.map(|s| s.priority)
.unwrap_or_default())
}
}

Expand Down