Skip to content

Commit 6cc8f50

Browse files
committed
Parameterize heartbeat
1 parent 86b2b3d commit 6cc8f50

File tree

2 files changed

+72
-27
lines changed

2 files changed

+72
-27
lines changed

lib/picos_mux.multififo/picos_mux_multififo.ml

Lines changed: 55 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
open Picos
22

3+
let[@inline never] heartbeat_delay_out_of_range _ =
4+
invalid_arg "heartbeat_delay should be between 0 and 1 seconds"
5+
6+
let[@inline never] heartbeat_rounds_negative _ =
7+
invalid_arg "heartbeat_rounds must be non-negative"
8+
39
let[@inline never] quota_non_positive _ = invalid_arg "quota must be positive"
410
let[@inline never] already_running () = invalid_arg "already running"
511
let[@inline never] not_worker _ = invalid_arg "not a worker thread"
@@ -25,6 +31,8 @@ type t = {
2531
mutex : Mutex.t;
2632
worker_condition : Condition.t;
2733
heartbeat_condition : Condition.t;
34+
heartbeat_delay : float;
35+
heartbeat_rounds : int;
2836
handler : (unit, unit) Effect.Deep.handler;
2937
quota : int;
3038
mutable threads : [ `Nothing | `Per_thread ] tdt array;
@@ -55,6 +63,16 @@ and _ tdt =
5563

5664
and per_thread = [ `Per_thread ] tdt
5765

66+
let kill t =
67+
if t.state < state_killed then begin
68+
Mutex.lock t.mutex;
69+
let state = t.state in
70+
if state != state lor state_killed then t.state <- state lor state_killed;
71+
Mutex.unlock t.mutex;
72+
Condition.broadcast t.heartbeat_condition;
73+
Condition.broadcast t.worker_condition
74+
end
75+
5876
let per_thread_key = Picos_thread.TLS.create ()
5977

6078
let[@inline] get_per_thread () : per_thread =
@@ -119,16 +137,6 @@ let[@inline never] wakeup_heartbeat t =
119137
let[@inline] wakeup_heartbeat t =
120138
if state_arrhytmia <= t.state then wakeup_heartbeat t
121139

122-
let kill t =
123-
if t.state < state_killed then begin
124-
Mutex.lock t.mutex;
125-
let state = t.state in
126-
if state != state lor state_killed then t.state <- state lor state_killed;
127-
Mutex.unlock t.mutex;
128-
Condition.broadcast t.heartbeat_condition;
129-
Condition.broadcast t.worker_condition
130-
end
131-
132140
let exec ready (Per_thread p : per_thread) t =
133141
p.remaining_quota <- t.quota;
134142
p.fiber <-
@@ -302,21 +310,21 @@ let[@inline never] with_per_thread new_pt fn old_p =
302310
| value -> returned value old_p
303311
| exception exn -> raised exn old_p
304312

305-
let rec heartbeat_thread t nth =
313+
let rec heartbeat_thread t rounds =
306314
if state_idlers lor state_running = t.state && any_fibers_ready t then begin
307315
if Mutex.try_lock t.mutex then begin
308316
t.state <- t.state land lnot state_idlers;
309317
Mutex.unlock t.mutex;
310318
Condition.signal t.worker_condition
311319
end;
312320
Thread.yield ();
313-
heartbeat_thread t 0
321+
heartbeat_thread t t.heartbeat_rounds
314322
end
315323
else begin
316-
if nth < 100 then begin
324+
if 0 < rounds then begin
317325
if t.state <= state_killed then begin
318-
Thread.delay 0.0001;
319-
heartbeat_thread t (nth + 1)
326+
Thread.delay t.heartbeat_delay;
327+
heartbeat_thread t (rounds - 1)
320328
end
321329
end
322330
else begin
@@ -327,14 +335,14 @@ let rec heartbeat_thread t nth =
327335
Condition.wait t.heartbeat_condition t.mutex
328336
end;
329337
Mutex.unlock t.mutex;
330-
heartbeat_thread t 0
338+
heartbeat_thread t t.heartbeat_rounds
331339
end
332-
else heartbeat_thread t nth
340+
else heartbeat_thread t 0
333341
end
334342
end
335343

336344
let heartbeat_thread t =
337-
try heartbeat_thread t 0
345+
try heartbeat_thread t t.heartbeat_rounds
338346
with exn ->
339347
kill t;
340348
t.handler.exnc exn
@@ -441,7 +449,20 @@ let retc () =
441449
p.num_stopped <- p.num_stopped + 1;
442450
next pt
443451

444-
let context ?quota ?fatal_exn_handler () =
452+
let context ?heartbeat_delay ?heartbeat_rounds ?quota ?fatal_exn_handler () =
453+
let heartbeat_delay =
454+
match heartbeat_delay with
455+
| None -> 0.005
456+
| Some delay ->
457+
if not (0.0 <= delay && delay <= 1.0) then
458+
heartbeat_delay_out_of_range ()
459+
else delay
460+
in
461+
let heartbeat_rounds =
462+
match heartbeat_rounds with
463+
| None -> 100
464+
| Some rounds -> if rounds < 0 then heartbeat_rounds_negative () else rounds
465+
in
445466
let quota =
446467
match quota with
447468
| None -> Int.max_int
@@ -468,6 +489,8 @@ let context ?quota ?fatal_exn_handler () =
468489
mutex;
469490
worker_condition;
470491
heartbeat_condition;
492+
heartbeat_delay;
493+
heartbeat_rounds;
471494
handler = { retc; exnc; effc };
472495
quota;
473496
threads = Array.make 15 Nothing;
@@ -533,9 +556,12 @@ let rec run_fiber_on n fiber main runner_main context =
533556
end;
534557
Printexc.raise_with_backtrace exn bt
535558

536-
let run_fiber_on ?quota ?fatal_exn_handler ~n_domains fiber main =
559+
let run_fiber_on ?heartbeat_delay ?heartbeat_rounds ?quota ?fatal_exn_handler
560+
~n_domains fiber main =
537561
if n_domains < 1 then invalid_arg "n_domains must be positive";
538-
let context = context ?quota ?fatal_exn_handler () in
562+
let context =
563+
context ?heartbeat_delay ?heartbeat_rounds ?quota ?fatal_exn_handler ()
564+
in
539565
let runner_main =
540566
if n_domains = 1 then fun () -> None
541567
else
@@ -550,14 +576,17 @@ let run_fiber_on ?quota ?fatal_exn_handler ~n_domains fiber main =
550576
in
551577
run_fiber_on n_domains fiber main runner_main context
552578

553-
let[@inline never] run_on ?quota ?fatal_exn_handler ~n_domains fiber main
554-
computation =
555-
run_fiber_on ?quota ?fatal_exn_handler ~n_domains fiber main;
579+
let[@inline never] run_on ?heartbeat_delay ?heartbeat_rounds ?quota
580+
?fatal_exn_handler ~n_domains fiber main computation =
581+
run_fiber_on ?heartbeat_delay ?heartbeat_rounds ?quota ?fatal_exn_handler
582+
~n_domains fiber main;
556583
Computation.peek_exn computation
557584

558-
let run_on ?quota ?fatal_exn_handler ~n_domains ?forbid main =
585+
let run_on ?heartbeat_delay ?heartbeat_rounds ?quota ?fatal_exn_handler
586+
~n_domains ?forbid main =
559587
let forbid = match forbid with None -> false | Some forbid -> forbid in
560588
let computation = Computation.create ~mode:`LIFO () in
561589
let fiber = Fiber.create ~forbid computation in
562590
let main _ = Computation.capture computation main () in
563-
run_on ?quota ?fatal_exn_handler ~n_domains fiber main computation
591+
run_on ?heartbeat_delay ?heartbeat_rounds ?quota ?fatal_exn_handler ~n_domains
592+
fiber main computation

lib/picos_mux.multififo/picos_mux_multififo.mli

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,22 @@ open Picos
2828
type t
2929
(** Represents a shared context for fifo runners. *)
3030

31-
val context : ?quota:int -> ?fatal_exn_handler:(exn -> unit) -> unit -> t
31+
val context :
32+
?heartbeat_delay:float ->
33+
?heartbeat_rounds:int ->
34+
?quota:int ->
35+
?fatal_exn_handler:(exn -> unit) ->
36+
unit ->
37+
t
3238
(** [context ()] creates a new context for randomized runners. The context must
3339
always be consumed by a call of {{!run} [run ~context ...]}.
3440
41+
The optional [heartbeat_delay], with a default of [0.005], and
42+
[heartbeat_rounds], with a default of [100], specify the delay between
43+
heartbeats and the number of heartbeat rounds to perform when all the
44+
threads appear busy or there doesn't seem to be enough fibers to
45+
redistribute. Note that [heartbeat_delay] may delay program termination.
46+
3547
The optional [quota] argument defaults to [Int.max_int] and determines the
3648
number of effects a fiber is allowed to perform before it is forced to
3749
yield. *)
@@ -54,6 +66,8 @@ val run : ?context:t -> ?forbid:bool -> (unit -> 'a) -> 'a
5466
propagation of cancelation is initially allowed. *)
5567

5668
val run_fiber_on :
69+
?heartbeat_delay:float ->
70+
?heartbeat_rounds:int ->
5771
?quota:int ->
5872
?fatal_exn_handler:(exn -> unit) ->
5973
n_domains:int ->
@@ -64,6 +78,8 @@ val run_fiber_on :
6478
runs the [main] on the current domain and those additional domains. *)
6579

6680
val run_on :
81+
?heartbeat_delay:float ->
82+
?heartbeat_rounds:int ->
6783
?quota:int ->
6884
?fatal_exn_handler:(exn -> unit) ->
6985
n_domains:int ->

0 commit comments

Comments
 (0)