Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions bin/import.ml
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,50 @@ let command_alias ?orig_name cmd term name =
Cmd.v (Cmd.info name ~docs:"COMMAND ALIASES" ~doc ~man) term
;;

(* The build system has some global state which makes it unsafe for
multiple instances of it to be executed concurrently, so we ensure
serialization by holding this mutex while running the build system. *)
let build_system_mutex = Fiber.Mutex.create ()
(* Global build coordinator enables true concurrent builds.

RPC and watch mode builds can now execute simultaneously by using
per-build sessions (Build_session.t) managed by the coordinator.

Key changes from mutex-based serialization:
1. Each build gets its own session with isolated state
2. Multiple builds can run concurrently
3. Hooks are per-build (attached to session)
4. Progress and errors are tracked per-build

Path locks remain global (correct - prevents file conflicts). *)
let build_coordinator = lazy (Dune_engine.Build_coordinator.create ())

let build f =
Hooks.End_of_build.once Promote.Diff_promotion.finalize;
Fiber.Mutex.with_lock build_system_mutex ~f:(fun () -> Build_system.run f)
let open Fiber.O in
let coordinator = Lazy.force build_coordinator in
(* Clear promotion database if no builds are active. This ensures sequential
builds behave like the old implementation where the database was cleared
between builds, while concurrent builds can safely merge their entries. *)
let* has_active = Dune_engine.Build_coordinator.has_active_builds coordinator in
let* () =
if has_active then Fiber.return () else Promote.Diff_promotion.clear_db_if_idle ()
in
Dune_engine.Build_coordinator.submit coordinator ~f:(fun ctx ->
(* Register finalize hook for THIS build - capture session in closure
so finalize can run outside the Fiber.Var.set scope *)
Dune_engine.Build_session.add_finalize_hook ctx (fun () ->
Fiber.Var.set
Dune_engine.Build_system.current_session
(Some ctx)
Promote.Diff_promotion.finalize);
(* Run build with this build's context *)
Fiber.finalize
(fun () -> Build_system.run ctx f)
~finally:(fun () ->
(* Run finalize hooks after build completes and errors are reported *)
Dune_engine.Build_session.run_finalize_hooks ctx))
;;

let build_exn f =
Hooks.End_of_build.once Promote.Diff_promotion.finalize;
Fiber.Mutex.with_lock build_system_mutex ~f:(fun () -> Build_system.run_exn f)
let open Fiber.O in
let+ res = build f in
match res with
| Ok res -> res
| Error `Already_reported -> raise Dune_util.Report_error.Already_reported
;;
90 changes: 90 additions & 0 deletions src/dune_engine/build_coordinator.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
open Import
open Fiber.O

(* Coordinates multiple concurrent builds. Allows RPC and watch mode builds
to execute simultaneously while maintaining correctness guarantees. *)

type active_build =
{ context : Build_session.t
; completion : unit Fiber.Ivar.t
}

type t =
{ mutable active_builds : active_build Build_id.Map.t
; mutex : Fiber.Mutex.t
}

let create () = { active_builds = Build_id.Map.empty; mutex = Fiber.Mutex.create () }

let active_build_ids t =
Fiber.Mutex.with_lock t.mutex ~f:(fun () ->
Fiber.return (Build_id.Map.keys t.active_builds))
;;

let get_build_context t build_id =
Fiber.Mutex.with_lock t.mutex ~f:(fun () ->
Fiber.return
(match Build_id.Map.find t.active_builds build_id with
| Some build -> Some build.context
| None -> None))
;;

let get_all_contexts t =
Fiber.Mutex.with_lock t.mutex ~f:(fun () ->
let contexts =
Build_id.Map.to_list t.active_builds
|> List.map ~f:(fun (id, build) -> id, build.context)
in
Fiber.return contexts)
;;

let get_all_progress t =
let* contexts = get_all_contexts t in
Fiber.parallel_map contexts ~f:(fun (id, ctx) ->
let+ progress = Build_session.get_progress ctx in
id, progress)
;;

let get_all_errors t =
let* contexts = get_all_contexts t in
Fiber.parallel_map contexts ~f:(fun (id, ctx) ->
let+ errors = Build_session.get_errors ctx in
id, errors)
;;

let has_active_builds t =
Fiber.Mutex.with_lock t.mutex ~f:(fun () ->
Fiber.return (not (Build_id.Map.is_empty t.active_builds)))
;;

let register_build t context =
Fiber.Mutex.with_lock t.mutex ~f:(fun () ->
let build_id = Build_session.id context in
let completion = Fiber.Ivar.create () in
let active_build = { context; completion } in
t.active_builds <- Build_id.Map.add_exn t.active_builds build_id active_build;
Fiber.return completion)
;;

let unregister_build t build_id =
Fiber.Mutex.with_lock t.mutex ~f:(fun () ->
let* () =
match Build_id.Map.find t.active_builds build_id with
| Some build -> Fiber.Ivar.fill build.completion ()
| None -> Fiber.return ()
in
t.active_builds <- Build_id.Map.remove t.active_builds build_id;
Fiber.return ())
;;

let submit t ~f =
let context = Build_session.create () in
let build_id = Build_session.id context in
let* _completion = register_build t context in
Fiber.finalize (fun () -> f context) ~finally:(fun () -> unregister_build t build_id)
;;

let to_dyn t =
let active_ids = Build_id.Map.keys t.active_builds in
Dyn.record [ "active_builds", Dyn.list Build_id.to_dyn active_ids ]
;;
34 changes: 34 additions & 0 deletions src/dune_engine/build_coordinator.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
(** Coordinates multiple concurrent builds. Allows RPC and watch mode builds
to execute simultaneously while maintaining correctness. *)

type t

(** Create a new build coordinator *)
val create : unit -> t

(** Submit a build request. The function receives a fresh Build_session.t
and should execute the build using that context. Multiple calls to submit
can run concurrently. *)
val submit : t -> f:(Build_session.t -> 'a Fiber.t) -> 'a Fiber.t

(** Get list of currently active build IDs *)
val active_build_ids : t -> Build_id.t list Fiber.t

(** Get the context for a specific build (if still active) *)
val get_build_context : t -> Build_id.t -> Build_session.t option Fiber.t

(** Get all active build contexts *)
val get_all_contexts : t -> (Build_id.t * Build_session.t) list Fiber.t

(** Get progress for all active builds *)
val get_all_progress : t -> (Build_id.t * Build_session.Progress.t) list Fiber.t

(** Get errors for all active builds *)
val get_all_errors : t -> (Build_id.t * Build_system_error.Set.t) list Fiber.t

(** Check if there are any active builds *)
val has_active_builds : t -> bool Fiber.t

(** {1 Debugging} *)

val to_dyn : t -> Dyn.t
29 changes: 29 additions & 0 deletions src/dune_engine/build_id.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
open Import

(* Unique identifier for a build execution.
Allows tracking multiple concurrent builds. *)

type t = int

let counter = ref 0

let create () =
let id = !counter in
counter := id + 1;
id
;;

let equal = Int.equal
let compare = Int.compare
let to_dyn t = Dyn.int t
let hash = Int.hash

module T = struct
type nonrec t = t

let compare = compare
let to_dyn = to_dyn
end

module Map = Map.Make (T)
module Set = Set.Make (T) (Map)
18 changes: 18 additions & 0 deletions src/dune_engine/build_id.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
open Import

(** Unique identifier for a build execution. Allows tracking multiple
concurrent builds and associating progress, errors, and hooks with
specific build instances. *)

type t

(** Create a new unique build ID. Thread-safe via sequential counter. *)
val create : unit -> t

val equal : t -> t -> bool
val compare : t -> t -> Ordering.t
val to_dyn : t -> Dyn.t
val hash : t -> int

module Map : Map.S with type key = t
module Set : Set.S with type elt = t
128 changes: 128 additions & 0 deletions src/dune_engine/build_session.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
open Import
open Fiber.O

(* Per-build context that holds state for a single build execution.
This allows multiple builds to run concurrently without interfering
with each other. *)

module Progress = struct
type t =
{ number_of_rules_discovered : int
; number_of_rules_executed : int
; number_of_rules_failed : int
}

let init =
{ number_of_rules_discovered = 0
; number_of_rules_executed = 0
; number_of_rules_failed = 0
}
;;
end

type t =
{ id : Build_id.t
; progress : Progress.t Fiber.Svar.t
; errors : Build_system_error.Set.t Fiber.Svar.t
; pending_targets : Targets.t ref
; finalize_hooks : (unit -> unit Fiber.t) Queue.t
}

let create () =
{ id = Build_id.create ()
; progress = Fiber.Svar.create Progress.init
; errors = Fiber.Svar.create Build_system_error.Set.empty
; pending_targets = ref Targets.empty
; finalize_hooks = Queue.create ()
}
;;

let id t = t.id

(* Progress tracking *)

let get_progress t = Fiber.return (Fiber.Svar.read t.progress)
let set_progress t progress = Fiber.Svar.write t.progress progress

let update_progress t ~f =
let* current = get_progress t in
set_progress t (f current)
;;

let incr_rules_discovered t =
update_progress t ~f:(fun p ->
{ p with number_of_rules_discovered = p.number_of_rules_discovered + 1 })
;;

let incr_rules_executed t =
update_progress t ~f:(fun p ->
{ p with number_of_rules_executed = p.number_of_rules_executed + 1 })
;;

let incr_rules_failed t =
update_progress t ~f:(fun p ->
{ p with number_of_rules_failed = p.number_of_rules_failed + 1 })
;;

(* Error tracking *)

let get_errors t = Fiber.return (Fiber.Svar.read t.errors)

let add_errors t error_list =
let* current = get_errors t in
let updated = List.fold_left error_list ~init:current ~f:Build_system_error.Set.add in
Fiber.Svar.write t.errors updated
;;

let reset_errors t = Fiber.Svar.write t.errors Build_system_error.Set.empty

(* Hooks *)

let add_finalize_hook t hook = Queue.push t.finalize_hooks hook

let run_finalize_hooks t =
let open Fiber.O in
let hooks = Queue.to_list t.finalize_hooks in
Queue.clear t.finalize_hooks;
let* results =
Fiber.parallel_map hooks ~f:(fun hook -> Fiber.collect_errors (fun () -> hook ()))
in
let exns =
List.filter_map results ~f:(function
| Ok () -> None
| Error exns -> Some exns)
|> List.concat
in
match exns with
| [] -> Fiber.return ()
| [ { Exn_with_backtrace.exn = User_error.E _ as e; backtrace = _ } ] -> raise e
| exns ->
Code_error.raise
"finalize hooks failed"
[ "build_id", Build_id.to_dyn t.id
; "exns", Dyn.list Exn_with_backtrace.to_dyn exns
]
;;

(* Pending targets *)

let add_pending_targets t targets =
t.pending_targets := Targets.combine !(t.pending_targets) targets
;;

let remove_pending_targets t targets =
t.pending_targets := Targets.diff !(t.pending_targets) targets
;;

let cleanup_pending_targets t =
let targets = !(t.pending_targets) in
t.pending_targets := Targets.empty;
Targets.iter targets ~file:Path.Build.unlink_no_err ~dir:(fun p ->
Path.rm_rf (Path.build p))
;;

let to_dyn t =
let open Dyn in
record
[ "id", Build_id.to_dyn t.id; "pending_targets", Targets.to_dyn !(t.pending_targets) ]
;;
Loading
Loading