Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ bench-pi:
'./_build/default/benchs/pi.exe -n $(PI_NSTEPS) -j 16 -mode forkjoin -kind=pool' \
'./_build/default/benchs/pi.exe -n $(PI_NSTEPS) -j 20 -mode forkjoin -kind=pool'

bench-repro-41:
dune build $(DUNE_OPTS_BENCH) examples/repro_41/run.exe
hyperfine --warmup=1 \
"./_build/default/examples/repro_41/run.exe 4 domainslib" \
"./_build/default/examples/repro_41/run.exe 4 moonpool" \
"./_build/default/examples/repro_41/run.exe 5 moonpool" \
"./_build/default/examples/repro_41/run.exe 5 seq"

.PHONY: test clean bench-fib bench-pi

VERSION=$(shell awk '/^version:/ {print $$2}' moonpool.opam)
Expand Down
44 changes: 2 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,49 +173,9 @@ val expected_sum : int = 5050
We have a `Exn_bt.t` type that comes in handy in many places. It bundles together
an exception and the backtrace associated with the place the exception was caught.

### Fibers
### Local storage

On OCaml 5, Moonpool comes with a library `moonpool.fib` (module `Moonpool_fib`)
which provides _lightweight fibers_
that can run on any Moonpool runner.
These fibers are a sort of lightweight thread, dispatched on the runner's
background thread(s).
Fibers rely on effects to implement `Fiber.await`, suspending themselves until the `await`-ed fiber
is done.

```ocaml
# #require "moonpool.fib";;
...

# (* convenient alias *)
module F = Moonpool_fib;;
module F = Moonpool_fib
# F.main (fun _runner ->
let f1 = F.spawn (fun () -> fib 10) in
let f2 = F.spawn (fun () -> fib 15) in
F.await f1 + F.await f2);;
- : int = 1076
```

Fibers form a _tree_, where a fiber calling `Fiber.spawn` to start a sub-fiber is
the sub-fiber's _parent_.
When a parent fails, all its children are cancelled (forced to fail).
This is a simple form of [Structured Concurrency](https://en.wikipedia.org/wiki/Structured_concurrency).

Like a future, a fiber eventually _resolves_ into a value (or an `Exn_bt.t`) that it's possible
to `await`. With `Fiber.res : 'a Fiber.t -> 'a Fut.t` it's possible to access that result
as a regular future, too.
However, this resolution is only done after all the children of the fiber have
resolved — the lifetime of fibers forms a well-nested tree in that sense.

When a fiber is suspended because it `await`s another fiber (or future), the scheduler's
thread on which it was running becomes available again and can go on process another task.
When the fiber resumes, it will automatically be re-scheduled on the same runner it started on.
This means fibers on pool P1 can await fibers from pool P2 and still be resumed on P1.

In addition to all that, fibers provide _fiber local storage_ (like thread-local storage, but per fiber).
This storage is inherited in `spawn` (as a shallow copy only — it's advisable to only
put persistent data in storage to avoid confusing aliasing).
Moonpool, via picos, provides _task local storage_ (like thread-local storage, but per task).
The storage is convenient for carrying around context for cross-cutting concerns such
as logging or tracing (e.g. a log tag for the current user or request ID, or a tracing
scope).
Expand Down
2 changes: 1 addition & 1 deletion dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
(flags :standard -strict-sequence -warn-error -a+8 -w +a-4-40-42-70)))

(mdx
(libraries moonpool moonpool.forkjoin moonpool.fib threads)
(libraries moonpool moonpool.forkjoin threads)
(package moonpool)
(enabled_if
(>= %{ocaml_version} 5.0)))
15 changes: 7 additions & 8 deletions examples/discuss1.ml
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
(** Example from
https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 *)
(** NOTE: this was an example from
https://discuss.ocaml.org/t/confused-about-moonpool-cancellation/15381 but
there is no cancelation anymore :) *)

let ( let@ ) = ( @@ )

let () =
let@ () = Trace_tef.with_setup () in
let@ _ = Moonpool_fib.main in
let@ _ = Moonpool.main in

