Atomic reconnects for capnp-client#8578
Conversation
Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
There was a problem hiding this comment.
Pull request overview
This PR implements atomic reconnection logic for the Cap'n Proto client to prevent multiple concurrent goroutines from triggering simultaneous reconnections when a write error occurs. The key changes introduce connection state management with read-write locking to ensure only one goroutine reconnects at a time.
Key changes:
- Introduced connection state machine (
connStateDisconnected,connStateError,connStateConnected) with RWMutex-based synchronization - Refactored reconnection logic to hold write lock during connection/reconnection and read lock during writes
- Added test coverage for concurrent requests with intermittent failures to verify atomic reconnection behavior
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
| pkg/receive/writecapnp/client.go | Implements atomic reconnection with state machine and RWMutex, refactors RemoteWrite to use retry loop instead of recursive reconnect |
| pkg/receive/capnproto_server_test.go | Adds test with faulty handler to verify atomic reconnection under concurrent load |
| pkg/receive/capnp_server.go | Changes NewCapNProtoServer signature to accept Writer_Server interface instead of concrete type |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| rpcConn := rpc.NewConn(codec, nil) | ||
| r.writer = Writer(rpcConn.Bootstrap(ctx)) |
There was a problem hiding this comment.
The RPC connection (rpcConn) is created but never explicitly closed or stored for cleanup. When the connection state changes to connStateError or when reconnecting, the previous rpcConn instance is lost without being closed, potentially leading to resource leaks.
Consider storing the rpcConn in the conn struct (similar to how closer is stored) and ensuring it's properly closed in the close() method. Alternatively, ensure that closing the codec also properly closes the RPC connection.
pkg/receive/writecapnp/client.go
Outdated
| return nil, status.Error(codes.InvalidArgument, "rpc failed") | ||
| case WriteError_internal: | ||
| extraContext, err := s.ExtraErrorContext() | ||
| return nil, status.Error(codes.Internal, "rpc failed") |
There was a problem hiding this comment.
The handling of WriteError_internal no longer includes the extra error context that was previously retrieved and included in the error message. The old code would fetch ExtraErrorContext() and include it in the error message for better debuggability.
Now the code just returns a generic "rpc failed" message. This reduces the debuggability of internal errors. Consider retrieving and including the extra error context similar to the previous implementation.
| return nil, status.Error(codes.Internal, "rpc failed") | |
| return nil, status.Error(codes.Internal, "rpc failed: "+resp.ExtraErrorContext()) |
| } | ||
| r.mu.Unlock() | ||
| return nil | ||
| r.state = connStateDisconnected |
There was a problem hiding this comment.
The writer field is not reset when closing the connection. After close() is called, r.writer still holds a reference to the old RPC client object, but r.state is set to connStateDisconnected. This could lead to issues if there's any code path that tries to use the writer based on the state check.
Consider setting r.writer to a zero value after closing to ensure no stale references remain:
r.writer = Writer{}| r.state = connStateDisconnected | |
| r.state = connStateDisconnected | |
| r.writer = Writer{} |
| arena := capnp.SingleSegment(nil) | ||
| defer arena.Release() |
There was a problem hiding this comment.
The arena is released via defer on line 121, but the function returns resp which was created from result.Struct() on line 142. If resp references memory from the arena's segment, this could lead to use-after-free since the arena is released before the caller can use the returned response.
Cap'n Proto messages typically need their backing segments to remain alive for as long as the structs are in use. Consider whether the arena should be released at all here, or if its lifecycle should be tied to the response's lifecycle (e.g., via the release function).
pkg/receive/capnproto_server_test.go
Outdated
|
|
||
| type faultyHandler struct { | ||
| mu sync.Mutex | ||
| numReqs int |
There was a problem hiding this comment.
The numReqs field is declared but never used. It's not initialized in the constructor and not accessed anywhere in the code. This appears to be unused code that should be removed.
| numReqs int |
pkg/receive/capnproto_server_test.go
Outdated
| wg.Wait() | ||
| } | ||
| require.NoError(t, client.Close()) | ||
| require.NoError(t, errors.Join(listener.Close())) |
There was a problem hiding this comment.
Using errors.Join() with a single error is unnecessary. The listener.Close() already returns an error, so you can directly use require.NoError(t, listener.Close()) without wrapping it in errors.Join().
errors.Join() is meant for combining multiple errors, but here it's being used with a single error which adds unnecessary overhead.
| require.NoError(t, errors.Join(listener.Close())) | |
| require.NoError(t, listener.Close()) |
Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
| r.closer = codec | ||
|
|
||
| rpcConn := rpc.NewConn(codec, nil) | ||
| r.writer = Writer(rpcConn.Bootstrap(ctx)) |
There was a problem hiding this comment.
Do we need to call Resolve() here like in https://github.com/thanos-io/thanos/pull/8491/files?
This PR resolves the problem where connections are terminated while other requests are in flight. I can close this for now and we will test #8491 once we have some spare time. I don't think #8491 solves the same problem though. |
|
I think this is probably still needed - I tried that other PR and it works well but now I see multiple attempts to reconnect and I think this PR addresses that issue, no? 🤔 |
|
Yes that was the idea. Ok let me rebase it and push another version. |
|
Do we have any plan merging this? This looks great to me. |
The reconnect section in the capnproto client is not atomic. When we get a write error we close the connection for all in-flight requests, which causes a flurry of reconnects to the target receiver.
This commit changes the logic to hold a write-lock while (re)connecting, and a read-lock while writing to a receiver. That way we won't have both operations happening at the same time.