Skip to content

Commit c35c09d

Browse files
server: timeout mechanism
Signed-off-by: Francis Bouvier <[email protected]>
1 parent 49adb61 commit c35c09d

File tree

1 file changed

+126
-32
lines changed

1 file changed

+126
-32
lines changed

src/server.zig

Lines changed: 126 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ const NoError = error{NoError};
3434
const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutError;
3535
const Error = IOError || std.fmt.ParseIntError || cdp.Error || NoError;
3636

37+
const TimeoutCheck = std.time.ns_per_ms * 100;
38+
const TimeoutRead = std.time.ns_per_s * 3;
39+
3740
// I/O Recv
3841
// --------
3942

@@ -48,27 +51,36 @@ pub const Cmd = struct {
4851
buf: []u8, // only for read operations
4952
err: ?Error = null,
5053

51-
completion: *Completion,
52-
acceptCtx: *Accept = undefined,
53-
5454
msg_buf: *MsgBuffer,
5555

56+
// I/O fields
57+
read_completion: *Completion,
58+
timeout_completion: *Completion,
59+
acceptCtx: *Accept = undefined,
60+
last_active: ?std.time.Instant = null,
61+
5662
// CDP
5763
state: cdp.State = .{},
5864

5965
// JS fields
6066
browser: *Browser, // TODO: is pointer mandatory here?
67+
sessionNew: bool,
6168
// try_catch: public.TryCatch, // TODO
6269

63-
fn cbk(self: *Cmd, completion: *Completion, result: RecvError!usize) void {
70+
// callbacks
71+
// ---------
72+
73+
fn readCbk(self: *Cmd, completion: *Completion, result: RecvError!usize) void {
74+
std.debug.assert(completion == self.read_completion);
75+
6476
const size = result catch |err| {
6577
self.err = err;
6678
return;
6779
};
6880

6981
if (size == 0) {
7082
// continue receving incomming messages asynchronously
71-
self.loop.io.recv(*Cmd, self, cbk, completion, self.socket, self.buf);
83+
self.loop.io.recv(*Cmd, self, Cmd.readCbk, self.read_completion, self.socket, self.buf);
7284
return;
7385
}
7486

@@ -79,12 +91,6 @@ pub const Cmd = struct {
7991
std.debug.print("\ninput size: {d}, content: {s}\n", .{ size, content });
8092
}
8193

82-
// close on exit command
83-
if (std.mem.eql(u8, input, "exit")) {
84-
self.err = error.NoError;
85-
return;
86-
}
87-
8894
// read and execute input
8995
self.msg_buf.read(self.alloc(), input, self, Cmd.do) catch |err| {
9096
if (err != error.Closed) {
@@ -93,16 +99,93 @@ pub const Cmd = struct {
9399
return;
94100
};
95101

102+
// set connection timestamp
103+
self.last_active = std.time.Instant.now() catch |err| {
104+
std.log.err("read timestamp error: {any}", .{err});
105+
return;
106+
};
107+
96108
// continue receving incomming messages asynchronously
97-
self.loop.io.recv(*Cmd, self, cbk, completion, self.socket, self.buf);
109+
self.loop.io.recv(*Cmd, self, Cmd.readCbk, self.read_completion, self.socket, self.buf);
110+
}
111+
112+
fn timeoutCbk(self: *Cmd, completion: *Completion, result: TimeoutError!void) void {
113+
std.debug.assert(completion == self.timeout_completion);
114+
115+
_ = result catch |err| {
116+
self.err = err;
117+
return;
118+
};
119+
120+
if (self.isClosed()) {
121+
// conn is already closed, ignore timeout
122+
return;
123+
}
124+
125+
// check time since last read
126+
const now = std.time.Instant.now() catch |err| {
127+
std.log.err("timeout timestamp error: {any}", .{err});
128+
return;
129+
};
130+
131+
if (now.since(self.last_active.?) > TimeoutRead) {
132+
// closing
133+
std.log.debug("conn timeout, closing...", .{});
134+
135+
// NOTE: we should cancel the current read
136+
// but it seems that's just closing the connection is enough
137+
// (and cancel does not work on MacOS)
138+
139+
// close current connection
140+
self.loop.io.close(*Cmd, self, Cmd.closeCbk, self.timeout_completion, self.socket);
141+
return;
142+
}
143+
144+
// continue checking timeout
145+
self.loop.io.timeout(*Cmd, self, Cmd.timeoutCbk, self.timeout_completion, TimeoutCheck);
146+
}
147+
148+
fn closeCbk(self: *Cmd, completion: *Completion, result: CloseError!void) void {
149+
_ = completion;
150+
// NOTE: completion can be either self.completion or self.timeout_completion
151+
152+
_ = result catch |err| {
153+
self.err = err;
154+
return;
155+
};
156+
157+
// conn is closed
158+
self.last_active = null;
159+
160+
// restart a new browser session in case of re-connect
161+
if (!self.sessionNew) {
162+
self.newSession() catch |err| {
163+
std.log.err("new session error: {any}", .{err});
164+
return;
165+
};
166+
}
167+
168+
std.log.debug("conn closed", .{});
169+
std.log.debug("accepting new conn...", .{});
170+
171+
// continue accepting incoming requests
172+
self.loop.io.accept(*Accept, self.acceptCtx, Accept.cbk, self.read_completion, self.acceptCtx.socket);
98173
}
99174

100175
// shortcuts
176+
// ---------
177+
178+
inline fn isClosed(self: *Cmd) bool {
179+
// last_active is first saved on acceptCbk
180+
return self.last_active == null;
181+
}
182+
183+
// allocator of the current session
101184
inline fn alloc(self: *Cmd) std.mem.Allocator {
102-
// TODO: should we return the allocator from the page instead?
103185
return self.browser.currentSession().alloc;
104186
}
105187

188+
// JS env of the current session
106189
inline fn env(self: Cmd) public.Env {
107190
return self.browser.currentSession().env;
108191
}
@@ -114,14 +197,19 @@ pub const Cmd = struct {
114197

115198
// close cmd
116199
if (std.mem.eql(u8, cmd, "close")) {
117-
self.loop.io.close(*Cmd, self, Cmd.closeCbk, self.completion, self.socket);
200+
// close connection
201+
std.log.debug("close cmd, closing...", .{});
202+
self.loop.io.close(*Cmd, self, Cmd.closeCbk, self.read_completion, self.socket);
118203
return error.Closed;
119204
}
120205

121-
// cdp cmd
206+
if (self.sessionNew) self.sessionNew = false;
207+
208+
// cdp end cmd
122209
const res = cdp.do(self.alloc(), cmd, self) catch |err| {
123-
// cdp end
124210
if (err == error.DisposeBrowserContext) {
211+
// restart a new browser session
212+
std.log.debug("cdp end cmd", .{});
125213
try self.newSession();
126214
return;
127215
}
@@ -136,24 +224,15 @@ pub const Cmd = struct {
136224
}
137225

138226
fn newSession(self: *Cmd) !void {
139-
std.log.info("new session", .{});
140227
try self.browser.newSession(self.alloc(), self.loop);
141228
const cmd_opaque = @as(*anyopaque, @ptrCast(self));
142229
try self.browser.currentSession().setInspector(cmd_opaque, Cmd.onInspectorResp, Cmd.onInspectorNotif);
230+
self.sessionNew = true;
231+
std.log.debug("new session", .{});
143232
}
144233

145-
fn closeCbk(self: *Cmd, completion: *Completion, result: CloseError!void) void {
146-
_ = result catch |err| {
147-
self.err = err;
148-
return;
149-
};
150-
std.log.debug("conn closed", .{});
151-
152-
// continue accepting incoming requests
153-
self.loop.io.accept(*Accept, self.acceptCtx, Accept.cbk, completion, self.acceptCtx.socket);
154-
}
155-
156-
// Inspector
234+
// inspector
235+
// ---------
157236

158237
pub fn sendInspector(self: *Cmd, msg: []const u8) void {
159238
if (self.env().getInspector()) |inspector| {
@@ -271,13 +350,22 @@ const Accept = struct {
271350
socket: std.posix.socket_t,
272351

273352
fn cbk(self: *Accept, completion: *Completion, result: AcceptError!std.posix.socket_t) void {
353+
std.debug.assert(completion == self.cmd.read_completion);
354+
274355
self.cmd.socket = result catch |err| {
275356
self.cmd.err = err;
276357
return;
277358
};
278359

360+
// set connection timestamp and timeout
361+
self.cmd.last_active = std.time.Instant.now() catch |err| {
362+
std.log.err("accept timestamp error: {any}", .{err});
363+
return;
364+
};
365+
self.cmd.loop.io.timeout(*Cmd, self.cmd, Cmd.timeoutCbk, self.cmd.timeout_completion, TimeoutCheck);
366+
279367
// receving incomming messages asynchronously
280-
self.cmd.loop.io.recv(*Cmd, self.cmd, Cmd.cbk, completion, self.cmd.socket, self.cmd.buf);
368+
self.cmd.loop.io.recv(*Cmd, self.cmd, Cmd.readCbk, self.cmd.read_completion, self.cmd.socket, self.cmd.buf);
281369
}
282370
};
283371

@@ -290,17 +378,22 @@ pub fn listen(browser: *Browser, loop: *public.Loop, socket: std.posix.socket_t)
290378
var msg_buf = try MsgBuffer.init(loop.alloc, BufReadSize * 256); // 256KB
291379
defer msg_buf.deinit(loop.alloc);
292380

381+
// create I/O completions
382+
var completion: Completion = undefined;
383+
var timeout_completion: Completion = undefined;
384+
293385
// create I/O contexts and callbacks
294386
// for accepting connections and receving messages
295387
var ctxInput: [BufReadSize]u8 = undefined;
296-
var completion: Completion = undefined;
297388
var cmd = Cmd{
298389
.loop = loop,
299390
.browser = browser,
391+
.sessionNew = true,
300392
.socket = undefined,
301393
.buf = &ctxInput,
302394
.msg_buf = &msg_buf,
303-
.completion = &completion,
395+
.read_completion = &completion,
396+
.timeout_completion = &timeout_completion,
304397
};
305398
const cmd_opaque = @as(*anyopaque, @ptrCast(&cmd));
306399
try browser.currentSession().setInspector(cmd_opaque, Cmd.onInspectorResp, Cmd.onInspectorNotif);
@@ -312,6 +405,7 @@ pub fn listen(browser: *Browser, loop: *public.Loop, socket: std.posix.socket_t)
312405
cmd.acceptCtx = &accept;
313406

314407
// accepting connection asynchronously on internal server
408+
std.log.debug("accepting new conn...", .{});
315409
loop.io.accept(*Accept, &accept, Accept.cbk, &completion, socket);
316410

317411
// infinite loop on I/O events, either:

0 commit comments

Comments
 (0)