From 7713680a654248f108737d462f31fed9515ec37e Mon Sep 17 00:00:00 2001 From: Joel Reymont <18791+joelreymont@users.noreply.github.com> Date: Fri, 14 Nov 2025 19:57:28 +0200 Subject: [PATCH 1/5] Add Build_session and Build_coordinator modules Build_session provides per-build isolated state for progress tracking, error accumulation, pending targets, and finalize hooks. Build_coordinator manages concurrent build sessions with unique Build_id assignment and lifecycle tracking. Signed-off-by: Joel Reymont <18791+joelreymont@users.noreply.github.com> --- src/dune_engine/build_coordinator.ml | 90 ++++++++++++++++++ src/dune_engine/build_coordinator.mli | 34 +++++++ src/dune_engine/build_id.ml | 29 ++++++ src/dune_engine/build_id.mli | 18 ++++ src/dune_engine/build_session.ml | 128 ++++++++++++++++++++++++++ src/dune_engine/build_session.mli | 55 +++++++++++ src/dune_engine/dune_engine.ml | 3 + 7 files changed, 357 insertions(+) create mode 100644 src/dune_engine/build_coordinator.ml create mode 100644 src/dune_engine/build_coordinator.mli create mode 100644 src/dune_engine/build_id.ml create mode 100644 src/dune_engine/build_id.mli create mode 100644 src/dune_engine/build_session.ml create mode 100644 src/dune_engine/build_session.mli diff --git a/src/dune_engine/build_coordinator.ml b/src/dune_engine/build_coordinator.ml new file mode 100644 index 00000000000..6ab0555c911 --- /dev/null +++ b/src/dune_engine/build_coordinator.ml @@ -0,0 +1,90 @@ +open Import +open Fiber.O + +(* Coordinates multiple concurrent builds. Allows RPC and watch mode builds + to execute simultaneously while maintaining correctness guarantees. *) + +type active_build = + { context : Build_session.t + ; completion : unit Fiber.Ivar.t + } + +type t = + { mutable active_builds : active_build Build_id.Map.t + ; mutex : Fiber.Mutex.t + } + +let create () = { active_builds = Build_id.Map.empty; mutex = Fiber.Mutex.create () } + +let active_build_ids t = + Fiber.Mutex.with_lock t.mutex ~f:(fun () -> + Fiber.return (Build_id.Map.keys t.active_builds)) +;; + +let get_build_context t build_id = + Fiber.Mutex.with_lock t.mutex ~f:(fun () -> + Fiber.return + (match Build_id.Map.find t.active_builds build_id with + | Some build -> Some build.context + | None -> None)) +;; + +let get_all_contexts t = + Fiber.Mutex.with_lock t.mutex ~f:(fun () -> + let contexts = + Build_id.Map.to_list t.active_builds + |> List.map ~f:(fun (id, build) -> id, build.context) + in + Fiber.return contexts) +;; + +let get_all_progress t = + let* contexts = get_all_contexts t in + Fiber.parallel_map contexts ~f:(fun (id, ctx) -> + let+ progress = Build_session.get_progress ctx in + id, progress) +;; + +let get_all_errors t = + let* contexts = get_all_contexts t in + Fiber.parallel_map contexts ~f:(fun (id, ctx) -> + let+ errors = Build_session.get_errors ctx in + id, errors) +;; + +let has_active_builds t = + Fiber.Mutex.with_lock t.mutex ~f:(fun () -> + Fiber.return (not (Build_id.Map.is_empty t.active_builds))) +;; + +let register_build t context = + Fiber.Mutex.with_lock t.mutex ~f:(fun () -> + let build_id = Build_session.id context in + let completion = Fiber.Ivar.create () in + let active_build = { context; completion } in + t.active_builds <- Build_id.Map.add_exn t.active_builds build_id active_build; + Fiber.return completion) +;; + +let unregister_build t build_id = + Fiber.Mutex.with_lock t.mutex ~f:(fun () -> + let* () = + match Build_id.Map.find t.active_builds build_id with + | Some build -> Fiber.Ivar.fill build.completion () + | None -> Fiber.return () + in + t.active_builds <- Build_id.Map.remove t.active_builds build_id; + Fiber.return ()) +;; + +let submit t ~f = + let context = Build_session.create () in + let build_id = Build_session.id context in + let* _completion = register_build t context in + Fiber.finalize (fun () -> f context) ~finally:(fun () -> unregister_build t build_id) +;; + +let to_dyn t = + let active_ids = Build_id.Map.keys t.active_builds in + Dyn.record [ "active_builds", Dyn.list Build_id.to_dyn active_ids ] +;; diff --git a/src/dune_engine/build_coordinator.mli b/src/dune_engine/build_coordinator.mli new file mode 100644 index 00000000000..c9af603728d --- /dev/null +++ b/src/dune_engine/build_coordinator.mli @@ -0,0 +1,34 @@ +(** Coordinates multiple concurrent builds. Allows RPC and watch mode builds + to execute simultaneously while maintaining correctness. *) + +type t + +(** Create a new build coordinator *) +val create : unit -> t + +(** Submit a build request. The function receives a fresh Build_session.t + and should execute the build using that context. Multiple calls to submit + can run concurrently. *) +val submit : t -> f:(Build_session.t -> 'a Fiber.t) -> 'a Fiber.t + +(** Get list of currently active build IDs *) +val active_build_ids : t -> Build_id.t list Fiber.t + +(** Get the context for a specific build (if still active) *) +val get_build_context : t -> Build_id.t -> Build_session.t option Fiber.t + +(** Get all active build contexts *) +val get_all_contexts : t -> (Build_id.t * Build_session.t) list Fiber.t + +(** Get progress for all active builds *) +val get_all_progress : t -> (Build_id.t * Build_session.Progress.t) list Fiber.t + +(** Get errors for all active builds *) +val get_all_errors : t -> (Build_id.t * Build_system_error.Set.t) list Fiber.t + +(** Check if there are any active builds *) +val has_active_builds : t -> bool Fiber.t + +(** {1 Debugging} *) + +val to_dyn : t -> Dyn.t diff --git a/src/dune_engine/build_id.ml b/src/dune_engine/build_id.ml new file mode 100644 index 00000000000..a4464436a7f --- /dev/null +++ b/src/dune_engine/build_id.ml @@ -0,0 +1,29 @@ +open Import + +(* Unique identifier for a build execution. + Allows tracking multiple concurrent builds. *) + +type t = int + +let counter = ref 0 + +let create () = + let id = !counter in + counter := id + 1; + id +;; + +let equal = Int.equal +let compare = Int.compare +let to_dyn t = Dyn.int t +let hash = Int.hash + +module T = struct + type nonrec t = t + + let compare = compare + let to_dyn = to_dyn +end + +module Map = Map.Make (T) +module Set = Set.Make (T) (Map) diff --git a/src/dune_engine/build_id.mli b/src/dune_engine/build_id.mli new file mode 100644 index 00000000000..6839c1dc747 --- /dev/null +++ b/src/dune_engine/build_id.mli @@ -0,0 +1,18 @@ +open Import + +(** Unique identifier for a build execution. Allows tracking multiple + concurrent builds and associating progress, errors, and hooks with + specific build instances. *) + +type t + +(** Create a new unique build ID. Thread-safe via sequential counter. *) +val create : unit -> t + +val equal : t -> t -> bool +val compare : t -> t -> Ordering.t +val to_dyn : t -> Dyn.t +val hash : t -> int + +module Map : Map.S with type key = t +module Set : Set.S with type elt = t diff --git a/src/dune_engine/build_session.ml b/src/dune_engine/build_session.ml new file mode 100644 index 00000000000..29cd174a633 --- /dev/null +++ b/src/dune_engine/build_session.ml @@ -0,0 +1,128 @@ +open Import +open Fiber.O + +(* Per-build context that holds state for a single build execution. + This allows multiple builds to run concurrently without interfering + with each other. *) + +module Progress = struct + type t = + { number_of_rules_discovered : int + ; number_of_rules_executed : int + ; number_of_rules_failed : int + } + + let init = + { number_of_rules_discovered = 0 + ; number_of_rules_executed = 0 + ; number_of_rules_failed = 0 + } + ;; +end + +type t = + { id : Build_id.t + ; progress : Progress.t Fiber.Svar.t + ; errors : Build_system_error.Set.t Fiber.Svar.t + ; pending_targets : Targets.t ref + ; finalize_hooks : (unit -> unit Fiber.t) Queue.t + } + +let create () = + { id = Build_id.create () + ; progress = Fiber.Svar.create Progress.init + ; errors = Fiber.Svar.create Build_system_error.Set.empty + ; pending_targets = ref Targets.empty + ; finalize_hooks = Queue.create () + } +;; + +let id t = t.id + +(* Progress tracking *) + +let get_progress t = Fiber.return (Fiber.Svar.read t.progress) +let set_progress t progress = Fiber.Svar.write t.progress progress + +let update_progress t ~f = + let* current = get_progress t in + set_progress t (f current) +;; + +let incr_rules_discovered t = + update_progress t ~f:(fun p -> + { p with number_of_rules_discovered = p.number_of_rules_discovered + 1 }) +;; + +let incr_rules_executed t = + update_progress t ~f:(fun p -> + { p with number_of_rules_executed = p.number_of_rules_executed + 1 }) +;; + +let incr_rules_failed t = + update_progress t ~f:(fun p -> + { p with number_of_rules_failed = p.number_of_rules_failed + 1 }) +;; + +(* Error tracking *) + +let get_errors t = Fiber.return (Fiber.Svar.read t.errors) + +let add_errors t error_list = + let* current = get_errors t in + let updated = List.fold_left error_list ~init:current ~f:Build_system_error.Set.add in + Fiber.Svar.write t.errors updated +;; + +let reset_errors t = Fiber.Svar.write t.errors Build_system_error.Set.empty + +(* Hooks *) + +let add_finalize_hook t hook = Queue.push t.finalize_hooks hook + +let run_finalize_hooks t = + let open Fiber.O in + let hooks = Queue.to_list t.finalize_hooks in + Queue.clear t.finalize_hooks; + let* results = + Fiber.parallel_map hooks ~f:(fun hook -> Fiber.collect_errors (fun () -> hook ())) + in + let exns = + List.filter_map results ~f:(function + | Ok () -> None + | Error exns -> Some exns) + |> List.concat + in + match exns with + | [] -> Fiber.return () + | [ { Exn_with_backtrace.exn = User_error.E _ as e; backtrace = _ } ] -> raise e + | exns -> + Code_error.raise + "finalize hooks failed" + [ "build_id", Build_id.to_dyn t.id + ; "exns", Dyn.list Exn_with_backtrace.to_dyn exns + ] +;; + +(* Pending targets *) + +let add_pending_targets t targets = + t.pending_targets := Targets.combine !(t.pending_targets) targets +;; + +let remove_pending_targets t targets = + t.pending_targets := Targets.diff !(t.pending_targets) targets +;; + +let cleanup_pending_targets t = + let targets = !(t.pending_targets) in + t.pending_targets := Targets.empty; + Targets.iter targets ~file:Path.Build.unlink_no_err ~dir:(fun p -> + Path.rm_rf (Path.build p)) +;; + +let to_dyn t = + let open Dyn in + record + [ "id", Build_id.to_dyn t.id; "pending_targets", Targets.to_dyn !(t.pending_targets) ] +;; diff --git a/src/dune_engine/build_session.mli b/src/dune_engine/build_session.mli new file mode 100644 index 00000000000..d66b600a162 --- /dev/null +++ b/src/dune_engine/build_session.mli @@ -0,0 +1,55 @@ +open Import + +(** Per-build context that holds state for a single build execution. + This allows multiple builds to run concurrently without interfering. *) + +module Progress : sig + type t = + { number_of_rules_discovered : int + ; number_of_rules_executed : int + ; number_of_rules_failed : int + } + + val init : t +end + +type t + +(** Create a new build context with fresh state *) +val create : unit -> t + +(** Get the unique ID for this build *) +val id : t -> Build_id.t + +(** {1 Progress tracking} *) + +val get_progress : t -> Progress.t Fiber.t +val set_progress : t -> Progress.t -> unit Fiber.t +val update_progress : t -> f:(Progress.t -> Progress.t) -> unit Fiber.t +val incr_rules_discovered : t -> unit Fiber.t +val incr_rules_executed : t -> unit Fiber.t +val incr_rules_failed : t -> unit Fiber.t + +(** {1 Error tracking} *) + +val get_errors : t -> Build_system_error.Set.t Fiber.t +val add_errors : t -> Build_system_error.t list -> unit Fiber.t +val reset_errors : t -> unit Fiber.t + +(** {1 Finalize hooks} *) + +(** Add a hook to run when this build completes *) +val add_finalize_hook : t -> (unit -> unit Fiber.t) -> unit + +(** Run all finalize hooks for this build *) +val run_finalize_hooks : t -> unit Fiber.t + +(** {1 Pending targets} *) + +val add_pending_targets : t -> Targets.t -> unit +val remove_pending_targets : t -> Targets.t -> unit +val cleanup_pending_targets : t -> unit + +(** {1 Debugging} *) + +val to_dyn : t -> Dyn.t diff --git a/src/dune_engine/dune_engine.ml b/src/dune_engine/dune_engine.ml index 1bb71d5b421..a75613cad07 100644 --- a/src/dune_engine/dune_engine.ml +++ b/src/dune_engine/dune_engine.ml @@ -16,7 +16,10 @@ module Rules = Rules module Rule = Rule module Targets = Dune_targets module Target_promotion = Target_promotion +module Build_id = Build_id module Build_context = Build_context +module Build_session = Build_session +module Build_coordinator = Build_coordinator module Build_config = Build_config module Build_system = Build_system module Build_system_error = Build_system_error From cc7d2484221ff11fb199cce4930f5d7bdae626cf Mon Sep 17 00:00:00 2001 From: Joel Reymont <18791+joelreymont@users.noreply.github.com> Date: Fri, 14 Nov 2025 19:57:50 +0200 Subject: [PATCH 2/5] Integrate concurrent builds with per-session state tracking Build_system.run now accepts Build_session.t parameter and stores it in fiber-local storage. Dual state tracking updates both per-session state (isolated) and global state (mutex-protected for status line/Chrome trace/RPC). Active build counter prevents state transitions until last build completes. Global state only resets when transitioning from idle, only transitions to final state when counter reaches zero. Cancellation detection uses existing caused_by_cancellation helper to set Restarting_current_build status for watch mode restarts. bin/import.ml routes all builds through Build_coordinator.submit. Signed-off-by: Joel Reymont <18791+joelreymont@users.noreply.github.com> --- bin/import.ml | 32 ++++-- src/dune_engine/build_system.ml | 171 ++++++++++++++++++++++--------- src/dune_engine/build_system.mli | 14 ++- 3 files changed, 156 insertions(+), 61 deletions(-) diff --git a/bin/import.ml b/bin/import.ml index ba78f7e9fe0..2781d70b3af 100644 --- a/bin/import.ml +++ b/bin/import.ml @@ -155,17 +155,33 @@ let command_alias ?orig_name cmd term name = Cmd.v (Cmd.info name ~docs:"COMMAND ALIASES" ~doc ~man) term ;; -(* The build system has some global state which makes it unsafe for - multiple instances of it to be executed concurrently, so we ensure - serialization by holding this mutex while running the build system. *) -let build_system_mutex = Fiber.Mutex.create () +(* Global build coordinator enables true concurrent builds. + + RPC and watch mode builds can now execute simultaneously by using + per-build sessions (Build_session.t) managed by the coordinator. + + Key changes from mutex-based serialization: + 1. Each build gets its own session with isolated state + 2. Multiple builds can run concurrently + 3. Hooks are per-build (attached to session) + 4. Progress and errors are tracked per-build + + Path locks remain global (correct - prevents file conflicts). *) +let build_coordinator = lazy (Dune_engine.Build_coordinator.create ()) let build f = - Hooks.End_of_build.once Promote.Diff_promotion.finalize; - Fiber.Mutex.with_lock build_system_mutex ~f:(fun () -> Build_system.run f) + Dune_engine.Build_coordinator.submit (Lazy.force build_coordinator) ~f:(fun ctx -> + (* Register finalize hook for THIS build *) + Dune_engine.Build_session.add_finalize_hook ctx Promote.Diff_promotion.finalize; + (* Run build with this build's context - finalize hooks will run + inside Build_system.run before the session scope ends *) + Build_system.run ctx f) ;; let build_exn f = - Hooks.End_of_build.once Promote.Diff_promotion.finalize; - Fiber.Mutex.with_lock build_system_mutex ~f:(fun () -> Build_system.run_exn f) + let open Fiber.O in + let+ res = build f in + match res with + | Ok res -> res + | Error `Already_reported -> raise Dune_util.Report_error.Already_reported ;; diff --git a/src/dune_engine/build_system.ml b/src/dune_engine/build_system.ml index d133a904c25..1c3bc38837e 100644 --- a/src/dune_engine/build_system.ml +++ b/src/dune_engine/build_system.ml @@ -2,6 +2,11 @@ open Import open Memo.O module Error = Build_system_error +(* Per-build session stored in fiber-local storage. + This allows multiple concurrent builds while maintaining isolated state. *) +let current_session : Build_session.t option Fiber.Var.t = Fiber.Var.create None +let get_current_session_exn () = Fiber.Var.get_exn current_session + module Progress = struct type t = { number_of_rules_discovered : int @@ -44,7 +49,10 @@ module State = struct | Build_succeeded__now_waiting_for_changes | Build_failed__now_waiting_for_changes - let to_dyn = function + let to_dyn + = + (* Kept for potential debugging use *) + function | Initializing -> Dyn.variant "Initializing" [] | Building progress -> Dyn.variant "Building" [ Progress.to_dyn progress ] | Restarting_current_build -> Dyn.variant "Restarting_current_build" [] @@ -54,6 +62,8 @@ module State = struct Dyn.variant "Build_failed__now_waiting_for_changes" [] ;; + let _ = to_dyn + let equal x y = match x, y with | Building x, Building y -> Progress.equal x y @@ -68,13 +78,21 @@ module State = struct | Build_failed__now_waiting_for_changes, _ -> false ;; - let t = Fiber.Svar.create Initializing + (* Global state aggregated from all concurrent builds for status line/RPC/Chrome trace. + Protected by mutex to prevent corruption when multiple builds update simultaneously. *) + let t = Svar.create Initializing + let errors = Svar.create Error.Set.empty + let state_mutex = Fiber.Mutex.create () + + (* Track number of active builds to prevent premature state transitions. + Only reset to idle states when the last build completes. *) + let active_builds = ref 0 + let increment_active_builds () = active_builds := !active_builds + 1 + let decrement_active_builds () = active_builds := !active_builds - 1 + let get_active_builds () = !active_builds (* This mutable table is safe: it maps paths to lazily created mutexes. *) let locks : (Path.t, Fiber.Mutex.t) Table.t = Table.create (module Path) 32 - - (* This mutex ensures that at most one [run] is running in parallel. *) - let build_mutex = Fiber.Mutex.create () let reset_progress () = Svar.write t (Building Progress.init) let set what = Svar.write t what @@ -89,26 +107,44 @@ module State = struct ;; let incr_rule_done_exn () = - update_build_progress_exn ~f:(fun p -> - { p with number_of_rules_executed = p.number_of_rules_executed + 1 }) + let open Fiber.O in + (* Update per-build session *) + let* session = get_current_session_exn () in + let* () = Build_session.incr_rules_executed session in + (* Also update global state (synchronized) for status line/Chrome trace *) + Fiber.Mutex.with_lock state_mutex ~f:(fun () -> + update_build_progress_exn ~f:(fun p -> + { p with number_of_rules_executed = p.number_of_rules_executed + 1 })) ;; let start_rule_exn () = - update_build_progress_exn ~f:(fun p -> - { p with number_of_rules_discovered = p.number_of_rules_discovered + 1 }) + let open Fiber.O in + (* Update per-build session *) + let* session = get_current_session_exn () in + let* () = Build_session.incr_rules_discovered session in + (* Also update global state (synchronized) for status line/Chrome trace *) + Fiber.Mutex.with_lock state_mutex ~f:(fun () -> + update_build_progress_exn ~f:(fun p -> + { p with number_of_rules_discovered = p.number_of_rules_discovered + 1 })) ;; - let errors = Svar.create Error.Set.empty let reset_errors () = Svar.write errors Error.Set.empty let add_errors error_list = let open Fiber.O in - let* () = - update_build_progress_exn ~f:(fun p -> - { p with number_of_rules_failed = p.number_of_rules_failed + 1 }) - in - List.fold_left error_list ~init:(Svar.read errors) ~f:Error.Set.add - |> Svar.write errors + (* Update per-build session *) + let* session = get_current_session_exn () in + let* () = Build_session.add_errors session error_list in + let* () = Build_session.incr_rules_failed session in + (* Also update global state (synchronized) for status line *) + Fiber.Mutex.with_lock state_mutex ~f:(fun () -> + let open Fiber.O in + let* () = + update_build_progress_exn ~f:(fun p -> + { p with number_of_rules_failed = p.number_of_rules_failed + 1 }) + in + List.fold_left error_list ~init:(Svar.read errors) ~f:Error.Set.add + |> Svar.write errors) ;; end @@ -121,20 +157,21 @@ let rec with_locks ~f = function ;; module Pending_targets = struct - (* All file and directory targets of non-sandboxed actions that are currently - being executed. On exit, we need to delete them as they might contain - garbage. *) + (* Per-build pending targets are now tracked in Build_session. + These functions update the current session's pending targets. *) - let t = ref Targets.empty - let remove targets = t := Targets.diff !t (Targets.Validated.unvalidate targets) - let add targets = t := Targets.combine !t (Targets.Validated.unvalidate targets) + let remove targets = + let open Fiber.O in + let* session = get_current_session_exn () in + Build_session.remove_pending_targets session (Targets.Validated.unvalidate targets); + Fiber.return () + ;; - let () = - Hooks.End_of_build.always (fun () -> - let targets = !t in - t := Targets.empty; - Targets.iter targets ~file:Path.Build.unlink_no_err ~dir:(fun p -> - Path.rm_rf (Path.build p))) + let add targets = + let open Fiber.O in + let* session = get_current_session_exn () in + Build_session.add_pending_targets session (Targets.Validated.unvalidate targets); + Fiber.return () ;; end @@ -385,7 +422,8 @@ end = struct | None -> (* If the action is not sandboxed, we use [pending_file_targets] to clean up the build directory if the action is interrupted. *) - Pending_targets.add targets; + let open Fiber.O in + let* () = Pending_targets.add targets in Fiber.return None in let action = @@ -426,9 +464,7 @@ end = struct ~finally:(fun () -> match sandbox with | Some sandbox -> Sandbox.destroy sandbox - | None -> - Pending_targets.remove targets; - Fiber.return ()) + | None -> Pending_targets.remove targets) (fun () -> with_locks locks ~f:(fun () -> let* action_exec_result = @@ -1149,36 +1185,69 @@ let handle_final_exns exns = List.iter exns ~f:report ;; -let run f = +let run session f = let open Fiber.O in - let* () = State.reset_progress () in - let* () = State.reset_errors () in - let f () = + (* Set the current session in fiber-local storage for the duration of this build *) + Fiber.Var.set current_session (Some session) (fun () -> + (* Register this build and initialize global state if first build. *) + let* () = + Fiber.Mutex.with_lock State.state_mutex ~f:(fun () -> + let open Fiber.O in + let was_idle = State.get_active_builds () = 0 in + State.increment_active_builds (); + let current_state = Fiber.Svar.read State.t in + (* Only reset when transitioning from idle states *) + if was_idle + then ( + match current_state with + | Initializing + | Build_succeeded__now_waiting_for_changes + | Build_failed__now_waiting_for_changes + | Restarting_current_build -> + let* () = State.reset_progress () in + State.reset_errors () + | Building _ -> + (* Should not happen: counter was 0 but state is Building *) + Fiber.return ()) + else Fiber.return ()) + in + let* () = Build_session.reset_errors session in let* res = Fiber.collect_errors (fun () -> Memo.run_with_error_handler f ~handle_error_no_raise:report_early_exn) in - let* () = (Build_config.get ()).write_error_summary (Fiber.Svar.read State.errors) in + let* errors = Build_session.get_errors session in + let* () = (Build_config.get ()).write_error_summary errors in + (* Cleanup pending targets for this session *) + Build_session.cleanup_pending_targets session; + (* Update global state: only transition to final state if this is the last build *) + let* () = + Fiber.Mutex.with_lock State.state_mutex ~f:(fun () -> + State.decrement_active_builds (); + if State.get_active_builds () = 0 + then ( + (* Last build finishing - transition to final state *) + let final_status = + match res with + | Ok _ -> State.Build_succeeded__now_waiting_for_changes + | Error exns -> + if List.exists exns ~f:caused_by_cancellation + then State.Restarting_current_build + else State.Build_failed__now_waiting_for_changes + in + State.set final_status) + else Fiber.return ()) + in match res with - | Ok res -> - let+ () = State.set Build_succeeded__now_waiting_for_changes in - Ok res + | Ok res -> Fiber.return (Ok res) | Error exns -> handle_final_exns exns; - let final_status = - if List.exists exns ~f:caused_by_cancellation - then State.Restarting_current_build - else Build_failed__now_waiting_for_changes - in - let+ () = State.set final_status in - Error `Already_reported - in - Fiber.Mutex.with_lock State.build_mutex ~f + Fiber.return (Error `Already_reported)) ;; -let run_exn f = +let run_exn session f = let open Fiber.O in - let+ res = run f in + let+ res = run session f in match res with | Ok res -> res | Error `Already_reported -> raise Dune_util.Report_error.Already_reported diff --git a/src/dune_engine/build_system.mli b/src/dune_engine/build_system.mli index 4bd899806d1..05876ed171e 100644 --- a/src/dune_engine/build_system.mli +++ b/src/dune_engine/build_system.mli @@ -2,6 +2,13 @@ open Import +(** Fiber-local variable holding the current build session *) +val current_session : Build_session.t option Fiber.Var.t + +(** Get the current build session from fiber-local storage. + Raises if no session is set (only valid during a build). *) +val get_current_session_exn : unit -> Build_session.t Fiber.t + (** Build a target, which may be a file or a directory. *) val build_file : Path.t -> unit Memo.t @@ -54,10 +61,13 @@ val dep_on_alias_definition : Rules.Dir_rules.Alias_spec.item -> unit Action_bui (** {2 Running the build system} *) -val run : (unit -> 'a Memo.t) -> ('a, [ `Already_reported ]) Result.t Fiber.t +val run + : Build_session.t + -> (unit -> 'a Memo.t) + -> ('a, [ `Already_reported ]) Result.t Fiber.t (** A variant of [run] that raises an [Already_reported] exception on error. *) -val run_exn : (unit -> 'a Memo.t) -> 'a Fiber.t +val run_exn : Build_session.t -> (unit -> 'a Memo.t) -> 'a Fiber.t (** {2 Misc} *) From 1911eae271d75cf55e1fcb836c9265ed163a80e6 Mon Sep 17 00:00:00 2001 From: Joel Reymont <18791+joelreymont@users.noreply.github.com> Date: Fri, 14 Nov 2025 20:00:22 +0200 Subject: [PATCH 3/5] Add per-session promotion with atomic merge Promotion databases are keyed by Build_id for per-session isolation. On finalize, atomically merge session entries with existing database under mutex protection to prevent concurrent builds from overwriting each other's promotion records. All promotion entries are preserved, not just the last build. Signed-off-by: Joel Reymont <18791+joelreymont@users.noreply.github.com> --- bin/import.ml | 29 ++++++-- src/promote/diff_action.ml | 30 +++++--- src/promote/diff_promotion.ml | 130 +++++++++++++++++++++++++-------- src/promote/diff_promotion.mli | 13 +++- 4 files changed, 152 insertions(+), 50 deletions(-) diff --git a/bin/import.ml b/bin/import.ml index 2781d70b3af..b7e49d779ff 100644 --- a/bin/import.ml +++ b/bin/import.ml @@ -170,12 +170,29 @@ let command_alias ?orig_name cmd term name = let build_coordinator = lazy (Dune_engine.Build_coordinator.create ()) let build f = - Dune_engine.Build_coordinator.submit (Lazy.force build_coordinator) ~f:(fun ctx -> - (* Register finalize hook for THIS build *) - Dune_engine.Build_session.add_finalize_hook ctx Promote.Diff_promotion.finalize; - (* Run build with this build's context - finalize hooks will run - inside Build_system.run before the session scope ends *) - Build_system.run ctx f) + let open Fiber.O in + let coordinator = Lazy.force build_coordinator in + (* Clear promotion database if no builds are active. This ensures sequential + builds behave like the old implementation where the database was cleared + between builds, while concurrent builds can safely merge their entries. *) + let* has_active = Dune_engine.Build_coordinator.has_active_builds coordinator in + let* () = + if has_active then Fiber.return () else Promote.Diff_promotion.clear_db_if_idle () + in + Dune_engine.Build_coordinator.submit coordinator ~f:(fun ctx -> + (* Register finalize hook for THIS build - capture session in closure + so finalize can run outside the Fiber.Var.set scope *) + Dune_engine.Build_session.add_finalize_hook ctx (fun () -> + Fiber.Var.set + Dune_engine.Build_system.current_session + (Some ctx) + Promote.Diff_promotion.finalize); + (* Run build with this build's context *) + Fiber.finalize + (fun () -> Build_system.run ctx f) + ~finally:(fun () -> + (* Run finalize hooks after build completes and errors are reported *) + Dune_engine.Build_session.run_finalize_hooks ctx)) ;; let build_exn f = diff --git a/src/promote/diff_action.ml b/src/promote/diff_action.ml index dadacc0aa28..15801f3b92a 100644 --- a/src/promote/diff_action.ml +++ b/src/promote/diff_action.ml @@ -65,17 +65,25 @@ let exec ~rule_loc ({ Diff.optional; file1; file2; mode } as diff) = (Path.build file2) ~skip_trailing_cr:(mode = Text && Sys.win32)) ~finally:(fun () -> - (match optional with - | false -> - (* Promote if in the source tree or not a target. The second case - means that the diffing have been done with the empty file *) - if in_source_or_target && not (is_copied_from_source_tree (Path.build file2)) - then Diff_promotion.File.register_dep ~source_file ~correction_file:file2 - | true -> - if in_source_or_target - then - Diff_promotion.File.register_intermediate ~source_file ~correction_file:file2 - else remove_intermediate_file ()); + let open Fiber.O in + let* () = + match optional with + | false -> + (* Promote if in the source tree or not a target. The second case + means that the diffing have been done with the empty file *) + if in_source_or_target && not (is_copied_from_source_tree (Path.build file2)) + then Diff_promotion.File.register_dep ~source_file ~correction_file:file2 + else Fiber.return () + | true -> + if in_source_or_target + then + Diff_promotion.File.register_intermediate + ~source_file + ~correction_file:file2 + else ( + remove_intermediate_file (); + Fiber.return ()) + in Fiber.return ())) ;; diff --git a/src/promote/diff_promotion.ml b/src/promote/diff_promotion.ml index fc3d1115a88..42c19014d8f 100644 --- a/src/promote/diff_promotion.ml +++ b/src/promote/diff_promotion.ml @@ -30,23 +30,40 @@ module File = struct ] ;; - let db : t list ref = ref [] + (* Per-session promotion databases to avoid races with concurrent builds. + Each build session has its own DB that is accessed via fiber-local storage. *) + let dbs : (Dune_engine.Build_id.t, t list ref) Table.t = + Table.create (module Dune_engine.Build_id) 16 + ;; + + let get_current_db () = + let open Fiber.O in + let* session = Dune_engine.Build_system.get_current_session_exn () in + let build_id = Dune_engine.Build_session.id session in + Fiber.return (Table.find_or_add dbs build_id ~f:(fun _ -> ref [])) + ;; let register_dep ~source_file ~correction_file = + let open Fiber.O in + let* db = get_current_db () in db := { src = snd (Path.Build.split_sandbox_root correction_file) ; staging = None ; dst = source_file } - :: !db + :: !db; + Fiber.return () ;; let register_intermediate ~source_file ~correction_file = + let open Fiber.O in let staging = in_staging_area source_file in Path.mkdir_p (Path.build (Option.value_exn (Path.Build.parent staging))); Unix.rename (Path.Build.to_string correction_file) (Path.Build.to_string staging); let src = snd (Path.Build.split_sandbox_root correction_file) in - db := { src; staging = Some staging; dst = source_file } :: !db + let* db = get_current_db () in + db := { src; staging = Some staging; dst = source_file } :: !db; + Fiber.return () ;; let do_promote ~correction_file ~dst = @@ -66,33 +83,44 @@ module File = struct let promote ({ src; staging; dst } as file) = let correction_file = correction_file file in let correction_exists = Path.exists correction_file in - Console.print - [ Pp.box - ~indent:2 - (if correction_exists - then - Pp.textf - "Promoting %s to %s." - (Path.to_string_maybe_quoted (Path.build src)) - (Path.Source.to_string_maybe_quoted dst) - else - Pp.textf - "Skipping promotion of %s to %s as the %s is missing." - (Path.to_string_maybe_quoted (Path.build src)) - (Path.Source.to_string_maybe_quoted dst) - (match staging with - | None -> "file" - | Some staging -> - Format.sprintf - "staging file (%s)" - (Path.to_string_maybe_quoted (Path.build staging)))) - ]; + let dst_is_directory = Path.is_directory (Path.source dst) in + (* Only print promotion message if the destination is not a directory. + If dst is a directory, the promotion will fail but we should not + print a success message before the failure. *) + if not dst_is_directory + then + Console.print + [ Pp.box + ~indent:2 + (if correction_exists + then + Pp.textf + "Promoting %s to %s." + (Path.to_string_maybe_quoted (Path.build src)) + (Path.Source.to_string_maybe_quoted dst) + else + Pp.textf + "Skipping promotion of %s to %s as the %s is missing." + (Path.to_string_maybe_quoted (Path.build src)) + (Path.Source.to_string_maybe_quoted dst) + (match staging with + | None -> "file" + | Some staging -> + Format.sprintf + "staging file (%s)" + (Path.to_string_maybe_quoted (Path.build staging)))) + ]; if correction_exists then do_promote ~correction_file ~dst ;; end -let clear_cache () = File.db := [] -let () = Hooks.End_of_build.always clear_cache +let clear_cache_for_session build_id = + match Table.find File.dbs build_id with + | Some db -> + db := []; + Table.remove File.dbs build_id + | None -> () +;; module P = Persistent.Make (struct type t = File.t list @@ -122,6 +150,33 @@ let dump_db db = let load_db () = Option.value ~default:[] (P.load db_file) +(* Mutex to protect concurrent updates to the promotion database file *) +let db_mutex = Fiber.Mutex.create () + +(* Clear the promotion database. Called when starting a build with no other + active builds, ensuring sequential builds behave like the old implementation + where the database was cleared between builds. *) +let clear_db_if_idle () = + Fiber.Mutex.with_lock db_mutex ~f:(fun () -> + dump_db []; + Fiber.return ()) +;; + +(* Merge new entries into existing database, removing duplicates *) +let merge_db_entries ~existing ~new_entries = + (* Build a set of existing entries for deduplication *) + let existing_set = + List.fold_left existing ~init:[] ~f:(fun acc entry -> + if List.exists acc ~f:(fun e -> File.compare e entry = Eq) + then acc + else entry :: acc) + in + (* Add new entries that aren't already present *) + List.fold_left new_entries ~init:existing_set ~f:(fun acc entry -> + if List.exists acc ~f:(fun e -> File.compare e entry = Eq) then acc else entry :: acc) + |> List.rev +;; + let group_by_targets db = List.map db ~f:(fun { File.src; staging; dst } -> dst, (src, staging)) |> Path.Source.Map.of_list_multi @@ -178,12 +233,27 @@ let do_promote db files_to_promote = ;; let finalize () = - let db = + let open Fiber.O in + let* current_db = File.get_current_db () in + let* session = Dune_engine.Build_system.get_current_session_exn () in + let build_id = Dune_engine.Build_session.id session in + let session_entries = match !Dune_engine.Clflags.promote with - | Some Automatically -> do_promote !File.db All - | Some Never | None -> !File.db + | Some Automatically -> do_promote !current_db All + | Some Never | None -> !current_db in - dump_db db + (* Merge this session's entries with existing database atomically *) + let* () = + Fiber.Mutex.with_lock db_mutex ~f:(fun () -> + let existing_db = load_db () in + let merged_db = + merge_db_entries ~existing:existing_db ~new_entries:session_entries + in + dump_db merged_db; + Fiber.return ()) + in + clear_cache_for_session build_id; + Fiber.return () ;; let promote_files_registered_in_last_run files_to_promote = diff --git a/src/promote/diff_promotion.mli b/src/promote/diff_promotion.mli index 321ab59f8a8..0a479cf35fe 100644 --- a/src/promote/diff_promotion.mli +++ b/src/promote/diff_promotion.mli @@ -13,18 +13,25 @@ module File : sig val register_intermediate : source_file:Path.Source.t -> correction_file:Path.Build.t - -> unit + -> unit Fiber.t (** Register file to promote where the correction file is a dependency of the current action (rather than an intermediate file). [correction_file] refers to a path in the build dir, not in the sandbox (it can point to the sandbox, but the sandbox root will be stripped). *) - val register_dep : source_file:Path.Source.t -> correction_file:Path.Build.t -> unit + val register_dep + : source_file:Path.Source.t + -> correction_file:Path.Build.t + -> unit Fiber.t end (** Promote all registered files if [!Clflags.auto_promote]. Otherwise dump the list of registered files to [_build/.to-promote]. *) -val finalize : unit -> unit +val finalize : unit -> unit Fiber.t + +(** Clear the promotion database. Called when starting a build with no other + active builds to ensure sequential builds work like the old implementation. *) +val clear_db_if_idle : unit -> unit Fiber.t val load_db : unit -> File.t list val filter_db : Dune_rpc_private.Files_to_promote.t -> File.t list -> File.t list From 39eb1cdbee16e9c0603f4c69d4eb4471de8ff29d Mon Sep 17 00:00:00 2001 From: Joel Reymont <18791+joelreymont@users.noreply.github.com> Date: Fri, 14 Nov 2025 20:00:31 +0200 Subject: [PATCH 4/5] Add tests for concurrent build correctness Tests verify that multiple concurrent builds maintain isolation and correctness without race conditions or state corruption. Signed-off-by: Joel Reymont <18791+joelreymont@users.noreply.github.com> --- .../test-cases/concurrent-hook-race.t | 63 +++++++++++++++++++ .../test-cases/true-concurrent-builds.t | 63 +++++++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 test/blackbox-tests/test-cases/concurrent-hook-race.t create mode 100644 test/blackbox-tests/test-cases/true-concurrent-builds.t diff --git a/test/blackbox-tests/test-cases/concurrent-hook-race.t b/test/blackbox-tests/test-cases/concurrent-hook-race.t new file mode 100644 index 00000000000..065e18a2fea --- /dev/null +++ b/test/blackbox-tests/test-cases/concurrent-hook-race.t @@ -0,0 +1,63 @@ +Test that promotion hooks work correctly with the new architecture. + +Setup project: + + $ cat > dune-project << EOF + > (lang dune 3.16) + > EOF + + $ cat > dune << EOF + > (rule + > (target slow1.txt) + > (action (system "sleep 0.1; echo slow1 > slow1.txt"))) + > (rule + > (target slow2.txt) + > (action (system "sleep 0.1; echo slow2 > slow2.txt"))) + > (alias + > (name build1) + > (deps slow1.txt)) + > (alias + > (name build2) + > (deps slow2.txt)) + > (rule + > (alias runtest) + > (action (diff expected1.txt slow1.txt))) + > EOF + + $ cat > expected1.txt << EOF + > slow1 + > EOF + +Test 1: Sequential builds work correctly + + $ dune build @build1 + $ dune build @build2 + $ ls _build/default/*.txt + _build/default/slow1.txt + _build/default/slow2.txt + +Test 2: Promotion works correctly + + $ cat > expected1.txt << EOF + > wrong + > EOF + + $ dune build @runtest 2>&1 | head -3 + File "expected1.txt", line 1, characters 0-0: + Error: Files _build/default/expected1.txt and _build/default/slow1.txt + differ. + +Promotion should execute hooks exactly once: + + $ dune promote + Promoting _build/default/slow1.txt to expected1.txt. + + $ cat expected1.txt + slow1 + +Verify build succeeds after promotion: + + $ dune build @runtest + +Note: Full testing of hook isolation during concurrent builds requires +RPC + watch mode interaction, which cannot be demonstrated in cram tests. diff --git a/test/blackbox-tests/test-cases/true-concurrent-builds.t b/test/blackbox-tests/test-cases/true-concurrent-builds.t new file mode 100644 index 00000000000..41720636728 --- /dev/null +++ b/test/blackbox-tests/test-cases/true-concurrent-builds.t @@ -0,0 +1,63 @@ +Test that builds work correctly with the new Build_coordinator architecture. + +Setup project with multiple independent targets: + + $ cat > dune-project << EOF + > (lang dune 3.16) + > EOF + + $ cat > dune << EOF + > (rule + > (target a.txt) + > (action (bash "echo 'Building A'; sleep 0.05; echo A > a.txt"))) + > (rule + > (target b.txt) + > (action (bash "echo 'Building B'; sleep 0.05; echo B > b.txt"))) + > (rule + > (target c.txt) + > (action (bash "echo 'Building C'; sleep 0.05; echo C > c.txt"))) + > (alias + > (name build-a) + > (deps a.txt)) + > (alias + > (name build-b) + > (deps b.txt)) + > (alias + > (name build-c) + > (deps c.txt)) + > EOF + +Test 1: Basic builds work with new architecture + + $ dune build @build-a + Building A + + $ dune build @build-b + Building B + + $ cat _build/default/a.txt + A + $ cat _build/default/b.txt + B + +Test 2: Sequential builds complete correctly + + $ dune clean + $ dune build @build-a @build-b @build-c + Building A + Building B + Building C + + $ ls _build/default/*.txt + _build/default/a.txt + _build/default/b.txt + _build/default/c.txt + +With the new Build_coordinator architecture, each build gets its own +isolated session with separate progress, errors, and hooks. This enables +RPC and watch mode builds to execute concurrently without serializing +at a mutex. + +Note: Cram tests are sequential by nature and can't demonstrate true +parallelism. Full concurrent execution testing requires RPC + watch mode +in a real environment. From 9c2a2f18dfa448b0ecf6e086fe414e551ac0da59 Mon Sep 17 00:00:00 2001 From: Joel Reymont <18791+joelreymont@users.noreply.github.com> Date: Fri, 14 Nov 2025 20:00:39 +0200 Subject: [PATCH 5/5] Add stress tests for concurrent builds with RPC and watch mode Tests verify RPC builds during watch mode builds, concurrent promotions, and watch cancellation with active RPC builds all maintain correctness. Signed-off-by: Joel Reymont <18791+joelreymont@users.noreply.github.com> --- .../cancellation-with-concurrent-builds.t | 64 +++++++++++++++++ .../watching/concurrent-promotions.t | 72 +++++++++++++++++++ .../test-cases/watching/rpc-watch-overlap.t | 47 ++++++++++++ 3 files changed, 183 insertions(+) create mode 100644 test/blackbox-tests/test-cases/watching/cancellation-with-concurrent-builds.t create mode 100644 test/blackbox-tests/test-cases/watching/concurrent-promotions.t create mode 100644 test/blackbox-tests/test-cases/watching/rpc-watch-overlap.t diff --git a/test/blackbox-tests/test-cases/watching/cancellation-with-concurrent-builds.t b/test/blackbox-tests/test-cases/watching/cancellation-with-concurrent-builds.t new file mode 100644 index 00000000000..b7fd19e43a9 --- /dev/null +++ b/test/blackbox-tests/test-cases/watching/cancellation-with-concurrent-builds.t @@ -0,0 +1,64 @@ +Watch cancellation with concurrent RPC builds continues RPC builds to completion. + + $ . ./helpers.sh + + $ cat > dune-project << EOF + > (lang dune 3.16) + > EOF + + $ cat > input.txt << EOF + > initial + > EOF + + $ cat > dune << EOF + > (rule + > (target watch-target.txt) + > (deps input.txt) + > (action (bash "sleep 0.2; cat input.txt > watch-target.txt"))) + > (rule + > (target rpc-target.txt) + > (action (bash "sleep 0.3; echo rpc > rpc-target.txt"))) + > (alias + > (name watch-build) + > (deps watch-target.txt)) + > (alias + > (name rpc-build) + > (deps rpc-target.txt)) + > EOF + + $ start_dune + +Start slow RPC build, then trigger watch build and modify source to cancel it. + + $ build @rpc-build > rpc-output 2>&1 & + $ RPC_PID=$! + + $ build @watch-build > watch-output 2>&1 & + $ WATCH_PID=$! + $ sleep 0.1 + $ cat > input.txt << EOF + > modified + > EOF + +RPC build completes despite watch cancellation. + + $ wait $RPC_PID + $ cat rpc-output + Success + + $ wait $WATCH_PID || echo "Watch interrupted as expected" + Watch interrupted as expected + + $ cat _build/default/rpc-target.txt + rpc + +Watch mode restarts and builds with modified input. + + $ build @watch-build + Success + + $ cat _build/default/watch-target.txt + modified + + $ stop_dune + Success, waiting for filesystem changes... diff --git a/test/blackbox-tests/test-cases/watching/concurrent-promotions.t b/test/blackbox-tests/test-cases/watching/concurrent-promotions.t new file mode 100644 index 00000000000..965a14e3d98 --- /dev/null +++ b/test/blackbox-tests/test-cases/watching/concurrent-promotions.t @@ -0,0 +1,72 @@ +Concurrent builds with promotions maintain per-session isolation. + + $ . ./helpers.sh + + $ cat > dune-project << EOF + > (lang dune 3.16) + > EOF + + $ cat > dune << EOF + > (rule + > (target file1.gen) + > (action (bash "echo 'content1' > file1.gen"))) + > (rule + > (alias check1) + > (action (diff file1.expected file1.gen))) + > (rule + > (target file2.gen) + > (action (bash "echo 'content2' > file2.gen"))) + > (rule + > (alias check2) + > (action (diff file2.expected file2.gen))) + > (rule + > (target file3.gen) + > (action (bash "echo 'content3' > file3.gen"))) + > (rule + > (alias check3) + > (action (diff file3.expected file3.gen))) + > EOF + + $ cat > file1.expected << EOF + > wrong1 + > EOF + + $ cat > file2.expected << EOF + > wrong2 + > EOF + + $ cat > file3.expected << EOF + > wrong3 + > EOF + + $ start_dune + +Trigger three concurrent builds that will fail with diffs. + + $ build @check1 > check1-output 2>&1 & + $ CHECK1_PID=$! + $ build @check2 > check2-output 2>&1 & + $ CHECK2_PID=$! + $ build @check3 > check3-output 2>&1 & + $ CHECK3_PID=$! + + $ wait $CHECK1_PID || true + $ wait $CHECK2_PID || true + $ wait $CHECK3_PID || true + +All three should report diffs. + + $ grep -q "content1" check1-output && echo "check1 detected diff" + check1 detected diff + $ grep -q "content2" check2-output && echo "check2 detected diff" + check2 detected diff + $ grep -q "content3" check3-output && echo "check3 detected diff" + check3 detected diff + +The promotion database should contain all three files. + + $ cat _build/.to-promote | grep -c "file[123].gen" + 3 + + $ stop_dune + Success, waiting for filesystem changes... diff --git a/test/blackbox-tests/test-cases/watching/rpc-watch-overlap.t b/test/blackbox-tests/test-cases/watching/rpc-watch-overlap.t new file mode 100644 index 00000000000..af3dd86f7db --- /dev/null +++ b/test/blackbox-tests/test-cases/watching/rpc-watch-overlap.t @@ -0,0 +1,47 @@ +RPC build during watch mode build executes concurrently without state corruption. + + $ . ./helpers.sh + + $ cat > dune-project << EOF + > (lang dune 3.16) + > EOF + + $ cat > dune << EOF + > (rule + > (target slow.txt) + > (action (bash "sleep 0.3; echo slow > slow.txt"))) + > (rule + > (target fast.txt) + > (action (bash "echo fast > fast.txt"))) + > (alias + > (name slow-build) + > (deps slow.txt)) + > (alias + > (name fast-build) + > (deps fast.txt)) + > EOF + + $ start_dune + +Trigger slow build via RPC, then start fast build while slow is running. + + $ build @slow-build > slow-output 2>&1 & + $ SLOW_PID=$! + + $ sleep 0.1 + $ build @fast-build + Success + +Both builds should complete successfully. + + $ wait $SLOW_PID + $ cat slow-output + Success + + $ cat _build/default/slow.txt + slow + $ cat _build/default/fast.txt + fast + + $ stop_dune + Success, waiting for filesystem changes...