@@ -14,15 +14,19 @@ type ready =
1414 * ((exn * Printexc .raw_backtrace ) option , unit ) Effect.Deep .continuation
1515 | Return of Fiber .t * (unit , unit ) Effect.Deep .continuation
1616
17+ let state_running = 1 lsl 0
18+ let state_idlers = 1 lsl 1
19+ let state_arrhytmia = 1 lsl 2
20+ let state_killed = 1 lsl 3
21+
1722type t = {
18- mutable num_waiters_non_zero : bool ;
19- num_waiters : int ref ;
23+ mutable state : int ;
2024 num_started : int Atomic .t ;
2125 mutex : Mutex .t ;
22- condition : Condition .t ;
26+ worker_condition : Condition .t ;
27+ heartbeat_condition : Condition .t ;
2328 handler : (unit , unit ) Effect.Deep .handler ;
2429 quota : int ;
25- mutable run : bool ;
2630 mutable threads : [ `Nothing | `Per_thread ] tdt array ;
2731 mutable threads_num : int ;
2832}
@@ -100,11 +104,29 @@ let next_index t i =
100104 let i = i + 1 in
101105 if i < t.threads_num then i else 0
102106
103- let [@ inline] relaxed_wakeup t ~known_not_empty ready =
104- if t.num_waiters_non_zero && (known_not_empty || Mpmcq. length ready != 0 ) then begin
107+ let [@ inline never] wakeup_heartbeat t =
108+ Mutex. lock t.mutex;
109+ let state = t.state in
110+ if state_arrhytmia < = state then begin
111+ t.state < - state land lnot state_arrhytmia;
112+ Mutex. unlock t.mutex;
113+ Condition. broadcast t.heartbeat_condition
114+ end
115+ else begin
116+ Mutex. unlock t.mutex
117+ end
118+
119+ let [@ inline] wakeup_heartbeat t =
120+ if state_arrhytmia < = t.state then wakeup_heartbeat t
121+
122+ let kill t =
123+ if t.state < state_killed then begin
105124 Mutex. lock t.mutex;
125+ let state = t.state in
126+ if state != state lor state_killed then t.state < - state lor state_killed;
106127 Mutex. unlock t.mutex;
107- Condition. signal t.condition
128+ Condition. broadcast t.heartbeat_condition;
129+ Condition. broadcast t.worker_condition
108130 end
109131
110132let exec ready (Per_thread p : per_thread ) t =
@@ -149,7 +171,6 @@ let rec next (Per_thread p as pt : per_thread) =
149171 match Mpmcq. pop_exn ready with
150172 | ready ->
151173 let t = p.context in
152- relaxed_wakeup t ~known_not_empty: false p.ready;
153174 exec ready pt t
154175 | exception Mpmcq. Empty ->
155176 p.fiber < - Fiber.Maybe. nothing;
@@ -162,9 +183,7 @@ and try_steal (Per_thread p as pt : per_thread) t i =
162183 | Nothing -> try_steal pt t (next_index t i)
163184 | Per_thread other_p -> begin
164185 match Mpmcq. pop_exn other_p.ready with
165- | ready ->
166- relaxed_wakeup t ~known_not_empty: false other_p.ready;
167- exec ready pt t
186+ | ready -> exec ready pt t
168187 | exception Mpmcq. Empty -> try_steal pt t (next_index t i)
169188 end
170189 end
@@ -173,36 +192,32 @@ and try_steal (Per_thread p as pt : per_thread) t i =
173192and wait (pt : per_thread ) t =
174193 if any_fibers_alive t then begin
175194 Mutex. lock t.mutex;
176- let n = ! (t.num_waiters) + 1 in
177- t.num_waiters := n;
178- if n = 1 then t.num_waiters_non_zero < - true ;
179- if (not (any_fibers_ready t)) && any_fibers_alive t then begin
180- match Condition. wait t.condition t.mutex with
195+ let state = t.state in
196+ if state != state lor state_idlers land lnot state_arrhytmia then
197+ t.state < - state lor state_idlers land lnot state_arrhytmia;
198+ if state_arrhytmia < = state then Condition. broadcast t.heartbeat_condition;
199+ if state < state_killed && not (any_fibers_ready t) then begin
200+ match Condition. wait t.worker_condition t.mutex with
181201 | () ->
182- let n = ! (t.num_waiters) - 1 in
183- t.num_waiters := n;
184- if n = 0 then t.num_waiters_non_zero < - false ;
202+ let state = t.state in
203+ if state != state lor state_idlers then
204+ t.state < - state lor state_idlers ;
185205 Mutex. unlock t.mutex;
186- next pt
206+ if state < state_killed then next pt
187207 | exception async_exn ->
188- let n = ! (t.num_waiters) - 1 in
189- t.num_waiters := n;
190- if n = 0 then t.num_waiters_non_zero < - false ;
208+ let state = t.state in
209+ if state != state lor state_idlers then
210+ t.state < - state lor state_idlers ;
191211 Mutex. unlock t.mutex;
192212 raise async_exn
193213 end
194214 else begin
195- let n = ! (t.num_waiters) - 1 in
196- t.num_waiters := n;
197- if n = 0 then t.num_waiters_non_zero < - false ;
198215 Mutex. unlock t.mutex;
199- next pt
216+ if state < state_killed then next pt
200217 end
201218 end
202219 else begin
203- Mutex. lock t.mutex;
204- Mutex. unlock t.mutex;
205- Condition. broadcast t.condition
220+ kill t
206221 end
207222
208223let default_fatal_exn_handler exn =
@@ -238,25 +253,19 @@ let per_thread context =
238253 match Picos_thread.TLS. get_exn per_thread_key with
239254 | Per_thread p_current when p_original.context == p_current.context ->
240255 (* We are running on a thread of this scheduler *)
241- if Fiber. unsuspend fiber trigger then
242- Mpmcq. push p_current.ready resume
243- else Mpmcq. push_head p_current.ready resume;
244- relaxed_wakeup p_current.context ~known_not_empty: true p_current.ready
256+ let ready = p_current.ready in
257+ if Fiber. unsuspend fiber trigger then Mpmcq. push ready resume
258+ else Mpmcq. push_head ready resume;
259+ let t = p_current.context in
260+ wakeup_heartbeat t
245261 | _ | (exception Picos_thread.TLS. Not_set) ->
246262 (* We are running on a foreign thread *)
247- if Fiber. unsuspend fiber trigger then
248- Mpmcq. push p_original. ready resume
249- else Mpmcq. push_head p_original. ready resume;
263+ let ready = p_original.ready in
264+ if Fiber. unsuspend fiber trigger then Mpmcq. push ready resume
265+ else Mpmcq. push_head ready resume;
250266 let t = p_original.context in
251- let non_zero =
252- match Mutex. lock t.mutex with
253- | () ->
254- let non_zero = t.num_waiters_non_zero in
255- Mutex. unlock t.mutex;
256- non_zero
257- | exception Sys_error _ -> false
258- in
259- if non_zero then Condition. signal t.condition);
267+ wakeup_heartbeat t;
268+ Condition. signal t.worker_condition);
260269 p.return < -
261270 Some
262271 (fun k ->
@@ -293,6 +302,43 @@ let[@inline never] with_per_thread new_pt fn old_p =
293302 | value -> returned value old_p
294303 | exception exn -> raised exn old_p
295304
305+ let rec heartbeat_thread t nth =
306+ if state_idlers lor state_running = t.state && any_fibers_ready t then begin
307+ if Mutex. try_lock t.mutex then begin
308+ t.state < - t.state land lnot state_idlers;
309+ Mutex. unlock t.mutex;
310+ Condition. signal t.worker_condition
311+ end ;
312+ Thread. yield () ;
313+ heartbeat_thread t 0
314+ end
315+ else begin
316+ if nth < 100 then begin
317+ if t.state < = state_killed then begin
318+ Thread. delay 0.0001 ;
319+ heartbeat_thread t (nth + 1 )
320+ end
321+ end
322+ else begin
323+ if Mutex. try_lock t.mutex then begin
324+ let state = t.state in
325+ if state < state_killed then begin
326+ t.state < - state lor state_arrhytmia;
327+ Condition. wait t.heartbeat_condition t.mutex
328+ end ;
329+ Mutex. unlock t.mutex;
330+ heartbeat_thread t 0
331+ end
332+ else heartbeat_thread t nth
333+ end
334+ end
335+
336+ let heartbeat_thread t =
337+ try heartbeat_thread t 0
338+ with exn ->
339+ kill t;
340+ t.handler.exnc exn
341+
296342let with_per_thread t fn =
297343 let (Per_thread new_p as new_pt) = per_thread t in
298344 begin
@@ -308,7 +354,11 @@ let with_per_thread t fn =
308354 end ;
309355 new_p.index < - t.threads_num;
310356 Array. unsafe_set t.threads t.threads_num new_pt;
311- if t.threads_num = 0 then Atomic. incr t.num_started
357+ if t.threads_num = 0 then begin
358+ Atomic. incr t.num_started;
359+ let _ = Thread. create heartbeat_thread t in
360+ ()
361+ end
312362 else Multicore_magic. fence t.num_started;
313363 t.threads_num < - t.threads_num + 1
314364 with
@@ -351,8 +401,7 @@ let effc : type a. a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option =
351401 (* The queue [push] includes a full fence, which means the increment
352402 of [num_started] will happen before increment of [num_stopped]. *)
353403 Mpmcq. push p.ready (Spawn (r.fiber, r.main));
354- let t = p.context in
355- relaxed_wakeup t ~known_not_empty: true p.ready;
404+ wakeup_heartbeat p.context;
356405 p.return
357406 end
358407 | Fiber. Yield -> yield
@@ -403,23 +452,24 @@ let context ?quota ?fatal_exn_handler () =
403452 | None -> default_fatal_exn_handler
404453 | Some handler ->
405454 fun exn ->
455+ let (Per_thread p) = get_per_thread () in
456+ kill p.context;
406457 handler exn ;
407458 raise exn
408459 in
409460 Select. check_configured () ;
410461 let mutex = Mutex. create ()
411- and condition = Condition. create ()
412- and num_waiters = ref 0 |> Multicore_magic. copy_as_padded
413- and num_started = Atomic. make 0 |> Multicore_magic. copy_as_padded in
462+ and worker_condition = Condition. create ()
463+ and heartbeat_condition = Condition. create ()
464+ and num_started = Atomic. make 0 in
414465 {
415- num_waiters_non_zero = false ;
416- num_waiters;
466+ state = 0 ;
417467 num_started;
418468 mutex;
419- condition;
469+ worker_condition;
470+ heartbeat_condition;
420471 handler = { retc; exnc; effc };
421472 quota;
422- run = false ;
423473 threads = Array. make 15 Nothing ;
424474 threads_num = 0 ;
425475 }
@@ -432,12 +482,13 @@ let run_fiber ?context:t_opt fiber main =
432482 let t = match t_opt with None -> context () | Some t -> t in
433483 with_per_thread t @@ fun (Per_thread p ) ->
434484 Mutex. lock t.mutex;
435- if t.run then begin
485+ let state = t.state in
486+ if state = state lor state_running then begin
436487 Mutex. unlock t.mutex;
437488 already_running ()
438489 end
439490 else begin
440- t.run < - true ;
491+ t.state < - state lor state_running ;
441492 Mutex. unlock t.mutex;
442493 p.remaining_quota < - t.quota;
443494 p.fiber < - Fiber.Maybe. of_fiber fiber;
0 commit comments