feat(rpc): add webzockets dependency for json rpc websockets#1222
feat(rpc): add webzockets dependency for json rpc websockets#1222jbuckmccready wants to merge 20 commits intomainfrom
Conversation
87e052b to
35e2914
Compare
- Add webzockets as a local path dependency in build.zig.zon - Wire up webzockets module in build.zig - Re-export webzockets module from src/rpc/lib.zig to include in build - Update libxev dependency with fix for error returns (compatible with zig 0.14): Syndica/libxev@11a00ee
35e2914 to
8e6b2f6
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. 🚀 New features to boost your workflow:
|
Not particularly useful and takes up a lot of space in the README.md.
- Define test_webzockets job for Linux environment - Define test_webzockets_macos job for macOS environment - Update workflows to run webzockets tests on both platforms
- add RawClient wait helpers for message type, close frames, and disconnects - make read timeout configurable and default to 2s to reduce jitter flakes - update close, fragmentation, handshake, and timeout tests to use bounded waits
src/rpc/webzockets/src/buffer.zig
Outdated
| /// Growable buffer pool with runtime-configurable buffer size. | ||
| /// Uses an intrusive free list (like std.heap.MemoryPool) for efficient reuse. | ||
| /// Thread-safe for shared use across connections. | ||
| /// | ||
| /// Pool grows beyond preheat size as needed. Use preheat to pre-allocate | ||
| /// buffers for expected concurrent usage, avoiding allocations in the | ||
| /// common case. | ||
| pub const BufferPool = struct { |
There was a problem hiding this comment.
Not convinced we need something like this, we already have e.g. arena allocators
There was a problem hiding this comment.
I don't quite follow, how would I use arena allocator effectively for server side read buffer per client?
This pooling is maybe an over optimization, this approach is biased to the patterns I saw from networking around edge-gateway.
The idea is if you know the use case for the server/API then you can tune the buffer pool size according to message sizes you expect from clients, then clients connecting, sending message, and disconnecting turns into single pop and push rather than going to a more generic/global allocator and relying on it to pool efficiently.
For our very specific use case of broadcasting jrpc subscriptions I I don't think this buffer pool will be used at all since client messages will always fit within the embedded read buffer each client struct has - but this should be effective for a more general web server use case and doesn't add much complexity.
src/rpc/webzockets/src/buffer.zig
Outdated
| /// Mutex for thread-safe access. | ||
| mutex: std.Thread.Mutex, |
There was a problem hiding this comment.
We can do without multi-threading here - if we do have a pool (or arena), per-thread is probably better.
There was a problem hiding this comment.
Yeah I'll change it.
My original thinking was the uncontested path is extremely fast anyways and this would allow more IO loops to share something that even under high volume will be largely uncontested, and it would avoid dead/unbalanced distributions across threads, but that's something easy to refine later.
Current code does bad things with the lock too (holds while allocating new buffer...). EDIT: oh right it would have to if it's backed by the arena.
| // ============================================================================ | ||
| // AutobahnRunner — orchestrates sequential test-case execution | ||
| // ============================================================================ |
| // ============================================================================ | ||
| // AutobahnClientHandler — echo handler for the Autobahn testsuite | ||
| // ============================================================================ |
| pub fn main() !void { | ||
| const allocator = std.heap.c_allocator; |
There was a problem hiding this comment.
Let's avoid the c_allocator and prefer the gpa
There was a problem hiding this comment.
I can update autobahn server as well. I can change to GPA but still no graceful exit (no leak check), or change to GPA and add sigint handler and graceful shutdown and check for leak (this adds some lines). LMK
| /// Manages read buffer state with automatic tier escalation: | ||
| /// - Starts with an embedded buffer for small messages | ||
| /// - Upgrades to pooled or dynamic buffer when needed | ||
| /// - Retains larger buffer for subsequent messages (performance optimization) | ||
| /// - Restores to embedded only on explicit reset() |
There was a problem hiding this comment.
Not convinced that we need any of this
| /// Type-erased context pointer for the shutdown completion callback. | ||
| shutdown_userdata: ?*anyopaque, |
src/rpc/webzockets/src/mask.zig
Outdated
| .stage2_llvm, .stage2_c => true, | ||
| else => false, | ||
| }; | ||
|
|
There was a problem hiding this comment.
Is this correct? If so can you link something on the issue tracker. Worth noting we only use the llvm backend
There was a problem hiding this comment.
I copied from: https://github.com/karlseguin/websocket.zig/blob/97fefafa59cc78ce177cff540b8685cd7f699276/src/proto.zig#L7-L10
I didn't drill into the compiler to see details. I put it in mostly just to note a place where disabling may be needed - for our use case it's not needed.
src/rpc/webzockets/src/mask.zig
Outdated
| pub fn mask(m: []const u8, payload: []u8) void { | ||
| std.debug.assert(m.len == 4); |
There was a problem hiding this comment.
if m must be len 4, then why not e.g. *const [4]u8
There was a problem hiding this comment.
Yeah, and style wise should I pass by value or *const here? Likely compiles to same thing but style wise is there preference for small values/arrays like this?
There was a problem hiding this comment.
Removed conditional stuff with backend, changed to pass by value.
| test { | ||
| // Uncomment to see logs during tests | ||
| // std.testing.log_level = .debug; | ||
| std.testing.refAllDecls(@This()); | ||
| std.testing.refAllDecls(@import("control_queue.zig")); | ||
| } |
There was a problem hiding this comment.
We should be moving away from refAllDecls
There was a problem hiding this comment.
Should there be explicit import for each file inside test block? I'm not familiar with Zig on this.
83cd274 to
96a343e
Compare
Also removes need for pool_buffer_size field in Reader.
- remove pending_data from client and server write state - add outstandingUserWrite helper for busy, defer, and cleanup checks - clear header_len in finishWrite before invoking onWriteComplete
Update libxev dependency hash in both root and webzockets build.zig.zon
46fbfc7 to
0639fd4
Compare
…erver - Add pauseReads/resumeReads/peekBufferedBytes API to both connections - Add onBytesRead handler callback for observing raw TCP data arrival - Add re-entrancy guard to processMessages to prevent recursive dispatch - Improve reader with availableSpace, compactIfFull, early compaction heuristic - Add comprehensive e2e tests for burst processing, mid-stream pause, close-while-paused, re-entrancy detection, and small-buffer scenarios
0639fd4 to
0028053
Compare
- Document onBytesRead handler callback - Add pauseReads, resumeReads, peekBufferedBytes connection methods
- Delete buffer.zig and update all code and docs - Reader now escalates directly from embedded buffer to dynamic allocation
Summary
Adds
webzocketspackage to pathsrc/rpc/webzockets.webzocketsimplements websockets in zig, built on top oflibxevfor network IO. Seesrc/rpc/webzockets/README.mdfor details on the package itself.This is only adds a websocket implementation needed to implement the json RPC websocket API. The json RPC API will be in a different PR.
Changes
Updated the
libxevversion used by sig inbuild.zig.zonto new commit (current HEAD of zig-0.14 branch): Syndica/libxev@11a00eeThis commit is on top of a slightly later version of
libxevthan we were previously using that is still compatible with zig 0.14.1, and the commit fixes error returns for thekqueuebackend (associated pull request upstream: mitchellh/libxev#207).Updated
build.zigandbuild.zig.zonwithwebzocketsdependency, and added import tosrc/rpc/lib.zigsowebzocketsis part of the build.Ran
docs/generate.pysowebzocketsREADME.md is included in the published docs.Updated
scripts/style.pyto skip cache dirs.Update 2026-02-13:
Fixed another bug in libxev where io_uring gave unexpected error on shutdown when socket already closed.
Upstream PR: mitchellh/libxev#211
Syndica fork commit: Syndica/libxev@c16162f
NOTE: for kqueue/epoll backends it could just issue
std.posix.shutdowndirectly to avoid the event loop processing entirely, but io_uring backend actually queues the shutdown command to avoid a syscall.Update 2026-02-14:
Added read pause/resume (0028053)
Removed BufferPool (51641cc), it will not be used by the jrpc server (embedded read buffer will cover sub/unsub requests) and adds complexity.