Skip to content

Commit bafdca3

Browse files
MsgBuffer to handle both combined and multipart read
Signed-off-by: Francis Bouvier <[email protected]>
1 parent ba12945 commit bafdca3

File tree

1 file changed

+141
-39
lines changed

1 file changed

+141
-39
lines changed

src/server.zig

Lines changed: 141 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,17 @@ const Error = IOError || std.fmt.ParseIntError || cdp.Error || NoError;
1818
// I/O Recv
1919
// --------
2020

21+
const BufReadSize = 1024;
22+
2123
pub const Cmd = struct {
2224

2325
// internal fields
2426
socket: std.os.socket_t,
2527
buf: []u8, // only for read operations
2628
err: ?Error = null,
2729

30+
msg_buf: *MsgBuffer,
31+
2832
// CDP
2933
state: cdp.State = .{},
3034

@@ -57,7 +61,7 @@ pub const Cmd = struct {
5761
}
5862

5963
// read and execute input
60-
readInput(input, Cmd.do, self) catch unreachable;
64+
self.msg_buf.read(self.alloc(), input, self, Cmd.do) catch unreachable;
6165

6266
// continue receving incomming messages asynchronously
6367
self.loop().io.recv(*Cmd, self, cbk, completion, self.socket, self.buf);
@@ -74,7 +78,7 @@ pub const Cmd = struct {
7478
return self.browser.currentSession().loop;
7579
}
7680

77-
fn do(self: *Cmd, cmd: []const u8) !void {
81+
fn do(self: *Cmd, cmd: []const u8) anyerror!void {
7882
const res = try cdp.do(self.alloc(), cmd, self);
7983

8084
// send result
@@ -85,60 +89,153 @@ pub const Cmd = struct {
8589
}
8690
};
8791

88-
fn readInput(buf: []const u8, func: anytype, data: anytype) !void {
89-
var input = buf;
92+
/// MsgBuffer return messages from a raw text read stream,
93+
/// according to the following format `<msg_size>:<msg>`.
94+
/// It handles both:
95+
/// - combined messages in one read
96+
/// - single message in several read (multipart)
97+
/// It is safe (and good practice) to reuse the same MsgBuffer
98+
/// on several reads of the same stream.
99+
const MsgBuffer = struct {
100+
size: usize = 0,
101+
buf: []u8,
102+
pos: usize = 0,
103+
104+
fn init(alloc: std.mem.Allocator, size: usize) std.mem.Allocator.Error!MsgBuffer {
105+
const buf = try alloc.alloc(u8, size);
106+
return .{ .buf = buf };
107+
}
90108

91-
while (true) {
92-
var cmd: []const u8 = undefined;
93-
94-
// size msg
95-
var size_msg: usize = undefined;
96-
97-
// parse json msg size
98-
const size_pos = std.mem.indexOfScalar(u8, input, ':').?;
99-
std.log.debug("msg size pos: {d}", .{size_pos});
100-
const size_str = input[0..size_pos];
101-
input = input[size_pos + 1 ..];
102-
size_msg = try std.fmt.parseInt(u32, size_str, 10);
103-
// }
104-
std.log.debug("msg size: {d}", .{size_msg});
105-
106-
// handle several JSON msg in 1 read
107-
const is_multi = input.len > size_msg;
108-
std.log.debug("is_multi: {any}", .{is_multi});
109-
cmd = input[0..size_msg];
110-
std.log.debug("cmd: {s}", .{cmd[0..@min(BufReadSize, size_msg)]});
111-
if (is_multi) {
112-
input = input[size_msg..];
113-
std.log.debug("rest: {s}", .{input});
114-
}
109+
fn deinit(self: MsgBuffer, alloc: std.mem.Allocator) void {
110+
alloc.free(self.buf);
111+
}
115112

116-
try @call(.auto, func, .{ data, cmd });
113+
fn isFinished(self: *MsgBuffer) bool {
114+
return self.pos >= self.size;
115+
}
117116

118-
if (!is_multi) break;
117+
fn isEmpty(self: MsgBuffer) bool {
118+
return self.size == 0 and self.pos == 0;
119+
}
119120

120-
// TODO: handle 1 read smaller than a complete JSON msg
121+
fn reset(self: *MsgBuffer) void {
122+
self.size = 0;
123+
self.pos = 0;
121124
}
122-
}
125+
126+
// read input
127+
// - `do_func` is a callback to execute on each message of the input
128+
// - `data` is a arbitrary payload that will be passed to the callback along with
129+
// the message itself
130+
fn read(
131+
self: *MsgBuffer,
132+
alloc: std.mem.Allocator,
133+
input: []const u8,
134+
data: anytype,
135+
comptime do_func: fn (data: @TypeOf(data), msg: []const u8) anyerror!void,
136+
) !void {
137+
var _input = input; // make input writable
138+
139+
while (true) {
140+
var msg: []const u8 = undefined;
141+
142+
// msg size
143+
var msg_size: usize = undefined;
144+
if (self.isEmpty()) {
145+
// parse msg size metadata
146+
const size_pos = std.mem.indexOfScalar(u8, _input, ':').?;
147+
const size_str = _input[0..size_pos];
148+
msg_size = try std.fmt.parseInt(u32, size_str, 10);
149+
_input = _input[size_pos + 1 ..];
150+
} else {
151+
msg_size = self.size;
152+
}
153+
154+
// multipart
155+
const is_multipart = !self.isEmpty() or _input.len < msg_size;
156+
if (is_multipart) {
157+
158+
// set msg size on empty MsgBuffer
159+
if (self.isEmpty()) {
160+
self.size = msg_size;
161+
}
162+
163+
// get the new position of the cursor
164+
const new_pos = self.pos + _input.len;
165+
166+
// check if the current input can fit in MsgBuffer
167+
if (new_pos > self.buf.len) {
168+
// max_size is the max between msg size and current new cursor position
169+
const max_size = @max(self.size, new_pos);
170+
// resize the MsgBuffer to fit
171+
self.buf = try alloc.realloc(self.buf, max_size);
172+
}
173+
174+
// copy the current input into MsgBuffer
175+
@memcpy(self.buf[self.pos..new_pos], _input[0..]);
176+
177+
// set the new cursor position
178+
self.pos = new_pos;
179+
180+
// if multipart is not finished, go fetch the next input
181+
if (!self.isFinished()) return;
182+
183+
// otherwhise multipart is finished, use its buffer as input
184+
_input = self.buf[0..self.pos];
185+
self.reset();
186+
}
187+
188+
// handle several JSON msg in 1 read
189+
const is_combined = _input.len > msg_size;
190+
msg = _input[0..msg_size];
191+
std.log.debug("msg: {s}", .{msg[0..@min(BufReadSize, msg_size)]});
192+
if (is_combined) {
193+
_input = _input[msg_size..];
194+
}
195+
196+
try @call(.auto, do_func, .{ data, msg });
197+
198+
if (!is_combined) break;
199+
}
200+
}
201+
};
123202

124203
fn doTest(nb: *u8, _: []const u8) anyerror!void {
125204
nb.* += 1;
126205
}
127206

128-
test {
207+
test "MsgBuffer" {
129208
const Case = struct {
130209
input: []const u8,
131210
nb: u8,
132211
};
212+
const alloc = std.testing.allocator;
133213
const cases = [_]Case{
134214
// simple
135215
.{ .input = "2:ok", .nb = 1 },
136-
// multi
216+
// combined
137217
.{ .input = "2:ok3:foo7:bar2:ok", .nb = 3 }, // "bar2:ok" is a message, no need to escape "2:" here
218+
// multipart
219+
.{ .input = "9:multi", .nb = 0 },
220+
.{ .input = "part", .nb = 1 },
221+
// multipart & combined
222+
.{ .input = "9:multi", .nb = 0 },
223+
.{ .input = "part2:ok", .nb = 2 },
224+
// several multipart
225+
.{ .input = "23:multi", .nb = 0 },
226+
.{ .input = "several", .nb = 0 },
227+
.{ .input = "complex", .nb = 0 },
228+
.{ .input = "part", .nb = 1 },
229+
// combined & multipart
230+
.{ .input = "2:ok9:multi", .nb = 1 },
231+
.{ .input = "part", .nb = 1 },
138232
};
233+
var nb: u8 = undefined;
234+
var msg_buf = try MsgBuffer.init(alloc, 10);
235+
defer msg_buf.deinit(alloc);
139236
for (cases) |case| {
140-
var nb: u8 = 0;
141-
try readInput(case.input, doTest, &nb);
237+
nb = 0;
238+
try msg_buf.read(alloc, case.input, &nb, doTest);
142239
try std.testing.expect(nb == case.nb);
143240
}
144241
}
@@ -232,22 +329,27 @@ const Accept = struct {
232329
// ------
233330

234331
pub fn listen(browser: *Browser, socket: std.os.socket_t) anyerror!void {
332+
const loop = browser.currentSession().loop;
333+
334+
// MsgBuffer
335+
var msg_buf = try MsgBuffer.init(loop.alloc, BufReadSize);
336+
defer msg_buf.deinit(loop.alloc);
235337

236338
// create I/O contexts and callbacks
237339
// for accepting connections and receving messages
238-
var input: [1024]u8 = undefined;
340+
var ctxInput: [BufReadSize]u8 = undefined;
239341
var cmd = Cmd{
240342
.browser = browser,
241343
.socket = undefined,
242-
.buf = &input,
344+
.buf = &ctxInput,
345+
.msg_buf = &msg_buf,
243346
};
244347
var accept = Accept{
245348
.cmd = &cmd,
246349
.socket = socket,
247350
};
248351

249352
// accepting connection asynchronously on internal server
250-
const loop = browser.currentSession().loop;
251353
var completion: Completion = undefined;
252354
loop.io.accept(*Accept, &accept, Accept.cbk, &completion, socket);
253355

0 commit comments

Comments
 (0)