Skip to content

Commit ec4c939

Browse files
committed
proto: Remove write_source
Prior to this commit, SendStream had several different write methods each of which had to: 1. Perform some shared logic to check how much can be written. 2. Do source-specific logic to iterate over a series of `Bytes` chunk to write 3. For each one of those, perform some shared logic to write the bytes. Prior to this commit, this this was achieved with the WriteSource trait. Previous attempts to simplify this and remove the WriteSource trait were complicated by the fact that it is desirable to cache a single StreamId hashmap lookup over the course of this whole process. However, that is no longer an obstacle due to previous commits. Thus, this commit basically splits the SendStream::write_source method into two methods (both of which are still private): 1. A `write_limit` method, which checks how many bytes can be written. 2. A `write_unchecked` method, which writes bytes under the assumption that the write limit is being expected, and does not itself check limits or error conditions. As such, the `write` and `write_chunks` methods are rewritten to call these methods directly, and the `WriteSource` trait system is removed entirely, achieving significant simplification.
1 parent 15365b9 commit ec4c939

File tree

2 files changed

+36
-235
lines changed

2 files changed

+36
-235
lines changed

quinn-proto/src/connection/streams/mod.rs

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,15 @@ use thiserror::Error;
88
use tracing::trace;
99

1010
use super::spaces::{Retransmits, ThinRetransmits};
11-
use crate::{
12-
Dir, StreamId, VarInt,
13-
connection::streams::state::get_or_insert_recv,
14-
frame,
15-
};
11+
use crate::{Dir, StreamId, VarInt, connection::streams::state::get_or_insert_recv, frame};
1612

1713
mod recv;
1814
use recv::Recv;
1915
pub use recv::{Chunks, ReadError, ReadableError};
2016

2117
mod send;
22-
pub(crate) use send::{ByteSlice, BytesArray};
23-
use send::{BytesSource, Send, SendState};
2418
pub use send::{FinishError, WriteError, Written};
19+
use send::{Send, SendState};
2520

2621
mod state;
2722
#[allow(unreachable_pub)] // fuzzing only
@@ -233,7 +228,9 @@ impl<'a> SendStream<'a> {
233228
///
234229
/// Returns the number of bytes successfully written.
235230
pub fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
236-
Ok(self.write_source(&mut ByteSlice::from_slice(data))?.bytes)
231+
let prefix_len = self.write_limit()?.min(data.len());
232+
self.write_unchecked(data[..prefix_len].to_vec().into());
233+
Ok(prefix_len)
237234
}
238235

239236
/// Send data on the given stream
@@ -243,10 +240,27 @@ impl<'a> SendStream<'a> {
243240
/// [`Written::chunks`] will not count this chunk as fully written. However
244241
/// the chunk will be advanced and contain only non-written data after the call.
245242
pub fn write_chunks(&mut self, data: &mut [Bytes]) -> Result<Written, WriteError> {
246-
self.write_source(&mut BytesArray::from_chunks(data))
243+
let limit = self.write_limit()?;
244+
let mut written = Written::default();
245+
for chunk in data {
246+
let prefix = chunk.split_to(chunk.len().min(limit - written.bytes));
247+
written.bytes += prefix.len();
248+
self.write_unchecked(prefix);
249+
250+
if chunk.is_empty() {
251+
written.chunks += 1;
252+
}
253+
254+
debug_assert!(written.bytes <= limit);
255+
if written.bytes == limit {
256+
break;
257+
}
258+
}
259+
Ok(written)
247260
}
248261

249-
fn write_source<B: BytesSource>(&mut self, source: &mut B) -> Result<Written, WriteError> {
262+
/// Get how many bytes could be written immediately, or mark as blocked if zero
263+
fn write_limit(&mut self) -> Result<usize, WriteError> {
250264
if self.conn_state.is_closed() {
251265
trace!(%self.id, "write blocked; connection draining");
252266
return Err(WriteError::Blocked);
@@ -271,15 +285,20 @@ impl<'a> SendStream<'a> {
271285
return Err(WriteError::Blocked);
272286
}
273287

288+
stream.write_limit(limit)
289+
}
290+
291+
/// Write bytes under the assumption that `write_limit().unwrap() <= chunk.len()`
292+
fn write_unchecked(&mut self, chunk: Bytes) {
293+
let stream = self.state.send[self.index.unwrap()].as_mut().unwrap();
274294
let was_pending = stream.is_pending();
275-
let written = stream.write(source, limit)?;
276-
self.state.data_sent += written.bytes as u64;
277-
self.state.unacked_data += written.bytes as u64;
278-
trace!(stream = %self.id, "wrote {} bytes", written.bytes);
295+
self.state.data_sent += chunk.len() as u64;
296+
self.state.unacked_data += chunk.len() as u64;
297+
trace!(stream = %self.id, "wrote {} bytes", chunk.len());
298+
stream.pending.write(chunk);
279299
if !was_pending {
280300
self.state.pending.push_pending(self.id, stream.priority);
281301
}
282-
Ok(written)
283302
}
284303

285304
/// Check if this stream was stopped, get the reason if it was
Lines changed: 2 additions & 220 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use bytes::Bytes;
21
use thiserror::Error;
32

43
use crate::{VarInt, connection::send_buffer::SendBuffer, frame};
@@ -49,11 +48,7 @@ impl Send {
4948
}
5049
}
5150

52-
pub(super) fn write<S: BytesSource>(
53-
&mut self,
54-
source: &mut S,
55-
limit: u64,
56-
) -> Result<Written, WriteError> {
51+
pub(super) fn write_limit(&self, limit: u64) -> Result<usize, WriteError> {
5752
if !self.is_writable() {
5853
return Err(WriteError::ClosedStream);
5954
}
@@ -64,23 +59,7 @@ impl Send {
6459
if budget == 0 {
6560
return Err(WriteError::Blocked);
6661
}
67-
let mut limit = limit.min(budget) as usize;
68-
69-
let mut result = Written::default();
70-
loop {
71-
let (chunk, chunks_consumed) = source.pop_chunk(limit);
72-
result.chunks += chunks_consumed;
73-
result.bytes += chunk.len();
74-
75-
if chunk.is_empty() {
76-
break;
77-
}
78-
79-
limit -= chunk.len();
80-
self.pending.write(chunk);
81-
}
82-
83-
Ok(result)
62+
Ok(limit.min(budget) as usize)
8463
}
8564

8665
/// Update stream state due to a reset sent by the local application
@@ -143,106 +122,6 @@ impl Send {
143122
}
144123
}
145124

146-
/// A [`BytesSource`] implementation for `&'a mut [Bytes]`
147-
///
148-
/// The type allows to dequeue [`Bytes`] chunks from an array of chunks, up to
149-
/// a configured limit.
150-
pub(crate) struct BytesArray<'a> {
151-
/// The wrapped slice of `Bytes`
152-
chunks: &'a mut [Bytes],
153-
/// The amount of chunks consumed from this source
154-
consumed: usize,
155-
}
156-
157-
impl<'a> BytesArray<'a> {
158-
pub(crate) fn from_chunks(chunks: &'a mut [Bytes]) -> Self {
159-
Self {
160-
chunks,
161-
consumed: 0,
162-
}
163-
}
164-
}
165-
166-
impl BytesSource for BytesArray<'_> {
167-
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
168-
// The loop exists to skip empty chunks while still marking them as
169-
// consumed
170-
let mut chunks_consumed = 0;
171-
172-
while self.consumed < self.chunks.len() {
173-
let chunk = &mut self.chunks[self.consumed];
174-
175-
if chunk.len() <= limit {
176-
let chunk = std::mem::take(chunk);
177-
self.consumed += 1;
178-
chunks_consumed += 1;
179-
if chunk.is_empty() {
180-
continue;
181-
}
182-
return (chunk, chunks_consumed);
183-
} else if limit > 0 {
184-
let chunk = chunk.split_to(limit);
185-
return (chunk, chunks_consumed);
186-
} else {
187-
break;
188-
}
189-
}
190-
191-
(Bytes::new(), chunks_consumed)
192-
}
193-
}
194-
195-
/// A [`BytesSource`] implementation for `&[u8]`
196-
///
197-
/// The type allows to dequeue a single [`Bytes`] chunk, which will be lazily
198-
/// created from a reference. This allows to defer the allocation until it is
199-
/// known how much data needs to be copied.
200-
pub(crate) struct ByteSlice<'a> {
201-
/// The wrapped byte slice
202-
data: &'a [u8],
203-
}
204-
205-
impl<'a> ByteSlice<'a> {
206-
pub(crate) fn from_slice(data: &'a [u8]) -> Self {
207-
Self { data }
208-
}
209-
}
210-
211-
impl BytesSource for ByteSlice<'_> {
212-
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
213-
let limit = limit.min(self.data.len());
214-
if limit == 0 {
215-
return (Bytes::new(), 0);
216-
}
217-
218-
let chunk = Bytes::from(self.data[..limit].to_owned());
219-
self.data = &self.data[chunk.len()..];
220-
221-
let chunks_consumed = usize::from(self.data.is_empty());
222-
(chunk, chunks_consumed)
223-
}
224-
}
225-
226-
/// A source of one or more buffers which can be converted into `Bytes` buffers on demand
227-
///
228-
/// The purpose of this data type is to defer conversion as long as possible,
229-
/// so that no heap allocation is required in case no data is writable.
230-
pub(super) trait BytesSource {
231-
/// Returns the next chunk from the source of owned chunks.
232-
///
233-
/// This method will consume parts of the source.
234-
/// Calling it will yield `Bytes` elements up to the configured `limit`.
235-
///
236-
/// The method returns a tuple:
237-
/// - The first item is the yielded `Bytes` element. The element will be
238-
/// empty if the limit is zero or no more data is available.
239-
/// - The second item returns how many complete chunks inside the source had
240-
/// had been consumed. This can be less than 1, if a chunk inside the
241-
/// source had been truncated in order to adhere to the limit. It can also
242-
/// be more than 1, if zero-length chunks had been skipped.
243-
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize);
244-
}
245-
246125
/// Indicates how many bytes and chunks had been transferred in a write operation
247126
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
248127
pub struct Written {
@@ -303,100 +182,3 @@ pub enum FinishError {
303182
#[error("closed stream")]
304183
ClosedStream,
305184
}
306-
307-
#[cfg(test)]
308-
mod tests {
309-
use super::*;
310-
311-
#[test]
312-
fn bytes_array() {
313-
let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
314-
for limit in 0..full.len() {
315-
let mut chunks = [
316-
Bytes::from_static(b""),
317-
Bytes::from_static(b"Hello "),
318-
Bytes::from_static(b"Wo"),
319-
Bytes::from_static(b""),
320-
Bytes::from_static(b"r"),
321-
Bytes::from_static(b"ld"),
322-
Bytes::from_static(b""),
323-
Bytes::from_static(b" 12345678"),
324-
Bytes::from_static(b"9 ABCDE"),
325-
Bytes::from_static(b"F"),
326-
Bytes::from_static(b"GHJIJKLMNOPQRSTUVWXYZ"),
327-
];
328-
let num_chunks = chunks.len();
329-
let last_chunk_len = chunks[chunks.len() - 1].len();
330-
331-
let mut array = BytesArray::from_chunks(&mut chunks);
332-
333-
let mut buf = Vec::new();
334-
let mut chunks_popped = 0;
335-
let mut chunks_consumed = 0;
336-
let mut remaining = limit;
337-
loop {
338-
let (chunk, consumed) = array.pop_chunk(remaining);
339-
chunks_consumed += consumed;
340-
341-
if !chunk.is_empty() {
342-
buf.extend_from_slice(&chunk);
343-
remaining -= chunk.len();
344-
chunks_popped += 1;
345-
} else {
346-
break;
347-
}
348-
}
349-
350-
assert_eq!(&buf[..], &full[..limit]);
351-
352-
if limit == full.len() {
353-
// Full consumption of the last chunk
354-
assert_eq!(chunks_consumed, num_chunks);
355-
// Since there are empty chunks, we consume more than there are popped
356-
assert_eq!(chunks_consumed, chunks_popped + 3);
357-
} else if limit > full.len() - last_chunk_len {
358-
// Partial consumption of the last chunk
359-
assert_eq!(chunks_consumed, num_chunks - 1);
360-
assert_eq!(chunks_consumed, chunks_popped + 2);
361-
}
362-
}
363-
}
364-
365-
#[test]
366-
fn byte_slice() {
367-
let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
368-
for limit in 0..full.len() {
369-
let mut array = ByteSlice::from_slice(&full[..]);
370-
371-
let mut buf = Vec::new();
372-
let mut chunks_popped = 0;
373-
let mut chunks_consumed = 0;
374-
let mut remaining = limit;
375-
loop {
376-
let (chunk, consumed) = array.pop_chunk(remaining);
377-
chunks_consumed += consumed;
378-
379-
if !chunk.is_empty() {
380-
buf.extend_from_slice(&chunk);
381-
remaining -= chunk.len();
382-
chunks_popped += 1;
383-
} else {
384-
break;
385-
}
386-
}
387-
388-
assert_eq!(&buf[..], &full[..limit]);
389-
if limit != 0 {
390-
assert_eq!(chunks_popped, 1);
391-
} else {
392-
assert_eq!(chunks_popped, 0);
393-
}
394-
395-
if limit == full.len() {
396-
assert_eq!(chunks_consumed, 1);
397-
} else {
398-
assert_eq!(chunks_consumed, 0);
399-
}
400-
}
401-
}
402-
}

0 commit comments

Comments
 (0)