Skip to content

Commit d5e7ebd

Browse files
Merge pull request #295 from lightpanda-io/fix_cdp_full_async
Fix cdp full async
2 parents 1e64513 + 625c174 commit d5e7ebd

File tree

2 files changed

+77
-49
lines changed

2 files changed

+77
-49
lines changed

src/server.zig

Lines changed: 76 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,29 @@
1717
// along with this program. If not, see <https://www.gnu.org/licenses/>.
1818

1919
const std = @import("std");
20+
const builtin = @import("builtin");
2021

2122
const jsruntime = @import("jsruntime");
2223
const Completion = jsruntime.IO.Completion;
2324
const AcceptError = jsruntime.IO.AcceptError;
2425
const RecvError = jsruntime.IO.RecvError;
2526
const SendError = jsruntime.IO.SendError;
2627
const CloseError = jsruntime.IO.CloseError;
28+
const CancelError = jsruntime.IO.CancelError;
2729
const TimeoutError = jsruntime.IO.TimeoutError;
2830

2931
const MsgBuffer = @import("msg.zig").MsgBuffer;
3032
const Browser = @import("browser/browser.zig").Browser;
3133
const cdp = @import("cdp/cdp.zig");
3234

3335
const NoError = error{NoError};
34-
const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutError;
36+
const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutError || CancelError;
3537
const Error = IOError || std.fmt.ParseIntError || cdp.Error || NoError;
3638

3739
const TimeoutCheck = std.time.ns_per_ms * 100;
3840

3941
const log = std.log.scoped(.server);
42+
const isLinux = builtin.target.os.tag == .linux;
4043

