Skip to content

Commit b789ba2

Browse files
server: add cancel current recv before accepting new connection
Only on Linux. On MacOS cancel is not supported for now and we do not have any problem with the current recv operation on a closed socket. Signed-off-by: Francis Bouvier <[email protected]>
1 parent c74feb9 commit b789ba2

File tree

1 file changed

+72
-49
lines changed

1 file changed

+72
-49
lines changed

src/server.zig

Lines changed: 72 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,29 @@
1717
// along with this program. If not, see <https://www.gnu.org/licenses/>.
1818

1919
const std = @import("std");
20+
const builtin = @import("builtin");
2021

2122
const jsruntime = @import("jsruntime");
2223
const Completion = jsruntime.IO.Completion;
2324
const AcceptError = jsruntime.IO.AcceptError;
2425
const RecvError = jsruntime.IO.RecvError;
2526
const SendError = jsruntime.IO.SendError;
2627
const CloseError = jsruntime.IO.CloseError;
28+
const CancelError = jsruntime.IO.CancelError;
2729
const TimeoutError = jsruntime.IO.TimeoutError;
2830

2931
const MsgBuffer = @import("msg.zig").MsgBuffer;
3032
const Browser = @import("browser/browser.zig").Browser;
3133
const cdp = @import("cdp/cdp.zig");
3234

3335
const NoError = error{NoError};
34-
const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutError;
36+
const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutError || CancelError;
3537
const Error = IOError || std.fmt.ParseIntError || cdp.Error || NoError;
3638

3739
const TimeoutCheck = std.time.ns_per_ms * 100;
3840

3941
const log = std.log.scoped(.server);
42+
const IsLinux = builtin.target.os.tag == .linux;
4043

