-
Notifications
You must be signed in to change notification settings - Fork 13
feat: driver high-water marks #161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
e81e3f4
1a32535
998e96f
88aff13
15b0aa6
3196ba8
b98636b
fc90afc
204210a
1341c03
e118d05
05af34d
79854fd
4758403
5424fef
8260946
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,4 @@ | ||
| use std::{ | ||
| collections::VecDeque, | ||
| pin::Pin, | ||
| sync::Arc, | ||
| task::{Context, Poll, ready}, | ||
|
|
@@ -14,11 +13,8 @@ use tokio::{ | |
| time::Interval, | ||
| }; | ||
|
|
||
| use super::{ReqError, ReqOptions}; | ||
| use crate::{ | ||
| SendCommand, | ||
| req::{SocketState, conn_manager::ConnManager}, | ||
| }; | ||
| use super::{ReqError, ReqOptions, SendCommand}; | ||
| use crate::req::{SocketState, conn_manager::ConnManager}; | ||
|
|
||
| use msg_common::span::{EnterSpan as _, SpanExt as _, WithSpan}; | ||
| use msg_transport::{Address, Transport}; | ||
|
|
@@ -42,8 +38,8 @@ pub(crate) struct ReqDriver<T: Transport<A>, A: Address> { | |
| pub(crate) conn_manager: ConnManager<T, A>, | ||
| /// The timer for the write buffer linger. | ||
| pub(crate) linger_timer: Option<Interval>, | ||
| /// The outgoing message queue. | ||
| pub(crate) egress_queue: VecDeque<WithSpan<reqrep::Message>>, | ||
| /// The single pending outgoing message waiting to be sent. | ||
| pub(crate) pending_egress: Option<WithSpan<reqrep::Message>>, | ||
| /// The currently pending requests waiting for a response. | ||
| pub(crate) pending_requests: FxHashMap<u32, WithSpan<PendingRequest>>, | ||
| /// Interval for checking for request timeouts. | ||
|
|
@@ -135,7 +131,7 @@ where | |
| let msg = message.inner.into_wire(self.id_counter); | ||
| let msg_id = msg.id(); | ||
| self.id_counter = self.id_counter.wrapping_add(1); | ||
| self.egress_queue.push_back(msg.with_span(span.clone())); | ||
| self.pending_egress = Some(msg.with_span(span.clone())); | ||
| self.pending_requests | ||
| .insert(msg_id, PendingRequest { start, sender: response }.with_span(span)); | ||
| } | ||
|
|
@@ -215,12 +211,10 @@ where | |
| Poll::Pending => {} | ||
| } | ||
|
|
||
| // NOTE: We try to drain the egress queue first (the `continue`), writing everything to | ||
| // the `Framed` internal buffer. When all messages are written, we move on to flushing | ||
| // the connection in the block below. We DO NOT rely on the `Framed` internal | ||
| // backpressure boundary, because we do not call `poll_ready`. | ||
| if let Some(msg) = this.egress_queue.pop_front().enter() { | ||
| // Generate the new message | ||
| // Try to send the pending egress message if we have one. | ||
| // We only hold a single pending message here; the channel serves as the actual queue. | ||
| // This pattern ensures we respect backpressure and don't accumulate unbounded messages. | ||
| if let Some(msg) = this.pending_egress.take().enter() { | ||
| let size = msg.size(); | ||
| tracing::debug!("Sending msg {}", msg.id()); | ||
| // Write the message to the buffer. | ||
|
|
@@ -236,7 +230,7 @@ where | |
| } | ||
| } | ||
|
|
||
| // We might be able to write more queued messages to the buffer. | ||
| // Continue to potentially send more or flush | ||
| continue; | ||
|
||
| } | ||
|
|
||
|
|
@@ -267,25 +261,34 @@ where | |
| this.check_timeouts(); | ||
| } | ||
|
|
||
| // Check for outgoing messages from the socket handle | ||
| match this.from_socket.poll_recv(cx) { | ||
| Poll::Ready(Some(cmd)) => { | ||
| this.on_send(cmd); | ||
| // Check for outgoing messages from the socket handle. | ||
| // Only poll for new requests when pending_egress is empty AND we're under HWM to maintain backpressure. | ||
| let under_hwm = this | ||
| .options | ||
| .pending_requests_hwm | ||
| .map(|hwm| this.pending_requests.len() < hwm) | ||
| .unwrap_or(true); | ||
|
|
||
| continue; | ||
| } | ||
| Poll::Ready(None) => { | ||
| tracing::debug!( | ||
| "socket dropped, shutting down backend and flushing connection" | ||
| ); | ||
| if this.pending_egress.is_none() && under_hwm { | ||
| match this.from_socket.poll_recv(cx) { | ||
| Poll::Ready(Some(cmd)) => { | ||
| this.on_send(cmd); | ||
|
|
||
| if let Some(channel) = this.conn_manager.active_connection() { | ||
| let _ = ready!(channel.poll_close_unpin(cx)); | ||
| continue; | ||
| } | ||
| Poll::Ready(None) => { | ||
| tracing::debug!( | ||
| "socket dropped, shutting down backend and flushing connection" | ||
| ); | ||
|
|
||
| if let Some(channel) = this.conn_manager.active_connection() { | ||
| let _ = ready!(channel.poll_close_unpin(cx)); | ||
| } | ||
|
|
||
| return Poll::Ready(()); | ||
| return Poll::Ready(()); | ||
| } | ||
| Poll::Pending => {} | ||
| } | ||
| Poll::Pending => {} | ||
| } | ||
|
|
||
| return Poll::Pending; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this still relevant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because we would still have requests in
pending_requests.This only flushes
pending_egress, where before it was a flush ofegress_queue. In a way this is the same asegress_queuewith size 1.This TODO just signals that we don't really respond anything to
pending_requests, we could for example send a connection reset instead... Or at least that's what I understand here