Skip to content

Commit 9fd545b

Browse files
committed
refactor(proto): Remove BytesSource
This commit is able to achieve significant simplification of code relating to proto::SendStream::write_source. The BytesSource trait and its two implementations are removed entirely. The source parameter to that method is replaced with a callback which is given a limit, and an output variable Vec<Bytes> with which to fill with Bytes to write. Benefits of this approach include: - More relevant logic for write and write_chunks is local. - write no longer has to convert back and forth as much through an API that fundamentally assumes multiple chunks. - Send::write no longer has to care about Written.chunks. - Tests for BytesSource implementations are obselete and removed. Conceivably, this could facilitate exposing these APIs in the future.
1 parent aa6105c commit 9fd545b

File tree

3 files changed

+61
-221
lines changed

3 files changed

+61
-221
lines changed

quinn-proto/src/connection/send_buffer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ impl SendBuffer {
3333

3434
/// Append application data to the end of the stream
3535
pub(super) fn write(&mut self, data: Bytes) {
36+
if data.is_empty() {
37+
return;
38+
}
39+
3640
self.unacked_len += data.len();
3741
self.offset += data.len() as u64;
3842
self.unacked_segments.push_back(data);

quinn-proto/src/connection/streams/mod.rs

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@ use recv::Recv;
1919
pub use recv::{Chunks, ReadError, ReadableError};
2020

2121
mod send;
22-
pub(crate) use send::{ByteSlice, BytesArray};
23-
use send::{BytesSource, Send, SendState};
2422
pub use send::{FinishError, WriteError, Written};
23+
use send::{Send, SendState};
2524

2625
mod state;
2726
#[allow(unreachable_pub)] // fuzzing only
@@ -221,7 +220,11 @@ impl<'a> SendStream<'a> {
221220
///
222221
/// Returns the number of bytes successfully written.
223222
pub fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
224-
Ok(self.write_source(&mut ByteSlice::from_slice(data))?.bytes)
223+
self.write_source(|limit, chunks| {
224+
let prefix = &data[..limit.min(data.len())];
225+
chunks.push(prefix.to_vec().into());
226+
prefix.len()
227+
})
225228
}
226229

227230
/// Send data on the given stream
@@ -231,10 +234,38 @@ impl<'a> SendStream<'a> {
231234
/// [`Written::chunks`] will not count this chunk as fully written. However
232235
/// the chunk will be advanced and contain only non-written data after the call.
233236
pub fn write_chunks(&mut self, data: &mut [Bytes]) -> Result<Written, WriteError> {
234-
self.write_source(&mut BytesArray::from_chunks(data))
237+
self.write_source(|limit, chunks| {
238+
let mut written = Written::default();
239+
for chunk in data {
240+
let prefix = chunk.split_to(chunk.len().min(limit - written.bytes));
241+
written.bytes += prefix.len();
242+
chunks.push(prefix);
243+
244+
if chunk.is_empty() {
245+
written.chunks += 1;
246+
}
247+
248+
debug_assert!(written.bytes <= limit);
249+
if written.bytes == limit {
250+
break;
251+
}
252+
}
253+
written
254+
})
235255
}
236256

237-
fn write_source<B: BytesSource>(&mut self, source: &mut B) -> Result<Written, WriteError> {
257+
/// Send data on the given stream
258+
///
259+
/// The `source` callback is invoked with the number of bytes that can be written immediately,
260+
/// as well as an initially empty `&mut Vec<Bytes>` to which it can push bytes to write. If the
261+
/// callback pushes a total number of bytes less than or equal to the provided limit, it is
262+
/// guaranteed they will all be written. If it provides more bytes than this, it is guaranteed
263+
/// that a prefix of the provided cumulative bytes will be written equal in length to the
264+
/// provided limit.
265+
fn write_source<T>(
266+
&mut self,
267+
source: impl FnOnce(usize, &mut Vec<Bytes>) -> T,
268+
) -> Result<T, WriteError> {
238269
if self.conn_state.is_closed() {
239270
trace!(%self.id, "write blocked; connection draining");
240271
return Err(WriteError::Blocked);
@@ -264,14 +295,14 @@ impl<'a> SendStream<'a> {
264295
}
265296

266297
let was_pending = stream.is_pending();
267-
let written = stream.write(source, limit)?;
268-
self.state.data_sent += written.bytes as u64;
269-
self.state.unacked_data += written.bytes as u64;
270-
trace!(stream = %self.id, "wrote {} bytes", written.bytes);
298+
let (written, source_output) = stream.write(source, limit)?;
299+
self.state.data_sent += written as u64;
300+
self.state.unacked_data += written as u64;
301+
trace!(stream = %self.id, "wrote {} bytes", written);
271302
if !was_pending {
272303
self.state.pending.push_pending(self.id, stream.priority);
273304
}
274-
Ok(written)
305+
Ok(source_output)
275306
}
276307

277308
/// Check if this stream was stopped, get the reason if it was

quinn-proto/src/connection/streams/send.rs

Lines changed: 16 additions & 211 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ impl Send {
4949
}
5050
}
5151

52-
pub(super) fn write<S: BytesSource>(
52+
pub(super) fn write<T>(
5353
&mut self,
54-
source: &mut S,
54+
source: impl FnOnce(usize, &mut Vec<Bytes>) -> T,
5555
limit: u64,
56-
) -> Result<Written, WriteError> {
56+
) -> Result<(usize, T), WriteError> {
5757
if !self.is_writable() {
5858
return Err(WriteError::ClosedStream);
5959
}
@@ -64,23 +64,25 @@ impl Send {
6464
if budget == 0 {
6565
return Err(WriteError::Blocked);
6666
}
67-
let mut limit = limit.min(budget) as usize;
67+
let limit = limit.min(budget) as usize;
6868

69-
let mut result = Written::default();
70-
loop {
71-
let (chunk, chunks_consumed) = source.pop_chunk(limit);
72-
result.chunks += chunks_consumed;
73-
result.bytes += chunk.len();
69+
let mut chunks = Vec::new();
70+
let source_output = source(limit, &mut chunks);
7471

75-
if chunk.is_empty() {
72+
let mut written = 0;
73+
74+
for mut chunk in chunks.drain(..) {
75+
let prefix = chunk.split_to(chunk.len().min(limit - written));
76+
written += prefix.len();
77+
self.pending.write(prefix);
78+
79+
debug_assert!(written <= limit);
80+
if written == limit {
7681
break;
7782
}
78-
79-
limit -= chunk.len();
80-
self.pending.write(chunk);
8183
}
8284

83-
Ok(result)
85+
Ok((written, source_output))
8486
}
8587

8688
/// Update stream state due to a reset sent by the local application
@@ -143,106 +145,6 @@ impl Send {
143145
}
144146
}
145147

146-
/// A [`BytesSource`] implementation for `&'a mut [Bytes]`
147-
///
148-
/// The type allows to dequeue [`Bytes`] chunks from an array of chunks, up to
149-
/// a configured limit.
150-
pub(crate) struct BytesArray<'a> {
151-
/// The wrapped slice of `Bytes`
152-
chunks: &'a mut [Bytes],
153-
/// The amount of chunks consumed from this source
154-
consumed: usize,
155-
}
156-
157-
impl<'a> BytesArray<'a> {
158-
pub(crate) fn from_chunks(chunks: &'a mut [Bytes]) -> Self {
159-
Self {
160-
chunks,
161-
consumed: 0,
162-
}
163-
}
164-
}
165-
166-
impl BytesSource for BytesArray<'_> {
167-
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
168-
// The loop exists to skip empty chunks while still marking them as
169-
// consumed
170-
let mut chunks_consumed = 0;
171-
172-
while self.consumed < self.chunks.len() {
173-
let chunk = &mut self.chunks[self.consumed];
174-
175-
if chunk.len() <= limit {
176-
let chunk = std::mem::take(chunk);
177-
self.consumed += 1;
178-
chunks_consumed += 1;
179-
if chunk.is_empty() {
180-
continue;
181-
}
182-
return (chunk, chunks_consumed);
183-
} else if limit > 0 {
184-
let chunk = chunk.split_to(limit);
185-
return (chunk, chunks_consumed);
186-
} else {
187-
break;
188-
}
189-
}
190-
191-
(Bytes::new(), chunks_consumed)
192-
}
193-
}
194-
195-
/// A [`BytesSource`] implementation for `&[u8]`
196-
///
197-
/// The type allows to dequeue a single [`Bytes`] chunk, which will be lazily
198-
/// created from a reference. This allows to defer the allocation until it is
199-
/// known how much data needs to be copied.
200-
pub(crate) struct ByteSlice<'a> {
201-
/// The wrapped byte slice
202-
data: &'a [u8],
203-
}
204-
205-
impl<'a> ByteSlice<'a> {
206-
pub(crate) fn from_slice(data: &'a [u8]) -> Self {
207-
Self { data }
208-
}
209-
}
210-
211-
impl BytesSource for ByteSlice<'_> {
212-
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
213-
let limit = limit.min(self.data.len());
214-
if limit == 0 {
215-
return (Bytes::new(), 0);
216-
}
217-
218-
let chunk = Bytes::from(self.data[..limit].to_owned());
219-
self.data = &self.data[chunk.len()..];
220-
221-
let chunks_consumed = usize::from(self.data.is_empty());
222-
(chunk, chunks_consumed)
223-
}
224-
}
225-
226-
/// A source of one or more buffers which can be converted into `Bytes` buffers on demand
227-
///
228-
/// The purpose of this data type is to defer conversion as long as possible,
229-
/// so that no heap allocation is required in case no data is writable.
230-
pub(super) trait BytesSource {
231-
/// Returns the next chunk from the source of owned chunks.
232-
///
233-
/// This method will consume parts of the source.
234-
/// Calling it will yield `Bytes` elements up to the configured `limit`.
235-
///
236-
/// The method returns a tuple:
237-
/// - The first item is the yielded `Bytes` element. The element will be
238-
/// empty if the limit is zero or no more data is available.
239-
/// - The second item returns how many complete chunks inside the source had
240-
/// had been consumed. This can be less than 1, if a chunk inside the
241-
/// source had been truncated in order to adhere to the limit. It can also
242-
/// be more than 1, if zero-length chunks had been skipped.
243-
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize);
244-
}
245-
246148
/// Indicates how many bytes and chunks had been transferred in a write operation
247149
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
248150
pub struct Written {
@@ -303,100 +205,3 @@ pub enum FinishError {
303205
#[error("closed stream")]
304206
ClosedStream,
305207
}
306-
307-
#[cfg(test)]
308-
mod tests {
309-
use super::*;
310-
311-
#[test]
312-
fn bytes_array() {
313-
let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
314-
for limit in 0..full.len() {
315-
let mut chunks = [
316-
Bytes::from_static(b""),
317-
Bytes::from_static(b"Hello "),
318-
Bytes::from_static(b"Wo"),
319-
Bytes::from_static(b""),
320-
Bytes::from_static(b"r"),
321-
Bytes::from_static(b"ld"),
322-
Bytes::from_static(b""),
323-
Bytes::from_static(b" 12345678"),
324-
Bytes::from_static(b"9 ABCDE"),
325-
Bytes::from_static(b"F"),
326-
Bytes::from_static(b"GHJIJKLMNOPQRSTUVWXYZ"),
327-
];
328-
let num_chunks = chunks.len();
329-
let last_chunk_len = chunks[chunks.len() - 1].len();
330-
331-
let mut array = BytesArray::from_chunks(&mut chunks);
332-
333-
let mut buf = Vec::new();
334-
let mut chunks_popped = 0;
335-
let mut chunks_consumed = 0;
336-
let mut remaining = limit;
337-
loop {
338-
let (chunk, consumed) = array.pop_chunk(remaining);
339-
chunks_consumed += consumed;
340-
341-
if !chunk.is_empty() {
342-
buf.extend_from_slice(&chunk);
343-
remaining -= chunk.len();
344-
chunks_popped += 1;
345-
} else {
346-
break;
347-
}
348-
}
349-
350-
assert_eq!(&buf[..], &full[..limit]);
351-
352-
if limit == full.len() {
353-
// Full consumption of the last chunk
354-
assert_eq!(chunks_consumed, num_chunks);
355-
// Since there are empty chunks, we consume more than there are popped
356-
assert_eq!(chunks_consumed, chunks_popped + 3);
357-
} else if limit > full.len() - last_chunk_len {
358-
// Partial consumption of the last chunk
359-
assert_eq!(chunks_consumed, num_chunks - 1);
360-
assert_eq!(chunks_consumed, chunks_popped + 2);
361-
}
362-
}
363-
}
364-
365-
#[test]
366-
fn byte_slice() {
367-
let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
368-
for limit in 0..full.len() {
369-
let mut array = ByteSlice::from_slice(&full[..]);
370-
371-
let mut buf = Vec::new();
372-
let mut chunks_popped = 0;
373-
let mut chunks_consumed = 0;
374-
let mut remaining = limit;
375-
loop {
376-
let (chunk, consumed) = array.pop_chunk(remaining);
377-
chunks_consumed += consumed;
378-
379-
if !chunk.is_empty() {
380-
buf.extend_from_slice(&chunk);
381-
remaining -= chunk.len();
382-
chunks_popped += 1;
383-
} else {
384-
break;
385-
}
386-
}
387-
388-
assert_eq!(&buf[..], &full[..limit]);
389-
if limit != 0 {
390-
assert_eq!(chunks_popped, 1);
391-
} else {
392-
assert_eq!(chunks_popped, 0);
393-
}
394-
395-
if limit == full.len() {
396-
assert_eq!(chunks_consumed, 1);
397-
} else {
398-
assert_eq!(chunks_consumed, 0);
399-
}
400-
}
401-
}
402-
}

0 commit comments

Comments
 (0)