Skip to content

Add H2 connection pool with per-authority multiplexing#4487

Open
muhamadazmy wants to merge 2 commits intorestatedev:mainfrom
muhamadazmy:pr4487
Open

Add H2 connection pool with per-authority multiplexing#4487
muhamadazmy wants to merge 2 commits intorestatedev:mainfrom
muhamadazmy:pr4487

Conversation

@muhamadazmy
Copy link
Copy Markdown
Contributor

@muhamadazmy muhamadazmy commented Mar 13, 2026

Add H2 connection pool with per-authority multiplexing

Summary

  • Add AuthorityPool that manages multiple H2 connections to a single authority (scheme+host+port), creating connections on demand when streams are exhausted and evicting failed ones
  • Add Pool that routes requests to the correct AuthorityPool via a DashMap<ConnectionInfo, AuthorityPool>
  • Add PoolBuilder with configurable max_connections and init_max_streams per authority

Stack created with Sapling. Best reviewed with ReviewStack.

@claude
Copy link
Copy Markdown

claude bot commented Mar 17, 2026

⚠️ Code review skipped — your organization's overage spend limit has been reached.

Code review is billed via overage credits. To resume reviews, an organization admin can raise the monthly limit in Settings → Usage.

Once credits are available, reopen this pull request to trigger a review.

Copy link
Copy Markdown
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for creating this PR @muhamadazmy. The design of connection pools per authority makes a lot of sense. Where I was a little bit unsure was about the poll ready behavior of the AuthorityPool as we seem to do prefer waiting on a single connection and repeatedly sorting the whole list of connections when trying to find new ones. Maybe add a high-level explanation of the strategy you were following there. This will help me review the code for its correctness. Please also post the results of the benchmarks.

// Helpers
// ---------------------------------------------------------------------------

const MAX_CONCURRENT_STREAMS: u32 = 100;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: These kind of constants I would put at the beginning of the file. Otherwise when reading the code, it is not really clear where this value is coming from.

}

/// Send a request with body via raw h2 and drain the echoed response.
async fn raw_h2_body_request(send_request: &h2::client::SendRequest<Bytes>, payload: Bytes) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could raw_h2_empty_request be raw_h2_body_request(..., Bytes::default)?

Comment on lines +105 to +139
/// A connector that creates in-memory duplex streams and spawns an H2 echo
/// server on the other end. Used by the custom pool benchmarks.
#[derive(Clone)]
struct TestConnector {
config: Arc<ServerConfig>,
}

impl TestConnector {
fn new(max_concurrent_streams: u32) -> Self {
Self {
config: Arc::new(ServerConfig {
max_concurrent_streams,
}),
}
}
}

impl Service<Uri> for TestConnector {
type Response = DuplexStream;
type Error = io::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, _req: Uri) -> Self::Future {
let config = Arc::clone(&self.config);
Box::pin(async move {
let (client, server) = tokio::io::duplex(64 * 1024);
tokio::spawn(run_server(server, config));
Ok(client)
})
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks quite familiar to the testing utilities found at other newly added places (like in conn.rs). I think with this PR we have now 4 TestConnectors in the code base that look quite similar. Could this be deduplicated?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can definitely deduplicate it. the only reason I kept it in case I wanted to customize the connector per each module. But I ended up using exact same one in the end. Will clean up!

Comment on lines +217 to +253
/// Runs an H2 echo server: for each request, echoes the request body back
/// and sends empty trailers when done.
async fn run_server(stream: DuplexStream, config: Arc<ServerConfig>) {
let mut h2 = h2::server::Builder::new()
.max_concurrent_streams(config.max_concurrent_streams)
.handshake::<_, Bytes>(stream)
.await
.unwrap();

while let Some(request) = h2.accept().await {
let (request, mut respond) = request.unwrap();
tokio::spawn(async move {
let response = http::Response::builder()
.status(StatusCode::OK)
.body(())
.unwrap();
let mut send_stream = respond.send_response(response, false).unwrap();
let mut recv_body = request.into_body();

while let Some(data) = recv_body.data().await {
let data = data.unwrap();
recv_body
.flow_control()
.release_capacity(data.len())
.unwrap();

send_stream.reserve_capacity(data.len());
let _ = futures::future::poll_fn(|cx| send_stream.poll_capacity(cx)).await;
if send_stream.send_data(data, false).is_err() {
return;
}
}

let _ = send_stream.send_trailers(http::HeaderMap::new());
});
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with this function. Maybe as a general comment: If there are utilities which we can reuse across tests because they implement the same behavior, then let's do it.

.build(TestConnector::new(MAX_CONCURRENT_STREAMS))
}

type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks identical to GenericError.

let pool = make_pool(10, 4);
let mut handles = tokio::task::JoinSet::default();

for i in 0u8..5 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's stress the pool a bit more by making more concurrent requests.

/// Only meaningful when `keep_alive_interval` is `Some`. Defaults to 20 s.
pub(crate) keep_alive_timeout: Duration,
/// How often to send HTTP/2 PING frames to keep idle connections alive.
/// `None` disables keep-alive pings entirely. Defaults to `None`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we disabling the keep alives by default? Are they expensive to do? If not, then enabling them by default has the benefit that the system is more resilient against badly behaving service deployments/infrastructure.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the same config semantics as hyper. They also has it disabled by default.

Copy link
Copy Markdown
Contributor

@tillrohrmann tillrohrmann Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the sane default for Restate? Are we configuring this value to always be Some in the code bits that use the service client?

Comment on lines +183 to +186
let conn = self
.ready
.as_mut()
.expect("call() invoked without prior poll_ready()");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make the API easier to use if we passed in the Connection to the call method and we obtain the Connection from poll_ready()?

}

fn key(self) -> u64 {
let mut hasher = std::hash::DefaultHasher::new();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you choose the DefaultHasher? How are other hashers performing in terms of value distribution and performance?


let mut authority_pool = self
.authorities
.entry(extractor.key())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While a collision between different authorities is unlikely to happen, what did motivate you to hash the scheme + authority? Did you observe big resource usage when keying by the scheme and authority directly? I am wondering whether this is an example of premature optimization where it's safer to key by schema + authority directly since the gains of hashing are marginal. The downside of this approach is that it will be impossible for a user and us to debug if two authorities map to the same hash value.

@muhamadazmy muhamadazmy force-pushed the pr4487 branch 3 times, most recently from 02359b2 to 64c91cb Compare March 30, 2026 15:41
## Summary
- Introduce Connection<C>, a Tower Service-based HTTP/2 connection that multiplexes requests over a single H2 session with semaphore-backed concurrency control
- Add TcpConnector service and ConnectionInfo/IntoConnectionInfo abstractions for URI-based TCP connection establishment
## Summary
- Add AuthorityPool<C> that manages multiple H2 connections to a single authority (scheme+host+port), creating connections on demand when streams are exhausted and evicting failed ones
- Add Pool<C> that routes requests to the correct AuthorityPool via a DashMap<ConnectionInfo, AuthorityPool<C>>
- Add PoolBuilder with configurable max_connections and init_max_streams per authority
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants