Skip to content

Commit d98dade

Browse files
authored
Merge pull request #37 from c-cube/simon/lwt-main-runner-2025-07-09
change moonpool-lwt to make it a lwt-engine based runner
2 parents d79200f + 2dbbad4 commit d98dade

35 files changed

+3162
-1259
lines changed

src/core/fifo_pool.ml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ type worker_state = {
2828

2929
let[@inline] size_ (self : state) = Array.length self.threads
3030
let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q
31-
let k_worker_state : worker_state TLS.t = TLS.create ()
3231

3332
(*
3433
get_thread_state = TLS.get_opt k_worker_state
@@ -71,12 +70,6 @@ let schedule_w (self : worker_state) (task : task_full) : unit =
7170
let get_next_task (self : worker_state) =
7271
try Bb_queue.pop self.st.q with Bb_queue.Closed -> raise WL.No_more_tasks
7372

74-
let get_thread_state () =
75-
match TLS.get_exn k_worker_state with
76-
| st -> st
77-
| exception TLS.Not_set ->
78-
failwith "Moonpool: get_thread_state called from outside a runner."
79-
8073
let before_start (self : worker_state) =
8174
let t_id = Thread.id @@ Thread.self () in
8275
self.st.on_init_thread ~dom_id:self.dom_idx ~t_id ();
@@ -103,7 +96,6 @@ let worker_ops : worker_state WL.ops =
10396
WL.schedule = schedule_w;
10497
runner;
10598
get_next_task;
106-
get_thread_state;
10799
around_task;
108100
on_exn;
109101
before_start;

src/core/worker_loop_.ml

Lines changed: 86 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ exception No_more_tasks
2121
type 'st ops = {
2222
schedule: 'st -> task_full -> unit;
2323
get_next_task: 'st -> task_full; (** @raise No_more_tasks *)
24-
get_thread_state: unit -> 'st;
25-
(** Access current thread's worker state from any worker *)
2624
around_task: 'st -> around_task;
2725
on_exn: 'st -> Exn_bt.t -> unit;
2826
runner: 'st -> Runner.t;
@@ -41,8 +39,8 @@ let[@inline] raise_with_bt exn =
4139
let bt = Printexc.get_raw_backtrace () in
4240
Printexc.raise_with_backtrace exn bt
4341

44-
let with_handler (type st arg) ~(ops : st ops) (self : st) :
45-
(unit -> unit) -> unit =
42+
let with_handler (type st) ~(ops : st ops) (self : st) : (unit -> unit) -> unit
43+
=
4644
let current =
4745
Some
4846
(fun k ->
@@ -85,8 +83,8 @@ let with_handler (type st arg) ~(ops : st ops) (self : st) :
8583
let fiber = get_current_fiber_exn () in
8684
(* when triggers is signaled, reschedule task *)
8785
if not (Picos.Fiber.try_suspend fiber trigger fiber k reschedule) then
88-
(* trigger was already signaled, run task now *)
89-
Picos.Fiber.resume fiber k)
86+
(* trigger was already signaled, reschedule task now *)
87+
reschedule trigger fiber k)
9088
| Picos.Computation.Cancel_after _r ->
9189
Some
9290
(fun k ->
@@ -98,31 +96,28 @@ let with_handler (type st arg) ~(ops : st ops) (self : st) :
9896
let handler = Effect.Deep.{ retc = Fun.id; exnc = raise_with_bt; effc } in
9997
fun f -> Effect.Deep.match_with f () handler
10098

101-
let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
102-
if block_signals then (
103-
try
104-
ignore
105-
(Unix.sigprocmask SIG_BLOCK
106-
[
107-
Sys.sigterm;
108-
Sys.sigpipe;
109-
Sys.sigint;
110-
Sys.sigchld;
111-
Sys.sigalrm;
112-
Sys.sigusr1;
113-
Sys.sigusr2;
114-
]
115-
: _ list)
116-
with _ -> ()
117-
);
118-
119-
let cur_fiber : fiber ref = ref _dummy_fiber in
120-
let runner = ops.runner self in
121-
TLS.set Runner.For_runner_implementors.k_cur_runner runner;
122-
123-
let (AT_pair (before_task, after_task)) = ops.around_task self in
99+
module type FINE_GRAINED_ARGS = sig
100+
type st
101+
102+
val ops : st ops
103+
val st : st
104+
end
105+
106+
module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
107+
open Args
108+
109+
let cur_fiber : fiber ref = ref _dummy_fiber
110+
let runner = ops.runner st
111+
112+
type state =
113+
| New
114+
| Ready
115+
| Torn_down
116+
117+
let state = ref New
124118

125119
let run_task (task : task_full) : unit =
120+
let (AT_pair (before_task, after_task)) = ops.around_task st in
126121
let fiber =
127122
match task with
128123
| T_start { fiber; _ } | T_resume { fiber; _ } -> fiber
@@ -136,32 +131,82 @@ let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
136131
assert (task != _dummy_task);
137132
(try
138133
match task with
139-
| T_start { fiber = _; f } -> with_handler ~ops self f
134+
| T_start { fiber = _; f } -> with_handler ~ops st f
140135
| T_resume { fiber = _; k } ->
141136
(* this is already in an effect handler *)
142137
k ()
143138
with e ->
144139
let bt = Printexc.get_raw_backtrace () in
145140
let ebt = Exn_bt.make e bt in
146-
ops.on_exn self ebt);
141+
ops.on_exn st ebt);
147142

148143
after_task runner _ctx;
149144

150145
cur_fiber := _dummy_fiber;
151146
TLS.set k_cur_fiber _dummy_fiber
152-
in
153147

154-
ops.before_start self;
148+
let setup ~block_signals () : unit =
149+
if !state <> New then invalid_arg "worker_loop.setup: not a new instance";
150+
state := Ready;
151+
152+
if block_signals then (
153+
try
154+
ignore
155+
(Unix.sigprocmask SIG_BLOCK
156+
[
157+
Sys.sigterm;
158+
Sys.sigpipe;
159+
Sys.sigint;
160+
Sys.sigchld;
161+
Sys.sigalrm;
162+
Sys.sigusr1;
163+
Sys.sigusr2;
164+
]
165+
: _ list)
166+
with _ -> ()
167+
);
168+
169+
TLS.set Runner.For_runner_implementors.k_cur_runner runner;
170+
171+
ops.before_start st
172+
173+
let run ?(max_tasks = max_int) () : unit =
174+
if !state <> Ready then invalid_arg "worker_loop.run: not setup";
175+
176+
let continue = ref true in
177+
let n_tasks = ref 0 in
178+
while !continue && !n_tasks < max_tasks do
179+
match ops.get_next_task st with
180+
| task ->
181+
incr n_tasks;
182+
run_task task
183+
| exception No_more_tasks -> continue := false
184+
done
185+
186+
let teardown () =
187+
if !state <> Torn_down then (
188+
state := Torn_down;
189+
cur_fiber := _dummy_fiber;
190+
ops.cleanup st
191+
)
192+
end
155193

156-
let continue = ref true in
194+
let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
195+
let module FG =
196+
Fine_grained
197+
(struct
198+
type nonrec st = st
199+
200+
let ops = ops
201+
let st = self
202+
end)
203+
()
204+
in
205+
FG.setup ~block_signals ();
157206
try
158-
while !continue do
159-
match ops.get_next_task self with
160-
| task -> run_task task
161-
| exception No_more_tasks -> continue := false
162-
done;
163-
ops.cleanup self
207+
FG.run ();
208+
FG.teardown ()
164209
with exn ->
165210
let bt = Printexc.get_raw_backtrace () in
166-
ops.cleanup self;
211+
FG.teardown ();
167212
Printexc.raise_with_backtrace exn bt

src/core/worker_loop_.mli

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,30 @@ exception No_more_tasks
2626
type 'st ops = {
2727
schedule: 'st -> task_full -> unit;
2828
get_next_task: 'st -> task_full;
29-
get_thread_state: unit -> 'st;
3029
around_task: 'st -> around_task;
3130
on_exn: 'st -> Exn_bt.t -> unit;
3231
runner: 'st -> Runner.t;
3332
before_start: 'st -> unit;
3433
cleanup: 'st -> unit;
3534
}
3635

36+
module type FINE_GRAINED_ARGS = sig
37+
type st
38+
39+
val ops : st ops
40+
val st : st
41+
end
42+
43+
module Fine_grained (_ : FINE_GRAINED_ARGS) () : sig
44+
val setup : block_signals:bool -> unit -> unit
45+
(** Just initialize the loop *)
46+
47+
val run : ?max_tasks:int -> unit -> unit
48+
(** Run the loop until no task remains or until [max_tasks] tasks have been
49+
run *)
50+
51+
val teardown : unit -> unit
52+
(** Tear down the loop *)
53+
end
54+
3755
val worker_loop : block_signals:bool -> ops:'st ops -> 'st -> unit

src/core/ws_pool.ml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,6 @@ let k_worker_state : worker_state TLS.t = TLS.create ()
6262
let[@inline] get_current_worker_ () : worker_state option =
6363
TLS.get_opt k_worker_state
6464

65-
let[@inline] get_current_worker_exn () : worker_state =
66-
match TLS.get_exn k_worker_state with
67-
| w -> w
68-
| exception TLS.Not_set ->
69-
failwith "Moonpool: get_current_runner was called from outside a pool."
70-
7165
(** Try to wake up a waiter, if there's any. *)
7266
let[@inline] try_wake_someone_ (self : state) : unit =
7367
if self.n_waiting_nonzero then (
@@ -212,7 +206,6 @@ let worker_ops : worker_state WL.ops =
212206
WL.schedule = schedule_from_w;
213207
runner;
214208
get_next_task;
215-
get_thread_state = get_current_worker_exn;
216209
around_task;
217210
on_exn;
218211
before_start;

src/lwt/IO.ml

Lines changed: 0 additions & 66 deletions
This file was deleted.

0 commit comments

Comments
 (0)