1717// along with this program. If not, see <https://www.gnu.org/licenses/>.
1818
1919const std = @import ("std" );
20+ const builtin = @import ("builtin" );
2021const MemoryPool = std .heap .MemoryPool ;
2122
2223pub const IO = @import ("tigerbeetle-io" ).IO ;
@@ -35,8 +36,12 @@ const log = std.log.scoped(.loop);
3536pub const Loop = struct {
3637 alloc : std.mem.Allocator , // TODO: unmanaged version ?
3738 io : IO ,
39+
40+ // both events_nb are used to track how many callbacks are to be called.
41+ // We use these counters to wait until all the events are finished.
3842 js_events_nb : usize ,
3943 zig_events_nb : usize ,
44+
4045 cbk_error : bool = false ,
4146
4247 // js_ctx_id is incremented each time the loop is reset for JS.
@@ -51,6 +56,11 @@ pub const Loop = struct {
5156 // This is a weak way to cancel all future Zig callbacks.
5257 zig_ctx_id : u32 = 0 ,
5358
59+ // The MacOS event loop doesn't support cancellation. We use this to track
60+ // cancellation ids and, on the timeout callback, we can can check here
61+ // to see if it's been cancelled.
62+ cancelled : std .AutoHashMapUnmanaged (usize , void ),
63+
5464 cancel_pool : MemoryPool (ContextCancel ),
5565 timeout_pool : MemoryPool (ContextTimeout ),
5666 event_callback_pool : MemoryPool (EventCallbackContext ),
@@ -65,6 +75,7 @@ pub const Loop = struct {
6575 pub fn init (alloc : std.mem.Allocator ) ! Self {
6676 return Self {
6777 .alloc = alloc ,
78+ .cancelled = .{},
6879 .io = try IO .init (32 , 0 ),
6980 .js_events_nb = 0 ,
7081 .zig_events_nb = 0 ,
@@ -75,6 +86,12 @@ pub const Loop = struct {
7586 }
7687
7788 pub fn deinit (self : * Self ) void {
89+ // first disable callbacks for existing events.
90+ // We don't want a callback re-create a setTimeout, it could create an
91+ // infinite loop on wait for events.
92+ self .resetJS ();
93+ self .resetZig ();
94+
7895 // run tail events. We do run the tail events to ensure all the
7996 // contexts are correcly free.
8097 while (self .eventsNb (.js ) > 0 or self .eventsNb (.zig ) > 0 ) {
@@ -83,11 +100,14 @@ pub const Loop = struct {
83100 break ;
84101 };
85102 }
86- self .cancelAll ();
103+ if (comptime CANCEL_SUPPORTED ) {
104+ self .io .cancel_all ();
105+ }
87106 self .io .deinit ();
88107 self .cancel_pool .deinit ();
89108 self .timeout_pool .deinit ();
90109 self .event_callback_pool .deinit ();
110+ self .cancelled .deinit (self .alloc );
91111 }
92112
93113 // Retrieve all registred I/O events completed by OS kernel,
@@ -131,9 +151,6 @@ pub const Loop = struct {
131151 fn eventsNb (self : * Self , comptime event : Event ) usize {
132152 return @atomicLoad (usize , self .eventsPtr (event ), .seq_cst );
133153 }
134- fn resetEvents (self : * Self , comptime event : Event ) void {
135- @atomicStore (usize , self .eventsPtr (event ), 0 , .unordered );
136- }
137154
138155 // JS callbacks APIs
139156 // -----------------
@@ -158,6 +175,12 @@ pub const Loop = struct {
158175 loop .alloc .destroy (completion );
159176 }
160177
178+ if (comptime CANCEL_SUPPORTED == false ) {
179+ if (loop .cancelled .remove (@intFromPtr (completion ))) {
180+ return ;
181+ }
182+ }
183+
161184 // If the loop's context id has changed, don't call the js callback
162185 // function. The callback's memory has already be cleaned and the
163186 // events nb reset.
@@ -175,7 +198,7 @@ pub const Loop = struct {
175198 // js callback
176199 if (ctx .js_cbk ) | * js_cbk | {
177200 js_cbk .call (null ) catch {
178- ctx . loop .cbk_error = true ;
201+ loop .cbk_error = true ;
179202 };
180203 }
181204 }
@@ -234,19 +257,26 @@ pub const Loop = struct {
234257 // js callback
235258 if (ctx .js_cbk ) | * js_cbk | {
236259 js_cbk .call (null ) catch {
237- ctx . loop .cbk_error = true ;
260+ loop .cbk_error = true ;
238261 };
239262 }
240263 }
241264
242265 pub fn cancel (self : * Self , id : usize , js_cbk : ? JSCallback ) ! void {
243- if (IO .supports_cancel == false ) {
266+ const alloc = self .alloc ;
267+ if (comptime CANCEL_SUPPORTED == false ) {
268+ try self .cancelled .put (alloc , id , {});
269+ if (js_cbk ) | cbk | {
270+ cbk .call (null ) catch {
271+ self .cbk_error = true ;
272+ };
273+ }
244274 return ;
245275 }
246276 const comp_cancel : * IO.Completion = @ptrFromInt (id );
247277
248- const completion = try self . alloc .create (Completion );
249- errdefer self . alloc .destroy (completion );
278+ const completion = try alloc .create (Completion );
279+ errdefer alloc .destroy (completion );
250280 completion .* = undefined ;
251281
252282 const ctx = self .alloc .create (ContextCancel ) catch unreachable ;
@@ -260,18 +290,17 @@ pub const Loop = struct {
260290 self .io .cancel_one (* ContextCancel , ctx , cancelCallback , completion , comp_cancel );
261291 }
262292
263- fn cancelAll (self : * Self ) void {
264- self .resetEvents (.js );
265- self .resetEvents (.zig );
266- self .io .cancel_all ();
267- }
268-
269293 // Reset all existing JS callbacks.
294+ // The existing events will happen and their memory will be cleanup but the
295+ // corresponding callbacks will not be called.
270296 pub fn resetJS (self : * Self ) void {
271297 self .js_ctx_id += 1 ;
298+ self .cancelled .clearRetainingCapacity ();
272299 }
273300
274301 // Reset all existing Zig callbacks.
302+ // The existing events will happen and their memory will be cleanup but the
303+ // corresponding callbacks will not be called.
275304 pub fn resetZig (self : * Self ) void {
276305 self .zig_ctx_id += 1 ;
277306 }
@@ -365,6 +394,7 @@ pub const Loop = struct {
365394 const ContextZigTimeout = struct {
366395 loop : * Self ,
367396 zig_ctx_id : u32 ,
397+
368398 context : * anyopaque ,
369399 callback : * const fn (
370400 context : ? * anyopaque ,
@@ -431,3 +461,9 @@ const EventCallbackContext = struct {
431461 ctx : * anyopaque ,
432462 loop : * Loop ,
433463};
464+
465+ const CANCEL_SUPPORTED = switch (builtin .target .os .tag ) {
466+ .linux = > true ,
467+ .macos , .tvos , .watchos , .ios = > false ,
468+ else = > @compileError ("IO is not supported for platform" ),
469+ };
0 commit comments