Skip to content

Commit 20dd140

Browse files
cdp: send I/O next read before executing current cmd
Signed-off-by: Francis Bouvier <[email protected]>
1 parent f30501c commit 20dd140

File tree

2 files changed

+81
-82
lines changed

2 files changed

+81
-82
lines changed

src/msg.zig

Lines changed: 61 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -55,89 +55,72 @@ pub const MsgBuffer = struct {
5555
}
5656

5757
// read input
58-
// - `do_func` is a callback to execute on each message of the input
59-
// - `data` is an arbitrary user data that will be forwarded to the do_func callback
60-
pub fn read(
61-
self: *MsgBuffer,
62-
alloc: std.mem.Allocator,
63-
input: []const u8,
64-
data: anytype,
65-
comptime do_func: fn (data: @TypeOf(data), msg: []const u8) anyerror!void,
66-
) !void {
58+
pub fn read(self: *MsgBuffer, alloc: std.mem.Allocator, input: []const u8) !struct {
59+
msg: []const u8,
60+
left: []const u8,
61+
} {
6762
var _input = input; // make input writable
6863

69-
while (true) {
70-
var msg: []const u8 = undefined;
64+
// msg size
65+
var msg_size: usize = undefined;
66+
if (self.isEmpty()) {
67+
// parse msg size metadata
68+
const size_pos = std.mem.indexOfScalar(u8, _input, ':') orelse return error.InputWithoutSize;
69+
const size_str = _input[0..size_pos];
70+
msg_size = try std.fmt.parseInt(u32, size_str, 10);
71+
_input = _input[size_pos + 1 ..];
72+
} else {
73+
msg_size = self.size;
74+
}
7175

72-
// msg size
73-
var msg_size: usize = undefined;
76+
// multipart
77+
const is_multipart = !self.isEmpty() or _input.len < msg_size;
78+
if (is_multipart) {
79+
80+
// set msg size on empty MsgBuffer
7481
if (self.isEmpty()) {
75-
// parse msg size metadata
76-
const size_pos = std.mem.indexOfScalar(u8, _input, ':') orelse return error.InputWithoutSize;
77-
const size_str = _input[0..size_pos];
78-
msg_size = try std.fmt.parseInt(u32, size_str, 10);
79-
_input = _input[size_pos + 1 ..];
80-
} else {
81-
msg_size = self.size;
82+
self.size = msg_size;
8283
}
8384

84-
// multipart
85-
const is_multipart = !self.isEmpty() or _input.len < msg_size;
86-
if (is_multipart) {
87-
88-
// set msg size on empty MsgBuffer
89-
if (self.isEmpty()) {
90-
self.size = msg_size;
91-
}
92-
93-
// get the new position of the cursor
94-
const new_pos = self.pos + _input.len;
95-
96-
// check max limit size
97-
if (new_pos > MaxSize) {
98-
return error.MsgTooBig;
99-
}
100-
101-
// check if the current input can fit in MsgBuffer
102-
if (new_pos > self.buf.len) {
103-
// we want to realloc at least:
104-
// - a size big enough to fit the entire input (ie. new_pos)
105-
// - a size big enough (ie. current size + starting size)
106-
// to avoid multiple reallocation
107-
const new_size = @max(self.buf.len + self.size, new_pos);
108-
// resize the MsgBuffer to fit
109-
self.buf = try alloc.realloc(self.buf, new_size);
110-
}
111-
112-
// copy the current input into MsgBuffer
113-
@memcpy(self.buf[self.pos..new_pos], _input[0..]);
114-
115-
// set the new cursor position
116-
self.pos = new_pos;
117-
118-
// if multipart is not finished, go fetch the next input
119-
if (!self.isFinished()) return;
120-
121-
// otherwhise multipart is finished, use its buffer as input
122-
_input = self.buf[0..self.pos];
123-
self.reset();
85+
// get the new position of the cursor
86+
const new_pos = self.pos + _input.len;
87+
88+
// check max limit size
89+
if (new_pos > MaxSize) {
90+
return error.MsgTooBig;
12491
}
12592

126-
// handle several JSON msg in 1 read
127-
const is_combined = _input.len > msg_size;
128-
msg = _input[0..msg_size];
129-
if (is_combined) {
130-
_input = _input[msg_size..];
93+
// check if the current input can fit in MsgBuffer
94+
if (new_pos > self.buf.len) {
95+
// we want to realloc at least:
96+
// - a size big enough to fit the entire input (ie. new_pos)
97+
// - a size big enough (ie. current msg size + starting buffer size)
98+
// to avoid multiple reallocation
99+
const new_size = @max(self.buf.len + self.size, new_pos);
100+
// resize the MsgBuffer to fit
101+
self.buf = try alloc.realloc(self.buf, new_size);
131102
}
132103

133-
try @call(.auto, do_func, .{ data, msg });
104+
// copy the current input into MsgBuffer
105+
@memcpy(self.buf[self.pos..new_pos], _input[0..]);
134106

135-
if (!is_combined) break;
107+
// set the new cursor position
108+
self.pos = new_pos;
109+
110+
// if multipart is not finished, go fetch the next input
111+
if (!self.isFinished()) return error.MsgMultipart;
112+
113+
// otherwhise multipart is finished, use its buffer as input
114+
_input = self.buf[0..self.pos];
115+
self.reset();
136116
}
117+
118+
// handle several JSON msg in 1 read
119+
return .{ .msg = _input[0..msg_size], .left = _input[msg_size..] };
137120
}
138121
};
139122

140-
fn doTest(nb: *u8, _: []const u8) anyerror!void {
123+
fn doTest(nb: *u8) void {
141124
nb.* += 1;
142125
}
143126

@@ -171,12 +154,19 @@ test "MsgBuffer" {
171154
.{ .input = "2:ok9:multi", .nb = 1 },
172155
.{ .input = "part", .nb = 1 },
173156
};
174-
var nb: u8 = undefined;
175157
var msg_buf = try MsgBuffer.init(alloc, 10);
176158
defer msg_buf.deinit(alloc);
177159
for (cases) |case| {
178-
nb = 0;
179-
try msg_buf.read(alloc, case.input, &nb, doTest);
160+
var nb: u8 = 0;
161+
var input: []const u8 = case.input;
162+
while (input.len > 0) {
163+
const parts = msg_buf.read(alloc, input) catch |err| {
164+
if (err == error.MsgMultipart) break; // go to the next case input
165+
return err;
166+
};
167+
nb += 1;
168+
input = parts.left;
169+
}
180170
try std.testing.expect(nb == case.nb);
181171
}
182172
}

src/server.zig

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -128,17 +128,6 @@ pub const Ctx = struct {
128128
return;
129129
}
130130

131-
// input
132-
const input = self.read_buf[0..size];
133-
134-
// read and execute input
135-
self.msg_buf.read(self.alloc(), input, self, Ctx.do) catch |err| {
136-
if (err != error.Closed) {
137-
log.err("do error: {any}", .{err});
138-
}
139-
return;
140-
};
141-
142131
// set connection timestamp
143132
self.last_active = std.time.Instant.now() catch |err| {
144133
log.err("read timestamp error: {any}", .{err});
@@ -154,6 +143,26 @@ pub const Ctx = struct {
154143
self.conn_socket,
155144
self.read_buf,
156145
);
146+
147+
// read and execute input
148+
var input: []const u8 = self.read_buf[0..size];
149+
while (input.len > 0) {
150+
const parts = self.msg_buf.read(self.alloc(), input) catch |err| {
151+
if (err == error.MsgMultipart) {
152+
return;
153+
} else {
154+
log.err("msg read error: {any}", .{err});
155+
return;
156+
}
157+
};
158+
input = parts.left;
159+
// execute
160+
self.do(parts.msg) catch |err| {
161+
if (err != error.Closed) {
162+
log.err("do error: {any}", .{err});
163+
}
164+
};
165+
}
157166
}
158167

159168
fn timeoutCbk(self: *Ctx, completion: *Completion, result: TimeoutError!void) void {

0 commit comments

Comments
 (0)