Skip to content

Conversation

@Karrq
Copy link
Contributor

@Karrq Karrq commented Jan 12, 2026

This PR attempts to address the issue outlined in #13.
It does so by using the SubscriberSession pattern: 1 pending outgoing message in the driver and the rest in the originating channel.

Additionally, there's a HWM configurable for the number of pending requests for a given peer, allowing only up to N requests to be pulled in the PeerState. This effectively allows HWM + Framed buffer size messages to be received by the network socket before we stop polling data from it

@Karrq Karrq requested a review from mempirate January 12, 2026 17:36
@Karrq Karrq self-assigned this Jan 12, 2026
@Karrq Karrq linked an issue Jan 12, 2026 that may be closed by this pull request
Karrq added 2 commits January 13, 2026 13:33
docs: clarify

fix: default buffer size from crate

test(hwm): fix behaviour to use hwm error
fix: cleanup unused error value
Copy link
Contributor

@mempirate mempirate left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a good track, but I was thinking about it slightly differently: the high-water mark should entail both queued requests (i.e. in the channel between socket frontend and driver, currently defined by DEFAULT_BUFFER_SIZE), and pending requests waiting for a response.

I realize this may be better to split up into 2 options: with_max_queue_size and with_max_pending_requests. Theoretically, the queue should only start filling up once max_pending_requests is hit, or if the driver is just too slow at processing messages.

Wdyt?


// We might be able to write more queued messages to the buffer.
// Continue to potentially send more or flush
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This continue is now redundant - we won't be able to write more since pending_egress is now None. However, in the error case above, we should continue to poll the conn_manager

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, in the error case, you should put the message back into pending_egress or it gets lost

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm not super sure I understand.
If we continue after writing pending_egress we will poll conn_manager again and also poll the given channel to drive it.

At the end, since we don't have pending_egress we will get another message when polling from_socket, processing that one...
Basically we try to do as much work as possible before getting "stuck" waiting for the underlying I/O, no?

Also, in the error case, you should put the message back into pending_egress or it gets lost

Does that apply also to all other error cases then, like when failing to flush? I need to double check how channel works here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we continue after writing pending_egress we will poll conn_manager again and also poll the given channel to drive it.

But we already have the channel in this code path, so there's no need to poll it again. The conn_manager is responsible for ensuring we stay connected in case of a disconnect, so when we get here we don't have to poll it.

At the end, since we don't have pending_egress we will get another message when polling from_socket, processing that one...
Basically we try to do as much work as possible before getting "stuck" waiting for the underlying I/O, no?

Yes, but in this case what work are you doing by continuing after writing to the channel?

Does that apply also to all other error cases then, like when failing to flush? I need to double check how channel works here

That's a good catch, technically we should yes. Maybe there's a way to take the Framed buffer and cache it to be written again when we reconnect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I thought conn manager handled multiple channels, depending on what was ready. Will account for it then

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I understood it now. If we were to still continue we'd just get the channel again, poll for incoming messages, but not really prepare anything else to write out. Honestly it's ok but we can instead drive the rest, which would also include getting a new message from the queue in the pending_egress...
In this scenario, we'll try flushing after we reach the write buffer size or after linger_timer anyways, with potentially N messages written (if the write and from_socket keep being ready)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OTOH wrt restoring the egress on error/flush, I'm unsure. I think it needs further testing so I'd leave it for a new issue.
I think we'd change the semantics too much rn, since it has been like this even before this pending_egress addition, just with multiple messages at once.

@Karrq
Copy link
Contributor Author

Karrq commented Jan 13, 2026

The queue should only start filling up once max_pending_requests is hit, or if the driver is just too slow at processing messages

This is already the case, the channel queue starts to fill up after HWM limit is reached, since the driver won't poll for new messages anymore.

I realize this may be better to split up into 2 options

Agree, for maximum flexibility. I will remove the unbounded growth and put something like 8k items each as a default. Shouldn't be an issue since it's easily configured.

@mempirate
Copy link
Contributor

@claude review this PR for correctness

@claude
Copy link

claude bot commented Jan 14, 2026

Claude finished @mempirate's task in 1m 48s —— View job


Code Review - High-Water Marks Implementation

I've reviewed the PR implementation for driver high-water marks. Here's my detailed analysis:


Architecture & Design ✅

The implementation correctly uses the SubscriberSession pattern:

  • 1 pending outgoing message in the driver (pending_egress)
  • Rest queued in the originating channel
  • This prevents unbounded memory growth in the driver

The HWM is split into two configurable options as discussed:

  • pending_requests_hwm - limits pending requests/responses in flight
  • channel_size (req) - controls the queue size between socket and driver

Correctness Issues Found

1. ⚠️ Message Loss on Send Error (req/driver.rs:217-233)

Location: msg-socket/src/req/driver.rs:217-233

When start_send_unpin fails, the message in pending_egress is dropped:

if let Some(msg) = this.pending_egress.take().enter() {
    // ...
    match channel.start_send_unpin(msg.inner) {
        Ok(_) => {
            this.socket_state.stats.specific.increment_tx(size);
        }
        Err(e) => {
            tracing::error!(err = ?e, "Failed to send message to socket");
            // Message is lost here!
            this.conn_manager.reset_connection();
            continue;
        }
    }
}

Problem: The message is taken from pending_egress but not restored on error. The request will hang forever since the pending_request entry exists but will never receive a response.

Impact: User requests will timeout instead of getting immediate errors. The pending_requests map will accumulate orphaned entries.

Fix: Restore the message to pending_egress on error, or remove the request from pending_requests and notify the sender.

Fix this →


2. ⚠️ Message Loss on Flush Error (req/driver.rs:238-242)

Location: msg-socket/src/req/driver.rs:238-242

