Skip to content

Conversation

mehnazyunus
Copy link
Contributor

@mehnazyunus mehnazyunus commented Oct 15, 2025

Release Summary:

Reuse handshakes across application processes to reduce memory utilization.

Resolved issues:

Description of changes:

This PR introduces a manager server and an application server, which are intended to run in separate processes. A client handshakes with and connects to the manager server. The manager server accepts incoming streams and transfers them via Unix domain sockets to the application server. The application server receives these streams and continues communicating with the client. The same handshake can be reused with multiple manager servers.

Call-outs:

Added in comments.

Testing:

Added happy path integration test. The client handshakes once and connects to two manager servers, reusing the same handshake. It then continues communication with separate application servers.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

@mehnazyunus mehnazyunus requested a review from a team as a code owner October 15, 2025 17:08
};

if keys.application.opener.dedup_check().is_err() {
return Ok(ControlFlow::Continue(())).into();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it okay to just continue here? The dedup check implementation does handle emitting events and sending a control packet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realised the dedup_check function is behind the testing flag, does it make sense to remove that flag and use it here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use the on_decrypt_success function -- and actually decrypt the packet we got.

let acceptor = udp::Acceptor::new(
id,
socket,
&self.stream_sender,
Copy link
Contributor Author

@mehnazyunus mehnazyunus Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Start struct could be fully reused in the manager once we can move this stream sender for UDP to a behavior struct like for TCP. (manager server does not have a stream sender)

return Err(Some(error)).into();
}
WorkerState::Sending { future, event_data } => match future.as_mut().poll(cx) {
Poll::Ready(res) => match res {
Copy link
Contributor Author

@mehnazyunus mehnazyunus Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried reusing the poll_send function here, but wasn't able to resolve the borrowing issues when calling the function with the future from the WorkerState, not sure if there is a way around that.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not worried about it for the initial PR, but we can take a look together at doing so.

let recv_buffer: either::Either<_, Channel> = Either::A(recv_buffer);
let sub = self.env.subscriber();

let map = Map::new(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a placeholder map here to send to accept_stream, afaict it is only used for handling unexpected packets.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll want to avoid creating a new map for each stream -- that's pretty expensive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we create a single placeholder map and reuse that for all streams?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly? I'm not sure what we're using the map for. Maybe a placeholder would work, or we can edit the signatures to have Option<&Map> if that is better/easier.

Poll::Ready(Ok(ControlFlow::Continue(())))
}
Err(err) => {
debug!("Error sending message to socket {:?}", err);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to close the stream here, something like this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that seems right.

/// The ID of the stream
stream_id: u64,

/// The amount of time the TCP stream spent in the queue before being enqueued
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which queue is this? I suspect this is time from TCP accept() to UDS sendmsg()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the "fresh" queue. Can change the comment to "before being sent over UDS"

sojourn_time: core::time::Duration,

/// The number of times the Unix domain socket was blocked on send
#[measure("blocked_count")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for sending this particular stream? It might make sense to switch this (or add) a #[counter(...)] as well.

Measure will emit histograms.

#[allow(dead_code)]
env: Environment<S>,
#[allow(dead_code)]
acceptor_rt: runtime::Shared<S>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have the allow(dead_code) here? Are these only kept for a Drop impl or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed those fields here, but I'm not sure why we have it in the original server in the first place. fmt/clippy/build do not seem to complain without it.

enable_udp: bool,
enable_tcp: bool,
accept_flavor: accept::Flavor,
socket_path: Option<PathBuf>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't expect accept_flavor, workers, acceptor_addr, and maybe even enable_udp/tcp to be relevant here. Are we actually making use of most of these values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workers is used in deciding the concurrency of the acceptor runtime. Do we want to use a fixed value instead?
Will remove acceptor_addr.
Wouldn't enable_udp/tcp be required when we add the UDP socket worker?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm hoping we can get rid of the application builder needing an acceptor runtime.

enable-udp/tcp is a good call out though, keeping those makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the runtime. Currently not using enable-udp/tcp, but would probably have to use it in accept when UDP logic is added.

return Err(Some(error)).into();
}
WorkerState::Sending { future, event_data } => match future.as_mut().poll(cx) {
Poll::Ready(res) => match res {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not worried about it for the initial PR, but we can take a look together at doing so.

};

if keys.application.opener.dedup_check().is_err() {
return Ok(ControlFlow::Continue(())).into();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use the on_decrypt_success function -- and actually decrypt the packet we got.

let recv_buffer: either::Either<_, Channel> = Either::A(recv_buffer);
let sub = self.env.subscriber();

let map = Map::new(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll want to avoid creating a new map for each stream -- that's pretty expensive.


impl EncoderValue for Timestamp {
fn encode<E: Encoder>(&self, buffer: &mut E) {
buffer.encode(&self.0.get());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect we shouldn't add these impls. Instead, let's capture SystemTime before/after send/recv on the unix socket.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants