Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 34 additions & 15 deletions quinn-proto/src/connection/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,15 @@ use thiserror::Error;
use tracing::trace;

use super::spaces::{Retransmits, ThinRetransmits};
use crate::{
Dir, StreamId, VarInt,
connection::streams::state::get_or_insert_recv,
frame,
};
use crate::{Dir, StreamId, VarInt, connection::streams::state::get_or_insert_recv, frame};

mod recv;
use recv::Recv;
pub use recv::{Chunks, ReadError, ReadableError};

mod send;
pub(crate) use send::{ByteSlice, BytesArray};
use send::{BytesSource, Send, SendState};
pub use send::{FinishError, WriteError, Written};
use send::{Send, SendState};

mod state;
#[allow(unreachable_pub)] // fuzzing only
Expand Down Expand Up @@ -233,7 +228,9 @@ impl<'a> SendStream<'a> {
///
/// Returns the number of bytes successfully written.
pub fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
Ok(self.write_source(&mut ByteSlice::from_slice(data))?.bytes)
let prefix_len = self.write_limit()?.min(data.len());
self.write_unchecked(data[..prefix_len].to_vec().into());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change this to Ralith's suggestion from the previous PR before merging.

Ok(prefix_len)
}

/// Send data on the given stream
Expand All @@ -243,10 +240,27 @@ impl<'a> SendStream<'a> {
/// [`Written::chunks`] will not count this chunk as fully written. However
/// the chunk will be advanced and contain only non-written data after the call.
pub fn write_chunks(&mut self, data: &mut [Bytes]) -> Result<Written, WriteError> {
self.write_source(&mut BytesArray::from_chunks(data))
let limit = self.write_limit()?;
let mut written = Written::default();
for chunk in data {
let prefix = chunk.split_to(chunk.len().min(limit - written.bytes));
written.bytes += prefix.len();
self.write_unchecked(prefix);

if chunk.is_empty() {
written.chunks += 1;
}

debug_assert!(written.bytes <= limit);
if written.bytes == limit {
break;
}
}
Ok(written)
}

fn write_source<B: BytesSource>(&mut self, source: &mut B) -> Result<Written, WriteError> {
/// Get how many bytes could be written immediately, or mark as blocked if zero
fn write_limit(&mut self) -> Result<usize, WriteError> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bikeshed: this name sounds like it's writing a limit somewhere. Maybe max_write_len? That's still weird given that this can have side effects and is fallible, though...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just write_limited()?

Copy link
Collaborator

@Ralith Ralith Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That also seems confusing. This is serving the dual purpose of declaring intent to write and getting the current write limit, but is not actually writing anything. I wonder if we could split those two roles up without significant cost?

if self.conn_state.is_closed() {
trace!(%self.id, "write blocked; connection draining");
return Err(WriteError::Blocked);
Expand All @@ -271,15 +285,20 @@ impl<'a> SendStream<'a> {
return Err(WriteError::Blocked);
}

stream.write_limit(limit)
}

/// Write bytes under the assumption that `write_limit().unwrap() <= chunk.len()`
fn write_unchecked(&mut self, chunk: Bytes) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually need to be unchecked? Seems like it might be fairly cheap to detect incorrect use. Maybe a debug_assert, at least...

let stream = self.state.send[self.index.unwrap()].as_mut().unwrap();
let was_pending = stream.is_pending();
let written = stream.write(source, limit)?;
self.state.data_sent += written.bytes as u64;
self.state.unacked_data += written.bytes as u64;
trace!(stream = %self.id, "wrote {} bytes", written.bytes);
self.state.data_sent += chunk.len() as u64;
self.state.unacked_data += chunk.len() as u64;
trace!(stream = %self.id, "wrote {} bytes", chunk.len());
stream.pending.write(chunk);
if !was_pending {
self.state.pending.push_pending(self.id, stream.priority);
}
Ok(written)
}

/// Check if this stream was stopped, get the reason if it was
Expand Down
222 changes: 2 additions & 220 deletions quinn-proto/src/connection/streams/send.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use bytes::Bytes;
use thiserror::Error;

use crate::{VarInt, connection::send_buffer::SendBuffer, frame};
Expand Down Expand Up @@ -49,11 +48,7 @@ impl Send {
}
}

pub(super) fn write<S: BytesSource>(
&mut self,
source: &mut S,
limit: u64,
) -> Result<Written, WriteError> {
pub(super) fn write_limit(&self, limit: u64) -> Result<usize, WriteError> {
if !self.is_writable() {
return Err(WriteError::ClosedStream);
}
Expand All @@ -64,23 +59,7 @@ impl Send {
if budget == 0 {
return Err(WriteError::Blocked);
}
let mut limit = limit.min(budget) as usize;

let mut result = Written::default();
loop {
let (chunk, chunks_consumed) = source.pop_chunk(limit);
result.chunks += chunks_consumed;
result.bytes += chunk.len();

if chunk.is_empty() {
break;
}

limit -= chunk.len();
self.pending.write(chunk);
}

Ok(result)
Ok(limit.min(budget) as usize)
}

/// Update stream state due to a reset sent by the local application
Expand Down Expand Up @@ -143,106 +122,6 @@ impl Send {
}
}

/// A [`BytesSource`] implementation for `&'a mut [Bytes]`
///
/// The type allows to dequeue [`Bytes`] chunks from an array of chunks, up to
/// a configured limit.
pub(crate) struct BytesArray<'a> {
/// The wrapped slice of `Bytes`
chunks: &'a mut [Bytes],
/// The amount of chunks consumed from this source
consumed: usize,
}

impl<'a> BytesArray<'a> {
pub(crate) fn from_chunks(chunks: &'a mut [Bytes]) -> Self {
Self {
chunks,
consumed: 0,
}
}
}

impl BytesSource for BytesArray<'_> {
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
// The loop exists to skip empty chunks while still marking them as
// consumed
let mut chunks_consumed = 0;

while self.consumed < self.chunks.len() {
let chunk = &mut self.chunks[self.consumed];

if chunk.len() <= limit {
let chunk = std::mem::take(chunk);
self.consumed += 1;
chunks_consumed += 1;
if chunk.is_empty() {
continue;
}
return (chunk, chunks_consumed);
} else if limit > 0 {
let chunk = chunk.split_to(limit);
return (chunk, chunks_consumed);
} else {
break;
}
}

(Bytes::new(), chunks_consumed)
}
}

/// A [`BytesSource`] implementation for `&[u8]`
///
/// The type allows to dequeue a single [`Bytes`] chunk, which will be lazily
/// created from a reference. This allows to defer the allocation until it is
/// known how much data needs to be copied.
pub(crate) struct ByteSlice<'a> {
/// The wrapped byte slice
data: &'a [u8],
}

impl<'a> ByteSlice<'a> {
pub(crate) fn from_slice(data: &'a [u8]) -> Self {
Self { data }
}
}

impl BytesSource for ByteSlice<'_> {
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
let limit = limit.min(self.data.len());
if limit == 0 {
return (Bytes::new(), 0);
}

let chunk = Bytes::from(self.data[..limit].to_owned());
self.data = &self.data[chunk.len()..];

let chunks_consumed = usize::from(self.data.is_empty());
(chunk, chunks_consumed)
}
}

/// A source of one or more buffers which can be converted into `Bytes` buffers on demand
///
/// The purpose of this data type is to defer conversion as long as possible,
/// so that no heap allocation is required in case no data is writable.
pub(super) trait BytesSource {
/// Returns the next chunk from the source of owned chunks.
///
/// This method will consume parts of the source.
/// Calling it will yield `Bytes` elements up to the configured `limit`.
///
/// The method returns a tuple:
/// - The first item is the yielded `Bytes` element. The element will be
/// empty if the limit is zero or no more data is available.
/// - The second item returns how many complete chunks inside the source had
/// had been consumed. This can be less than 1, if a chunk inside the
/// source had been truncated in order to adhere to the limit. It can also
/// be more than 1, if zero-length chunks had been skipped.
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize);
}

/// Indicates how many bytes and chunks had been transferred in a write operation
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct Written {
Expand Down Expand Up @@ -303,100 +182,3 @@ pub enum FinishError {
#[error("closed stream")]
ClosedStream,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn bytes_array() {
let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
for limit in 0..full.len() {
let mut chunks = [
Bytes::from_static(b""),
Bytes::from_static(b"Hello "),
Bytes::from_static(b"Wo"),
Bytes::from_static(b""),
Bytes::from_static(b"r"),
Bytes::from_static(b"ld"),
Bytes::from_static(b""),
Bytes::from_static(b" 12345678"),
Bytes::from_static(b"9 ABCDE"),
Bytes::from_static(b"F"),
Bytes::from_static(b"GHJIJKLMNOPQRSTUVWXYZ"),
];
let num_chunks = chunks.len();
let last_chunk_len = chunks[chunks.len() - 1].len();

let mut array = BytesArray::from_chunks(&mut chunks);

let mut buf = Vec::new();
let mut chunks_popped = 0;
let mut chunks_consumed = 0;
let mut remaining = limit;
loop {
let (chunk, consumed) = array.pop_chunk(remaining);
chunks_consumed += consumed;

if !chunk.is_empty() {
buf.extend_from_slice(&chunk);
remaining -= chunk.len();
chunks_popped += 1;
} else {
break;
}
}

assert_eq!(&buf[..], &full[..limit]);

if limit == full.len() {
// Full consumption of the last chunk
assert_eq!(chunks_consumed, num_chunks);
// Since there are empty chunks, we consume more than there are popped
assert_eq!(chunks_consumed, chunks_popped + 3);
} else if limit > full.len() - last_chunk_len {
// Partial consumption of the last chunk
assert_eq!(chunks_consumed, num_chunks - 1);
assert_eq!(chunks_consumed, chunks_popped + 2);
}
}
}

#[test]
fn byte_slice() {
let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
for limit in 0..full.len() {
let mut array = ByteSlice::from_slice(&full[..]);

let mut buf = Vec::new();
let mut chunks_popped = 0;
let mut chunks_consumed = 0;
let mut remaining = limit;
loop {
let (chunk, consumed) = array.pop_chunk(remaining);
chunks_consumed += consumed;

if !chunk.is_empty() {
buf.extend_from_slice(&chunk);
remaining -= chunk.len();
chunks_popped += 1;
} else {
break;
}
}

assert_eq!(&buf[..], &full[..limit]);
if limit != 0 {
assert_eq!(chunks_popped, 1);
} else {
assert_eq!(chunks_popped, 0);
}

if limit == full.len() {
assert_eq!(chunks_consumed, 1);
} else {
assert_eq!(chunks_consumed, 0);
}
}
}
}