Skip to content

Commit d27f5d2

Browse files
Merge pull request #296 from lightpanda-io/loop-tail-event
loop: ensure all contexts are free on loop's deinit
2 parents 8fd9176 + 84a2d05 commit d27f5d2

File tree

1 file changed

+67
-56
lines changed

1 file changed

+67
-56
lines changed

src/loop.zig

Lines changed: 67 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ fn report(comptime fmt: []const u8, args: anytype) void {
3939
// I/O APIs based on async/await might be added in the future.
4040
pub const SingleThreaded = struct {
4141
alloc: std.mem.Allocator, // TODO: unmanaged version ?
42-
io: *IO,
43-
events_nb: *usize,
42+
io: IO,
43+
js_events_nb: usize,
44+
zig_events_nb: usize,
4445
cbk_error: bool = false,
4546

4647
// js_ctx_id is incremented each time the loop is reset for JS.
@@ -63,20 +64,25 @@ pub const SingleThreaded = struct {
6364
pub const SendError = IO.SendError;
6465

6566
pub fn init(alloc: std.mem.Allocator) !Self {
66-
const io = try alloc.create(IO);
67-
errdefer alloc.destroy(io);
68-
69-
io.* = try IO.init(32, 0);
70-
const events_nb = try alloc.create(usize);
71-
events_nb.* = 0;
72-
return Self{ .alloc = alloc, .io = io, .events_nb = events_nb };
67+
return Self{
68+
.alloc = alloc,
69+
.io = try IO.init(32, 0),
70+
.js_events_nb = 0,
71+
.zig_events_nb = 0,
72+
};
7373
}
7474

7575
pub fn deinit(self: *Self) void {
76+
// run tail events. We do run the tail events to ensure all the
77+
// contexts are correcly free.
78+
while (self.eventsNb(.js) > 0 or self.eventsNb(.zig) > 0) {
79+
self.io.run_for_ns(10 * std.time.ns_per_ms) catch |err| {
80+
log.err("deinit run tail events: {any}", .{err});
81+
break;
82+
};
83+
}
7684
self.cancelAll();
7785
self.io.deinit();
78-
self.alloc.destroy(self.io);
79-
self.alloc.destroy(self.events_nb);
8086
}
8187

8288
// Retrieve all registred I/O events completed by OS kernel,
@@ -85,7 +91,7 @@ pub const SingleThreaded = struct {
8591
// Note that I/O events callbacks might register more I/O events
8692
// on the go when they are executed (ie. nested I/O events).
8793
pub fn run(self: *Self) !void {
88-
while (self.eventsNb() > 0) {
94+
while (self.eventsNb(.js) > 0) {
8995
try self.io.run_for_ns(10 * std.time.ns_per_ms);
9096
// at each iteration we might have new events registred by previous callbacks
9197
}
@@ -98,21 +104,30 @@ pub const SingleThreaded = struct {
98104
}
99105
}
100106

107+
const Event = enum { js, zig };
108+
109+
fn eventsPtr(self: *Self, comptime event: Event) *usize {
110+
return switch (event) {
111+
.zig => &self.zig_events_nb,
112+
.js => &self.js_events_nb,
113+
};
114+
}
115+
101116
// Register events atomically
102117
// - add 1 event and return previous value
103-
fn addEvent(self: *Self) usize {
104-
return @atomicRmw(usize, self.events_nb, .Add, 1, .acq_rel);
118+
fn addEvent(self: *Self, comptime event: Event) usize {
119+
return @atomicRmw(usize, self.eventsPtr(event), .Add, 1, .acq_rel);
105120
}
106121
// - remove 1 event and return previous value
107-
fn removeEvent(self: *Self) usize {
108-
return @atomicRmw(usize, self.events_nb, .Sub, 1, .acq_rel);
122+
fn removeEvent(self: *Self, comptime event: Event) usize {
123+
return @atomicRmw(usize, self.eventsPtr(event), .Sub, 1, .acq_rel);
109124
}
110125
// - get the number of current events
111-
fn eventsNb(self: *Self) usize {
112-
return @atomicLoad(usize, self.events_nb, .seq_cst);
126+
fn eventsNb(self: *Self, comptime event: Event) usize {
127+
return @atomicLoad(usize, self.eventsPtr(event), .seq_cst);
113128
}
114-
fn resetEvents(self: *Self) void {
115-
@atomicStore(usize, self.events_nb, 0, .unordered);
129+
fn resetEvents(self: *Self, comptime event: Event) void {
130+
@atomicStore(usize, self.eventsPtr(event), 0, .unordered);
116131
}
117132

118133
fn freeCbk(self: *Self, completion: *IO.Completion, ctx: anytype) void {
@@ -136,18 +151,20 @@ pub const SingleThreaded = struct {
136151
completion: *IO.Completion,
137152
result: IO.TimeoutError!void,
138153
) void {
139-
defer ctx.loop.freeCbk(completion, ctx);
154+
defer {
155+
const old_events_nb = ctx.loop.removeEvent(.js);
156+
if (builtin.is_test) {
157+
report("timeout done, remaining events: {d}", .{old_events_nb - 1});
158+
}
159+
160+
ctx.loop.freeCbk(completion, ctx);
161+
}
140162

141163
// If the loop's context id has changed, don't call the js callback
142164
// function. The callback's memory has already be cleaned and the
143165
// events nb reset.
144166
if (ctx.js_ctx_id != ctx.loop.js_ctx_id) return;
145167

146-
const old_events_nb = ctx.loop.removeEvent();
147-
if (builtin.is_test) {
148-
report("timeout done, remaining events: {d}", .{old_events_nb - 1});
149-
}
150-
151168
// TODO: return the error to the callback
152169
result catch |err| {
153170
switch (err) {
@@ -175,7 +192,7 @@ pub const SingleThreaded = struct {
175192
.js_cbk = js_cbk,
176193
.js_ctx_id = self.js_ctx_id,
177194
};
178-
const old_events_nb = self.addEvent();
195+
const old_events_nb = self.addEvent(.js);
179196
self.io.timeout(*ContextTimeout, ctx, timeoutCallback, completion, nanoseconds);
180197
if (builtin.is_test) {
181198
report("start timeout {d} for {d} nanoseconds", .{ old_events_nb + 1, nanoseconds });
@@ -195,18 +212,20 @@ pub const SingleThreaded = struct {
195212
completion: *IO.Completion,
196213
result: IO.CancelOneError!void,
197214
) void {
198-
defer ctx.loop.freeCbk(completion, ctx);
215+
defer {
216+
const old_events_nb = ctx.loop.removeEvent(.js);
217+
if (builtin.is_test) {
218+
report("cancel done, remaining events: {d}", .{old_events_nb - 1});
219+
}
220+
221+
ctx.loop.freeCbk(completion, ctx);
222+
}
199223

200224
// If the loop's context id has changed, don't call the js callback
201225
// function. The callback's memory has already be cleaned and the
202226
// events nb reset.
203227
if (ctx.js_ctx_id != ctx.loop.js_ctx_id) return;
204228

205-
const old_events_nb = ctx.loop.removeEvent();
206-
if (builtin.is_test) {
207-
report("cancel done, remaining events: {d}", .{old_events_nb - 1});
208-
}
209-
210229
// TODO: return the error to the callback
211230
result catch |err| {
212231
switch (err) {
@@ -237,28 +256,27 @@ pub const SingleThreaded = struct {
237256
.js_ctx_id = self.js_ctx_id,
238257
};
239258

240-
const old_events_nb = self.addEvent();
259+
const old_events_nb = self.addEvent(.js);
241260
self.io.cancel_one(*ContextCancel, ctx, cancelCallback, completion, comp_cancel);
242261
if (builtin.is_test) {
243262
report("cancel {d}", .{old_events_nb + 1});
244263
}
245264
}
246265

247-
pub fn cancelAll(self: *Self) void {
248-
self.resetEvents();
266+
fn cancelAll(self: *Self) void {
267+
self.resetEvents(.js);
268+
self.resetEvents(.zig);
249269
self.io.cancel_all();
250270
}
251271

252272
// Reset all existing JS callbacks.
253273
pub fn resetJS(self: *Self) void {
254274
self.js_ctx_id += 1;
255-
self.resetEvents();
256275
}
257276

258277
// Reset all existing Zig callbacks.
259278
pub fn resetZig(self: *Self) void {
260279
self.zig_ctx_id += 1;
261-
self.resetEvents();
262280
}
263281

264282
// IO callbacks APIs
@@ -275,15 +293,15 @@ pub const SingleThreaded = struct {
275293
socket: std.posix.socket_t,
276294
address: std.net.Address,
277295
) void {
278-
const old_events_nb = self.addEvent();
296+
const old_events_nb = self.addEvent(.js);
279297
self.io.connect(*Ctx, ctx, cbk, completion, socket, address);
280298
if (builtin.is_test) {
281299
report("start connect {d}", .{old_events_nb + 1});
282300
}
283301
}
284302

285303
pub fn onConnect(self: *Self, _: ConnectError!void) void {
286-
const old_events_nb = self.removeEvent();
304+
const old_events_nb = self.removeEvent(.js);
287305
if (builtin.is_test) {
288306
report("connect done, remaining events: {d}", .{old_events_nb - 1});
289307
}
@@ -300,15 +318,15 @@ pub const SingleThreaded = struct {
300318
socket: std.posix.socket_t,
301319
buf: []const u8,
302320
) void {
303-
const old_events_nb = self.addEvent();
321+
const old_events_nb = self.addEvent(.js);
304322
self.io.send(*Ctx, ctx, cbk, completion, socket, buf);
305323
if (builtin.is_test) {
306324
report("start send {d}", .{old_events_nb + 1});
307325
}
308326
}
309327

310328
pub fn onSend(self: *Self, _: SendError!usize) void {
311-
const old_events_nb = self.removeEvent();
329+
const old_events_nb = self.removeEvent(.js);
312330
if (builtin.is_test) {
313331
report("send done, remaining events: {d}", .{old_events_nb - 1});
314332
}
@@ -325,15 +343,15 @@ pub const SingleThreaded = struct {
325343
socket: std.posix.socket_t,
326344
buf: []u8,
327345
) void {
328-
const old_events_nb = self.addEvent();
346+
const old_events_nb = self.addEvent(.js);
329347
self.io.recv(*Ctx, ctx, cbk, completion, socket, buf);
330348
if (builtin.is_test) {
331349
report("start recv {d}", .{old_events_nb + 1});
332350
}
333351
}
334352

335353
pub fn onRecv(self: *Self, _: RecvError!usize) void {
336-
const old_events_nb = self.removeEvent();
354+
const old_events_nb = self.removeEvent(.js);
337355
if (builtin.is_test) {
338356
report("recv done, remaining events: {d}", .{old_events_nb - 1});
339357
}
@@ -356,19 +374,16 @@ pub const SingleThreaded = struct {
356374
completion: *IO.Completion,
357375
result: IO.TimeoutError!void,
358376
) void {
359-
defer ctx.loop.freeCbk(completion, ctx);
377+
defer {
378+
_ = ctx.loop.removeEvent(.zig);
379+
ctx.loop.freeCbk(completion, ctx);
380+
}
360381

361382
// If the loop's context id has changed, don't call the js callback
362383
// function. The callback's memory has already be cleaned and the
363384
// events nb reset.
364385
if (ctx.zig_ctx_id != ctx.loop.zig_ctx_id) return;
365386

366-
// We don't remove event here b/c we don't want the main loop to wait for
367-
// the timeout is done.
368-
// This is mainly due b/c the usage of zigTimeout is used to process
369-
// background tasks.
370-
//_ = ctx.loop.removeEvent();
371-
372387
result catch |err| {
373388
switch (err) {
374389
error.Canceled => {},
@@ -403,11 +418,7 @@ pub const SingleThreaded = struct {
403418
}.wrapper,
404419
};
405420

406-
// We don't add event here b/c we don't want the main loop to wait for
407-
// the timeout is done.
408-
// This is mainly due b/c the usage of zigTimeout is used to process
409-
// background tasks.
410-
// _ = self.addEvent();
421+
_ = self.addEvent(.zig);
411422

412423
self.io.timeout(*ContextZigTimeout, ctxtimeout, zigTimeoutCallback, completion, nanoseconds);
413424
}

0 commit comments

Comments
 (0)