4144
// I/O Main
4245
// --------
@@ -55,6 +58,7 @@ pub const Ctx = struct {
5558
err: ?Error = null,
5659

5760
// I/O fields
61+
accept_completion: *Completion,
5862
conn_completion: *Completion,
5963
timeout_completion: *Completion,
6064
timeout: u64,
@@ -76,13 +80,14 @@ pub const Ctx = struct {
7680
completion: *Completion,
7781
result: AcceptError!std.posix.socket_t,
7882
) void {
79-
std.debug.assert(completion == self.conn_completion);
83+
std.debug.assert(completion == self.acceptCompletion());
8084

8185
self.conn_socket = result catch |err| {
8286
log.err("accept error: {any}", .{err});
8387
self.err = err;
8488
return;
8589
};
90+
log.info("client connected", .{});
8691

8792
// set connection timestamp and timeout
8893
self.last_active = std.time.Instant.now() catch |err| {
@@ -112,6 +117,10 @@ pub const Ctx = struct {
112117
std.debug.assert(completion == self.conn_completion);
113118

114119
const size = result catch |err| {
120+
if (err == error.Canceled) {
121+
log.debug("read canceled", .{});
122+
return;
123+
}
115124
log.err("read error: {any}", .{err});
116125
self.err = err;
117126
return;
@@ -188,21 +197,9 @@ pub const Ctx = struct {
188197
};
189198

190199
if (now.since(self.last_active.?) > self.timeout) {
191-
// closing
192-
log.debug("conn timeout, closing...", .{});
193-
194-
// NOTE: we should cancel the current read
195-
// but it seems that's just closing the connection is enough
196-
// (and cancel does not work on MacOS)
197-
198200
// close current connection
199-
self.loop.io.close(
200-
*Ctx,
201-
self,
202-
Ctx.closeCbk,
203-
self.timeout_completion,
204-
self.conn_socket,
205-
);
201+
log.debug("conn timeout, closing...", .{});
202+
self.cancelAndClose();
206203
return;
207204
}
208205

@@ -216,37 +213,17 @@ pub const Ctx = struct {
216213
);
217214
}
218215

219-
fn closeCbk(self: *Ctx, completion: *Completion, result: CloseError!void) void {
220-
_ = completion;
221-
// NOTE: completion can be either self.conn_completion or self.timeout_completion
216+
fn cancelCbk(self: *Ctx, completion: *Completion, result: CancelError!void) void {
217+
std.debug.assert(completion == self.accept_completion);
222218

223219
_ = result catch |err| {
224-
log.err("close error: {any}", .{err});
220+
log.err("cancel error: {any}", .{err});
225221
self.err = err;
226222
return;
227223
};
224+
log.debug("cancel done", .{});
228225

229-
// conn is closed
230-
self.last_active = null;
231-
232-
// restart a new browser session in case of re-connect
233-
if (!self.sessionNew) {
234-
self.newSession() catch |err| {
235-
log.err("new session error: {any}", .{err});
236-
return;
237-
};
238-
}
239-
240-
log.info("accepting new conn...", .{});
241-
242-
// continue accepting incoming requests
243-
self.loop.io.accept(
244-
*Ctx,
245-
self,
246-
Ctx.acceptCbk,
247-
self.conn_completion,
248-
self.accept_socket,
249-
);
226+
self.close();
250227
}
251228

252229
// shortcuts
@@ -267,6 +244,15 @@ pub const Ctx = struct {
267244
return self.browser.session.env;
268245
}
269246

247+
inline fn acceptCompletion(self: *Ctx) *Completion {
248+
// NOTE: the logical completion to use here is the accept_completion
249+
// as the pipe_connection can be used simulteanously by a recv I/O operation.
250+
// But on MacOS (kqueue) the recv I/O operation on a closed socket leads to a panic
251+
// so we use the pipe_connection to avoid this problem
252+
if (IsLinux) return self.accept_completion;
253+
return self.conn_completion;
254+
}
255+
270256
// actions
271257
// -------
272258

@@ -276,13 +262,7 @@ pub const Ctx = struct {
276262
if (std.mem.eql(u8, cmd, "close")) {
277263
// close connection
278264
log.info("close cmd, closing conn...", .{});
279-
self.loop.io.close(
280-
*Ctx,
281-
self,
282-
Ctx.closeCbk,
283-
self.conn_completion,
284-
self.conn_socket,
285-
);
265+
self.cancelAndClose();
286266
return error.Closed;
287267
}
288268

@@ -307,6 +287,47 @@ pub const Ctx = struct {
307287
}
308288
}
309289

290+
fn cancelAndClose(self: *Ctx) void {
291+
if (IsLinux) { // cancel is only available on Linux
292+
self.loop.io.cancel(
293+
*Ctx,
294+
self,
295+
Ctx.cancelCbk,
296+
self.accept_completion,
297+
self.conn_completion,
298+
);
299+
} else {
300+
self.close();
301+
}
302+
}
303+
304+
fn close(self: *Ctx) void {
305+
std.posix.close(self.conn_socket);
306+
307+
// conn is closed
308+
log.debug("connection closed", .{});
309+
self.last_active = null;
310+
311+
// restart a new browser session in case of re-connect
312+
if (!self.sessionNew) {
313+
self.newSession() catch |err| {
314+
log.err("new session error: {any}", .{err});
315+
return;
316+
};
317+
}
318+
319+
log.info("accepting new conn...", .{});
320+
321+
// continue accepting incoming requests
322+
self.loop.io.accept(
323+
*Ctx,
324+
self,
325+
Ctx.acceptCbk,
326+
self.acceptCompletion(),
327+
self.accept_socket,
328+
);
329+
}
330+
310331
fn newSession(self: *Ctx) !void {
311332
try self.browser.newSession(self.alloc(), self.loop);
312333
try self.browser.session.initInspector(
@@ -430,6 +451,7 @@ pub fn listen(
430451
defer msg_buf.deinit(loop.alloc);
431452

432453
// create I/O completions
454+
var accept_completion: Completion = undefined;
433455
var conn_completion: Completion = undefined;
434456
var timeout_completion: Completion = undefined;
435457

@@ -443,6 +465,7 @@ pub fn listen(
443465
.msg_buf = &msg_buf,
444466
.accept_socket = server_socket,
445467
.timeout = timeout,
468+
.accept_completion = &accept_completion,
446469
.conn_completion = &conn_completion,
447470
.timeout_completion = &timeout_completion,
448471
};
@@ -454,7 +477,7 @@ pub fn listen(
454477

455478
// accepting connection asynchronously on internal server
456479
log.info("accepting new conn...", .{});
457-
loop.io.accept(*Ctx, &ctx, Ctx.acceptCbk, ctx.conn_completion, ctx.accept_socket);
480+
loop.io.accept(*Ctx, &ctx, Ctx.acceptCbk, ctx.acceptCompletion(), ctx.accept_socket);
458481

459482
// infinite loop on I/O events, either:
460483
// - cmd from incoming connection on server socket

0 commit comments

Comments
 (0)