@@ -34,9 +34,11 @@ pub const Loop = struct {
3434 alloc : std.mem.Allocator , // TODO: unmanaged version ?
3535 io : IO ,
3636
37- // Used to track how many callbacks are to be called and wait until all
38- // event are finished.
39- events_nb : usize ,
37+ // number of pending network events we have
38+ pending_network_count : usize ,
39+
40+ // number of pending timeout events we have
41+ pending_timeout_count : usize ,
4042
4143 // Used to stop repeating timeouts when loop.run is called.
4244 stopping : bool ,
@@ -66,8 +68,9 @@ pub const Loop = struct {
6668 .alloc = alloc ,
6769 .cancelled = .{},
6870 .io = try IO .init (32 , 0 ),
69- .events_nb = 0 ,
7071 .stopping = false ,
72+ .pending_network_count = 0 ,
73+ .pending_timeout_count = 0 ,
7174 .timeout_pool = MemoryPool (ContextTimeout ).init (alloc ),
7275 .event_callback_pool = MemoryPool (EventCallbackContext ).init (alloc ),
7376 };
@@ -78,7 +81,7 @@ pub const Loop = struct {
7881
7982 // run tail events. We do run the tail events to ensure all the
8083 // contexts are correcly free.
81- while (self .eventsNb () > 0 ) {
84+ while (self .hasPendinEvents () ) {
8285 self .io .run_for_ns (10 * std .time .ns_per_ms ) catch | err | {
8386 log .err (.loop , "deinit" , .{ .err = err });
8487 break ;
@@ -93,6 +96,21 @@ pub const Loop = struct {
9396 self .cancelled .deinit (self .alloc );
9497 }
9598
99+ // We can shutdown once all the pending network IO is complete.
100+ // In debug mode we also wait until al the pending timeouts are complete
101+ // but we only do this so that the `timeoutCallback` can free all allocated
102+ // memory and we won't report a leak.
103+ fn hasPendinEvents (self : * const Self ) bool {
104+ if (self .pending_network_count > 0 ) {
105+ return true ;
106+ }
107+
108+ if (builtin .mode != .Debug ) {
109+ return false ;
110+ }
111+ return self .pending_timeout_count > 0 ;
112+ }
113+
96114 // Retrieve all registred I/O events completed by OS kernel,
97115 // and execute sequentially their callbacks.
98116 // Stops when there is no more I/O events registered on the loop.
@@ -103,25 +121,12 @@ pub const Loop = struct {
103121 self .stopping = true ;
104122 defer self .stopping = false ;
105123
106- while (self .eventsNb () > 0 ) {
124+ while (self .pending_network_count > 0 ) {
107125 try self .io .run_for_ns (10 * std .time .ns_per_ms );
108126 // at each iteration we might have new events registred by previous callbacks
109127 }
110128 }
111129
112- // Register events atomically
113- // - add 1 event and return previous value
114- fn addEvent (self : * Self ) void {
115- _ = @atomicRmw (usize , & self .events_nb , .Add , 1 , .acq_rel );
116- }
117- // - remove 1 event and return previous value
118- fn removeEvent (self : * Self ) void {
119- _ = @atomicRmw (usize , & self .events_nb , .Sub , 1 , .acq_rel );
120- }
121- // - get the number of current events
122- fn eventsNb (self : * Self ) usize {
123- return @atomicLoad (usize , & self .events_nb , .seq_cst );
124- }
125130
126131 // JS callbacks APIs
127132 // -----------------
@@ -152,7 +157,7 @@ pub const Loop = struct {
152157 const loop = ctx .loop ;
153158
154159 if (ctx .initial ) {
155- loop .removeEvent () ;
160+ loop .pending_timeout_count -= 1 ;
156161 }
157162
158163 defer {
@@ -207,7 +212,7 @@ pub const Loop = struct {
207212 .callback_node = callback_node ,
208213 };
209214
210- self .addEvent () ;
215+ self .pending_timeout_count += 1 ;
211216 self .scheduleTimeout (nanoseconds , ctx , completion );
212217 return @intFromPtr (completion );
213218 }
@@ -244,17 +249,18 @@ pub const Loop = struct {
244249 ) ! void {
245250 const onConnect = struct {
246251 fn onConnect (callback : * EventCallbackContext , completion_ : * Completion , res : ConnectError ! void ) void {
252+ callback .loop .pending_network_count -= 1 ;
247253 defer callback .loop .event_callback_pool .destroy (callback );
248- callback .loop .removeEvent ();
249254 cbk (@alignCast (@ptrCast (callback .ctx )), completion_ , res );
250255 }
251256 }.onConnect ;
252257
258+
253259 const callback = try self .event_callback_pool .create ();
254260 errdefer self .event_callback_pool .destroy (callback );
255261 callback .* = .{ .loop = self , .ctx = ctx };
256262
257- self .addEvent () ;
263+ self .pending_network_count += 1 ;
258264 self .io .connect (* EventCallbackContext , callback , onConnect , completion , socket , address );
259265 }
260266
@@ -271,8 +277,8 @@ pub const Loop = struct {
271277 ) ! void {
272278 const onSend = struct {
273279 fn onSend (callback : * EventCallbackContext , completion_ : * Completion , res : SendError ! usize ) void {
280+ callback .loop .pending_network_count -= 1 ;
274281 defer callback .loop .event_callback_pool .destroy (callback );
275- callback .loop .removeEvent ();
276282 cbk (@alignCast (@ptrCast (callback .ctx )), completion_ , res );
277283 }
278284 }.onSend ;
@@ -281,7 +287,7 @@ pub const Loop = struct {
281287 errdefer self .event_callback_pool .destroy (callback );
282288 callback .* = .{ .loop = self , .ctx = ctx };
283289
284- self .addEvent () ;
290+ self .pending_network_count += 1 ;
285291 self .io .send (* EventCallbackContext , callback , onSend , completion , socket , buf );
286292 }
287293
@@ -298,17 +304,16 @@ pub const Loop = struct {
298304 ) ! void {
299305 const onRecv = struct {
300306 fn onRecv (callback : * EventCallbackContext , completion_ : * Completion , res : RecvError ! usize ) void {
307+ callback .loop .pending_network_count -= 1 ;
301308 defer callback .loop .event_callback_pool .destroy (callback );
302- callback .loop .removeEvent ();
303309 cbk (@alignCast (@ptrCast (callback .ctx )), completion_ , res );
304310 }
305311 }.onRecv ;
306312
307313 const callback = try self .event_callback_pool .create ();
308314 errdefer self .event_callback_pool .destroy (callback );
309315 callback .* = .{ .loop = self , .ctx = ctx };
310-
311- self .addEvent ();
316+ self .pending_network_count += 1 ;
312317 self .io .recv (* EventCallbackContext , callback , onRecv , completion , socket , buf );
313318 }
314319};
0 commit comments