Similar issue when flushing fails:

if channel.write_buffer().len() >= this.options.write_buffer_size {
    if let Poll::Ready(Err(e)) = channel.poll_flush_unpin(cx) {
        tracing::error!(err = ?e, "Failed to flush connection");
        this.conn_manager.reset_connection();
        continue; // Messages in write buffer are lost
    }

Problem: Messages already written to the Framed write buffer are lost when flush fails.

Impact: Silent message loss. From the user's perspective, requests were sent but responses never arrive (will timeout).

Note: This is a pre-existing issue (as noted in review discussion) but worth documenting. Consider tracking in a separate issue.


3. 🟡 Potential Race in HWM Check (req/driver.rs:268-270)

Location: msg-socket/src/req/driver.rs:268-270

let under_hwm = this.pending_requests.len() < this.options.pending_requests_hwm;

if this.pending_egress.is_none() && under_hwm {
    match this.from_socket.poll_recv(cx) {

Observation: The under_hwm check happens AFTER processing responses (which reduces pending_requests.len()), so this is correct. However, there's a subtle edge case:

If pending_requests.len() == HWM - 1, we poll from_socket and get a new request. We then call on_send() which adds to both pending_egress and pending_requests, making pending_requests.len() == HWM. This is correct behavior.

Verdict: Implementation is correct. ✅


4. ✅ Rep Driver Implementation Looks Good (rep/driver.rs:468-532)

Location: msg-socket/src/rep/driver.rs:468-532

The Rep side correctly implements backpressure:

let under_hwm = this.pending_requests.len() < this.pending_responses_hwm;

if under_hwm {
    // Poll connection for new requests
} else {
    // Log and don't poll - backpressure applied
}

When at HWM:

  • ✅ Doesn't poll the underlying connection
  • ✅ Still processes completed responses to drain pending_requests
  • ✅ Will wake when responses complete (waker registered on pending_requests.poll_next_unpin)

Verdict: Correct implementation. The waker propagation works because FuturesUnordered will wake the task when futures complete.


Review Comments Addressed

Checking if previous review feedback was incorporated:

  1. Split HWM options - Now separated into max_pending_requests and max_queue_size
  2. Don't drop responses - Rep driver doesn't drop, just stops polling
  3. Return error on full channel - Req returns HighWaterMarkReached when channel is full
  4. Aligned buffer sizes - Both default to DEFAULT_BUFFER_SIZE (8192)
  5. 🟡 Error handling discussion - Acknowledged as needing further work (see issues chore: crates setup + benchmarks directory #1-2 above)

Additional Observations

Test Coverage ✅

The test reqrep_hwm_reached() (msg-socket/tests/it/reqrep.rs:240-322) validates:

  • ✅ HWM enforcement
  • ✅ Channel buffering capacity
  • ✅ Error returned when both limits reached
  • ✅ Expected capacity = HWM + DEFAULT_BUFFER_SIZE

Documentation ✅

Good inline documentation:

  • ✅ Struct field comments explain HWM purpose
  • ✅ Function docs explain behavior
  • ✅ Options builder methods have clear descriptions

Summary

Critical Issues:

  1. Message loss on send error - Must be fixed before merge
  2. Message loss on flush error - Pre-existing, could be separate issue

Overall Assessment:
The core HWM implementation is sound and addresses the architectural concerns from #13. The backpressure mechanism works correctly on both req and rep sides. However, error handling around connection failures needs improvement to prevent message loss.

Recommendation: Address critical issue #1 before merging. Issue #2 can be tracked separately as it's a pre-existing problem, but should be prioritized.


Copy link
Contributor

@mempirate mempirate left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming nits

Comment on lines 79 to 84
/// Currently, this driver will use the following sources of work:
/// 1. Connected peers (scheduled in task 2 and 3)
/// 2. Authentication tasks for connecting peers (scheduled in task 3)
/// 3. Incoming new connections for connecting peers (scheduled in task 5)
/// 4. Process control signals for the underlying transport (doesn't restart the loop)
/// 5. Incoming connections from the underlying transport
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "scheduled in task n" refer to? I think it's not quite obvious, unless those are specific functions code paths you can name and refer to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I will clarify it, but "task n" means "point number n", and in that sense the point number n somehow creates work for point number m.
For example, task 5 (Incoming connections from underlying transport) creates a conn_task which is prosessed by task 3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the new commit :) Rephrased a bit and got claude to do a small ascii diagram for it

Comment on lines 352 to 353
/// TODO: there might be some [`Self::pending_requests`] yet to processed. TBD how to handle
/// them, for now they're dropped.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still relevant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because we would still have requests in pending_requests.
This only flushes pending_egress, where before it was a flush of egress_queue. In a way this is the same as egress_queue with size 1.
This TODO just signals that we don't really respond anything to pending_requests, we could for example send a connection reset instead... Or at least that's what I understand here

Copy link
Contributor

@mempirate mempirate left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Just 2 nits but approved


/// The default buffer size for the socket.
const DEFAULT_BUFFER_SIZE: usize = 1024;
use crate::DEFAULT_BUFFER_SIZE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename this to DEFAULT_QUEUE_SIZE, more accurate now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also used by the other drivers for various buffers and queues. For example Rep uses it for the number of pending responses, but also for channel size for control and notifications of requests.
Req uses it for queue size yes and also pending requests.
Sub uses it for the notification channel size as well as the channel size to queue messages to the driver...

Overall it's used in many places. I'll group up the usages and dedicate separate constants where needed

@thedevbirb thedevbirb merged commit b16003f into main Jan 15, 2026
12 checks passed
@thedevbirb thedevbirb deleted the feat/hwm branch January 15, 2026 16:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: driver high-water marks

4 participants