-
-
Notifications
You must be signed in to change notification settings - Fork 487
proto: Remove write_source (pull in IndexMap to avoid borrowing issues)
#2260
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,20 +8,15 @@ use thiserror::Error; | |
| use tracing::trace; | ||
|
|
||
| use super::spaces::{Retransmits, ThinRetransmits}; | ||
| use crate::{ | ||
| Dir, StreamId, VarInt, | ||
| connection::streams::state::{get_or_insert_recv, get_or_insert_send}, | ||
| frame, | ||
| }; | ||
| use crate::{Dir, StreamId, VarInt, connection::streams::state::get_or_insert_recv, frame}; | ||
|
|
||
| mod recv; | ||
| use recv::Recv; | ||
| pub use recv::{Chunks, ReadError, ReadableError}; | ||
|
|
||
| mod send; | ||
| pub(crate) use send::{ByteSlice, BytesArray}; | ||
| use send::{BytesSource, Send, SendState}; | ||
| pub use send::{FinishError, WriteError, Written}; | ||
| use send::{Send, SendState}; | ||
|
|
||
| mod state; | ||
| #[allow(unreachable_pub)] // fuzzing only | ||
|
|
@@ -195,6 +190,7 @@ impl RecvStream<'_> { | |
| /// Access to streams | ||
| pub struct SendStream<'a> { | ||
| pub(super) id: StreamId, | ||
| pub(super) index: Option<usize>, | ||
| pub(super) state: &'a mut StreamsState, | ||
| pub(super) pending: &'a mut Retransmits, | ||
| pub(super) conn_state: &'a super::State, | ||
|
|
@@ -203,14 +199,25 @@ pub struct SendStream<'a> { | |
| #[allow(clippy::needless_lifetimes)] // Needed for cfg(fuzzing) | ||
| impl<'a> SendStream<'a> { | ||
| #[cfg(fuzzing)] | ||
| pub fn new( | ||
| pub fn new_for_fuzzing( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. bikeshed: I'd prefer that this always be named |
||
| id: StreamId, | ||
| state: &'a mut StreamsState, | ||
| pending: &'a mut Retransmits, | ||
| conn_state: &'a super::State, | ||
| ) -> Self { | ||
| Self::new(id, state, pending, conn_state) | ||
| } | ||
|
|
||
| pub(super) fn new( | ||
| id: StreamId, | ||
| state: &'a mut StreamsState, | ||
| pending: &'a mut Retransmits, | ||
| conn_state: &'a super::State, | ||
| ) -> Self { | ||
| let index = state.send.get_index_of(&id); | ||
| Self { | ||
| id, | ||
| index, | ||
| state, | ||
| pending, | ||
| conn_state, | ||
|
|
@@ -221,7 +228,9 @@ impl<'a> SendStream<'a> { | |
| /// | ||
| /// Returns the number of bytes successfully written. | ||
| pub fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> { | ||
| Ok(self.write_source(&mut ByteSlice::from_slice(data))?.bytes) | ||
| let prefix_len = self.write_limit()?.min(data.len()); | ||
| self.write_unchecked(data[..prefix_len].to_vec().into()); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll change this to Ralith's suggestion from the previous PR before merging. |
||
| Ok(prefix_len) | ||
| } | ||
|
|
||
| /// Send data on the given stream | ||
|
|
@@ -231,10 +240,27 @@ impl<'a> SendStream<'a> { | |
| /// [`Written::chunks`] will not count this chunk as fully written. However | ||
| /// the chunk will be advanced and contain only non-written data after the call. | ||
| pub fn write_chunks(&mut self, data: &mut [Bytes]) -> Result<Written, WriteError> { | ||
| self.write_source(&mut BytesArray::from_chunks(data)) | ||
| let limit = self.write_limit()?; | ||
| let mut written = Written::default(); | ||
| for chunk in data { | ||
| let prefix = chunk.split_to(chunk.len().min(limit - written.bytes)); | ||
| written.bytes += prefix.len(); | ||
| self.write_unchecked(prefix); | ||
|
|
||
| if chunk.is_empty() { | ||
| written.chunks += 1; | ||
| } | ||
|
|
||
| debug_assert!(written.bytes <= limit); | ||
| if written.bytes == limit { | ||
| break; | ||
| } | ||
| } | ||
| Ok(written) | ||
| } | ||
|
|
||
| fn write_source<B: BytesSource>(&mut self, source: &mut B) -> Result<Written, WriteError> { | ||
| /// Get how many bytes could be written immediately, or mark as blocked if zero | ||
| fn write_limit(&mut self) -> Result<usize, WriteError> { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. bikeshed: this name sounds like it's writing a limit somewhere. Maybe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That also seems confusing. This is serving the dual purpose of declaring intent to write and getting the current write limit, but is not actually writing anything. I wonder if we could split those two roles up without significant cost? |
||
| if self.conn_state.is_closed() { | ||
| trace!(%self.id, "write blocked; connection draining"); | ||
| return Err(WriteError::Blocked); | ||
|
|
@@ -244,12 +270,8 @@ impl<'a> SendStream<'a> { | |
|
|
||
| let max_send_data = self.state.max_send_data(self.id); | ||
|
|
||
| let stream = self | ||
| .state | ||
| .send | ||
| .get_mut(&self.id) | ||
| .map(get_or_insert_send(max_send_data)) | ||
| .ok_or(WriteError::ClosedStream)?; | ||
| let index = self.index.ok_or(WriteError::ClosedStream)?; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we raise this condition in |
||
| let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data)); | ||
|
|
||
| if limit == 0 { | ||
| trace!( | ||
|
|
@@ -263,24 +285,26 @@ impl<'a> SendStream<'a> { | |
| return Err(WriteError::Blocked); | ||
| } | ||
|
|
||
| stream.write_limit(limit) | ||
| } | ||
|
|
||
| /// Write bytes under the assumption that `write_limit().unwrap() <= chunk.len()` | ||
| fn write_unchecked(&mut self, chunk: Bytes) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this actually need to be unchecked? Seems like it might be fairly cheap to detect incorrect use. Maybe a |
||
| let stream = self.state.send[self.index.unwrap()].as_mut().unwrap(); | ||
| let was_pending = stream.is_pending(); | ||
| let written = stream.write(source, limit)?; | ||
| self.state.data_sent += written.bytes as u64; | ||
| self.state.unacked_data += written.bytes as u64; | ||
| trace!(stream = %self.id, "wrote {} bytes", written.bytes); | ||
| self.state.data_sent += chunk.len() as u64; | ||
| self.state.unacked_data += chunk.len() as u64; | ||
| trace!(stream = %self.id, "wrote {} bytes", chunk.len()); | ||
| stream.pending.write(chunk); | ||
| if !was_pending { | ||
| self.state.pending.push_pending(self.id, stream.priority); | ||
| } | ||
| Ok(written) | ||
| } | ||
|
|
||
| /// Check if this stream was stopped, get the reason if it was | ||
| pub fn stopped(&self) -> Result<Option<VarInt>, ClosedStream> { | ||
| match self.state.send.get(&self.id).as_ref() { | ||
| Some(Some(s)) => Ok(s.stop_reason), | ||
| Some(None) => Ok(None), | ||
| None => Err(ClosedStream { _private: () }), | ||
| } | ||
| let index = self.index.ok_or(ClosedStream { _private: () })?; | ||
| Ok(self.state.send[index].as_ref().and_then(|s| s.stop_reason)) | ||
| } | ||
|
|
||
| /// Finish a send stream, signalling that no more data will be sent. | ||
|
|
@@ -290,12 +314,9 @@ impl<'a> SendStream<'a> { | |
| /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished | ||
| pub fn finish(&mut self) -> Result<(), FinishError> { | ||
| let max_send_data = self.state.max_send_data(self.id); | ||
| let stream = self | ||
| .state | ||
| .send | ||
| .get_mut(&self.id) | ||
| .map(get_or_insert_send(max_send_data)) | ||
| .ok_or(FinishError::ClosedStream)?; | ||
|
|
||
| let index = self.index.ok_or(FinishError::ClosedStream)?; | ||
| let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data)); | ||
|
|
||
| let was_pending = stream.is_pending(); | ||
| stream.finish()?; | ||
|
|
@@ -312,12 +333,8 @@ impl<'a> SendStream<'a> { | |
| /// - when applied to a receive stream | ||
| pub fn reset(&mut self, error_code: VarInt) -> Result<(), ClosedStream> { | ||
| let max_send_data = self.state.max_send_data(self.id); | ||
| let stream = self | ||
| .state | ||
| .send | ||
| .get_mut(&self.id) | ||
| .map(get_or_insert_send(max_send_data)) | ||
| .ok_or(ClosedStream { _private: () })?; | ||
| let index = self.index.ok_or(ClosedStream { _private: () })?; | ||
| let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data)); | ||
|
|
||
| if matches!(stream.state, SendState::ResetSent) { | ||
| // Redundant reset call | ||
|
|
@@ -341,12 +358,8 @@ impl<'a> SendStream<'a> { | |
| /// - when applied to a receive stream | ||
| pub fn set_priority(&mut self, priority: i32) -> Result<(), ClosedStream> { | ||
| let max_send_data = self.state.max_send_data(self.id); | ||
| let stream = self | ||
| .state | ||
| .send | ||
| .get_mut(&self.id) | ||
| .map(get_or_insert_send(max_send_data)) | ||
| .ok_or(ClosedStream { _private: () })?; | ||
| let index = self.index.ok_or(ClosedStream { _private: () })?; | ||
| let stream = self.state.send[index].get_or_insert_with(|| Send::new(max_send_data)); | ||
|
|
||
| stream.priority = priority; | ||
| Ok(()) | ||
|
|
@@ -357,13 +370,12 @@ impl<'a> SendStream<'a> { | |
| /// # Panics | ||
| /// - when applied to a receive stream | ||
| pub fn priority(&self) -> Result<i32, ClosedStream> { | ||
| let stream = self | ||
| .state | ||
| .send | ||
| .get(&self.id) | ||
| .ok_or(ClosedStream { _private: () })?; | ||
| let index = self.index.ok_or(ClosedStream { _private: () })?; | ||
|
|
||
| Ok(stream.as_ref().map(|s| s.priority).unwrap_or_default()) | ||
| Ok(self.state.send[index] | ||
| .as_ref() | ||
| .map(|s| s.priority) | ||
| .unwrap_or_default()) | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when the
swap_remove_entrycall introduced in the previous commit invalidates this?