diff --git a/bin/import.ml b/bin/import.ml index ba78f7e9fe0..b7e49d779ff 100644 --- a/bin/import.ml +++ b/bin/import.ml @@ -155,17 +155,50 @@ let command_alias ?orig_name cmd term name = Cmd.v (Cmd.info name ~docs:"COMMAND ALIASES" ~doc ~man) term ;; -(* The build system has some global state which makes it unsafe for - multiple instances of it to be executed concurrently, so we ensure - serialization by holding this mutex while running the build system. *) -let build_system_mutex = Fiber.Mutex.create () +(* Global build coordinator enables true concurrent builds. + + RPC and watch mode builds can now execute simultaneously by using + per-build sessions (Build_session.t) managed by the coordinator. + + Key changes from mutex-based serialization: + 1. Each build gets its own session with isolated state + 2. Multiple builds can run concurrently + 3. Hooks are per-build (attached to session) + 4. Progress and errors are tracked per-build + + Path locks remain global (correct - prevents file conflicts). *) +let build_coordinator = lazy (Dune_engine.Build_coordinator.create ()) let build f = - Hooks.End_of_build.once Promote.Diff_promotion.finalize; - Fiber.Mutex.with_lock build_system_mutex ~f:(fun () -> Build_system.run f) + let open Fiber.O in + let coordinator = Lazy.force build_coordinator in + (* Clear promotion database if no builds are active. This ensures sequential + builds behave like the old implementation where the database was cleared + between builds, while concurrent builds can safely merge their entries. *) + let* has_active = Dune_engine.Build_coordinator.has_active_builds coordinator in + let* () = + if has_active then Fiber.return () else Promote.Diff_promotion.clear_db_if_idle () + in + Dune_engine.Build_coordinator.submit coordinator ~f:(fun ctx -> + (* Register finalize hook for THIS build - capture session in closure + so finalize can run outside the Fiber.Var.set scope *) + Dune_engine.Build_session.add_finalize_hook ctx (fun () -> + Fiber.Var.set + Dune_engine.Build_system.current_session + (Some ctx) + Promote.Diff_promotion.finalize); + (* Run build with this build's context *) + Fiber.finalize + (fun () -> Build_system.run ctx f) + ~finally:(fun () -> + (* Run finalize hooks after build completes and errors are reported *) + Dune_engine.Build_session.run_finalize_hooks ctx)) ;; let build_exn f = - Hooks.End_of_build.once Promote.Diff_promotion.finalize; - Fiber.Mutex.with_lock build_system_mutex ~f:(fun () -> Build_system.run_exn f) + let open Fiber.O in + let+ res = build f in + match res with + | Ok res -> res + | Error `Already_reported -> raise Dune_util.Report_error.Already_reported ;; diff --git a/src/dune_engine/build_coordinator.ml b/src/dune_engine/build_coordinator.ml new file mode 100644 index 00000000000..6ab0555c911 --- /dev/null +++ b/src/dune_engine/build_coordinator.ml @@ -0,0 +1,90 @@ +open Import +open Fiber.O + +(* Coordinates multiple concurrent builds. Allows RPC and watch mode builds + to execute simultaneously while maintaining correctness guarantees. *) + +type active_build = + { context : Build_session.t + ; completion : unit Fiber.Ivar.t + } + +type t = + { mutable active_builds : active_build Build_id.Map.t + ; mutex : Fiber.Mutex.t + } + +let create () = { active_builds = Build_id.Map.empty; mutex = Fiber.Mutex.create () } + +let active_build_ids t = + Fiber.Mutex.with_lock t.mutex ~f:(fun () -> + Fiber.return (Build_id.Map.keys t.active_builds)) +;; + +let get_build_context t build_id = + Fiber.Mutex.with_lock t.mutex ~f:(fun () -> + Fiber.return + (match Build_id.Map.find t.active_builds build_id with + | Some build -> Some build.context + | None -> None)) +;; + +let get_all_contexts t = + Fiber.Mutex.with_lock t.mutex ~f:(fun () -> + let contexts = + Build_id.Map.to_list t.active_builds + |> List.map ~f:(fun (id, build) -> id, build.context) + in + Fiber.return contexts) +;; + +let get_all_progress t = + let* contexts = get_all_contexts t in + Fiber.parallel_map contexts ~f:(fun (id, ctx) -> + let+ progress = Build_session.get_progress ctx in + id, progress) +;; + +let get_all_errors t = + let* contexts = get_all_contexts t in + Fiber.parallel_map contexts ~f:(fun (id, ctx) -> + let+ errors = Build_session.get_errors ctx in + id, errors) +;; + +let has_active_builds t = + Fiber.Mutex.with_lock t.mutex ~f:(fun () -> + Fiber.return (not (Build_id.Map.is_empty t.active_builds))) +;; + +let register_build t context = + Fiber.Mutex.with_lock t.mutex ~f:(fun () -> + let build_id = Build_session.id context in + let completion = Fiber.Ivar.create () in + let active_build = { context; completion } in + t.active_builds <- Build_id.Map.add_exn t.active_builds build_id active_build; + Fiber.return completion) +;; + +let unregister_build t build_id = + Fiber.Mutex.with_lock t.mutex ~f:(fun () -> + let* () = + match Build_id.Map.find t.active_builds build_id with + | Some build -> Fiber.Ivar.fill build.completion () + | None -> Fiber.return () + in + t.active_builds <- Build_id.Map.remove t.active_builds build_id; + Fiber.return ()) +;; + +let submit t ~f = + let context = Build_session.create () in + let build_id = Build_session.id context in + let* _completion = register_build t context in + Fiber.finalize (fun () -> f context) ~finally:(fun () -> unregister_build t build_id) +;; + +let to_dyn t = + let active_ids = Build_id.Map.keys t.active_builds in + Dyn.record [ "active_builds", Dyn.list Build_id.to_dyn active_ids ] +;; diff --git a/src/dune_engine/build_coordinator.mli b/src/dune_engine/build_coordinator.mli new file mode 100644 index 00000000000..c9af603728d --- /dev/null +++ b/src/dune_engine/build_coordinator.mli @@ -0,0 +1,34 @@ +(** Coordinates multiple concurrent builds. Allows RPC and watch mode builds + to execute simultaneously while maintaining correctness. *) + +type t + +(** Create a new build coordinator *) +val create : unit -> t + +(** Submit a build request. The function receives a fresh Build_session.t + and should execute the build using that context. Multiple calls to submit + can run concurrently. *) +val submit : t -> f:(Build_session.t -> 'a Fiber.t) -> 'a Fiber.t + +(** Get list of currently active build IDs *) +val active_build_ids : t -> Build_id.t list Fiber.t + +(** Get the context for a specific build (if still active) *) +val get_build_context : t -> Build_id.t -> Build_session.t option Fiber.t + +(** Get all active build contexts *) +val get_all_contexts : t -> (Build_id.t * Build_session.t) list Fiber.t + +(** Get progress for all active builds *) +val get_all_progress : t -> (Build_id.t * Build_session.Progress.t) list Fiber.t + +(** Get errors for all active builds *) +val get_all_errors : t -> (Build_id.t * Build_system_error.Set.t) list Fiber.t + +(** Check if there are any active builds *) +val has_active_builds : t -> bool Fiber.t + +(** {1 Debugging} *) + +val to_dyn : t -> Dyn.t diff --git a/src/dune_engine/build_id.ml b/src/dune_engine/build_id.ml new file mode 100644 index 00000000000..a4464436a7f --- /dev/null +++ b/src/dune_engine/build_id.ml @@ -0,0 +1,29 @@ +open Import + +(* Unique identifier for a build execution. + Allows tracking multiple concurrent builds. *) + +type t = int + +let counter = ref 0 + +let create () = + let id = !counter in + counter := id + 1; + id +;; + +let equal = Int.equal +let compare = Int.compare +let to_dyn t = Dyn.int t +let hash = Int.hash + +module T = struct + type nonrec t = t + + let compare = compare + let to_dyn = to_dyn +end + +module Map = Map.Make (T) +module Set = Set.Make (T) (Map) diff --git a/src/dune_engine/build_id.mli b/src/dune_engine/build_id.mli new file mode 100644 index 00000000000..6839c1dc747 --- /dev/null +++ b/src/dune_engine/build_id.mli @@ -0,0 +1,18 @@ +open Import + +(** Unique identifier for a build execution. Allows tracking multiple + concurrent builds and associating progress, errors, and hooks with + specific build instances. *) + +type t + +(** Create a new unique build ID. Thread-safe via sequential counter. *) +val create : unit -> t + +val equal : t -> t -> bool +val compare : t -> t -> Ordering.t +val to_dyn : t -> Dyn.t +val hash : t -> int + +module Map : Map.S with type key = t +module Set : Set.S with type elt = t diff --git a/src/dune_engine/build_session.ml b/src/dune_engine/build_session.ml new file mode 100644 index 00000000000..29cd174a633 --- /dev/null +++ b/src/dune_engine/build_session.ml @@ -0,0 +1,128 @@ +open Import +open Fiber.O + +(* Per-build context that holds state for a single build execution. + This allows multiple builds to run concurrently without interfering + with each other. *) + +module Progress = struct + type t = + { number_of_rules_discovered : int + ; number_of_rules_executed : int + ; number_of_rules_failed : int + } + + let init = + { number_of_rules_discovered = 0 + ; number_of_rules_executed = 0 + ; number_of_rules_failed = 0 + } + ;; +end + +type t = + { id : Build_id.t + ; progress : Progress.t Fiber.Svar.t + ; errors : Build_system_error.Set.t Fiber.Svar.t + ; pending_targets : Targets.t ref + ; finalize_hooks : (unit -> unit Fiber.t) Queue.t + } + +let create () = + { id = Build_id.create () + ; progress = Fiber.Svar.create Progress.init + ; errors = Fiber.Svar.create Build_system_error.Set.empty + ; pending_targets = ref Targets.empty + ; finalize_hooks = Queue.create () + } +;; + +let id t = t.id + +(* Progress tracking *) + +let get_progress t = Fiber.return (Fiber.Svar.read t.progress) +let set_progress t progress = Fiber.Svar.write t.progress progress + +let update_progress t ~f = + let* current = get_progress t in + set_progress t (f current) +;; + +let incr_rules_discovered t = + update_progress t ~f:(fun p -> + { p with number_of_rules_discovered = p.number_of_rules_discovered + 1 }) +;; + +let incr_rules_executed t = + update_progress t ~f:(fun p -> + { p with number_of_rules_executed = p.number_of_rules_executed + 1 }) +;; + +let incr_rules_failed t = + update_progress t ~f:(fun p -> + { p with number_of_rules_failed = p.number_of_rules_failed + 1 }) +;; + +(* Error tracking *) + +let get_errors t = Fiber.return (Fiber.Svar.read t.errors) + +let add_errors t error_list = + let* current = get_errors t in + let updated = List.fold_left error_list ~init:current ~f:Build_system_error.Set.add in + Fiber.Svar.write t.errors updated +;; + +let reset_errors t = Fiber.Svar.write t.errors Build_system_error.Set.empty + +(* Hooks *) + +let add_finalize_hook t hook = Queue.push t.finalize_hooks hook + +let run_finalize_hooks t = + let open Fiber.O in + let hooks = Queue.to_list t.finalize_hooks in + Queue.clear t.finalize_hooks; + let* results = + Fiber.parallel_map hooks ~f:(fun hook -> Fiber.collect_errors (fun () -> hook ())) + in + let exns = + List.filter_map results ~f:(function + | Ok () -> None + | Error exns -> Some exns) + |> List.concat + in + match exns with + | [] -> Fiber.return () + | [ { Exn_with_backtrace.exn = User_error.E _ as e; backtrace = _ } ] -> raise e + | exns -> + Code_error.raise + "finalize hooks failed" + [ "build_id", Build_id.to_dyn t.id + ; "exns", Dyn.list Exn_with_backtrace.to_dyn exns + ] +;; + +(* Pending targets *) + +let add_pending_targets t targets = + t.pending_targets := Targets.combine !(t.pending_targets) targets +;; + +let remove_pending_targets t targets = + t.pending_targets := Targets.diff !(t.pending_targets) targets +;; + +let cleanup_pending_targets t = + let targets = !(t.pending_targets) in + t.pending_targets := Targets.empty; + Targets.iter targets ~file:Path.Build.unlink_no_err ~dir:(fun p -> + Path.rm_rf (Path.build p)) +;; + +let to_dyn t = + let open Dyn in + record + [ "id", Build_id.to_dyn t.id; "pending_targets", Targets.to_dyn !(t.pending_targets) ] +;; diff --git a/src/dune_engine/build_session.mli b/src/dune_engine/build_session.mli new file mode 100644 index 00000000000..d66b600a162 --- /dev/null +++ b/src/dune_engine/build_session.mli @@ -0,0 +1,55 @@ +open Import + +(** Per-build context that holds state for a single build execution. + This allows multiple builds to run concurrently without interfering. *) + +module Progress : sig + type t = + { number_of_rules_discovered : int + ; number_of_rules_executed : int + ; number_of_rules_failed : int + } + + val init : t +end + +type t + +(** Create a new build context with fresh state *) +val create : unit -> t + +(** Get the unique ID for this build *) +val id : t -> Build_id.t + +(** {1 Progress tracking} *) + +val get_progress : t -> Progress.t Fiber.t +val set_progress : t -> Progress.t -> unit Fiber.t +val update_progress : t -> f:(Progress.t -> Progress.t) -> unit Fiber.t +val incr_rules_discovered : t -> unit Fiber.t +val incr_rules_executed : t -> unit Fiber.t +val incr_rules_failed : t -> unit Fiber.t + +(** {1 Error tracking} *) + +val get_errors : t -> Build_system_error.Set.t Fiber.t +val add_errors : t -> Build_system_error.t list -> unit Fiber.t +val reset_errors : t -> unit Fiber.t + +(** {1 Finalize hooks} *) + +(** Add a hook to run when this build completes *) +val add_finalize_hook : t -> (unit -> unit Fiber.t) -> unit + +(** Run all finalize hooks for this build *) +val run_finalize_hooks : t -> unit Fiber.t + +(** {1 Pending targets} *) + +val add_pending_targets : t -> Targets.t -> unit +val remove_pending_targets : t -> Targets.t -> unit +val cleanup_pending_targets : t -> unit + +(** {1 Debugging} *) + +val to_dyn : t -> Dyn.t diff --git a/src/dune_engine/build_system.ml b/src/dune_engine/build_system.ml index d133a904c25..1c3bc38837e 100644 --- a/src/dune_engine/build_system.ml +++ b/src/dune_engine/build_system.ml @@ -2,6 +2,11 @@ open Import open Memo.O module Error = Build_system_error +(* Per-build session stored in fiber-local storage. + This allows multiple concurrent builds while maintaining isolated state. *) +let current_session : Build_session.t option Fiber.Var.t = Fiber.Var.create None +let get_current_session_exn () = Fiber.Var.get_exn current_session + module Progress = struct type t = { number_of_rules_discovered : int @@ -44,7 +49,10 @@ module State = struct | Build_succeeded__now_waiting_for_changes | Build_failed__now_waiting_for_changes - let to_dyn = function + let to_dyn + = + (* Kept for potential debugging use *) + function | Initializing -> Dyn.variant "Initializing" [] | Building progress -> Dyn.variant "Building" [ Progress.to_dyn progress ] | Restarting_current_build -> Dyn.variant "Restarting_current_build" [] @@ -54,6 +62,8 @@ module State = struct Dyn.variant "Build_failed__now_waiting_for_changes" [] ;; + let _ = to_dyn + let equal x y = match x, y with | Building x, Building y -> Progress.equal x y @@ -68,13 +78,21 @@ module State = struct | Build_failed__now_waiting_for_changes, _ -> false ;; - let t = Fiber.Svar.create Initializing + (* Global state aggregated from all concurrent builds for status line/RPC/Chrome trace. + Protected by mutex to prevent corruption when multiple builds update simultaneously. *) + let t = Svar.create Initializing + let errors = Svar.create Error.Set.empty + let state_mutex = Fiber.Mutex.create () + + (* Track number of active builds to prevent premature state transitions. + Only reset to idle states when the last build completes. *) + let active_builds = ref 0 + let increment_active_builds () = active_builds := !active_builds + 1 + let decrement_active_builds () = active_builds := !active_builds - 1 + let get_active_builds () = !active_builds (* This mutable table is safe: it maps paths to lazily created mutexes. *) let locks : (Path.t, Fiber.Mutex.t) Table.t = Table.create (module Path) 32 - - (* This mutex ensures that at most one [run] is running in parallel. *) - let build_mutex = Fiber.Mutex.create () let reset_progress () = Svar.write t (Building Progress.init) let set what = Svar.write t what @@ -89,26 +107,44 @@ module State = struct ;; let incr_rule_done_exn () = - update_build_progress_exn ~f:(fun p -> - { p with number_of_rules_executed = p.number_of_rules_executed + 1 }) + let open Fiber.O in + (* Update per-build session *) + let* session = get_current_session_exn () in + let* () = Build_session.incr_rules_executed session in + (* Also update global state (synchronized) for status line/Chrome trace *) + Fiber.Mutex.with_lock state_mutex ~f:(fun () -> + update_build_progress_exn ~f:(fun p -> + { p with number_of_rules_executed = p.number_of_rules_executed + 1 })) ;; let start_rule_exn () = - update_build_progress_exn ~f:(fun p -> - { p with number_of_rules_discovered = p.number_of_rules_discovered + 1 }) + let open Fiber.O in + (* Update per-build session *) + let* session = get_current_session_exn () in + let* () = Build_session.incr_rules_discovered session in + (* Also update global state (synchronized) for status line/Chrome trace *) + Fiber.Mutex.with_lock state_mutex ~f:(fun () -> + update_build_progress_exn ~f:(fun p -> + { p with number_of_rules_discovered = p.number_of_rules_discovered + 1 })) ;; - let errors = Svar.create Error.Set.empty let reset_errors () = Svar.write errors Error.Set.empty let add_errors error_list = let open Fiber.O in - let* () = - update_build_progress_exn ~f:(fun p -> - { p with number_of_rules_failed = p.number_of_rules_failed + 1 }) - in - List.fold_left error_list ~init:(Svar.read errors) ~f:Error.Set.add - |> Svar.write errors + (* Update per-build session *) + let* session = get_current_session_exn () in + let* () = Build_session.add_errors session error_list in + let* () = Build_session.incr_rules_failed session in + (* Also update global state (synchronized) for status line *) + Fiber.Mutex.with_lock state_mutex ~f:(fun () -> + let open Fiber.O in + let* () = + update_build_progress_exn ~f:(fun p -> + { p with number_of_rules_failed = p.number_of_rules_failed + 1 }) + in + List.fold_left error_list ~init:(Svar.read errors) ~f:Error.Set.add + |> Svar.write errors) ;; end @@ -121,20 +157,21 @@ let rec with_locks ~f = function ;; module Pending_targets = struct - (* All file and directory targets of non-sandboxed actions that are currently - being executed. On exit, we need to delete them as they might contain - garbage. *) + (* Per-build pending targets are now tracked in Build_session. + These functions update the current session's pending targets. *) - let t = ref Targets.empty - let remove targets = t := Targets.diff !t (Targets.Validated.unvalidate targets) - let add targets = t := Targets.combine !t (Targets.Validated.unvalidate targets) + let remove targets = + let open Fiber.O in + let* session = get_current_session_exn () in + Build_session.remove_pending_targets session (Targets.Validated.unvalidate targets); + Fiber.return () + ;; - let () = - Hooks.End_of_build.always (fun () -> - let targets = !t in - t := Targets.empty; - Targets.iter targets ~file:Path.Build.unlink_no_err ~dir:(fun p -> - Path.rm_rf (Path.build p))) + let add targets = + let open Fiber.O in + let* session = get_current_session_exn () in + Build_session.add_pending_targets session (Targets.Validated.unvalidate targets); + Fiber.return () ;; end @@ -385,7 +422,8 @@ end = struct | None -> (* If the action is not sandboxed, we use [pending_file_targets] to clean up the build directory if the action is interrupted. *) - Pending_targets.add targets; + let open Fiber.O in + let* () = Pending_targets.add targets in Fiber.return None in let action = @@ -426,9 +464,7 @@ end = struct ~finally:(fun () -> match sandbox with | Some sandbox -> Sandbox.destroy sandbox - | None -> - Pending_targets.remove targets; - Fiber.return ()) + | None -> Pending_targets.remove targets) (fun () -> with_locks locks ~f:(fun () -> let* action_exec_result = @@ -1149,36 +1185,69 @@ let handle_final_exns exns = List.iter exns ~f:report ;; -let run f = +let run session f = let open Fiber.O in - let* () = State.reset_progress () in - let* () = State.reset_errors () in - let f () = + (* Set the current session in fiber-local storage for the duration of this build *) + Fiber.Var.set current_session (Some session) (fun () -> + (* Register this build and initialize global state if first build. *) + let* () = + Fiber.Mutex.with_lock State.state_mutex ~f:(fun () -> + let open Fiber.O in + let was_idle = State.get_active_builds () = 0 in + State.increment_active_builds (); + let current_state = Fiber.Svar.read State.t in + (* Only reset when transitioning from idle states *) + if was_idle + then ( + match current_state with + | Initializing + | Build_succeeded__now_waiting_for_changes + | Build_failed__now_waiting_for_changes + | Restarting_current_build -> + let* () = State.reset_progress () in + State.reset_errors () + | Building _ -> + (* Should not happen: counter was 0 but state is Building *) + Fiber.return ()) + else Fiber.return ()) + in + let* () = Build_session.reset_errors session in let* res = Fiber.collect_errors (fun () -> Memo.run_with_error_handler f ~handle_error_no_raise:report_early_exn) in - let* () = (Build_config.get ()).write_error_summary (Fiber.Svar.read State.errors) in + let* errors = Build_session.get_errors session in + let* () = (Build_config.get ()).write_error_summary errors in + (* Cleanup pending targets for this session *) + Build_session.cleanup_pending_targets session; + (* Update global state: only transition to final state if this is the last build *) + let* () = + Fiber.Mutex.with_lock State.state_mutex ~f:(fun () -> + State.decrement_active_builds (); + if State.get_active_builds () = 0 + then ( + (* Last build finishing - transition to final state *) + let final_status = + match res with + | Ok _ -> State.Build_succeeded__now_waiting_for_changes + | Error exns -> + if List.exists exns ~f:caused_by_cancellation + then State.Restarting_current_build + else State.Build_failed__now_waiting_for_changes + in + State.set final_status) + else Fiber.return ()) + in match res with - | Ok res -> - let+ () = State.set Build_succeeded__now_waiting_for_changes in - Ok res + | Ok res -> Fiber.return (Ok res) | Error exns -> handle_final_exns exns; - let final_status = - if List.exists exns ~f:caused_by_cancellation - then State.Restarting_current_build - else Build_failed__now_waiting_for_changes - in - let+ () = State.set final_status in - Error `Already_reported - in - Fiber.Mutex.with_lock State.build_mutex ~f + Fiber.return (Error `Already_reported)) ;; -let run_exn f = +let run_exn session f = let open Fiber.O in - let+ res = run f in + let+ res = run session f in match res with | Ok res -> res | Error `Already_reported -> raise Dune_util.Report_error.Already_reported diff --git a/src/dune_engine/build_system.mli b/src/dune_engine/build_system.mli index 4bd899806d1..05876ed171e 100644 --- a/src/dune_engine/build_system.mli +++ b/src/dune_engine/build_system.mli @@ -2,6 +2,13 @@ open Import +(** Fiber-local variable holding the current build session *) +val current_session : Build_session.t option Fiber.Var.t + +(** Get the current build session from fiber-local storage. + Raises if no session is set (only valid during a build). *) +val get_current_session_exn : unit -> Build_session.t Fiber.t + (** Build a target, which may be a file or a directory. *) val build_file : Path.t -> unit Memo.t @@ -54,10 +61,13 @@ val dep_on_alias_definition : Rules.Dir_rules.Alias_spec.item -> unit Action_bui (** {2 Running the build system} *) -val run : (unit -> 'a Memo.t) -> ('a, [ `Already_reported ]) Result.t Fiber.t +val run + : Build_session.t + -> (unit -> 'a Memo.t) + -> ('a, [ `Already_reported ]) Result.t Fiber.t (** A variant of [run] that raises an [Already_reported] exception on error. *) -val run_exn : (unit -> 'a Memo.t) -> 'a Fiber.t +val run_exn : Build_session.t -> (unit -> 'a Memo.t) -> 'a Fiber.t (** {2 Misc} *) diff --git a/src/dune_engine/dune_engine.ml b/src/dune_engine/dune_engine.ml index 1bb71d5b421..a75613cad07 100644 --- a/src/dune_engine/dune_engine.ml +++ b/src/dune_engine/dune_engine.ml @@ -16,7 +16,10 @@ module Rules = Rules module Rule = Rule module Targets = Dune_targets module Target_promotion = Target_promotion +module Build_id = Build_id module Build_context = Build_context +module Build_session = Build_session +module Build_coordinator = Build_coordinator module Build_config = Build_config module Build_system = Build_system module Build_system_error = Build_system_error diff --git a/src/promote/diff_action.ml b/src/promote/diff_action.ml index dadacc0aa28..15801f3b92a 100644 --- a/src/promote/diff_action.ml +++ b/src/promote/diff_action.ml @@ -65,17 +65,25 @@ let exec ~rule_loc ({ Diff.optional; file1; file2; mode } as diff) = (Path.build file2) ~skip_trailing_cr:(mode = Text && Sys.win32)) ~finally:(fun () -> - (match optional with - | false -> - (* Promote if in the source tree or not a target. The second case - means that the diffing have been done with the empty file *) - if in_source_or_target && not (is_copied_from_source_tree (Path.build file2)) - then Diff_promotion.File.register_dep ~source_file ~correction_file:file2 - | true -> - if in_source_or_target - then - Diff_promotion.File.register_intermediate ~source_file ~correction_file:file2 - else remove_intermediate_file ()); + let open Fiber.O in + let* () = + match optional with + | false -> + (* Promote if in the source tree or not a target. The second case + means that the diffing have been done with the empty file *) + if in_source_or_target && not (is_copied_from_source_tree (Path.build file2)) + then Diff_promotion.File.register_dep ~source_file ~correction_file:file2 + else Fiber.return () + | true -> + if in_source_or_target + then + Diff_promotion.File.register_intermediate + ~source_file + ~correction_file:file2 + else ( + remove_intermediate_file (); + Fiber.return ()) + in Fiber.return ())) ;; diff --git a/src/promote/diff_promotion.ml b/src/promote/diff_promotion.ml index fc3d1115a88..42c19014d8f 100644 --- a/src/promote/diff_promotion.ml +++ b/src/promote/diff_promotion.ml @@ -30,23 +30,40 @@ module File = struct ] ;; - let db : t list ref = ref [] + (* Per-session promotion databases to avoid races with concurrent builds. + Each build session has its own DB that is accessed via fiber-local storage. *) + let dbs : (Dune_engine.Build_id.t, t list ref) Table.t = + Table.create (module Dune_engine.Build_id) 16 + ;; + + let get_current_db () = + let open Fiber.O in + let* session = Dune_engine.Build_system.get_current_session_exn () in + let build_id = Dune_engine.Build_session.id session in + Fiber.return (Table.find_or_add dbs build_id ~f:(fun _ -> ref [])) + ;; let register_dep ~source_file ~correction_file = + let open Fiber.O in + let* db = get_current_db () in db := { src = snd (Path.Build.split_sandbox_root correction_file) ; staging = None ; dst = source_file } - :: !db + :: !db; + Fiber.return () ;; let register_intermediate ~source_file ~correction_file = + let open Fiber.O in let staging = in_staging_area source_file in Path.mkdir_p (Path.build (Option.value_exn (Path.Build.parent staging))); Unix.rename (Path.Build.to_string correction_file) (Path.Build.to_string staging); let src = snd (Path.Build.split_sandbox_root correction_file) in - db := { src; staging = Some staging; dst = source_file } :: !db + let* db = get_current_db () in + db := { src; staging = Some staging; dst = source_file } :: !db; + Fiber.return () ;; let do_promote ~correction_file ~dst = @@ -66,33 +83,44 @@ module File = struct let promote ({ src; staging; dst } as file) = let correction_file = correction_file file in let correction_exists = Path.exists correction_file in - Console.print - [ Pp.box - ~indent:2 - (if correction_exists - then - Pp.textf - "Promoting %s to %s." - (Path.to_string_maybe_quoted (Path.build src)) - (Path.Source.to_string_maybe_quoted dst) - else - Pp.textf - "Skipping promotion of %s to %s as the %s is missing." - (Path.to_string_maybe_quoted (Path.build src)) - (Path.Source.to_string_maybe_quoted dst) - (match staging with - | None -> "file" - | Some staging -> - Format.sprintf - "staging file (%s)" - (Path.to_string_maybe_quoted (Path.build staging)))) - ]; + let dst_is_directory = Path.is_directory (Path.source dst) in + (* Only print promotion message if the destination is not a directory. + If dst is a directory, the promotion will fail but we should not + print a success message before the failure. *) + if not dst_is_directory + then + Console.print + [ Pp.box + ~indent:2 + (if correction_exists + then + Pp.textf + "Promoting %s to %s." + (Path.to_string_maybe_quoted (Path.build src)) + (Path.Source.to_string_maybe_quoted dst) + else + Pp.textf + "Skipping promotion of %s to %s as the %s is missing." + (Path.to_string_maybe_quoted (Path.build src)) + (Path.Source.to_string_maybe_quoted dst) + (match staging with + | None -> "file" + | Some staging -> + Format.sprintf + "staging file (%s)" + (Path.to_string_maybe_quoted (Path.build staging)))) + ]; if correction_exists then do_promote ~correction_file ~dst ;; end -let clear_cache () = File.db := [] -let () = Hooks.End_of_build.always clear_cache +let clear_cache_for_session build_id = + match Table.find File.dbs build_id with + | Some db -> + db := []; + Table.remove File.dbs build_id + | None -> () +;; module P = Persistent.Make (struct type t = File.t list @@ -122,6 +150,33 @@ let dump_db db = let load_db () = Option.value ~default:[] (P.load db_file) +(* Mutex to protect concurrent updates to the promotion database file *) +let db_mutex = Fiber.Mutex.create () + +(* Clear the promotion database. Called when starting a build with no other + active builds, ensuring sequential builds behave like the old implementation + where the database was cleared between builds. *) +let clear_db_if_idle () = + Fiber.Mutex.with_lock db_mutex ~f:(fun () -> + dump_db []; + Fiber.return ()) +;; + +(* Merge new entries into existing database, removing duplicates *) +let merge_db_entries ~existing ~new_entries = + (* Build a set of existing entries for deduplication *) + let existing_set = + List.fold_left existing ~init:[] ~f:(fun acc entry -> + if List.exists acc ~f:(fun e -> File.compare e entry = Eq) + then acc + else entry :: acc) + in + (* Add new entries that aren't already present *) + List.fold_left new_entries ~init:existing_set ~f:(fun acc entry -> + if List.exists acc ~f:(fun e -> File.compare e entry = Eq) then acc else entry :: acc) + |> List.rev +;; + let group_by_targets db = List.map db ~f:(fun { File.src; staging; dst } -> dst, (src, staging)) |> Path.Source.Map.of_list_multi @@ -178,12 +233,27 @@ let do_promote db files_to_promote = ;; let finalize () = - let db = + let open Fiber.O in + let* current_db = File.get_current_db () in + let* session = Dune_engine.Build_system.get_current_session_exn () in + let build_id = Dune_engine.Build_session.id session in + let session_entries = match !Dune_engine.Clflags.promote with - | Some Automatically -> do_promote !File.db All - | Some Never | None -> !File.db + | Some Automatically -> do_promote !current_db All + | Some Never | None -> !current_db in - dump_db db + (* Merge this session's entries with existing database atomically *) + let* () = + Fiber.Mutex.with_lock db_mutex ~f:(fun () -> + let existing_db = load_db () in + let merged_db = + merge_db_entries ~existing:existing_db ~new_entries:session_entries + in + dump_db merged_db; + Fiber.return ()) + in + clear_cache_for_session build_id; + Fiber.return () ;; let promote_files_registered_in_last_run files_to_promote = diff --git a/src/promote/diff_promotion.mli b/src/promote/diff_promotion.mli index 321ab59f8a8..0a479cf35fe 100644 --- a/src/promote/diff_promotion.mli +++ b/src/promote/diff_promotion.mli @@ -13,18 +13,25 @@ module File : sig val register_intermediate : source_file:Path.Source.t -> correction_file:Path.Build.t - -> unit + -> unit Fiber.t (** Register file to promote where the correction file is a dependency of the current action (rather than an intermediate file). [correction_file] refers to a path in the build dir, not in the sandbox (it can point to the sandbox, but the sandbox root will be stripped). *) - val register_dep : source_file:Path.Source.t -> correction_file:Path.Build.t -> unit + val register_dep + : source_file:Path.Source.t + -> correction_file:Path.Build.t + -> unit Fiber.t end (** Promote all registered files if [!Clflags.auto_promote]. Otherwise dump the list of registered files to [_build/.to-promote]. *) -val finalize : unit -> unit +val finalize : unit -> unit Fiber.t + +(** Clear the promotion database. Called when starting a build with no other + active builds to ensure sequential builds work like the old implementation. *) +val clear_db_if_idle : unit -> unit Fiber.t val load_db : unit -> File.t list val filter_db : Dune_rpc_private.Files_to_promote.t -> File.t list -> File.t list diff --git a/test/blackbox-tests/test-cases/concurrent-hook-race.t b/test/blackbox-tests/test-cases/concurrent-hook-race.t new file mode 100644 index 00000000000..065e18a2fea --- /dev/null +++ b/test/blackbox-tests/test-cases/concurrent-hook-race.t @@ -0,0 +1,63 @@ +Test that promotion hooks work correctly with the new architecture. + +Setup project: + + $ cat > dune-project << EOF + > (lang dune 3.16) + > EOF + + $ cat > dune << EOF + > (rule + > (target slow1.txt) + > (action (system "sleep 0.1; echo slow1 > slow1.txt"))) + > (rule + > (target slow2.txt) + > (action (system "sleep 0.1; echo slow2 > slow2.txt"))) + > (alias + > (name build1) + > (deps slow1.txt)) + > (alias + > (name build2) + > (deps slow2.txt)) + > (rule + > (alias runtest) + > (action (diff expected1.txt slow1.txt))) + > EOF + + $ cat > expected1.txt << EOF + > slow1 + > EOF + +Test 1: Sequential builds work correctly + + $ dune build @build1 + $ dune build @build2 + $ ls _build/default/*.txt + _build/default/slow1.txt + _build/default/slow2.txt + +Test 2: Promotion works correctly + + $ cat > expected1.txt << EOF + > wrong + > EOF + + $ dune build @runtest 2>&1 | head -3 + File "expected1.txt", line 1, characters 0-0: + Error: Files _build/default/expected1.txt and _build/default/slow1.txt + differ. + +Promotion should execute hooks exactly once: + + $ dune promote + Promoting _build/default/slow1.txt to expected1.txt. + + $ cat expected1.txt + slow1 + +Verify build succeeds after promotion: + + $ dune build @runtest + +Note: Full testing of hook isolation during concurrent builds requires +RPC + watch mode interaction, which cannot be demonstrated in cram tests. diff --git a/test/blackbox-tests/test-cases/true-concurrent-builds.t b/test/blackbox-tests/test-cases/true-concurrent-builds.t new file mode 100644 index 00000000000..41720636728 --- /dev/null +++ b/test/blackbox-tests/test-cases/true-concurrent-builds.t @@ -0,0 +1,63 @@ +Test that builds work correctly with the new Build_coordinator architecture. + +Setup project with multiple independent targets: + + $ cat > dune-project << EOF + > (lang dune 3.16) + > EOF + + $ cat > dune << EOF + > (rule + > (target a.txt) + > (action (bash "echo 'Building A'; sleep 0.05; echo A > a.txt"))) + > (rule + > (target b.txt) + > (action (bash "echo 'Building B'; sleep 0.05; echo B > b.txt"))) + > (rule + > (target c.txt) + > (action (bash "echo 'Building C'; sleep 0.05; echo C > c.txt"))) + > (alias + > (name build-a) + > (deps a.txt)) + > (alias + > (name build-b) + > (deps b.txt)) + > (alias + > (name build-c) + > (deps c.txt)) + > EOF + +Test 1: Basic builds work with new architecture + + $ dune build @build-a + Building A + + $ dune build @build-b + Building B + + $ cat _build/default/a.txt + A + $ cat _build/default/b.txt + B + +Test 2: Sequential builds complete correctly + + $ dune clean + $ dune build @build-a @build-b @build-c + Building A + Building B + Building C + + $ ls _build/default/*.txt + _build/default/a.txt + _build/default/b.txt + _build/default/c.txt + +With the new Build_coordinator architecture, each build gets its own +isolated session with separate progress, errors, and hooks. This enables +RPC and watch mode builds to execute concurrently without serializing +at a mutex. + +Note: Cram tests are sequential by nature and can't demonstrate true +parallelism. Full concurrent execution testing requires RPC + watch mode +in a real environment. diff --git a/test/blackbox-tests/test-cases/watching/cancellation-with-concurrent-builds.t b/test/blackbox-tests/test-cases/watching/cancellation-with-concurrent-builds.t new file mode 100644 index 00000000000..b7fd19e43a9 --- /dev/null +++ b/test/blackbox-tests/test-cases/watching/cancellation-with-concurrent-builds.t @@ -0,0 +1,64 @@ +Watch cancellation with concurrent RPC builds continues RPC builds to completion. + + $ . ./helpers.sh + + $ cat > dune-project << EOF + > (lang dune 3.16) + > EOF + + $ cat > input.txt << EOF + > initial + > EOF + + $ cat > dune << EOF + > (rule + > (target watch-target.txt) + > (deps input.txt) + > (action (bash "sleep 0.2; cat input.txt > watch-target.txt"))) + > (rule + > (target rpc-target.txt) + > (action (bash "sleep 0.3; echo rpc > rpc-target.txt"))) + > (alias + > (name watch-build) + > (deps watch-target.txt)) + > (alias + > (name rpc-build) + > (deps rpc-target.txt)) + > EOF + + $ start_dune + +Start slow RPC build, then trigger watch build and modify source to cancel it. + + $ build @rpc-build > rpc-output 2>&1 & + $ RPC_PID=$! + + $ build @watch-build > watch-output 2>&1 & + $ WATCH_PID=$! + $ sleep 0.1 + $ cat > input.txt << EOF + > modified + > EOF + +RPC build completes despite watch cancellation. + + $ wait $RPC_PID + $ cat rpc-output + Success + + $ wait $WATCH_PID || echo "Watch interrupted as expected" + Watch interrupted as expected + + $ cat _build/default/rpc-target.txt + rpc + +Watch mode restarts and builds with modified input. + + $ build @watch-build + Success + + $ cat _build/default/watch-target.txt + modified + + $ stop_dune + Success, waiting for filesystem changes... diff --git a/test/blackbox-tests/test-cases/watching/concurrent-promotions.t b/test/blackbox-tests/test-cases/watching/concurrent-promotions.t new file mode 100644 index 00000000000..965a14e3d98 --- /dev/null +++ b/test/blackbox-tests/test-cases/watching/concurrent-promotions.t @@ -0,0 +1,72 @@ +Concurrent builds with promotions maintain per-session isolation. + + $ . ./helpers.sh + + $ cat > dune-project << EOF + > (lang dune 3.16) + > EOF + + $ cat > dune << EOF + > (rule + > (target file1.gen) + > (action (bash "echo 'content1' > file1.gen"))) + > (rule + > (alias check1) + > (action (diff file1.expected file1.gen))) + > (rule + > (target file2.gen) + > (action (bash "echo 'content2' > file2.gen"))) + > (rule + > (alias check2) + > (action (diff file2.expected file2.gen))) + > (rule + > (target file3.gen) + > (action (bash "echo 'content3' > file3.gen"))) + > (rule + > (alias check3) + > (action (diff file3.expected file3.gen))) + > EOF + + $ cat > file1.expected << EOF + > wrong1 + > EOF + + $ cat > file2.expected << EOF + > wrong2 + > EOF + + $ cat > file3.expected << EOF + > wrong3 + > EOF + + $ start_dune + +Trigger three concurrent builds that will fail with diffs. + + $ build @check1 > check1-output 2>&1 & + $ CHECK1_PID=$! + $ build @check2 > check2-output 2>&1 & + $ CHECK2_PID=$! + $ build @check3 > check3-output 2>&1 & + $ CHECK3_PID=$! + + $ wait $CHECK1_PID || true + $ wait $CHECK2_PID || true + $ wait $CHECK3_PID || true + +All three should report diffs. + + $ grep -q "content1" check1-output && echo "check1 detected diff" + check1 detected diff + $ grep -q "content2" check2-output && echo "check2 detected diff" + check2 detected diff + $ grep -q "content3" check3-output && echo "check3 detected diff" + check3 detected diff + +The promotion database should contain all three files. + + $ cat _build/.to-promote | grep -c "file[123].gen" + 3 + + $ stop_dune + Success, waiting for filesystem changes... diff --git a/test/blackbox-tests/test-cases/watching/rpc-watch-overlap.t b/test/blackbox-tests/test-cases/watching/rpc-watch-overlap.t new file mode 100644 index 00000000000..af3dd86f7db --- /dev/null +++ b/test/blackbox-tests/test-cases/watching/rpc-watch-overlap.t @@ -0,0 +1,47 @@ +RPC build during watch mode build executes concurrently without state corruption. + + $ . ./helpers.sh + + $ cat > dune-project << EOF + > (lang dune 3.16) + > EOF + + $ cat > dune << EOF + > (rule + > (target slow.txt) + > (action (bash "sleep 0.3; echo slow > slow.txt"))) + > (rule + > (target fast.txt) + > (action (bash "echo fast > fast.txt"))) + > (alias + > (name slow-build) + > (deps slow.txt)) + > (alias + > (name fast-build) + > (deps fast.txt)) + > EOF + + $ start_dune + +Trigger slow build via RPC, then start fast build while slow is running. + + $ build @slow-build > slow-output 2>&1 & + $ SLOW_PID=$! + + $ sleep 0.1 + $ build @fast-build + Success + +Both builds should complete successfully. + + $ wait $SLOW_PID + $ cat slow-output + Success + + $ cat _build/default/slow.txt + slow + $ cat _build/default/fast.txt + fast + + $ stop_dune + Success, waiting for filesystem changes...