Skip to content

Commit 0028053

Browse files
committed
feat(webzockets): add pause/resume read flow control for client and server
- Add pauseReads/resumeReads/peekBufferedBytes API to both connections - Add onBytesRead handler callback for observing raw TCP data arrival - Add re-entrancy guard to processMessages to prevent recursive dispatch - Improve reader with availableSpace, compactIfFull, early compaction heuristic - Add comprehensive e2e tests for burst processing, mid-stream pause, close-while-paused, re-entrancy detection, and small-buffer scenarios
1 parent 279a715 commit 0028053

File tree

11 files changed

+1405
-20
lines changed

11 files changed

+1405
-20
lines changed
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
const std = @import("std");
2+
const testing = std.testing;
3+
4+
const servers = @import("../support/test_servers.zig");
5+
const server_handlers = @import("../support/server_handlers.zig");
6+
const clients = @import("../support/test_clients.zig");
7+
const FdLeakDetector = @import("../support/fd_leak.zig").FdLeakDetector;
8+
9+
test "e2e client pause/resume: sequential processing of server burst" {
10+
const fd_check = FdLeakDetector.baseline();
11+
defer fd_check.assertNoLeaks();
12+
13+
const messages = [_][]const u8{ "msg-1", "msg-2", "msg-3" };
14+
var send_ctx: server_handlers.SendMessagesOnOpenHandler.Context = .{ .messages = &messages };
15+
const ts = try servers.startSendMessagesOnOpenServer(testing.allocator, &send_ctx);
16+
defer ts.stop();
17+
18+
// Each server-to-client text frame for "msg-N" (5 bytes payload) is 7 bytes
19+
// on the wire: 2-byte header (FIN+opcode, length) + 5-byte payload.
20+
// 3 messages × 7 bytes = 21 bytes minimum before we resume.
21+
var handler: clients.PauseUntilBufferedClientHandler = .{
22+
.allocator = testing.allocator,
23+
.expected_messages = messages.len,
24+
.resume_threshold = 21,
25+
.results = std.ArrayList(
26+
clients.PauseUntilBufferedClientHandler.RecvResult,
27+
).init(testing.allocator),
28+
};
29+
defer handler.deinit();
30+
31+
var env: clients.TestEnv = undefined;
32+
try env.start();
33+
defer env.deinit();
34+
35+
var conn: clients.TestPauseUntilBufferedClient.Conn = undefined;
36+
var client = env.initClient(clients.TestPauseUntilBufferedClient, &handler, &conn, .{
37+
.address = std.net.Address.initIp4(.{ 127, 0, 0, 1 }, ts.port),
38+
});
39+
40+
try client.connect();
41+
try env.loop.run(.until_done);
42+
43+
try testing.expect(handler.open_called);
44+
try testing.expect(handler.close_called);
45+
try testing.expectEqual(messages.len, handler.results.items.len);
46+
for (messages, handler.results.items) |expected, result| {
47+
try testing.expectEqualSlices(u8, expected, result.data);
48+
}
49+
50+
conn.deinit();
51+
}
52+
53+
test "e2e client pause/resume: pause mid-stream stops dispatch then delivers on resume" {
54+
const fd_check = FdLeakDetector.baseline();
55+
defer fd_check.assertNoLeaks();
56+
57+
const messages = [_][]const u8{ "msg-1", "msg-2", "msg-3", "msg-4" };
58+
var send_ctx: server_handlers.SendMessagesOnOpenHandler.Context = .{ .messages = &messages };
59+
const ts = try servers.startSendMessagesOnOpenServer(testing.allocator, &send_ctx);
60+
defer ts.stop();
61+
62+
// Server sends 4 unmasked text frames: "msg-1".."msg-4" (5 bytes each).
63+
// Each frame = 2 + 5 = 7 bytes → 4 × 7 = 28 bytes threshold.
64+
var handler: clients.PauseMidStreamClientHandler = .{
65+
.allocator = testing.allocator,
66+
.expected_messages = messages.len,
67+
.resume_threshold = 28,
68+
.results = std.ArrayList(
69+
clients.PauseMidStreamClientHandler.RecvResult,
70+
).init(testing.allocator),
71+
};
72+
defer handler.deinit();
73+
74+
var env: clients.TestEnv = undefined;
75+
try env.start();
76+
defer env.deinit();
77+
78+
var conn: clients.TestPauseMidStreamClient.Conn = undefined;
79+
var client = env.initClient(clients.TestPauseMidStreamClient, &handler, &conn, .{
80+
.address = std.net.Address.initIp4(.{ 127, 0, 0, 1 }, ts.port),
81+
});
82+
83+
try client.connect();
84+
try env.loop.run(.until_done);
85+
86+
try testing.expect(handler.open_called);
87+
try testing.expect(handler.close_called);
88+
try testing.expectEqual(messages.len, handler.results.items.len);
89+
for (messages, handler.results.items) |expected, result| {
90+
try testing.expectEqualSlices(u8, expected, result.data);
91+
}
92+
93+
conn.deinit();
94+
}
95+
96+
test "e2e client pause/resume: close while client is paused" {
97+
const fd_check = FdLeakDetector.baseline();
98+
defer fd_check.assertNoLeaks();
99+
100+
const messages = [_][]const u8{"msg-1"};
101+
var send_ctx: server_handlers.SendMessagesOnOpenHandler.Context = .{ .messages = &messages };
102+
const ts = try servers.startSendMessagesOnOpenServer(testing.allocator, &send_ctx);
103+
defer ts.stop();
104+
105+
// Server sends "msg-1" (2 + 5 = 7 bytes) then close frame with code 1000
106+
// + reason "done" (2 + 6 = 8 bytes). Total threshold: 15 bytes.
107+
var handler: clients.PauseUntilBufferedClientHandler = .{
108+
.allocator = testing.allocator,
109+
.expected_messages = 0,
110+
.resume_threshold = 15,
111+
.results = std.ArrayList(
112+
clients.PauseUntilBufferedClientHandler.RecvResult,
113+
).init(testing.allocator),
114+
};
115+
defer handler.deinit();
116+
117+
var env: clients.TestEnv = undefined;
118+
try env.start();
119+
defer env.deinit();
120+
121+
var conn: clients.TestPauseUntilBufferedClient.Conn = undefined;
122+
var client = env.initClient(clients.TestPauseUntilBufferedClient, &handler, &conn, .{
123+
.address = std.net.Address.initIp4(.{ 127, 0, 0, 1 }, ts.port),
124+
});
125+
126+
try client.connect();
127+
try env.loop.run(.until_done);
128+
129+
try testing.expect(handler.open_called);
130+
try testing.expect(handler.close_called);
131+
try testing.expectEqual(@as(usize, 1), handler.results.items.len);
132+
try testing.expectEqualSlices(u8, "msg-1", handler.results.items[0].data);
133+
134+
conn.deinit();
135+
}
136+
137+
test "e2e client pause/resume: no re-entrant onMessage dispatch" {
138+
const fd_check = FdLeakDetector.baseline();
139+
defer fd_check.assertNoLeaks();
140+
141+
const messages = [_][]const u8{ "a", "b", "c", "done" };
142+
var send_ctx: server_handlers.SendMessagesOnOpenHandler.Context = .{ .messages = &messages };
143+
const ts = try servers.startSendMessagesOnOpenServer(testing.allocator, &send_ctx);
144+
defer ts.stop();
145+
146+
// Server sends 4 unmasked text frames: "a" (3B), "b" (3B), "c" (3B), "done" (6B) = 15 bytes.
147+
var handler: clients.ReentrancyDetectClientHandler = .{
148+
.allocator = testing.allocator,
149+
.resume_threshold = 15,
150+
.results = std.ArrayList(
151+
clients.ReentrancyDetectClientHandler.RecvResult,
152+
).init(testing.allocator),
153+
};
154+
defer handler.deinit();
155+
156+
var env: clients.TestEnv = undefined;
157+
try env.start();
158+
defer env.deinit();
159+
160+
var conn: clients.TestReentrancyDetectClient.Conn = undefined;
161+
var client = env.initClient(clients.TestReentrancyDetectClient, &handler, &conn, .{
162+
.address = std.net.Address.initIp4(.{ 127, 0, 0, 1 }, ts.port),
163+
});
164+
165+
try client.connect();
166+
try env.loop.run(.until_done);
167+
168+
try testing.expect(handler.open_called);
169+
try testing.expect(handler.close_called);
170+
try testing.expect(!handler.reentrant_detected);
171+
try testing.expectEqual(messages.len, handler.results.items.len);
172+
for (messages, handler.results.items) |expected, result| {
173+
try testing.expectEqualSlices(u8, expected, result.data);
174+
}
175+
176+
conn.deinit();
177+
}
178+
179+
test "e2e client pause/resume: buffer fills while paused (small read buffer)" {
180+
const fd_check = FdLeakDetector.baseline();
181+
defer fd_check.assertNoLeaks();
182+
183+
var send_ctx: server_handlers.SendMessagesOnOpenHandler.Context = .{
184+
.messages = &servers.small_buf_slices,
185+
};
186+
const ts = try servers.startSendMessagesOnOpenServer(testing.allocator, &send_ctx);
187+
defer ts.stop();
188+
189+
const msg_len = servers.small_buf_msg_len;
190+
const msg_count = servers.small_buf_msg_count;
191+
192+
// 256-byte client read buffer. Server sends 12 unmasked text frames,
193+
// each with 20-byte payload = 22 bytes per frame. 12 × 22 = 264 bytes
194+
// total — exceeds the 256-byte buffer.
195+
// Threshold of 256 ensures we only resume once the buffer is completely
196+
// full and reads have been stopped (freeSpace() == 0).
197+
var handler: clients.PauseUntilBufferedClientHandler = .{
198+
.allocator = testing.allocator,
199+
.expected_messages = msg_count,
200+
.resume_threshold = 256,
201+
.results = std.ArrayList(
202+
clients.PauseUntilBufferedClientHandler.RecvResult,
203+
).init(testing.allocator),
204+
};
205+
defer handler.deinit();
206+
207+
var env: clients.TestEnv = undefined;
208+
try env.start();
209+
defer env.deinit();
210+
211+
var conn: clients.TestPauseUntilBufferedSmallBufClient.Conn = undefined;
212+
var client = env.initClient(clients.TestPauseUntilBufferedSmallBufClient, &handler, &conn, .{
213+
.address = std.net.Address.initIp4(.{ 127, 0, 0, 1 }, ts.port),
214+
});
215+
216+
try client.connect();
217+
try env.loop.run(.until_done);
218+
219+
try testing.expect(handler.open_called);
220+
try testing.expect(handler.close_called);
221+
try testing.expectEqual(@as(usize, msg_count), handler.results.items.len);
222+
223+
for (0..msg_count) |i| {
224+
var expected: [msg_len]u8 = undefined;
225+
const byte = @as(u8, @truncate('A' + i));
226+
@memset(&expected, byte);
227+
try testing.expectEqualSlices(u8, &expected, handler.results.items[i].data);
228+
}
229+
230+
conn.deinit();
231+
}

src/rpc/webzockets/e2e_tests/client/tests.zig

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@ comptime {
33
_ = @import("close_tests.zig");
44
_ = @import("ping_pong_tests.zig");
55
_ = @import("max_message_tests.zig");
6+
_ = @import("pause_resume_tests.zig");
67
_ = @import("timeout_tests.zig");
78
}

0 commit comments

Comments
 (0)