diff --git a/Cargo.lock b/Cargo.lock index f83b5604d86cc..bda57d08a8ec0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7979,7 +7979,6 @@ dependencies = [ "tracing", "turbopath", "turborepo-cache", - "turborepo-daemon", "turborepo-hash", "turborepo-repository", "turborepo-scm", diff --git a/crates/turborepo-lib/src/cli/mod.rs b/crates/turborepo-lib/src/cli/mod.rs index 017e17b0e6d26..1de41258541ab 100644 --- a/crates/turborepo-lib/src/cli/mod.rs +++ b/crates/turborepo-lib/src/cli/mod.rs @@ -1727,12 +1727,18 @@ pub async fn run( let mut client = WatchClient::new(base, *experimental_write_cache, event, query_server.clone()) .await?; - if let Err(e) = client.start().await { - client.shutdown().await; - return Err(e.into()); + match client.start().await { + Ok(()) => {} + Err(crate::run::watch::Error::SignalInterrupt) => { + // Normal shutdown via Ctrl+C — not an error. + } + Err(e) => { + client.shutdown().await; + return Err(e.into()); + } } - // We only exit if we get a signal, so we return a non-zero exit code - return Ok(1); + client.shutdown().await; + return Ok(0); } Command::Prune { scope, diff --git a/crates/turborepo-lib/src/package_changes_watcher.rs b/crates/turborepo-lib/src/package_changes_watcher.rs index 58561f72c531e..2bec5004e806f 100644 --- a/crates/turborepo-lib/src/package_changes_watcher.rs +++ b/crates/turborepo-lib/src/package_changes_watcher.rs @@ -441,9 +441,27 @@ impl Subscriber { else { return; }; - // We store the hash of the package's files. If the hash is already - // in here, we don't need to recompute it + // Pre-populate hash baselines for all known packages. Without + // this, the first file change for each package would always be + // treated as "new" (no old hash to compare against), causing + // spurious rebuilds from build output writes on the initial run. let mut package_file_hashes = HashMap::new(); + for (name, info) in repo_state.pkg_dep_graph.packages() { + let pkg = WorkspacePackage { + name: name.clone(), + path: info.package_path().to_owned(), + }; + if let Ok(hash) = self + .hash_watcher + .get_file_hashes(HashSpec { + package_path: pkg.path.clone(), + inputs: InputGlobs::Default, + }) + .await + { + package_file_hashes.insert(pkg.path, hash); + } + } let mut change_mapper = match repo_state.get_change_mapper() { Some(change_mapper) => change_mapper, diff --git a/crates/turborepo-lib/src/run/builder.rs b/crates/turborepo-lib/src/run/builder.rs index f3e5c9d1aea7b..78361bbccf299 100644 --- a/crates/turborepo-lib/src/run/builder.rs +++ b/crates/turborepo-lib/src/run/builder.rs @@ -11,7 +11,6 @@ use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; use turborepo_analytics::{start_analytics, AnalyticsHandle}; use turborepo_api_client::{APIAuth, APIClient}; use turborepo_cache::AsyncCache; -use turborepo_daemon::{DaemonClient, DaemonConnector}; use turborepo_env::EnvironmentVariableMap; use turborepo_errors::Spanned; use turborepo_process::ProcessManager; @@ -61,11 +60,11 @@ pub struct RunBuilder { should_validate_engine: bool, // If true, we will add all tasks to the graph, even if they are not specified add_all_tasks: bool, - // When running under `turbo watch`, a daemon client is needed so that + // When running under `turbo watch`, an output watcher is needed so that // the run cache can register output globs and skip restoring outputs // that are already on disk. Without this, cache restores write files // that trigger the file watcher, causing an infinite rebuild loop. - daemon_client: Option>, + output_watcher: Option>, query_server: Option>, } @@ -108,7 +107,7 @@ impl RunBuilder { should_print_prelude_override: None, should_validate_engine: true, add_all_tasks: false, - daemon_client: None, + output_watcher: None, query_server: None, }) } @@ -118,8 +117,11 @@ impl RunBuilder { self } - pub fn with_daemon_client(mut self, client: DaemonClient) -> Self { - self.daemon_client = Some(client); + pub fn with_output_watcher( + mut self, + watcher: Arc, + ) -> Self { + self.output_watcher = Some(watcher); self } @@ -471,7 +473,7 @@ impl RunBuilder { &self.repo_root, self.opts.runcache_opts, &self.opts.cache_opts, - self.daemon_client, + self.output_watcher, self.color_config, self.opts.run_opts.dry_run.is_some(), )); diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index 09ce6c51d643c..aacec87caf015 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -2,21 +2,25 @@ use std::{ collections::HashSet, ops::DerefMut as _, sync::{Arc, Mutex}, + time::Duration, }; -use futures::{future::join_all, StreamExt}; use miette::{Diagnostic, SourceSpan}; use thiserror::Error; use tokio::{ select, - sync::{oneshot, Notify}, + sync::{broadcast, oneshot, Notify}, task::JoinHandle, }; use tracing::{instrument, trace}; -use turborepo_daemon::{ - proto, DaemonClient, DaemonConnector, DaemonConnectorError, DaemonError, Paths, +use turborepo_daemon::{PackageChangeEvent, PackageChangesWatcher as PackageChangesWatcherTrait}; +use turborepo_filewatch::{ + cookies::CookieWriter, globwatcher::GlobWatcher, hash_watcher::HashWatcher, + package_watcher::PackageWatcher, FileSystemWatcher, }; use turborepo_repository::package_graph::PackageName; +use turborepo_run_cache::{OutputWatcher, OutputWatcherError}; +use turborepo_scm::SCM; use turborepo_signals::{listeners::get_signal, SignalHandler}; use turborepo_telemetry::events::command::CommandEventBuilder; use turborepo_ui::sender::UISender; @@ -26,6 +30,7 @@ use crate::{ config::resolve_turbo_config_path, engine::{EngineExt, TaskNode}, get_version, opts, + package_changes_watcher::PackageChangesWatcher, run::{self, builder::RunBuilder, scope::target_selector::InvalidSelectorError, Run}, }; @@ -59,16 +64,77 @@ impl ChangedPackages { } } +/// In-process file watching infrastructure that replaces the daemon. +/// All components are standalone structs from `turborepo-filewatch` +/// and `turborepo-lib` — no gRPC or IPC involved. +struct FileWatching { + // Kept alive so the OS-level watcher keeps running. + _watcher: Arc, + glob_watcher: Arc, + // Kept alive so its background tasks continue providing package + // discovery data to the HashWatcher. + _package_watcher: Arc, + // Kept alive to maintain the watcher background task. + _package_changes_watcher: PackageChangesWatcher, +} + +/// Adapts `GlobWatcher` to the `OutputWatcher` trait so it can be passed +/// to `RunCache`/`TaskCache` for output change tracking. +struct InProcessOutputWatcher { + glob_watcher: Arc, +} + +impl OutputWatcher for InProcessOutputWatcher { + fn get_changed_outputs( + &self, + hash: String, + output_globs: Vec, + ) -> std::pin::Pin< + Box, OutputWatcherError>> + Send>, + > { + let glob_watcher = self.glob_watcher.clone(); + let candidates: HashSet = output_globs.into_iter().collect(); + Box::pin(async move { + glob_watcher + .get_changed_globs(hash, candidates, Duration::from_millis(100)) + .await + .map_err(|e| OutputWatcherError(Box::new(e))) + }) + } + + fn notify_outputs_written( + &self, + hash: String, + output_globs: Vec, + output_exclusion_globs: Vec, + _time_saved: u64, + ) -> std::pin::Pin> + Send>> + { + let glob_watcher = self.glob_watcher.clone(); + Box::pin(async move { + let glob_set = turborepo_filewatch::globwatcher::GlobSet::from_raw( + output_globs, + output_exclusion_globs, + ) + .map_err(|e| OutputWatcherError(Box::new(e)))?; + glob_watcher + .watch_globs(hash, glob_set, Duration::from_millis(100)) + .await + .map_err(|e| OutputWatcherError(Box::new(e))) + }) + } +} + pub struct WatchClient { run: Arc, watched_packages: HashSet, persistent_tasks_handle: Option, active_runs: Vec, - connector: DaemonConnector, - // A daemon client used by the run cache to register output globs and check - // whether outputs have changed. This prevents cache restores from writing - // files that trigger the file watcher and cause infinite rebuild loops. - daemon_client: DaemonClient, + _watching: FileWatching, + output_watcher: Arc, + // Subscribed eagerly (before building the Run) so we don't miss the + // initial Rediscover event from the PackageChangesWatcher. + package_change_events: broadcast::Receiver, base: CommandBase, telemetry: CommandEventBuilder, handler: SignalHandler, @@ -85,13 +151,10 @@ struct RunHandle { #[derive(Debug, Error, Diagnostic)] pub enum Error { - #[error("Failed to connect to daemon.")] - #[diagnostic(transparent)] - Daemon(#[from] DaemonError), - #[error("Failed to connect to daemon.")] - DaemonConnector(#[from] DaemonConnectorError), - #[error("Failed to decode message from daemon.")] - Decode(#[from] prost::DecodeError), + #[error("File watcher error: {0}")] + FileWatcher(#[from] turborepo_filewatch::WatchError), + #[error("Package watcher error: {0}")] + PackageWatcher(String), #[error("Could not get current executable.")] CurrentExe(std::io::Error), #[error("Could not start `turbo`.")] @@ -112,23 +175,21 @@ pub enum Error { #[label] span: SourceSpan, }, - #[error("Daemon connection closed.")] - ConnectionClosed, #[error( - "Timed out waiting for the daemon's file watcher to become ready. The daemon may be \ - having trouble watching your repository. Try running `turbo daemon clean` and retrying." + "Timed out waiting for the file watcher to become ready. Try running `turbo daemon clean` \ + and retrying." )] - DaemonFileWatchingTimeout, + FileWatchingTimeout, #[error("Failed to subscribe to signal handler. Shutting down.")] NoSignalHandler, #[error("Watch interrupted due to signal.")] SignalInterrupt, - #[error("Package change error.")] - PackageChange(#[from] tonic::Status), + #[error("Package change channel closed.")] + PackageChangeClosed, + #[error("Package change channel lagged.")] + PackageChangeLagged, #[error(transparent)] UI(#[from] turborepo_ui::Error), - #[error("Could not connect to UI thread: {0}")] - UISend(String), #[error("Invalid config: {0}")] Config(#[from] crate::config::Error), #[error(transparent)] @@ -147,7 +208,6 @@ impl WatchClient { let standard_config_path = resolve_turbo_config_path(&base.repo_root)?; - // Determine if we're using a custom turbo.json path let custom_turbo_json_path = if base.opts.repo_opts.root_turbo_json_path != standard_config_path { tracing::info!( @@ -160,22 +220,56 @@ impl WatchClient { None }; - let connector = DaemonConnector { - can_start_server: true, - can_kill_server: true, - paths: Paths::from_repo_root(&base.repo_root), + // Build the in-process file watching stack (replaces the daemon). + let watcher = Arc::new(FileSystemWatcher::new_with_default_cookie_dir( + &base.repo_root, + )?); + let recv = watcher.watch(); + let cookie_writer = CookieWriter::new( + watcher.cookie_dir(), + Duration::from_millis(100), + recv.clone(), + ); + let glob_watcher = Arc::new(GlobWatcher::new( + base.repo_root.clone(), + cookie_writer.clone(), + recv.clone(), + )); + let package_watcher = Arc::new( + PackageWatcher::new(base.repo_root.clone(), recv.clone(), cookie_writer) + .map_err(|e| Error::PackageWatcher(format!("{e:?}")))?, + ); + let scm = SCM::new(&base.repo_root); + let hash_watcher = Arc::new(HashWatcher::new( + base.repo_root.clone(), + package_watcher.watch_discovery(), + recv.clone(), + scm, + )); + let package_changes_watcher = PackageChangesWatcher::new( + base.repo_root.clone(), + recv, + hash_watcher, custom_turbo_json_path, + ); + + // Subscribe before building the Run so we don't miss the initial + // Rediscover event that PackageChangesWatcher emits on startup. + let package_change_events = package_changes_watcher.package_changes().await; + + let watching = FileWatching { + _watcher: watcher, + glob_watcher: glob_watcher.clone(), + _package_watcher: package_watcher, + _package_changes_watcher: package_changes_watcher, }; - // Connect a daemon client for the run cache. This allows the cache to - // register output globs with the daemon's GlobWatcher and skip restoring - // outputs that are already on disk, preventing the file watcher from - // seeing restored files as changes and causing an infinite rebuild loop. - let daemon_client = connector.clone().connect().await?; + let output_watcher: Arc = + Arc::new(InProcessOutputWatcher { glob_watcher }); let new_base = base.clone(); let mut run_builder = - RunBuilder::new(new_base, None)?.with_daemon_client(daemon_client.clone()); + RunBuilder::new(new_base, None)?.with_output_watcher(output_watcher.clone()); if let Some(ref qs) = query_server { run_builder = run_builder.with_query_server(qs.clone()); } @@ -190,8 +284,9 @@ impl WatchClient { base, run, watched_packages, - connector, - daemon_client, + _watching: watching, + output_watcher, + package_change_events, handler, telemetry, experimental_write_cache, @@ -204,63 +299,65 @@ impl WatchClient { } pub async fn start(&mut self) -> Result<(), Error> { - let connector = self.connector.clone(); - let mut client = connector.connect().await?; - - let mut events = client.package_changes().await?; + let mut events = std::mem::replace( + &mut self.package_change_events, + // Replace with a dummy receiver. The real one is consumed above. + broadcast::channel(1).1, + ); - // Wait for the initial event from the daemon with a timeout. - // The daemon sends a Rediscover event immediately when the stream opens, - // but the stream won't produce anything until the daemon's file watcher - // is ready. If it never becomes ready, we'd hang here forever. - let initial_event = tokio::time::timeout(std::time::Duration::from_secs(10), events.next()) + // Wait for the initial Rediscover event, which signals that the file + // watcher is ready. The PackageChangesWatcher emits this on startup. + let initial_event = tokio::time::timeout(std::time::Duration::from_secs(10), events.recv()) .await - .map_err(|_| Error::DaemonFileWatchingTimeout)? - .ok_or(Error::ConnectionClosed)?; - let initial_event = initial_event?; + .map_err(|_| Error::FileWatchingTimeout)?; + let initial_event = match initial_event { + Ok(event) => event, + Err(broadcast::error::RecvError::Closed) => return Err(Error::PackageChangeClosed), + Err(broadcast::error::RecvError::Lagged(_)) => return Err(Error::PackageChangeLagged), + }; let signal_subscriber = self.handler.subscribe().ok_or(Error::NoSignalHandler)?; - // We explicitly use a tokio::sync::Mutex here to avoid deadlocks. - // If we used a std::sync::Mutex, we could deadlock by spinning the lock - // and not yielding back to the tokio runtime. - let changed_packages = Mutex::new(ChangedPackages::default()); + let pending_changes = Mutex::new(ChangedPackages::default()); let notify_run = Arc::new(Notify::new()); let notify_event = notify_run.clone(); - // Process the initial event - Self::handle_change_event(&changed_packages, initial_event.event.unwrap())?; + Self::handle_change_event(&pending_changes, initial_event); notify_event.notify_one(); let event_fut = async { - while let Some(event) = events.next().await { - let event = event?; - Self::handle_change_event(&changed_packages, event.event.unwrap())?; - notify_event.notify_one(); + loop { + match events.recv().await { + Ok(event) => { + Self::handle_change_event(&pending_changes, event); + notify_event.notify_one(); + } + Err(broadcast::error::RecvError::Closed) => { + return Err(Error::PackageChangeClosed); + } + Err(broadcast::error::RecvError::Lagged(_)) => { + Self::handle_change_event(&pending_changes, PackageChangeEvent::Rediscover); + notify_event.notify_one(); + } + } } - - Err(Error::ConnectionClosed) }; let run_fut = async { loop { notify_run.notified().await; let some_changed_packages = { - let mut changed_packages_guard = - changed_packages.lock().expect("poisoned lock"); - (!changed_packages_guard.is_empty()) - .then(|| std::mem::take(changed_packages_guard.deref_mut())) + let mut guard = pending_changes.lock().expect("poisoned lock"); + (!guard.is_empty()).then(|| std::mem::take(guard.deref_mut())) }; if let Some(mut changed_packages) = some_changed_packages { - // Clean up currently running tasks - self.active_runs.retain(|h| !h.run_task.is_finished()); - - // Safe to filter early: the engine only contains tasks from - // watched_packages, so unwatched packages can't impact any - // running tasks. - changed_packages.filter_to_watched(&self.watched_packages); - + // Stop impacted tasks and wait for prior runs to finish + // before starting new ones. This prevents: + // - Concurrent builds of the same package (Next.js lock conflicts, duplicated + // output) + // - A cache-hit run incorrectly signaling persistent task readiness when the + // real build failed match changed_packages { ChangedPackages::Some(ref pkgs) => { let impacted = self.stop_impacted_tasks(pkgs).await; @@ -273,8 +370,20 @@ impl WatchClient { } } } + + changed_packages.filter_to_watched(&self.watched_packages); + let new_run = self.execute_run(changed_packages).await?; self.active_runs.push(new_run); + + // Wait for runs to complete before processing more events. + // Combined with the hash baseline pre-population in + // PackageChangesWatcher, this ensures build output writes + // don't trigger spurious rebuilds. + for handle in &mut self.active_runs { + let _ = (&mut handle.run_task).await; + } + self.active_runs.retain(|h| !h.run_task.is_finished()); } } }; @@ -295,35 +404,36 @@ impl WatchClient { } #[instrument(skip(changed_packages))] - fn handle_change_event( - changed_packages: &Mutex, - event: proto::package_change_event::Event, - ) -> Result<(), Error> { - // Should we recover here? + fn handle_change_event(changed_packages: &Mutex, event: PackageChangeEvent) { match event { - proto::package_change_event::Event::PackageChanged(proto::PackageChanged { - package_name, - }) => { - let package_name = PackageName::from(package_name); - + PackageChangeEvent::Package { name } => { match changed_packages.lock().expect("poisoned lock").deref_mut() { ChangedPackages::All => { - // If we've already changed all packages, ignore + // Already rediscovering everything, ignore } ChangedPackages::Some(ref mut pkgs) => { - pkgs.insert(package_name); + pkgs.insert(name); } } } - proto::package_change_event::Event::RediscoverPackages(_) => { + PackageChangeEvent::Rediscover => { *changed_packages.lock().expect("poisoned lock") = ChangedPackages::All; } - proto::package_change_event::Event::Error(proto::PackageChangeError { message }) => { - return Err(DaemonError::Unavailable(message).into()); - } } + } - Ok(()) + pub async fn shutdown(&mut self) { + if let Some(sender) = &self.ui_sender { + sender.stop().await; + } + for handle in self.active_runs.drain(..) { + handle.stopper.stop().await; + let _ = handle.run_task.await; + } + if let Some(RunHandle { stopper, run_task }) = self.persistent_tasks_handle.take() { + stopper.stop().await; + let _ = run_task.await; + } } async fn stop_impacted_tasks(&self, pkgs: &HashSet) -> HashSet { @@ -331,7 +441,6 @@ impl WatchClient { let impacted_nodes = engine.tasks_impacted_by_packages(pkgs); - // Extract task IDs from task nodes (filtering out Root nodes) let task_ids: Vec<_> = impacted_nodes .iter() .filter_map(|node| match node { @@ -340,49 +449,30 @@ impl WatchClient { }) .collect(); - // Collect unique impacted packages - let impacted_packages: HashSet<_> = task_ids + let impacted_packages: HashSet = task_ids .iter() .map(|task_id| PackageName::from(task_id.package())) .collect(); - join_all( - self.active_runs - .iter() - .map(|handle| handle.stopper.stop_tasks(&task_ids)), - ) - .await; + for handle in &self.active_runs { + handle.stopper.stop_tasks(&task_ids).await; + } impacted_packages } - /// Shut down any resources that run as part of watch. - pub async fn shutdown(&mut self) { - if let Some(sender) = &self.ui_sender { - sender.stop().await; - } - for handle in self.active_runs.drain(..) { - handle.stopper.stop().await; - let _ = handle.run_task.await; - } - if let Some(RunHandle { stopper, run_task }) = self.persistent_tasks_handle.take() { - // Shut down the tasks for the run - stopper.stop().await; - // Run should exit shortly after we stop all child tasks, wait for it to finish - // to ensure all messages are flushed. - let _ = run_task.await; - } - } - - /// Executes a run with the given changed packages. Splits the run into two - /// parts: - /// 1. The persistent tasks that are not allowed to be interrupted + /// Start executing tasks. + /// + /// If `changed_packages` is `Some(set)`, only tasks in those packages run. + /// If `All`, we rebuild the entire Run struct and re-run everything. + /// + /// Persistent (non-interruptible) tasks are split into a separate handle: + /// 1. First we run non-persistent + interruptible tasks /// 2. The non-persistent tasks and the persistent tasks that are allowed to /// be interrupted /// /// Returns a handle to the task running (2) async fn execute_run(&mut self, changed_packages: ChangedPackages) -> Result { - // Should we recover here? trace!("handling run with changed packages: {changed_packages:?}"); match changed_packages { ChangedPackages::Some(packages) => { @@ -403,7 +493,7 @@ impl WatchClient { let telemetry = self.telemetry.clone(); let mut run_builder = RunBuilder::new(new_base, None)? - .with_daemon_client(self.daemon_client.clone()) + .with_output_watcher(self.output_watcher.clone()) .with_entrypoint_packages(packages) .hide_prelude(); if let Some(ref qs) = self.query_server { @@ -413,9 +503,9 @@ impl WatchClient { if let Some(sender) = &self.ui_sender { let task_names = run.engine.tasks_with_command(&run.pkg_dep_graph); - sender - .restart_tasks(task_names) - .map_err(|err| Error::UISend(format!("some packages changed: {err}")))?; + if let Err(err) = sender.restart_tasks(task_names) { + tracing::warn!("failed to notify UI of restarted tasks: {err}"); + } } let ui_sender = self.ui_sender.clone(); @@ -438,9 +528,8 @@ impl WatchClient { self.base.color_config, ); - // rebuild run struct let mut run_builder = RunBuilder::new(base.clone(), None)? - .with_daemon_client(self.daemon_client.clone()) + .with_output_watcher(self.output_watcher.clone()) .hide_prelude(); if let Some(ref qs) = self.query_server { run_builder = run_builder.with_query_server(qs.clone()); @@ -452,19 +541,15 @@ impl WatchClient { self.watched_packages = self.run.get_relevant_packages(); - // Clean up currently running persistent tasks if let Some(RunHandle { stopper, run_task }) = self.persistent_tasks_handle.take() { - // Shut down the tasks for the run stopper.stop().await; - // Run should exit shortly after we stop all child tasks, wait for it to finish - // to ensure all messages are flushed. let _ = run_task.await; } if let Some(sender) = &self.ui_sender { let task_names = self.run.engine.tasks_with_command(&self.run.pkg_dep_graph); - sender - .update_tasks(task_names) - .map_err(|err| Error::UISend(format!("all packages changed {err}")))?; + if let Err(err) = sender.update_tasks(task_names) { + tracing::warn!("failed to notify UI of updated tasks: {err}"); + } } if self.run.has_non_interruptible_tasks() { @@ -481,12 +566,8 @@ impl WatchClient { let non_persistent_ui_sender = self.ui_sender.clone(); let persistent_ui_sender = self.ui_sender.clone(); - // Signal from non-persistent run to persistent run: non-persistent - // tasks finished successfully, so it's safe to start persistent ones. let (ready_tx, ready_rx) = oneshot::channel::<()>(); - // If we have persistent tasks, we run them on a separate thread - // since persistent tasks don't finish let persistent_task = tokio::spawn(async move { match ready_rx.await { Ok(()) => persistent_run.run(persistent_ui_sender, true).await, @@ -529,25 +610,19 @@ mod test { use std::{collections::HashSet, sync::Mutex}; use tokio::sync::oneshot; - use turborepo_daemon::proto; + use turborepo_daemon::PackageChangeEvent; use turborepo_repository::package_graph::PackageName; use super::{ChangedPackages, WatchClient}; - fn make_package_changed(name: &str) -> proto::package_change_event::Event { - proto::package_change_event::Event::PackageChanged(proto::PackageChanged { - package_name: name.to_string(), - }) - } - - fn make_rediscover() -> proto::package_change_event::Event { - proto::package_change_event::Event::RediscoverPackages(proto::RediscoverPackages {}) + fn make_package_changed(name: &str) -> PackageChangeEvent { + PackageChangeEvent::Package { + name: PackageName::from(name), + } } - fn make_error(msg: &str) -> proto::package_change_event::Event { - proto::package_change_event::Event::Error(proto::PackageChangeError { - message: msg.to_string(), - }) + fn make_rediscover() -> PackageChangeEvent { + PackageChangeEvent::Rediscover } #[test] @@ -572,7 +647,7 @@ mod test { #[test] fn handle_change_event_package_changed_inserts() { let changed = Mutex::new(ChangedPackages::default()); - WatchClient::handle_change_event(&changed, make_package_changed("web")).unwrap(); + WatchClient::handle_change_event(&changed, make_package_changed("web")); let guard = changed.lock().unwrap(); match &*guard { @@ -587,9 +662,9 @@ mod test { #[test] fn handle_change_event_multiple_packages_accumulate() { let changed = Mutex::new(ChangedPackages::default()); - WatchClient::handle_change_event(&changed, make_package_changed("web")).unwrap(); - WatchClient::handle_change_event(&changed, make_package_changed("ui")).unwrap(); - WatchClient::handle_change_event(&changed, make_package_changed("utils")).unwrap(); + WatchClient::handle_change_event(&changed, make_package_changed("web")); + WatchClient::handle_change_event(&changed, make_package_changed("ui")); + WatchClient::handle_change_event(&changed, make_package_changed("utils")); let guard = changed.lock().unwrap(); match &*guard { @@ -606,8 +681,8 @@ mod test { #[test] fn handle_change_event_duplicate_package_deduplicates() { let changed = Mutex::new(ChangedPackages::default()); - WatchClient::handle_change_event(&changed, make_package_changed("web")).unwrap(); - WatchClient::handle_change_event(&changed, make_package_changed("web")).unwrap(); + WatchClient::handle_change_event(&changed, make_package_changed("web")); + WatchClient::handle_change_event(&changed, make_package_changed("web")); let guard = changed.lock().unwrap(); match &*guard { @@ -619,8 +694,8 @@ mod test { #[test] fn handle_change_event_rediscover_sets_all() { let changed = Mutex::new(ChangedPackages::default()); - WatchClient::handle_change_event(&changed, make_package_changed("web")).unwrap(); - WatchClient::handle_change_event(&changed, make_rediscover()).unwrap(); + WatchClient::handle_change_event(&changed, make_package_changed("web")); + WatchClient::handle_change_event(&changed, make_rediscover()); let guard = changed.lock().unwrap(); assert!(matches!(*guard, ChangedPackages::All)); @@ -629,25 +704,17 @@ mod test { #[test] fn handle_change_event_package_changed_after_all_is_noop() { let changed = Mutex::new(ChangedPackages::All); - WatchClient::handle_change_event(&changed, make_package_changed("web")).unwrap(); + WatchClient::handle_change_event(&changed, make_package_changed("web")); let guard = changed.lock().unwrap(); assert!(matches!(*guard, ChangedPackages::All)); } - #[test] - fn handle_change_event_error_returns_err() { - let changed = Mutex::new(ChangedPackages::default()); - let result = - WatchClient::handle_change_event(&changed, make_error("daemon is unavailable")); - assert!(result.is_err()); - } - #[test] fn handle_change_event_rediscover_then_rediscover_stays_all() { let changed = Mutex::new(ChangedPackages::default()); - WatchClient::handle_change_event(&changed, make_rediscover()).unwrap(); - WatchClient::handle_change_event(&changed, make_rediscover()).unwrap(); + WatchClient::handle_change_event(&changed, make_rediscover()); + WatchClient::handle_change_event(&changed, make_rediscover()); let guard = changed.lock().unwrap(); assert!(matches!(*guard, ChangedPackages::All)); @@ -727,7 +794,7 @@ mod test { #[test] fn changed_packages_take_resets_to_default() { let changed = Mutex::new(ChangedPackages::default()); - WatchClient::handle_change_event(&changed, make_package_changed("web")).unwrap(); + WatchClient::handle_change_event(&changed, make_package_changed("web")); let taken = { let mut guard = changed.lock().unwrap(); @@ -735,11 +802,9 @@ mod test { std::mem::take(&mut *guard) }; - // After take, the mutex should hold an empty Some let guard = changed.lock().unwrap(); assert!(guard.is_empty()); - // The taken value should have the package match taken { ChangedPackages::Some(pkgs) => { assert!(pkgs.contains(&PackageName::from("web"))); @@ -750,16 +815,8 @@ mod test { // ----------------------------------------------------------------------- // Oneshot coordination pattern tests - // - // These test the contract used in execute_run to gate persistent tasks - // behind non-persistent task completion. The pattern: - // - Non-persistent run sends on ready_tx only when result is Ok(0) - // - Persistent run waits on ready_rx before starting - // - If ready_tx is dropped (failure/cancellation), persistent run exits // ----------------------------------------------------------------------- - /// Simulates the non-persistent side of the coordination: only sends the - /// ready signal when the run result is Ok(0). fn simulate_non_persistent( ready_tx: oneshot::Sender<()>, result: Result, @@ -834,7 +891,6 @@ mod test { } }); - // Simulate non-persistent task being cancelled: sender dropped drop(ready_tx); let outcome = persistent.await.unwrap(); diff --git a/crates/turborepo-run-cache/Cargo.toml b/crates/turborepo-run-cache/Cargo.toml index 495989084de1f..abbeb0fecb219 100644 --- a/crates/turborepo-run-cache/Cargo.toml +++ b/crates/turborepo-run-cache/Cargo.toml @@ -17,7 +17,6 @@ tokio = { workspace = true, features = ["sync"] } tracing = { workspace = true } turbopath = { workspace = true } turborepo-cache = { workspace = true } -turborepo-daemon = { workspace = true } turborepo-hash = { workspace = true } turborepo-repository = { path = "../turborepo-repository" } turborepo-scm = { workspace = true } diff --git a/crates/turborepo-run-cache/src/lib.rs b/crates/turborepo-run-cache/src/lib.rs index fb44ee76165ad..6f3f2be6814e4 100644 --- a/crates/turborepo-run-cache/src/lib.rs +++ b/crates/turborepo-run-cache/src/lib.rs @@ -23,7 +23,6 @@ use turbopath::{ use turborepo_cache::{ AsyncCache, CacheError, CacheHitMetadata, CacheOpts, CacheSource, http::UploadMap, }; -use turborepo_daemon::{DaemonClient, DaemonConnector}; use turborepo_hash::{FileHashes, TurboHash}; use turborepo_repository::package_graph::PackageInfo; use turborepo_scm::SCM; @@ -47,20 +46,14 @@ pub enum Error { Globwalk(#[from] globwalk::WalkError), #[error("Invalid globwalk pattern: {0}")] Glob(#[from] globwalk::GlobError), - #[error("Error with daemon: {0}")] - Daemon(Box), + #[error("Error with output watcher: {0}")] + OutputWatcher(#[from] OutputWatcherError), #[error(transparent)] Scm(#[from] turborepo_scm::Error), #[error(transparent)] Path(#[from] turbopath::PathError), } -impl From for Error { - fn from(err: turborepo_daemon::DaemonError) -> Self { - Error::Daemon(Box::new(err)) - } -} - /// Abstraction over output change tracking. /// /// In watch mode, turbo needs to know which task outputs have changed since @@ -79,18 +72,20 @@ pub trait OutputWatcher: Send + Sync { fn get_changed_outputs( &self, hash: String, - output_globs: &[String], - ) -> impl std::future::Future, OutputWatcherError>> + Send; + output_globs: Vec, + ) -> std::pin::Pin< + Box, OutputWatcherError>> + Send>, + >; /// Register output globs for a task hash so that future changes to /// matching files can be detected. fn notify_outputs_written( &self, hash: String, - output_globs: &[String], - output_exclusion_globs: &[String], + output_globs: Vec, + output_exclusion_globs: Vec, time_saved: u64, - ) -> impl std::future::Future> + Send; + ) -> std::pin::Pin> + Send>>; } #[derive(Debug, thiserror::Error)] @@ -110,7 +105,7 @@ pub struct RunCache { reads_disabled: bool, writes_disabled: bool, repo_root: AbsoluteSystemPathBuf, - daemon_client: Option>, + output_watcher: Option>, ui: ColorConfig, /// When using `outputLogs: "errors-only"`, show task hashes when tasks /// complete successfully. Controlled by the `errorsOnlyShowHash` future @@ -131,7 +126,7 @@ impl RunCache { repo_root: &AbsoluteSystemPath, run_cache_opts: RunCacheOpts, cache_opts: &CacheOpts, - daemon_client: Option>, + output_watcher: Option>, ui: ColorConfig, is_dry_run: bool, ) -> Self { @@ -147,7 +142,7 @@ impl RunCache { reads_disabled: !cache_opts.cache.remote.read && !cache_opts.cache.local.read, writes_disabled: !cache_opts.cache.remote.write && !cache_opts.cache.local.write, repo_root: repo_root.to_owned(), - daemon_client, + output_watcher, ui, errors_only_show_hash: run_cache_opts.errors_only_show_hash, } @@ -184,7 +179,7 @@ impl RunCache { task_output_logs, caching_disabled, log_file_path, - daemon_client: self.daemon_client.clone(), + output_watcher: self.output_watcher.clone(), ui: self.ui, warnings: self.warnings.clone(), errors_only_show_hash: self.errors_only_show_hash, @@ -218,7 +213,7 @@ pub struct TaskCache { task_output_logs: OutputLogsMode, caching_disabled: bool, log_file_path: AbsoluteSystemPathBuf, - daemon_client: Option>, + output_watcher: Option>, ui: ColorConfig, task_id: TaskId<'static>, warnings: Arc>>, @@ -320,14 +315,18 @@ impl TaskCache { let validated_inclusions = self.repo_relative_globs.validated_inclusions()?; - // If the daemon is connected, check whether outputs have changed since - // they were last written. When outputs are already on disk and unchanged, - // we can skip the cache restore entirely — avoiding file writes that would - // otherwise trigger the file watcher and cause an infinite rebuild loop - // in `turbo watch`. - let changed_output_count = if let Some(daemon_client) = &mut self.daemon_client { - match daemon_client - .get_changed_outputs(self.hash.to_string(), &validated_inclusions) + // If an output watcher is connected, check whether outputs have changed + // since they were last written. When outputs are already on disk and + // unchanged, we can skip the cache restore entirely — avoiding file writes + // that would otherwise trigger the file watcher and cause an infinite + // rebuild loop in `turbo watch`. + let inclusion_strings: Vec = validated_inclusions + .iter() + .map(|g| g.as_ref().to_string()) + .collect(); + let changed_output_count = if let Some(output_watcher) = &self.output_watcher { + match output_watcher + .get_changed_outputs(self.hash.to_string(), inclusion_strings.clone()) .await { Ok(changed_output_globs) => changed_output_globs.len(), @@ -386,13 +385,18 @@ impl TaskCache { self.expanded_outputs = restored_files; - if let Some(daemon_client) = &mut self.daemon_client { - let validated_exclusions = self.repo_relative_globs.validated_exclusions()?; - if let Err(err) = daemon_client + if let Some(output_watcher) = &self.output_watcher { + let exclusion_strings: Vec = self + .repo_relative_globs + .validated_exclusions()? + .iter() + .map(|g| g.as_ref().to_string()) + .collect(); + if let Err(err) = output_watcher .notify_outputs_written( self.hash.clone(), - &validated_inclusions, - &validated_exclusions, + inclusion_strings.clone(), + exclusion_strings, cache_hit_metadata.time_saved, ) .await @@ -508,20 +512,28 @@ impl TaskCache { ) .await?; - if let Some(daemon_client) = self.daemon_client.as_mut() - && let Err(err) = daemon_client + if let Some(output_watcher) = &self.output_watcher { + let inclusion_strings: Vec = validated_inclusions + .iter() + .map(|g| g.as_ref().to_string()) + .collect(); + let exclusion_strings: Vec = validated_exclusions + .iter() + .map(|g| g.as_ref().to_string()) + .collect(); + if let Err(err) = output_watcher .notify_outputs_written( self.hash.to_string(), - &validated_inclusions, - &validated_exclusions, + inclusion_strings, + exclusion_strings, duration.as_millis() as u64, ) .await - .map_err(Error::from) - { - telemetry.track_error(TrackedErrors::DaemonFailedToMarkOutputsAsCached); - let task_id = &self.task_id; - debug!("failed to mark outputs as cached for {task_id}: {err}"); + { + telemetry.track_error(TrackedErrors::DaemonFailedToMarkOutputsAsCached); + let task_id = &self.task_id; + debug!("failed to mark outputs as cached for {task_id}: {err}"); + } } self.expanded_outputs = relative_paths; @@ -691,32 +703,41 @@ mod test { } impl OutputWatcher for MockOutputWatcher { - async fn get_changed_outputs( + fn get_changed_outputs( &self, _hash: String, - _output_globs: &[String], - ) -> Result, OutputWatcherError> { + _output_globs: Vec, + ) -> std::pin::Pin< + Box< + dyn std::future::Future, OutputWatcherError>> + + Send, + >, + > { self.get_changed_call_count.fetch_add(1, Ordering::SeqCst); self.was_called.store(true, Ordering::SeqCst); - match &self.changed_outputs { + let result = match &self.changed_outputs { Ok(set) => Ok(set.clone()), Err(msg) => Err(OutputWatcherError(Box::new(std::io::Error::other(*msg)))), - } + }; + Box::pin(async move { result }) } - async fn notify_outputs_written( + fn notify_outputs_written( &self, _hash: String, - _output_globs: &[String], - _output_exclusion_globs: &[String], + _output_globs: Vec, + _output_exclusion_globs: Vec, _time_saved: u64, - ) -> Result<(), OutputWatcherError> { + ) -> std::pin::Pin< + Box> + Send>, + > { self.notify_call_count.fetch_add(1, Ordering::SeqCst); self.was_called.store(true, Ordering::SeqCst); - match &self.notify_result { + let result = match &self.notify_result { Ok(()) => Ok(()), Err(msg) => Err(OutputWatcherError(Box::new(std::io::Error::other(*msg)))), - } + }; + Box::pin(async move { result }) } } @@ -728,7 +749,7 @@ mod test { async fn output_watcher_no_changes_returns_empty_set() { let watcher = MockOutputWatcher::returning_no_changes(); let result = watcher - .get_changed_outputs("abc123".into(), &["dist/**".into()]) + .get_changed_outputs("abc123".into(), vec!["dist/**".into()]) .await; assert!(result.is_ok()); assert!(result.unwrap().is_empty()); @@ -741,7 +762,7 @@ mod test { let result = watcher .get_changed_outputs( "abc123".into(), - &["dist/**".into(), ".next/**".into(), "build/**".into()], + vec!["dist/**".into(), ".next/**".into(), "build/**".into()], ) .await; let changed = result.unwrap(); @@ -756,7 +777,7 @@ mod test { // treating all outputs as changed (normal cache restore path). let watcher = MockOutputWatcher::returning_get_error(); let result = watcher - .get_changed_outputs("abc123".into(), &["dist/**".into()]) + .get_changed_outputs("abc123".into(), vec!["dist/**".into()]) .await; assert!(result.is_err()); // The error should be displayable for logging @@ -771,8 +792,8 @@ mod test { let result = watcher .notify_outputs_written( "abc123".into(), - &["dist/**".into()], - &["dist/cache/**".into()], + vec!["dist/**".into()], + vec!["dist/cache/**".into()], 1500, ) .await; @@ -786,7 +807,7 @@ mod test { // continue — it's not a fatal error. let watcher = MockOutputWatcher::returning_notify_error(); let result = watcher - .notify_outputs_written("abc123".into(), &["dist/**".into()], &[], 0) + .notify_outputs_written("abc123".into(), vec!["dist/**".into()], vec![], 0) .await; assert!(result.is_err()); } @@ -800,7 +821,7 @@ mod test { // First check: nothing changed let result = watcher - .get_changed_outputs("hash1".into(), &["dist/**".into()]) + .get_changed_outputs("hash1".into(), vec!["dist/**".into()]) .await .unwrap(); assert!(result.is_empty()); @@ -808,7 +829,7 @@ mod test { // Notify after restore watcher - .notify_outputs_written("hash1".into(), &["dist/**".into()], &[], 500) + .notify_outputs_written("hash1".into(), vec!["dist/**".into()], vec![], 500) .await .unwrap(); assert_eq!(watcher.notify_calls(), 1); @@ -816,7 +837,7 @@ mod test { // Second check: still unchanged in this mock (real GlobWatcher would // track actual file changes between calls) let result = watcher - .get_changed_outputs("hash1".into(), &["dist/**".into()]) + .get_changed_outputs("hash1".into(), vec!["dist/**".into()]) .await .unwrap(); assert!(result.is_empty()); @@ -830,11 +851,11 @@ mod test { let watcher = MockOutputWatcher::returning_all_changed(vec!["dist/**".into()]); let result1 = watcher - .get_changed_outputs("hash-a".into(), &["dist/**".into()]) + .get_changed_outputs("hash-a".into(), vec!["dist/**".into()]) .await .unwrap(); let result2 = watcher - .get_changed_outputs("hash-b".into(), &["dist/**".into()]) + .get_changed_outputs("hash-b".into(), vec!["dist/**".into()]) .await .unwrap(); diff --git a/crates/turborepo-ui/src/tui/app.rs b/crates/turborepo-ui/src/tui/app.rs index c7ac7d07e9c07..c9f3fe256eaec 100644 --- a/crates/turborepo-ui/src/tui/app.rs +++ b/crates/turborepo-ui/src/tui/app.rs @@ -435,7 +435,34 @@ impl App { } if !found_task { - return Err(Error::TaskNotFound { name: task.into() }); + // Task might already be running or finished (e.g., from a concurrent + // watch run that hasn't fully stopped yet, or from in-flight events + // of a prior run). Handle gracefully instead of crashing the TUI. + let in_running = self + .tasks_by_status + .running + .iter() + .any(|t| t.name() == task); + let in_finished = self + .tasks_by_status + .finished + .iter() + .any(|t| t.name() == task); + if !in_running && !in_finished { + return Err(Error::TaskNotFound { name: task.into() }); + } + if in_finished + && let Some(idx) = self + .tasks_by_status + .finished + .iter() + .position(|t| t.name() == task) + { + let finished = self.tasks_by_status.finished.remove(idx); + self.tasks_by_status + .running + .push(finished.restart().start()); + } } self.tasks .get_mut(task) @@ -2380,4 +2407,74 @@ mod test { Ok(()) } + + #[test] + fn test_start_task_idempotent_when_already_running() -> Result<(), Error> { + let repo_root_tmp = tempdir()?; + let repo_root = AbsoluteSystemPathBuf::try_from(repo_root_tmp.path()) + .expect("Failed to create AbsoluteSystemPathBuf"); + + let mut app: App = App::new( + 100, + 100, + vec!["a".to_string(), "b".to_string()], + PreferenceLoader::new(&repo_root), + 2048, + ); + + // Start task a normally (planned -> running) + app.start_task("a", OutputLogs::Full)?; + assert_eq!(app.tasks_by_status.running.len(), 1); + assert_eq!(app.tasks_by_status.planned.len(), 1); + + // Call start_task again while a is already running. This can happen + // when a concurrent watch run sends StartTask for a task that hasn't + // been fully stopped yet. Should not error. + app.start_task("a", OutputLogs::Full)?; + assert_eq!( + app.tasks_by_status.running.len(), + 1, + "task should still be in running exactly once" + ); + + Ok(()) + } + + #[test] + fn test_start_task_restarts_finished_task() -> Result<(), Error> { + let repo_root_tmp = tempdir()?; + let repo_root = AbsoluteSystemPathBuf::try_from(repo_root_tmp.path()) + .expect("Failed to create AbsoluteSystemPathBuf"); + + let mut app: App = App::new( + 100, + 100, + vec!["a".to_string(), "b".to_string()], + PreferenceLoader::new(&repo_root), + 2048, + ); + + // Run task a to completion + app.start_task("a", OutputLogs::Full)?; + app.finish_task("a", TaskResult::Success)?; + assert_eq!(app.tasks_by_status.finished.len(), 1); + assert_eq!(app.tasks_by_status.running.len(), 0); + + // Call start_task on the finished task. This happens when a watch + // rebuild starts a task that completed in a prior run but wasn't + // moved back to planned via restart_tasks. Should move it to running. + app.start_task("a", OutputLogs::Full)?; + assert_eq!( + app.tasks_by_status.running.len(), + 1, + "finished task should move to running" + ); + assert_eq!( + app.tasks_by_status.finished.len(), + 0, + "task should no longer be in finished" + ); + + Ok(()) + } } diff --git a/crates/turborepo/tests/watch_test.rs b/crates/turborepo/tests/watch_test.rs index 01737ed4ae700..fa495e8a6f755 100644 --- a/crates/turborepo/tests/watch_test.rs +++ b/crates/turborepo/tests/watch_test.rs @@ -304,9 +304,12 @@ fn watch_clean_shutdown_on_sigint() { let start = Instant::now(); loop { match child.try_wait() { - Ok(Some(_status)) => { - // Process exited — turbo watch exits with non-zero on - // signal interrupt, which is expected. + Ok(Some(status)) => { + // Process exited cleanly. SIGINT is a normal shutdown. + assert!( + status.success(), + "turbo watch should exit cleanly on SIGINT, got: {status}" + ); return; } Ok(None) => { @@ -546,3 +549,223 @@ fn watch_same_content_write_does_not_rebuild() { {a_after}" ); } + +// --------------------------------------------------------------------------- +// Regression tests for in-process watcher behavior +// --------------------------------------------------------------------------- + +/// Create a fixture where package a's build takes ~3 seconds, giving us +/// time to edit files while the build is running. +fn setup_slow_build_test() -> (tempfile::TempDir, PathBuf) { + let tempdir = tempfile::tempdir().expect("failed to create tempdir"); + let test_dir = tempdir.path().to_path_buf(); + + setup::copy_fixture("watch_test", &test_dir).unwrap(); + setup::setup_git(&test_dir).unwrap(); + + let gitignore = test_dir.join(".gitignore"); + let mut gi = fs::read_to_string(&gitignore).unwrap_or_default(); + gi.push_str(".markers/\n"); + fs::write(&gitignore, gi).unwrap(); + + // Make package a's build slow (3 seconds) + let build_a = test_dir.join("packages/a/build.js"); + fs::write( + &build_a, + "const fs = require('fs');\nconst path = require('path');\nconst markerDir = \ + path.join(__dirname, '.markers');\nfs.mkdirSync(markerDir, { recursive: true });\nconst \ + count = fs.readdirSync(markerDir).length;\nfs.writeFileSync(path.join(markerDir, \ + `run-${count}`), `${Date.now()}\\n`);\nconsole.log(`pkg-a slow build #${count}`);\n// \ + Simulate a slow build\nconst start = Date.now();\nwhile (Date.now() - start < 3000) {}\n", + ) + .unwrap(); + + common::git(&test_dir, &["add", "."]); + common::git(&test_dir, &["commit", "-m", "slow build setup", "--quiet"]); + + (tempdir, test_dir) +} + +/// Editing a file while a build is running should trigger a rebuild after +/// the current build finishes. The in-process watcher must not discard +/// events that arrive during a run. +#[test] +fn watch_edit_during_build_triggers_rebuild() { + let (_tempdir, test_dir) = setup_slow_build_test(); + let guard = WatchGuard::new(spawn_turbo_watch(&test_dir)); + + // Wait for the initial build to start (marker appears) + wait_for_markers(&test_dir, "a", 1, Duration::from_secs(30)); + + // While the slow build is still running (3s), edit package b's source. + // Package b's build is fast, but the watch coordinator waits for all + // active runs to finish before processing new events. + let b_before = marker_count(&test_dir, "b"); + let src_file = test_dir.join("packages/b/src.js"); + fs::write( + &src_file, + "module.exports = { b: 'edited-during-build' };\n", + ) + .unwrap(); + common::git(&test_dir, &["add", "."]); + common::git( + &test_dir, + &["commit", "-m", "edit b during slow build", "--quiet"], + ); + + // Wait for b to rebuild. The edit should not be lost — after the slow + // build completes and the system processes accumulated events, b should + // rebuild. + let b_after = wait_for_markers(&test_dir, "b", b_before + 1, Duration::from_secs(30)); + + drop(guard); + + assert!( + b_after > b_before, + "package b should have rebuilt after edit during build. before: {b_before}, after: \ + {b_after}" + ); +} + +/// Rapid successive edits to the same file should be coalesced by the hash +/// watcher's debouncer and the PackageChangesWatcher's 100ms batching, +/// producing at most 2 rebuilds rather than one per edit. +#[test] +fn watch_rapid_edits_produce_single_rebuild() { + let (_tempdir, test_dir) = setup_watch_test(); + let guard = WatchGuard::new(spawn_turbo_watch(&test_dir)); + + // Wait for initial build and settle + wait_for_markers(&test_dir, "a", 1, Duration::from_secs(30)); + wait_for_markers(&test_dir, "b", 1, Duration::from_secs(30)); + std::thread::sleep(Duration::from_secs(3)); + + let a_before = marker_count(&test_dir, "a"); + + // Rapidly edit the same file 5 times, committing each time. + let src_file = test_dir.join("packages/a/src.js"); + for i in 0..5 { + fs::write( + &src_file, + format!("module.exports = {{ a: 'rapid-{i}' }};\n"), + ) + .unwrap(); + common::git(&test_dir, &["add", "."]); + common::git( + &test_dir, + &[ + "commit", + "-m", + &format!("rapid edit {i}"), + "--quiet", + "--allow-empty", + ], + ); + std::thread::sleep(Duration::from_millis(50)); + } + + // Wait for at least one rebuild, then let the system fully settle. + wait_for_markers(&test_dir, "a", a_before + 1, Duration::from_secs(30)); + std::thread::sleep(Duration::from_secs(5)); + + let a_after = marker_count(&test_dir, "a"); + + drop(guard); + + let rebuilds = a_after - a_before; + assert!( + (1..=3).contains(&rebuilds), + "5 rapid edits should be debounced to at most 3 rebuilds, but got {rebuilds}" + ); +} + +/// Verify that builds of the same package don't run concurrently. The watch +/// coordinator waits for active runs to finish before starting new ones. +/// We verify this by checking that marker timestamps are sequential (no +/// overlap). +#[test] +fn watch_no_concurrent_builds_of_same_package() { + let (_tempdir, test_dir) = setup_watch_test(); + let guard = WatchGuard::new(spawn_turbo_watch(&test_dir)); + + // Wait for initial build and settle + wait_for_markers(&test_dir, "a", 1, Duration::from_secs(30)); + std::thread::sleep(Duration::from_secs(3)); + + // Trigger two sequential rebuilds by editing twice with a gap + let src_file = test_dir.join("packages/a/src.js"); + + fs::write(&src_file, "module.exports = { a: 'first-edit' };\n").unwrap(); + common::git(&test_dir, &["add", "."]); + common::git(&test_dir, &["commit", "-m", "first edit", "--quiet"]); + wait_for_markers(&test_dir, "a", 2, Duration::from_secs(30)); + std::thread::sleep(Duration::from_secs(2)); + + fs::write(&src_file, "module.exports = { a: 'second-edit' };\n").unwrap(); + common::git(&test_dir, &["add", "."]); + common::git(&test_dir, &["commit", "-m", "second edit", "--quiet"]); + wait_for_markers(&test_dir, "a", 3, Duration::from_secs(30)); + + drop(guard); + + // Read timestamps from markers. Each marker file contains a Date.now() + // timestamp. Sequential builds mean each timestamp is >= the previous. + let marker_dir = test_dir.join("packages/a/.markers"); + let mut timestamps: Vec = fs::read_dir(&marker_dir) + .unwrap() + .filter_map(|e| e.ok()) + .filter_map(|e| fs::read_to_string(e.path()).ok()) + .filter_map(|content| content.trim().parse::().ok()) + .collect(); + timestamps.sort(); + + for window in timestamps.windows(2) { + assert!( + window[1] >= window[0], + "marker timestamps should be sequential (no concurrent builds). got: {timestamps:?}" + ); + } +} + +/// Verify that Ctrl+C (SIGINT) produces a clean exit with code 0, not an +/// error message. +#[cfg(unix)] +#[test] +fn watch_sigint_exits_with_zero() { + use nix::{ + sys::signal::{self, Signal}, + unistd::Pid, + }; + + let (_tempdir, test_dir) = setup_watch_test(); + let guard = WatchGuard::new(spawn_turbo_watch(&test_dir)); + + // Wait for the initial build to complete + wait_for_markers(&test_dir, "a", 1, Duration::from_secs(30)); + std::thread::sleep(Duration::from_secs(2)); + + let mut child = guard.take(); + let pid = Pid::from_raw(child.id() as i32); + signal::kill(pid, Signal::SIGINT).expect("failed to send SIGINT"); + + let start = Instant::now(); + loop { + match child.try_wait() { + Ok(Some(status)) => { + assert!( + status.success(), + "turbo watch should exit with code 0 on SIGINT, got: {status}" + ); + return; + } + Ok(None) => { + if start.elapsed() > Duration::from_secs(10) { + child.kill().unwrap(); + panic!("turbo watch did not exit within 10s after SIGINT"); + } + std::thread::sleep(Duration::from_millis(100)); + } + Err(e) => panic!("error waiting for turbo watch: {e}"), + } + } +}