4144
// I/O Main
4245
// --------
@@ -55,6 +58,7 @@ pub const Ctx = struct {
5558
err: ?Error = null,
5659

5760
// I/O fields
61+
accept_completion: *Completion,
5862
conn_completion: *Completion,
5963
timeout_completion: *Completion,
6064
timeout: u64,
@@ -76,12 +80,14 @@ pub const Ctx = struct {
7680
completion: *Completion,
7781
result: AcceptError!std.posix.socket_t,
7882
) void {
79-
std.debug.assert(completion == self.conn_completion);
83+
std.debug.assert(completion == self.acceptCompletion());
8084

8185
self.conn_socket = result catch |err| {
86+
log.err("accept error: {any}", .{err});
8287
self.err = err;
8388
return;
8489
};
90+
log.info("client connected", .{});
8591

8692
// set connection timestamp and timeout
8793
self.last_active = std.time.Instant.now() catch |err| {
@@ -111,6 +117,11 @@ pub const Ctx = struct {
111117
std.debug.assert(completion == self.conn_completion);
112118

113119
const size = result catch |err| {
120+
if (err == error.Canceled) {
121+
log.debug("read canceled", .{});
122+
return;
123+
}
124+
log.err("read error: {any}", .{err});
114125
self.err = err;
115126
return;
116127
};
@@ -169,6 +180,7 @@ pub const Ctx = struct {
169180
std.debug.assert(completion == self.timeout_completion);
170181

171182
_ = result catch |err| {
183+
log.err("timeout error: {any}", .{err});
172184
self.err = err;
173185
return;
174186
};
@@ -185,21 +197,9 @@ pub const Ctx = struct {
185197
};
186198

187199
if (now.since(self.last_active.?) > self.timeout) {
188-
// closing
189-
log.debug("conn timeout, closing...", .{});
190-
191-
// NOTE: we should cancel the current read
192-
// but it seems that's just closing the connection is enough
193-
// (and cancel does not work on MacOS)
194-
195200
// close current connection
196-
self.loop.io.close(
197-
*Ctx,
198-
self,
199-
Ctx.closeCbk,
200-
self.timeout_completion,
201-
self.conn_socket,
202-
);
201+
log.debug("conn timeout, closing...", .{});
202+
self.cancelAndClose();
203203
return;
204204
}
205205

@@ -213,36 +213,17 @@ pub const Ctx = struct {
213213
);
214214
}
215215

216-
fn closeCbk(self: *Ctx, completion: *Completion, result: CloseError!void) void {
217-
_ = completion;
218-
// NOTE: completion can be either self.conn_completion or self.timeout_completion
216+
fn cancelCbk(self: *Ctx, completion: *Completion, result: CancelError!void) void {
217+
std.debug.assert(completion == self.accept_completion);
219218

220219
_ = result catch |err| {
220+
log.err("cancel error: {any}", .{err});
221221
self.err = err;
222222
return;
223223
};
224+
log.debug("cancel done", .{});
224225

225-
// conn is closed
226-
self.last_active = null;
227-
228-
// restart a new browser session in case of re-connect
229-
if (!self.sessionNew) {
230-
self.newSession() catch |err| {
231-
log.err("new session error: {any}", .{err});
232-
return;
233-
};
234-
}
235-
236-
log.info("accepting new conn...", .{});
237-
238-
// continue accepting incoming requests
239-
self.loop.io.accept(
240-
*Ctx,
241-
self,
242-
Ctx.acceptCbk,
243-
self.conn_completion,
244-
self.accept_socket,
245-
);
226+
self.close();
246227
}
247228

248229
// shortcuts
@@ -263,6 +244,15 @@ pub const Ctx = struct {
263244
return self.browser.session.env;
264245
}
265246

247+
inline fn acceptCompletion(self: *Ctx) *Completion {
248+
// NOTE: the logical completion to use here is the accept_completion
249+
// as the pipe_connection can be used simulteanously by a recv I/O operation.
250+
// But on MacOS (kqueue) the recv I/O operation on a closed socket leads to a panic
251+
// so we use the pipe_connection to avoid this problem
252+
if (isLinux) return self.accept_completion;
253+
return self.conn_completion;
254+
}
255+
266256
// actions
267257
// -------
268258

@@ -272,13 +262,7 @@ pub const Ctx = struct {
272262
if (std.mem.eql(u8, cmd, "close")) {
273263
// close connection
274264
log.info("close cmd, closing conn...", .{});
275-
self.loop.io.close(
276-
*Ctx,
277-
self,
278-
Ctx.closeCbk,
279-
self.conn_completion,
280-
self.conn_socket,
281-
);
265+
self.cancelAndClose();
282266
return error.Closed;
283267
}
284268

@@ -303,6 +287,47 @@ pub const Ctx = struct {
303287
}
304288
}
305289

290+
fn cancelAndClose(self: *Ctx) void {
291+
if (isLinux) { // cancel is only available on Linux
292+
self.loop.io.cancel(
293+
*Ctx,
294+
self,
295+
Ctx.cancelCbk,
296+
self.accept_completion,
297+
self.conn_completion,
298+
);
299+
} else {
300+
self.close();
301+
}
302+
}
303+
304+
fn close(self: *Ctx) void {
305+
std.posix.close(self.conn_socket);
306+
307+
// conn is closed
308+
log.debug("connection closed", .{});
309+
self.last_active = null;
310+
311+
// restart a new browser session in case of re-connect
312+
if (!self.sessionNew) {
313+
self.newSession() catch |err| {
314+
log.err("new session error: {any}", .{err});
315+
return;
316+
};
317+
}
318+
319+
log.info("accepting new conn...", .{});
320+
321+
// continue accepting incoming requests
322+
self.loop.io.accept(
323+
*Ctx,
324+
self,
325+
Ctx.acceptCbk,
326+
self.acceptCompletion(),
327+
self.accept_socket,
328+
);
329+
}
330+
306331
fn newSession(self: *Ctx) !void {
307332
try self.browser.newSession(self.alloc(), self.loop);
308333
try self.browser.session.initInspector(
@@ -388,6 +413,7 @@ const Send = struct {
388413

389414
fn asyncCbk(self: *Send, _: *Completion, result: SendError!usize) void {
390415
_ = result catch |err| {
416+
log.err("send error: {any}", .{err});
391417
self.ctx.err = err;
392418
return;
393419
};
@@ -425,6 +451,7 @@ pub fn listen(
425451
defer msg_buf.deinit(loop.alloc);
426452

427453
// create I/O completions
454+
var accept_completion: Completion = undefined;
428455
var conn_completion: Completion = undefined;
429456
var timeout_completion: Completion = undefined;
430457

@@ -438,6 +465,7 @@ pub fn listen(
438465
.msg_buf = &msg_buf,
439466
.accept_socket = server_socket,
440467
.timeout = timeout,
468+
.accept_completion = &accept_completion,
441469
.conn_completion = &conn_completion,
442470
.timeout_completion = &timeout_completion,
443471
};
@@ -449,7 +477,7 @@ pub fn listen(
449477

450478
// accepting connection asynchronously on internal server
451479
log.info("accepting new conn...", .{});
452-
loop.io.accept(*Ctx, &ctx, Ctx.acceptCbk, ctx.conn_completion, ctx.accept_socket);
480+
loop.io.accept(*Ctx, &ctx, Ctx.acceptCbk, ctx.acceptCompletion(), ctx.accept_socket);
453481

454482
// infinite loop on I/O events, either:
455483
// - cmd from incoming connection on server socket

0 commit comments

Comments
 (0)