Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ on:
branches:
- master
pull_request:
branches:
- master
jobs:
run:
name: Build
Expand All @@ -16,8 +14,9 @@ jobs:
#- windows-latest # depext issue for gnuplot/jemalloc?
#- macos-latest # slow
ocaml-compiler:
- 4.12.x
- 5.0.x
- 4.14.x
- 5.1.x
- 5.4.x
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
Expand Down
4 changes: 2 additions & 2 deletions benchpress-lsp.opam
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ build: [
depends: [
"dune" { >= "1.11" }
"containers" { >= "3.0" & < "4.0" }
"containers-thread" { >= "3.0" & < "4.0" }
"moonpool" { >= "0.8" }
"benchpress" { = version }
"linol" { >= "0.4" & < "0.5" }
"linol" { >= "0.10" & < "0.11" }
]
homepage: "https://github.com/sneeuwballen/benchpress/"
dev-repo: "git+https://github.com/sneeuwballen/benchpress.git"
Expand Down
4 changes: 2 additions & 2 deletions benchpress.opam
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ depends: [
"dune" { >= "2.2" }
"base-unix"
"containers" { >= "3.3" & < "4.0" }
"containers-thread" { >= "3.0" & < "4.0" }
"moonpool" { >= "0.8" }
"re" { >= "1.8" & < "2.0" }
"csv"
"cmdliner" {>= "1.1.0"}
Expand All @@ -29,7 +29,7 @@ depends: [
"sqlite3_utils" { >= "0.4" & < "0.6" }
"printbox" { >= "0.6" }
"printbox-text" { >= "0.6" }
"ocaml" {>= "4.12" }
"ocaml" {>= "4.14" }
]
homepage: "https://github.com/sneeuwballen/benchpress/"
dev-repo: "git+https://github.com/sneeuwballen/benchpress.git"
Expand Down
8 changes: 4 additions & 4 deletions src/bin/Run_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ let execute_run_prover_action ?j ?cpus ?timestamp ?pp_results ?dyn ?limits
Error.guard
(Error.wrapf "run prover action@ `@[%a@]`" Action.pp_run_provers r)
@@ fun () ->
let interrupted = CCLock.create false in
let interrupted = Moonpool.Lock.create false in
let r =
Exec_action.Exec_run_provers.expand ?dyn ?j ?cpus ?proof_dir ?limits defs
r.limits r.j r.pattern r.dirs r.provers
Expand All @@ -23,7 +23,7 @@ let execute_run_prover_action ?j ?cpus ?timestamp ?pp_results ?dyn ?limits
let result =
Error.guard (Error.wrapf "running %d tests" len) @@ fun () ->
Exec_action.Exec_run_provers.run ~uuid ?timestamp
~interrupted:(fun () -> CCLock.get interrupted)
~interrupted:(fun () -> Moonpool.Lock.get interrupted)
~on_solve:progress#on_res ~save ~wal_mode
~on_start_proof_check:(fun () -> progress#on_start_proof_check)
~on_proof_check:progress#on_proof_check_res
Expand All @@ -40,7 +40,7 @@ let execute_submit_job_action ?pp_results ?j ?timestamp ?dyn ?limits ?proof_dir
(Error.wrapf "run provers with slurm action@ `@[%a@]`"
Action.pp_run_provers_slurm r)
@@ fun () ->
let interrupted = CCLock.create false in
let interrupted = Moonpool.Lock.create false in
let exp_r =
Exec_action.Exec_run_provers.expand ~slurm:true ?dyn ?j ?proof_dir ?limits
defs r.limits r.j r.pattern r.dirs r.provers
Expand All @@ -53,7 +53,7 @@ let execute_submit_job_action ?pp_results ?j ?timestamp ?dyn ?limits ?proof_dir
let result =
Error.guard (Error.wrapf "running %d tests" len) @@ fun () ->
Exec_action.Exec_run_provers.run_sbatch_job ~uuid ?timestamp
~interrupted:(fun () -> CCLock.get interrupted)
~interrupted:(fun () -> Moonpool.Lock.get interrupted)
?partition:r.partition ~nodes:r.nodes ~addr:r.addr ~port:r.port
~ntasks:r.ntasks ~save ~wal_mode ~on_solve:progress#on_res
~on_start_proof_check:(fun () -> progress#on_start_proof_check)
Expand Down
12 changes: 6 additions & 6 deletions src/core/Exec_action.ml
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ end = struct
in
(* insert into DB here *)
let ev_prover = Run_event.mk_prover result in
CCLock.with_lock db (fun db -> Run_event.to_db db ev_prover);
Moonpool.Lock.with_ db (fun db -> Run_event.to_db db ev_prover);

let ev_proof =
match result.res, proof_file with
Expand Down Expand Up @@ -320,7 +320,7 @@ end = struct
on_proof_check res;

(* insert into DB here *)
CCLock.with_lock db (fun db -> Run_event.to_db db ev_checker);
Moonpool.Lock.with_ db (fun db -> Run_event.to_db db ev_checker);
[ ev_checker ]
| _ -> []
in
Expand All @@ -339,7 +339,7 @@ end = struct
in
(* run provers *)
let res_l =
let db = CCLock.create db in
let db = Moonpool.Lock.create db in
match self.j with
| Bounded j ->
Misc.Par_map.map_p ~j
Expand Down Expand Up @@ -446,7 +446,7 @@ end = struct
let nb_resps = ref 0 in
let resps_ref = ref [] in
let resps_lock = Mutex.create () in
let db_wl = CCLock.create db in
let db_wl = Moonpool.Lock.create db in
let add_resps evl =
Mutex.lock resps_lock;
nb_resps := !nb_resps + List.length evl;
Expand All @@ -456,7 +456,7 @@ end = struct
(match ev with
| Run_event.Prover_run r -> on_solve r
| Checker_run r -> on_proof_check r);
CCLock.with_lock db_wl (fun db -> Run_event.to_db db ev))
Moonpool.Lock.with_ db_wl (fun db -> Run_event.to_db db ev))
evl;
Mutex.unlock resps_lock
in
Expand Down Expand Up @@ -574,7 +574,7 @@ end = struct
in

let sock, used_port = Misc.mk_socket (Unix.ADDR_INET (addr, port)) in
ignore (CCThread.spawn (fun () -> Misc.start_server nodes server_loop sock));
ignore (Thread.create (fun () -> Misc.start_server nodes server_loop sock) ());

Log.debug (fun k ->
k "Spawned the thread that establishes a server listening at: %s:%s."
Expand Down
98 changes: 61 additions & 37 deletions src/core/Misc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ module Db = Sqlite3_utils
module PB = PrintBox

let spf = Printf.sprintf
let _lock = CCLock.create ()
let _lock = Moonpool.Lock.create ()
let now_s () = Ptime_clock.now () |> Ptime.to_float_s
let reset_line = "\x1b[2K\r"
let synchronized f = CCLock.with_lock _lock f
let synchronized f = Moonpool.Lock.with_ _lock f

module Log_report = struct
let buf_fmt () =
Expand Down Expand Up @@ -41,7 +41,7 @@ module Log_report = struct
| Logs.App -> Fmt.fprintf out "[@{<blue>app@}:%s]" src

let reporter () =
let lock = CCLock.create () in
let lock = Moonpool.Lock.create () in
Fmt.set_color_default true;
let buf_out, buf_flush = buf_fmt () in
let pp_header fmt header =
Expand All @@ -67,7 +67,7 @@ module Log_report = struct
let report src level ~over k msgf =
let k _ =
let write_str out s =
CCLock.with_lock lock @@ fun () ->
Moonpool.Lock.with_ lock @@ fun () ->
Printf.fprintf out "%s%s%!" reset_line s
in
let msg = buf_flush () in
Expand Down Expand Up @@ -301,22 +301,27 @@ module Par_map = struct
let map_p ~j f l =
if j < 1 then invalid_arg "map_p: ~j";
die_on_sigterm ();
(* NOTE: for some reason the pool seems to spawn one too many thread
in some cases. So we add a guard to respect [-j] properly. *)
let sem = CCSemaphore.create j in
let f_with_sem x = CCSemaphore.with_acquire ~n:1 sem ~f:(fun () -> f x) in
Logs.debug (fun k -> k "par-map: create pool j=%d" j);
let module P = CCPool.Make (struct
let max_size = j
end) in
let res =
match l with
| [] -> []
| _ -> P.Fut.map_l (fun x -> P.Fut.make1 f_with_sem x) l |> P.Fut.get
in
Logs.debug (fun k -> k "par-map: stop pool");
P.stop ();
res
match l with
| [] -> []
| _ ->
Logs.debug (fun k -> k "par-map: create pool j=%d" j);
let pool = Moonpool.Fifo_pool.create ~num_threads:j () in
let res =
try
let futs = List.map (fun x -> Moonpool.Fut.spawn ~on:pool (fun () -> f x)) l in
let results = List.map Moonpool.Fut.wait_block futs in
List.map (function
| Ok x -> x
| Error (exn, bt) -> Printexc.raise_with_backtrace exn bt
) results
with e ->
Logs.debug (fun k -> k "par-map: shutdown pool (exception)");
Moonpool.Fifo_pool.shutdown pool;
raise e
in
Logs.debug (fun k -> k "par-map: shutdown pool");
Moonpool.Fifo_pool.shutdown pool;
res

(* Map on the list [l] with each call to [f] being associated one of the
resources from [resources] that is guaranteed not to be used concurrently
Expand All @@ -329,21 +334,35 @@ module Par_map = struct
invalid_arg "map_with_resource: ~resources";
die_on_sigterm ();
let jobs = List.length resources in
let queue = CCBlockingQueue.create jobs in
List.iter (CCBlockingQueue.push queue) resources;
let f x =
let resource = CCBlockingQueue.take queue in
let queue = Moonpool.Blocking_queue.create () in
List.iter (Moonpool.Blocking_queue.push queue) resources;
let f_with_resource x =
let resource = Moonpool.Blocking_queue.pop queue in
Fun.protect
~finally:(fun () -> CCBlockingQueue.push queue resource)
~finally:(fun () -> Moonpool.Blocking_queue.push queue resource)
(fun () -> f resource x)
in
Logs.debug (fun m -> m "par-map: create pool j=%d" jobs);
let module P = CCPool.Make (struct
let max_size = jobs
end) in
let res = P.Fut.map_l (P.Fut.make1 f) l |> P.Fut.get in
Logs.debug (fun m -> m "par-map: stop pool");
P.stop ();
let pool = Moonpool.Fifo_pool.create ~num_threads:jobs () in
let res =
try
let futs = List.map (fun x ->
Moonpool.Fut.spawn ~on:pool (fun () -> f_with_resource x)
) l in
let results = List.map Moonpool.Fut.wait_block futs in
Moonpool.Blocking_queue.close queue;
List.map (function
| Ok x -> x
| Error (exn, bt) -> Printexc.raise_with_backtrace exn bt
) results
with e ->
Logs.debug (fun m -> m "par-map: shutdown pool (exception)");
Moonpool.Blocking_queue.close queue;
Moonpool.Fifo_pool.shutdown pool;
raise e
in
Logs.debug (fun m -> m "par-map: shutdown pool");
Moonpool.Fifo_pool.shutdown pool;
res
end

Expand Down Expand Up @@ -464,12 +483,17 @@ let mk_socket sockaddr =
let start_server n server_fun sock =
let open Unix in
listen sock 5;
CCThread.Arr.join
@@ CCThread.Arr.spawn n (fun _ ->
let s, _caller = accept_non_intr sock in
let inchan = in_channel_of_descr s in
let outchan = out_channel_of_descr s in
server_fun inchan outchan)
let threads =
List.init n (fun _ ->
Thread.create (fun () ->
let s, _caller = accept_non_intr sock in
let inchan = in_channel_of_descr s in
let outchan = out_channel_of_descr s in
server_fun inchan outchan
) ()
)
in
List.iter Thread.join threads

(** [establish_server n server_fun sockaddr] same as
[Unix.establish_server], but it uses threads instead of forking the process
Expand Down
12 changes: 6 additions & 6 deletions src/core/Task_queue.ml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(** {1 Task queue for the server} *)

open Common
module M = CCLock
module M = Moonpool.Lock
module Log = (val Logs.src_log (Logs.Src.create "benchpress.task-queue"))

type job = {
Expand Down Expand Up @@ -40,14 +40,14 @@ type api_job = { mutable aj_last_seen: float; mutable aj_interrupted: bool }

type t = {
defs: Definitions.t M.t;
jobs: job CCBlockingQueue.t;
jobs: job Moonpool.Blocking_queue.t;
jobs_tbl: (string, Job.t) Hashtbl.t;
api_jobs: (string, api_job) Hashtbl.t; (* last seen+descr *)
cur: job option M.t;
}

let defs self = self.defs
let size self = CCBlockingQueue.size self.jobs
let size self = Moonpool.Blocking_queue.size self.jobs
let cur_job self = M.get self.cur

let interrupt self ~uuid : bool =
Expand All @@ -62,7 +62,7 @@ let interrupt self ~uuid : bool =

let create ?(defs = Definitions.empty) () : t =
{
jobs = CCBlockingQueue.create 64;
jobs = Moonpool.Blocking_queue.create ();
defs = M.create defs;
jobs_tbl = Hashtbl.create 8;
api_jobs = Hashtbl.create 8;
Expand All @@ -85,11 +85,11 @@ let push self task : unit =
}
in
Hashtbl.add self.jobs_tbl j_uuid j;
CCBlockingQueue.push self.jobs j
Moonpool.Blocking_queue.push self.jobs j

let loop self =
while true do
let job = CCBlockingQueue.take self.jobs in
let job = Moonpool.Blocking_queue.pop self.jobs in
Profile.with_ "task-queue.job" @@ fun () ->
job.j_started_time <- Unix.gettimeofday ();
M.set self.cur (Some job);
Expand Down
2 changes: 1 addition & 1 deletion src/core/Task_queue.mli
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type t

val create : ?defs:Definitions.t -> unit -> t

val defs : t -> Definitions.t CCLock.t
val defs : t -> Definitions.t Moonpool.Lock.t
(** List of definitions available for tasks *)

val size : t -> int
Expand Down
2 changes: 1 addition & 1 deletion src/core/dune
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
(synopsis
"Benchpress core library, with all the data structures and functions")
(wrapped true)
(libraries containers containers.unix containers-thread re re.perl csv iter
(libraries containers containers.unix moonpool re re.perl csv iter
printbox printbox-text logs logs.cli gnuplot ptime ptime.clock.os uuidm
sqlite3 sqlite3_utils cmdliner pp_loc processor)
(flags :standard -w -5 -warn-error -a+8 -strict-sequence))
Expand Down
8 changes: 4 additions & 4 deletions src/core/profile.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ let spf = Printf.sprintf

type span = float

let out_ : out_channel CCLock.t option ref = ref None
let out_ : out_channel Moonpool.Lock.t option ref = ref None
let[@inline] enabled () = !out_ != None
let start_ = Unix.gettimeofday ()
let[@inline] now_us () = (Unix.gettimeofday () -. start_) *. 1e6
Expand All @@ -15,11 +15,11 @@ let enable () =
(* let oc = Unix.open_process_out "gzip --synchronous -c - > trace.json.gz" in *)
let oc = open_out "trace.json" in
output_string oc "[";
let out = CCLock.create oc in
let out = Moonpool.Lock.create oc in
out_ := Some out;
at_exit (fun () ->
out_ := None;
CCLock.with_lock out (fun oc ->
Moonpool.Lock.with_ out (fun oc ->
flush oc;
close_out_noerr oc))
)
Expand All @@ -44,7 +44,7 @@ let exit_real_ ?(args = []) sp name =
{json|{"ph":"X","name":%S,"pid":1,"tid":%d,"ts":%.1f,"dur":%.1f%s},|json}
name tid sp dur args
in
CCLock.with_lock out (fun oc ->
Moonpool.Lock.with_ out (fun oc ->
output_string oc s;
output_char oc '\n';
flush oc)
Expand Down
Loading