Skip to content

Commit 52c26a2

Browse files
committed
loop: ensure all contexts are free on loop's deinit
We add a new zig event count and a loop during deinit to ensure all contexts are correcly free.
1 parent 8fd9176 commit 52c26a2

File tree

1 file changed

+76
-50
lines changed

1 file changed

+76
-50
lines changed

src/loop.zig

Lines changed: 76 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ fn report(comptime fmt: []const u8, args: anytype) void {
4040
pub const SingleThreaded = struct {
4141
alloc: std.mem.Allocator, // TODO: unmanaged version ?
4242
io: *IO,
43-
events_nb: *usize,
43+
js_events_nb: *usize,
44+
zig_events_nb: *usize,
4445
cbk_error: bool = false,
4546

4647
// js_ctx_id is incremented each time the loop is reset for JS.
@@ -67,16 +68,36 @@ pub const SingleThreaded = struct {
6768
errdefer alloc.destroy(io);
6869

6970
io.* = try IO.init(32, 0);
70-
const events_nb = try alloc.create(usize);
71-
events_nb.* = 0;
72-
return Self{ .alloc = alloc, .io = io, .events_nb = events_nb };
71+
72+
const js_events_nb = try alloc.create(usize);
73+
js_events_nb.* = 0;
74+
const zig_events_nb = try alloc.create(usize);
75+
zig_events_nb.* = 0;
76+
77+
return Self{
78+
.alloc = alloc,
79+
.io = io,
80+
.js_events_nb = js_events_nb,
81+
.zig_events_nb = zig_events_nb,
82+
};
7383
}
7484

7585
pub fn deinit(self: *Self) void {
86+
// run tail events. We do run the tail events to ensure all the
87+
// contexts are correcly free.
88+
while (self.eventsNb(.js) > 0 or self.eventsNb(.zig) > 0) {
89+
self.io.run_for_ns(10 * std.time.ns_per_ms) catch |err| {
90+
log.err("deinit run tail events: {any}", .{err});
91+
break;
92+
};
93+
}
94+
7695
self.cancelAll();
96+
7797
self.io.deinit();
7898
self.alloc.destroy(self.io);
79-
self.alloc.destroy(self.events_nb);
99+
self.alloc.destroy(self.js_events_nb);
100+
self.alloc.destroy(self.zig_events_nb);
80101
}
81102

82103
// Retrieve all registred I/O events completed by OS kernel,
@@ -85,7 +106,7 @@ pub const SingleThreaded = struct {
85106
// Note that I/O events callbacks might register more I/O events
86107
// on the go when they are executed (ie. nested I/O events).
87108
pub fn run(self: *Self) !void {
88-
while (self.eventsNb() > 0) {
109+
while (self.eventsNb(.js) > 0) {
89110
try self.io.run_for_ns(10 * std.time.ns_per_ms);
90111
// at each iteration we might have new events registred by previous callbacks
91112
}
@@ -98,21 +119,30 @@ pub const SingleThreaded = struct {
98119
}
99120
}
100121

122+
const Event = enum { js, zig };
123+
124+
fn eventsPtr(self: *const Self, event: Event) *usize {
125+
return switch (event) {
126+
.zig => self.zig_events_nb,
127+
.js => self.js_events_nb,
128+
};
129+
}
130+
101131
// Register events atomically
102132
// - add 1 event and return previous value
103-
fn addEvent(self: *Self) usize {
104-
return @atomicRmw(usize, self.events_nb, .Add, 1, .acq_rel);
133+
fn addEvent(self: *Self, event: Event) usize {
134+
return @atomicRmw(usize, self.eventsPtr(event), .Add, 1, .acq_rel);
105135
}
106136
// - remove 1 event and return previous value
107-
fn removeEvent(self: *Self) usize {
108-
return @atomicRmw(usize, self.events_nb, .Sub, 1, .acq_rel);
137+
fn removeEvent(self: *Self, event: Event) usize {
138+
return @atomicRmw(usize, self.eventsPtr(event), .Sub, 1, .acq_rel);
109139
}
110140
// - get the number of current events
111-
fn eventsNb(self: *Self) usize {
112-
return @atomicLoad(usize, self.events_nb, .seq_cst);
141+
fn eventsNb(self: *Self, event: Event) usize {
142+
return @atomicLoad(usize, self.eventsPtr(event), .seq_cst);
113143
}
114-
fn resetEvents(self: *Self) void {
115-
@atomicStore(usize, self.events_nb, 0, .unordered);
144+
fn resetEvents(self: *Self, event: Event) void {
145+
@atomicStore(usize, self.eventsPtr(event), 0, .unordered);
116146
}
117147

118148
fn freeCbk(self: *Self, completion: *IO.Completion, ctx: anytype) void {
@@ -136,18 +166,20 @@ pub const SingleThreaded = struct {
136166
completion: *IO.Completion,
137167
result: IO.TimeoutError!void,
138168
) void {
139-
defer ctx.loop.freeCbk(completion, ctx);
169+
defer {
170+
const old_events_nb = ctx.loop.removeEvent(.js);
171+
if (builtin.is_test) {
172+
report("timeout done, remaining events: {d}", .{old_events_nb - 1});
173+
}
174+
175+
ctx.loop.freeCbk(completion, ctx);
176+
}
140177

141178
// If the loop's context id has changed, don't call the js callback
142179
// function. The callback's memory has already be cleaned and the
143180
// events nb reset.
144181
if (ctx.js_ctx_id != ctx.loop.js_ctx_id) return;
145182

146-
const old_events_nb = ctx.loop.removeEvent();
147-
if (builtin.is_test) {
148-
report("timeout done, remaining events: {d}", .{old_events_nb - 1});
149-
}
150-
151183
// TODO: return the error to the callback
152184
result catch |err| {
153185
switch (err) {
@@ -175,7 +207,7 @@ pub const SingleThreaded = struct {
175207
.js_cbk = js_cbk,
176208
.js_ctx_id = self.js_ctx_id,
177209
};
178-
const old_events_nb = self.addEvent();
210+
const old_events_nb = self.addEvent(.js);
179211
self.io.timeout(*ContextTimeout, ctx, timeoutCallback, completion, nanoseconds);
180212
if (builtin.is_test) {
181213
report("start timeout {d} for {d} nanoseconds", .{ old_events_nb + 1, nanoseconds });
@@ -195,18 +227,20 @@ pub const SingleThreaded = struct {
195227
completion: *IO.Completion,
196228
result: IO.CancelOneError!void,
197229
) void {
198-
defer ctx.loop.freeCbk(completion, ctx);
230+
defer {
231+
const old_events_nb = ctx.loop.removeEvent(.js);
232+
if (builtin.is_test) {
233+
report("cancel done, remaining events: {d}", .{old_events_nb - 1});
234+
}
235+
236+
ctx.loop.freeCbk(completion, ctx);
237+
}
199238

200239
// If the loop's context id has changed, don't call the js callback
201240
// function. The callback's memory has already be cleaned and the
202241
// events nb reset.
203242
if (ctx.js_ctx_id != ctx.loop.js_ctx_id) return;
204243

205-
const old_events_nb = ctx.loop.removeEvent();
206-
if (builtin.is_test) {
207-
report("cancel done, remaining events: {d}", .{old_events_nb - 1});
208-
}
209-
210244
// TODO: return the error to the callback
211245
result catch |err| {
212246
switch (err) {
@@ -237,28 +271,27 @@ pub const SingleThreaded = struct {
237271
.js_ctx_id = self.js_ctx_id,
238272
};
239273

240-
const old_events_nb = self.addEvent();
274+
const old_events_nb = self.addEvent(.js);
241275
self.io.cancel_one(*ContextCancel, ctx, cancelCallback, completion, comp_cancel);
242276
if (builtin.is_test) {
243277
report("cancel {d}", .{old_events_nb + 1});
244278
}
245279
}
246280

247-
pub fn cancelAll(self: *Self) void {
248-
self.resetEvents();
281+
fn cancelAll(self: *Self) void {
282+
self.resetEvents(.js);
283+
self.resetEvents(.zig);
249284
self.io.cancel_all();
250285
}
251286

252287
// Reset all existing JS callbacks.
253288
pub fn resetJS(self: *Self) void {
254289
self.js_ctx_id += 1;
255-
self.resetEvents();
256290
}
257291

258292
// Reset all existing Zig callbacks.
259293
pub fn resetZig(self: *Self) void {
260294
self.zig_ctx_id += 1;
261-
self.resetEvents();
262295
}
263296

264297
// IO callbacks APIs
@@ -275,15 +308,15 @@ pub const SingleThreaded = struct {
275308
socket: std.posix.socket_t,
276309
address: std.net.Address,
277310
) void {
278-
const old_events_nb = self.addEvent();
311+
const old_events_nb = self.addEvent(.js);
279312
self.io.connect(*Ctx, ctx, cbk, completion, socket, address);
280313
if (builtin.is_test) {
281314
report("start connect {d}", .{old_events_nb + 1});
282315
}
283316
}
284317

285318
pub fn onConnect(self: *Self, _: ConnectError!void) void {
286-
const old_events_nb = self.removeEvent();
319+
const old_events_nb = self.removeEvent(.js);
287320
if (builtin.is_test) {
288321
report("connect done, remaining events: {d}", .{old_events_nb - 1});
289322
}
@@ -300,15 +333,15 @@ pub const SingleThreaded = struct {
300333
socket: std.posix.socket_t,
301334
buf: []const u8,
302335
) void {
303-
const old_events_nb = self.addEvent();
336+
const old_events_nb = self.addEvent(.js);
304337
self.io.send(*Ctx, ctx, cbk, completion, socket, buf);
305338
if (builtin.is_test) {
306339
report("start send {d}", .{old_events_nb + 1});
307340
}
308341
}
309342

310343
pub fn onSend(self: *Self, _: SendError!usize) void {
311-
const old_events_nb = self.removeEvent();
344+
const old_events_nb = self.removeEvent(.js);
312345
if (builtin.is_test) {
313346
report("send done, remaining events: {d}", .{old_events_nb - 1});
314347
}
@@ -325,15 +358,15 @@ pub const SingleThreaded = struct {
325358
socket: std.posix.socket_t,
326359
buf: []u8,
327360
) void {
328-
const old_events_nb = self.addEvent();
361+
const old_events_nb = self.addEvent(.js);
329362
self.io.recv(*Ctx, ctx, cbk, completion, socket, buf);
330363
if (builtin.is_test) {
331364
report("start recv {d}", .{old_events_nb + 1});
332365
}
333366
}
334367

335368
pub fn onRecv(self: *Self, _: RecvError!usize) void {
336-
const old_events_nb = self.removeEvent();
369+
const old_events_nb = self.removeEvent(.js);
337370
if (builtin.is_test) {
338371
report("recv done, remaining events: {d}", .{old_events_nb - 1});
339372
}
@@ -356,19 +389,16 @@ pub const SingleThreaded = struct {
356389
completion: *IO.Completion,
357390
result: IO.TimeoutError!void,
358391
) void {
359-
defer ctx.loop.freeCbk(completion, ctx);
392+
defer {
393+
_ = ctx.loop.removeEvent(.zig);
394+
ctx.loop.freeCbk(completion, ctx);
395+
}
360396

361397
// If the loop's context id has changed, don't call the js callback
362398
// function. The callback's memory has already be cleaned and the
363399
// events nb reset.
364400
if (ctx.zig_ctx_id != ctx.loop.zig_ctx_id) return;
365401

366-
// We don't remove event here b/c we don't want the main loop to wait for
367-
// the timeout is done.
368-
// This is mainly due b/c the usage of zigTimeout is used to process
369-
// background tasks.
370-
//_ = ctx.loop.removeEvent();
371-
372402
result catch |err| {
373403
switch (err) {
374404
error.Canceled => {},
@@ -403,11 +433,7 @@ pub const SingleThreaded = struct {
403433
}.wrapper,
404434
};
405435

406-
// We don't add event here b/c we don't want the main loop to wait for
407-
// the timeout is done.
408-
// This is mainly due b/c the usage of zigTimeout is used to process
409-
// background tasks.
410-
// _ = self.addEvent();
436+
_ = self.addEvent(.zig);
411437

412438
self.io.timeout(*ContextZigTimeout, ctxtimeout, zigTimeoutCallback, completion, nanoseconds);
413439
}

0 commit comments

Comments
 (0)