Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ futures-io = "0.3.19"
getrandom = { version = "0.3", default-features = false }
hdrhistogram = { version = "7.2", default-features = false }
hex-literal = "0.4"
indexmap = "2.9.0"
lru-slab = "0.1.2"
lazy_static = "1"
log = "0.4"
Expand Down
7 changes: 4 additions & 3 deletions fuzz/fuzz_targets/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ fuzz_target!(|input: (StreamParams, Vec<Operation>)| {
Streams::new(&mut state, &conn_state).accept(dir);
}
Operation::Finish(id) => {
let _ = SendStream::new(id, &mut state, &mut pending, &conn_state).finish();
let _ =
SendStream::new_for_fuzzing(id, &mut state, &mut pending, &conn_state).finish();
}
Operation::ReceivedStopSending(sid, err_code) => {
Streams::new(&mut state, &conn_state)
Expand All @@ -63,8 +64,8 @@ fuzz_target!(|input: (StreamParams, Vec<Operation>)| {
.received_reset(rs);
}
Operation::Reset(id) => {
let _ =
SendStream::new(id, &mut state, &mut pending, &conn_state).reset(0u32.into());
let _ = SendStream::new_for_fuzzing(id, &mut state, &mut pending, &conn_state)
.reset(0u32.into());
}
}
}
Expand Down
1 change: 1 addition & 0 deletions quinn-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ arbitrary = { workspace = true, optional = true }
aws-lc-rs = { workspace = true, optional = true }
bytes = { workspace = true }
fastbloom = { workspace = true, optional = true }
indexmap = { workspace = true }
lru-slab = { workspace = true }
rustc-hash = { workspace = true }
rand = { workspace = true }
Expand Down
10 changes: 5 additions & 5 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,12 +426,12 @@ impl Connection {
#[must_use]
pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
SendStream {
SendStream::new(
id,
state: &mut self.streams,
pending: &mut self.spaces[SpaceId::Data].pending,
conn_state: &self.state,
}
&mut self.streams,
&mut self.spaces[SpaceId::Data].pending,
&self.state,
)
}

/// Returns packets to transmit
Expand Down
114 changes: 63 additions & 51 deletions quinn-proto/src/connection/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,15 @@ use thiserror::Error;
use tracing::trace;

use super::spaces::{Retransmits, ThinRetransmits};
use crate::{
Dir, StreamId, VarInt,
connection::streams::state::{get_or_insert_recv, get_or_insert_send},
frame,
};
use crate::{Dir, StreamId, VarInt, connection::streams::state::get_or_insert_recv, frame};

mod recv;
use recv::Recv;
pub use recv::{Chunks, ReadError, ReadableError};

mod send;
pub(crate) use send::{ByteSlice, BytesArray};
use send::{BytesSource, Send, SendState};
pub use send::{FinishError, WriteError, Written};
use send::{Send, SendState};

mod state;
#[allow(unreachable_pub)] // fuzzing only
Expand Down Expand Up @@ -195,6 +190,7 @@ impl RecvStream<'_> {
/// Access to streams
pub struct SendStream<'a> {
pub(super) id: StreamId,
pub(super) index: Option<usize>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the swap_remove_entry call introduced in the previous commit invalidates this?

pub(super) state: &'a mut StreamsState,
pub(super) pending: &'a mut Retransmits,
pub(super) conn_state: &'a super::State,
Expand All @@ -203,14 +199,25 @@ pub struct SendStream<'a> {
#[allow(clippy::needless_lifetimes)] // Needed for cfg(fuzzing)
impl<'a> SendStream<'a> {
#[cfg(fuzzing)]
pub fn new(
pub fn new_for_fuzzing(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bikeshed: I'd prefer that this always be named new. We could achieve that by having two definitions, selected by #[cfg(fuzzing)] state and differing only in visibility, both of which delegate to a private new_inner or similar.

id: StreamId,
state: &'a mut StreamsState,
pending: &'a mut Retransmits,
conn_state: &'a super::State,
) -> Self {
Self::new(id, state, pending, conn_state)
}

pub(super) fn new(
id: StreamId,
state: &'a mut StreamsState,
pending: &'a mut Retransmits,
conn_state: &'a super::State,
) -> Self {
let index = state.send.get_index_of(&id);
Self {
id,
index,
state,
pending,
conn_state,
Expand All @@ -221,7 +228,9 @@ impl<'a> SendStream<'a> {
///
/// Returns the number of bytes successfully written.
pub fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
Ok(self.write_source(&mut ByteSlice::from_slice(data))?.bytes)
let prefix_len = self.write_limit()?.min(data.len());
self.write_unchecked(data[..prefix_len].to_vec().into());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change this to Ralith's suggestion from the previous PR before merging.

Ok(prefix_len)
}

/// Send data on the given stream
Expand All @@ -231,10 +240,27 @@ impl<'a> SendStream<'a> {
/// [`Written::chunks`] will not count this chunk as fully written. However
/// the chunk will be advanced and contain only non-written data after the call.
pub fn write_chunks(&mut self, data: &mut [Bytes]) -> Result<Written, WriteError> {
self.write_source(&mut BytesArray::from_chunks(data))
let limit = self.write_limit()?;
let mut written = Written::default();
for chunk in data {
let prefix = chunk.split_to(chunk.len().min(limit - written.bytes));
written.bytes += prefix.len();
self.write_unchecked(prefix);

if chunk.is_empty() {
written.chunks += 1;
}

debug_assert!(written.bytes <= limit);
if written.bytes == limit {
break;
}
}
Ok(written)
}

fn write_source<B: BytesSource>(&mut self, source: &mut B) -> Result<Written, WriteError> {
/// Get how many bytes could be written immediately, or mark as blocked if zero
fn write_limit(&mut self) -> Result<usize, WriteError> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bikeshed: this name sounds like it's writing a limit somewhere. Maybe max_write_len? That's still weird given that this can have side effects and is fallible, though...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just write_limited()?

Copy link
Collaborator

@Ralith Ralith Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That also seems confusing. This is serving the dual purpose of declaring intent to write and getting the current write limit, but is not actually writing anything. I wonder if we could split those two roles up without significant cost?

if self.conn_state.is_closed() {
trace!(%self.id, "write blocked; connection draining");
return Err(WriteError::Blocked);
Expand All @@ -244,12 +270,8 @@ impl<'a> SendStream<'a> {

let max_send_data = self.state.max_send_data(self.id);

let stream = self
.state
.send
.get_mut(&self.id)
.map(get_or_insert_send(max_send_data))
.ok_or(WriteError::ClosedStream)?;
let index = self.index.ok_or(WriteError::ClosedStream)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we raise this condition in new?

let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data));

if limit == 0 {
trace!(
Expand All @@ -263,24 +285,26 @@ impl<'a> SendStream<'a> {
return Err(WriteError::Blocked);
}

stream.write_limit(limit)
}

/// Write bytes under the assumption that `write_limit().unwrap() <= chunk.len()`
fn write_unchecked(&mut self, chunk: Bytes) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually need to be unchecked? Seems like it might be fairly cheap to detect incorrect use. Maybe a debug_assert, at least...

let stream = self.state.send[self.index.unwrap()].as_mut().unwrap();
let was_pending = stream.is_pending();
let written = stream.write(source, limit)?;
self.state.data_sent += written.bytes as u64;
self.state.unacked_data += written.bytes as u64;
trace!(stream = %self.id, "wrote {} bytes", written.bytes);
self.state.data_sent += chunk.len() as u64;
self.state.unacked_data += chunk.len() as u64;
trace!(stream = %self.id, "wrote {} bytes", chunk.len());
stream.pending.write(chunk);
if !was_pending {
self.state.pending.push_pending(self.id, stream.priority);
}
Ok(written)
}

/// Check if this stream was stopped, get the reason if it was
pub fn stopped(&self) -> Result<Option<VarInt>, ClosedStream> {
match self.state.send.get(&self.id).as_ref() {
Some(Some(s)) => Ok(s.stop_reason),
Some(None) => Ok(None),
None => Err(ClosedStream { _private: () }),
}
let index = self.index.ok_or(ClosedStream { _private: () })?;
Ok(self.state.send[index].as_ref().and_then(|s| s.stop_reason))
}

/// Finish a send stream, signalling that no more data will be sent.
Expand All @@ -290,12 +314,9 @@ impl<'a> SendStream<'a> {
/// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
pub fn finish(&mut self) -> Result<(), FinishError> {
let max_send_data = self.state.max_send_data(self.id);
let stream = self
.state
.send
.get_mut(&self.id)
.map(get_or_insert_send(max_send_data))
.ok_or(FinishError::ClosedStream)?;

let index = self.index.ok_or(FinishError::ClosedStream)?;
let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data));

let was_pending = stream.is_pending();
stream.finish()?;
Expand All @@ -312,12 +333,8 @@ impl<'a> SendStream<'a> {
/// - when applied to a receive stream
pub fn reset(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
let max_send_data = self.state.max_send_data(self.id);
let stream = self
.state
.send
.get_mut(&self.id)
.map(get_or_insert_send(max_send_data))
.ok_or(ClosedStream { _private: () })?;
let index = self.index.ok_or(ClosedStream { _private: () })?;
let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data));

if matches!(stream.state, SendState::ResetSent) {
// Redundant reset call
Expand All @@ -341,12 +358,8 @@ impl<'a> SendStream<'a> {
/// - when applied to a receive stream
pub fn set_priority(&mut self, priority: i32) -> Result<(), ClosedStream> {
let max_send_data = self.state.max_send_data(self.id);
let stream = self
.state
.send
.get_mut(&self.id)
.map(get_or_insert_send(max_send_data))
.ok_or(ClosedStream { _private: () })?;
let index = self.index.ok_or(ClosedStream { _private: () })?;
let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data));

stream.priority = priority;
Ok(())
Expand All @@ -357,13 +370,12 @@ impl<'a> SendStream<'a> {
/// # Panics
/// - when applied to a receive stream
pub fn priority(&self) -> Result<i32, ClosedStream> {
let stream = self
.state
.send
.get(&self.id)
.ok_or(ClosedStream { _private: () })?;
let index = self.index.ok_or(ClosedStream { _private: () })?;

Ok(stream.as_ref().map(|s| s.priority).unwrap_or_default())
Ok(self.state.send[index]
.as_ref()
.map(|s| s.priority)
.unwrap_or_default())
}
}

Expand Down
Loading