(* let@ runner = Moonpool.Ws_pool.with_ () in *)
let@ runner = Moonpool.Background_thread.with_ () in

(* Pretend this is some long-running read loop *)
for i = 1 to 10 do
Printf.printf "MAIN LOOP %d\n%!" i;
Moonpool_fib.check_if_cancelled ();
let _ : _ Moonpool_fib.t =
Moonpool_fib.spawn ~on:runner ~protect:false (fun () ->
let _ : _ Moonpool.Fut.t =
Moonpool.Fut.spawn ~on:runner (fun () ->
Printf.printf "RUN FIBER %d\n%!" i;
Moonpool_fib.check_if_cancelled ();
Format.printf "FIBER %d NOT CANCELLED YET@." i;
failwith "BOOM")
in
Moonpool_fib.yield ();
Moonpool.Fut.yield ();
(* Thread.delay 0.2; *)
(* Thread.delay 0.0001; *)
()
Expand Down
1 change: 0 additions & 1 deletion examples/dune
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
;(package moonpool)
(libraries
moonpool
moonpool.fib
trace
trace-tef
;tracy-client.trace
Expand Down
5 changes: 5 additions & 0 deletions examples/repro_41/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
(executables
(names run)
(enabled_if
(>= %{ocaml_version} 5.0))
(libraries moonpool trace trace-tef domainslib))
54 changes: 54 additions & 0 deletions examples/repro_41/run.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
(* fibo.ml *)
let cutoff = 25
let input = 40

let rec fibo_seq n =
if n <= 1 then
n
else
fibo_seq (n - 1) + fibo_seq (n - 2)

let rec fibo_domainslib ctx n =
if n <= cutoff then
fibo_seq n
else
let open Domainslib in
let fut1 = Task.async ctx (fun () -> fibo_domainslib ctx (n - 1)) in
let fut2 = Task.async ctx (fun () -> fibo_domainslib ctx (n - 2)) in
Task.await ctx fut1 + Task.await ctx fut2

let rec fibo_moonpool ctx n =
if n <= cutoff then
fibo_seq n
else
let open Moonpool in
let fut1 = Fut.spawn ~on:ctx (fun () -> fibo_moonpool ctx (n - 1)) in
let fut2 = Fut.spawn ~on:ctx (fun () -> fibo_moonpool ctx (n - 2)) in
Fut.await fut1 + Fut.await fut2

let usage =
"fibo.exe <num_domains> [ domainslib | moonpool | moonpool_fifo | seq ]"

let num_domains = try int_of_string Sys.argv.(1) with _ -> failwith usage
let implem = try Sys.argv.(2) with _ -> failwith usage

let () =
let output =
match implem with
| "moonpool" ->
let open Moonpool in
let ctx = Ws_pool.create ~num_threads:num_domains () in
Ws_pool.run_wait_block ctx (fun () -> fibo_moonpool ctx input)
| "moonpool_fifo" ->
let open Moonpool in
let ctx = Fifo_pool.create ~num_threads:num_domains () in
Ws_pool.run_wait_block ctx (fun () -> fibo_moonpool ctx input)
| "domainslib" ->
let open Domainslib in
let pool = Task.setup_pool ~num_domains () in
Task.run pool (fun () -> fibo_domainslib pool input)
| "seq" -> fibo_seq input
| _ -> failwith usage
in
print_int output;
print_newline ()
13 changes: 5 additions & 8 deletions src/core/background_thread.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,15 @@ type ('a, 'b) create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?name:string ->
'a
(** Arguments used in {!create}. See {!create} for explanations. *)

let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?name () : t =
Fifo_pool.create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?name
~num_threads:1 ()
let create ?on_init_thread ?on_exit_thread ?on_exn ?name () : t =
Fifo_pool.create ?on_init_thread ?on_exit_thread ?on_exn ?name ~num_threads:1
()

let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?name () f =
let pool =
create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?name ()
in
let with_ ?on_init_thread ?on_exit_thread ?on_exn ?name () f =
let pool = create ?on_init_thread ?on_exit_thread ?on_exn ?name () in
let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in
f pool
1 change: 0 additions & 1 deletion src/core/background_thread.mli
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ type ('a, 'b) create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?name:string ->
'a
(** Arguments used in {!create}. See {!create} for explanations. *)
Expand Down
28 changes: 5 additions & 23 deletions src/core/fifo_pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ let ( let@ ) = ( @@ )
type state = {
threads: Thread.t array;
q: task_full Bb_queue.t; (** Queue for tasks. *)
around_task: WL.around_task;
mutable as_runner: t;
(* init options *)
name: string option;
Expand Down Expand Up @@ -43,13 +42,10 @@ type ('a, 'b) create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?num_threads:int ->
?name:string ->
'a

let default_around_task_ : WL.around_task = AT_pair (ignore, fun _ _ -> ())

(** Run [task] as is, on the pool. *)
let schedule_ (self : state) (task : task_full) : unit =
try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown
Expand Down Expand Up @@ -88,35 +84,25 @@ let cleanup (self : worker_state) : unit =

let worker_ops : worker_state WL.ops =
let runner (st : worker_state) = st.st.as_runner in
let around_task st = st.st.around_task in
let on_exn (st : worker_state) (ebt : Exn_bt.t) =
st.st.on_exn (Exn_bt.exn ebt) (Exn_bt.bt ebt)
in
{
WL.schedule = schedule_w;
runner;
get_next_task;
around_task;
on_exn;
before_start;
cleanup;
}

let create_ ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
?around_task ~threads ?name () : state =
(* wrapper *)
let around_task =
match around_task with
| Some (f, g) -> WL.AT_pair (f, g)
| None -> default_around_task_
in

~threads ?name () : state =
let self =
{
threads;
q = Bb_queue.create ();
around_task;
as_runner = Runner.dummy;
name;
on_init_thread;
Expand All @@ -127,8 +113,7 @@ let create_ ?(on_init_thread = default_thread_init_exit_)
self.as_runner <- runner_of_state self;
self

let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
?name () : t =
let create ?on_init_thread ?on_exit_thread ?on_exn ?num_threads ?name () : t =
let num_domains = Domain_pool_.max_number_of_domains () in

(* number of threads to run *)
Expand All @@ -140,8 +125,7 @@ let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
let pool =
let dummy_thread = Thread.self () in
let threads = Array.make num_threads dummy_thread in
create_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ~threads ?name
()
create_ ?on_init_thread ?on_exit_thread ?on_exn ~threads ?name ()
in
let runner = runner_of_state pool in

Expand Down Expand Up @@ -181,11 +165,9 @@ let create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads

runner

let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
?name () f =
let with_ ?on_init_thread ?on_exit_thread ?on_exn ?num_threads ?name () f =
let pool =
create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?num_threads
?name ()
create ?on_init_thread ?on_exit_thread ?on_exn ?num_threads ?name ()
in
let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in
f pool
Expand Down
4 changes: 0 additions & 4 deletions src/core/fifo_pool.mli
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type ('a, 'b) create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?num_threads:int ->
?name:string ->
'a
Expand All @@ -35,9 +34,6 @@ val create : (unit -> t, _) create_args
[Domain.recommended_domain_count()], ie one worker per CPU core. On OCaml
4 the default is [4] (since there is only one domain).
@param on_exit_thread called at the end of each worker thread in the pool.
@param around_task
a pair of [before, after] functions ran around each task. See
{!Pool.create_args}.
@param name name for the pool, used in tracing (since 0.6) *)

val with_ : (unit -> (t -> 'a) -> 'a, _) create_args
Expand Down
2 changes: 2 additions & 0 deletions src/core/fut.ml
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ let await (self : 'a t) : 'a =
(* un-suspended: we should have a result! *)
get_or_fail_exn self

let yield = Picos.Fiber.yield

module Infix = struct
let[@inline] ( >|= ) x f = map ~f x
let[@inline] ( >>= ) x f = bind ~f x
Expand Down
Loading