diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 1bc2aad5f6349..fac0fbdf175ea 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -52,7 +52,7 @@ use crate::{ AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue, CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl, Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number, - get_uppers, is_root_node, prepare_new_children, + get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children, }, storage::{ InnerStorageSnapshot, Storage, count, get, get_many, get_mut, get_mut_or_insert_with, @@ -420,6 +420,8 @@ struct TaskExecutionCompletePrepareResult { pub new_children: FxHashSet, pub removed_data: Vec, pub is_now_immutable: bool, + pub new_output: Option, + pub output_dependent_tasks: SmallVec<[TaskId; 4]>, } // Operations @@ -493,16 +495,8 @@ impl TurboTasksBackendInner { Some(Ok(Err(listen_to_done_event(this, reader, done_event)))) } Some(InProgressState::InProgress(box InProgressStateInner { - done, - done_event, - .. - })) => { - if !*done { - Some(Ok(Err(listen_to_done_event(this, reader, done_event)))) - } else { - None - } - } + done_event, .. + })) => Some(Ok(Err(listen_to_done_event(this, reader, done_event)))), Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!( "{} was canceled", ctx.get_task_description(task.id()) @@ -1596,7 +1590,6 @@ impl TurboTasksBackendInner { done_event, session_dependent: false, marked_as_completed: false, - done: false, new_children: Default::default(), })), }); @@ -1695,20 +1688,12 @@ impl TurboTasksBackendInner { Some(TaskExecutionSpec { future, span }) } - fn task_execution_result( - &self, - task_id: TaskId, - result: Result, - turbo_tasks: &dyn TurboTasksBackendApi>, - ) { - operation::UpdateOutputOperation::run(task_id, result, self.execute_context(turbo_tasks)); - } - fn task_execution_completed( &self, task_id: TaskId, duration: Duration, _memory_usage: usize, + result: Result, cell_counters: &AutoMap, 8>, stateful: bool, has_invalidator: bool, @@ -1737,10 +1722,13 @@ impl TurboTasksBackendInner { new_children, mut removed_data, is_now_immutable, + new_output, + output_dependent_tasks, }) = self.task_execution_completed_prepare( &mut ctx, &span, task_id, + result, cell_counters, stateful, has_invalidator, @@ -1754,7 +1742,20 @@ impl TurboTasksBackendInner { // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and // would be executed again. + if !output_dependent_tasks.is_empty() { + self.task_execution_completed_invalidate_output_dependent( + &mut ctx, + task_id, + output_dependent_tasks, + ); + } + let has_new_children = !new_children.is_empty(); + + if has_new_children { + self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children) + } + if has_new_children && self.task_execution_completed_connect(&mut ctx, task_id, new_children) { @@ -1765,6 +1766,7 @@ impl TurboTasksBackendInner { if self.task_execution_completed_finish( &mut ctx, task_id, + new_output, &mut removed_data, is_now_immutable, ) { @@ -1784,6 +1786,7 @@ impl TurboTasksBackendInner { ctx: &mut impl ExecuteContext<'_>, span: &Span, task_id: TaskId, + result: Result, cell_counters: &AutoMap, 8>, stateful: bool, has_invalidator: bool, @@ -1797,12 +1800,12 @@ impl TurboTasksBackendInner { new_children: Default::default(), removed_data: Default::default(), is_now_immutable: false, + new_output: None, + output_dependent_tasks: Default::default(), }); } let &mut InProgressState::InProgress(box InProgressStateInner { stale, - ref mut done, - ref done_event, ref mut new_children, session_dependent, .. @@ -1846,12 +1849,6 @@ impl TurboTasksBackendInner { return None; } - if cfg!(not(feature = "no_fast_stale")) || !stale { - // mark the task as completed, so dependent tasks can continue working - *done = true; - done_event.notify(usize::MAX); - } - // take the children from the task to process them let mut new_children = take(new_children); @@ -1979,6 +1976,53 @@ impl TurboTasksBackendInner { ); } + // Check if output need to be updated + let current_output = get!(task, Output); + let new_output = match result { + Ok(RawVc::TaskOutput(output_task_id)) => { + if let Some(OutputValue::Output(current_task_id)) = current_output + && *current_task_id == output_task_id + { + None + } else { + Some(OutputValue::Output(output_task_id)) + } + } + Ok(RawVc::TaskCell(output_task_id, cell)) => { + if let Some(OutputValue::Cell(CellRef { + task: current_task_id, + cell: current_cell, + })) = current_output + && *current_task_id == output_task_id + && *current_cell == cell + { + None + } else { + Some(OutputValue::Cell(CellRef { + task: output_task_id, + cell, + })) + } + } + Ok(RawVc::LocalOutput(..)) => { + panic!("Non-local tasks must not return a local Vc"); + } + Err(err) => { + if let Some(OutputValue::Error(old_error)) = current_output + && old_error == &err + { + None + } else { + Some(OutputValue::Error(err)) + } + } + }; + let mut output_dependent_tasks = SmallVec::<[_; 4]>::new(); + // When output has changed, grab the dependent tasks + if new_output.is_some() && ctx.should_track_dependencies() { + output_dependent_tasks = get_many!(task, OutputDependent { task } => task); + } + drop(task); // Check if the task can be marked as immutable @@ -2005,9 +2049,76 @@ impl TurboTasksBackendInner { new_children, removed_data, is_now_immutable, + new_output, + output_dependent_tasks, }) } + fn task_execution_completed_invalidate_output_dependent( + &self, + ctx: &mut impl ExecuteContext<'_>, + task_id: TaskId, + output_dependent_tasks: SmallVec<[TaskId; 4]>, + ) { + debug_assert!(!output_dependent_tasks.is_empty()); + + let mut queue = AggregationUpdateQueue::new(); + for dependent_task_id in output_dependent_tasks { + if ctx.is_once_task(dependent_task_id) { + // once tasks are never invalidated + continue; + } + let dependent = ctx.task(dependent_task_id, TaskDataCategory::All); + if dependent.has_key(&CachedDataItemKey::OutdatedOutputDependency { target: task_id }) { + // output dependency is outdated, so it hasn't read the output yet + // and doesn't need to be invalidated + continue; + } + if !dependent.has_key(&CachedDataItemKey::OutputDependency { target: task_id }) { + // output dependency has been removed, so the task doesn't depend on the + // output anymore and doesn't need to be invalidated + continue; + } + make_task_dirty_internal( + dependent, + dependent_task_id, + true, + #[cfg(feature = "trace_task_dirty")] + TaskDirtyCause::OutputChange { task_id }, + &mut queue, + ctx, + ); + } + + queue.execute(ctx); + } + + fn task_execution_completed_unfinished_children_dirty( + &self, + ctx: &mut impl ExecuteContext<'_>, + new_children: &FxHashSet, + ) { + debug_assert!(!new_children.is_empty()); + + let mut queue = AggregationUpdateQueue::new(); + for &child_id in new_children { + let child_task = ctx.task(child_id, TaskDataCategory::Meta); + if !child_task.has_key(&CachedDataItemKey::Output {}) { + make_task_dirty_internal( + child_task, + child_id, + false, + #[cfg(feature = "trace_task_dirty")] + TaskDirtyCause::InitialDirty, + &mut queue, + ctx, + ); + } + } + + queue.execute(ctx); + } + fn task_execution_completed_connect( &self, ctx: &mut impl ExecuteContext<'_>, @@ -2078,6 +2189,7 @@ impl TurboTasksBackendInner { &self, ctx: &mut impl ExecuteContext<'_>, task_id: TaskId, + new_output: Option, removed_data: &mut Vec, is_now_immutable: bool, ) -> bool { @@ -2094,7 +2206,6 @@ impl TurboTasksBackendInner { once_task: _, stale, session_dependent, - done: _, marked_as_completed: _, new_children, }) = in_progress @@ -2114,6 +2225,12 @@ impl TurboTasksBackendInner { return true; } + // Set the output if it has changed + let mut old_content = None; + if let Some(value) = new_output { + old_content = task.insert(CachedDataItem::Output { value }); + } + // If the task is not stateful and has no mutable children, it does not have a way to be // invalidated and we can mark it as immutable. if is_now_immutable { @@ -2194,6 +2311,10 @@ impl TurboTasksBackendInner { }; drop(task); + drop(old_content); + + // Notify dependent tasks that are waiting for this task to finish + done_event.notify(usize::MAX); if let Some(data_update) = data_update { AggregationUpdateQueue::run(data_update, ctx); @@ -2981,20 +3102,12 @@ impl Backend for TurboTasksBackend { self.0.try_start_task_execution(task_id, turbo_tasks) } - fn task_execution_result( - &self, - task_id: TaskId, - result: Result, - turbo_tasks: &dyn TurboTasksBackendApi, - ) { - self.0.task_execution_result(task_id, result, turbo_tasks); - } - fn task_execution_completed( &self, task_id: TaskId, _duration: Duration, _memory_usage: usize, + result: Result, cell_counters: &AutoMap, 8>, stateful: bool, has_invalidator: bool, @@ -3004,6 +3117,7 @@ impl Backend for TurboTasksBackend { task_id, _duration, _memory_usage, + result, cell_counters, stateful, has_invalidator, diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 7927c5273e132..135553d0fc617 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -6,7 +6,6 @@ mod invalidate; mod prepare_new_children; mod update_cell; mod update_collectible; -mod update_output; use std::{ fmt::{Debug, Formatter}, @@ -600,7 +599,6 @@ macro_rules! impl_operation { pub enum AnyOperation { ConnectChild(connect_child::ConnectChildOperation), Invalidate(invalidate::InvalidateOperation), - UpdateOutput(update_output::UpdateOutputOperation), UpdateCell(update_cell::UpdateCellOperation), CleanupOldEdges(cleanup_old_edges::CleanupOldEdgesOperation), AggregationUpdate(aggregation_update::AggregationUpdateQueue), @@ -612,7 +610,6 @@ impl AnyOperation { match self { AnyOperation::ConnectChild(op) => op.execute(ctx), AnyOperation::Invalidate(op) => op.execute(ctx), - AnyOperation::UpdateOutput(op) => op.execute(ctx), AnyOperation::UpdateCell(op) => op.execute(ctx), AnyOperation::CleanupOldEdges(op) => op.execute(ctx), AnyOperation::AggregationUpdate(op) => op.execute(ctx), @@ -627,7 +624,6 @@ impl AnyOperation { impl_operation!(ConnectChild connect_child::ConnectChildOperation); impl_operation!(Invalidate invalidate::InvalidateOperation); -impl_operation!(UpdateOutput update_output::UpdateOutputOperation); impl_operation!(UpdateCell update_cell::UpdateCellOperation); impl_operation!(CleanupOldEdges cleanup_old_edges::CleanupOldEdgesOperation); impl_operation!(AggregationUpdate aggregation_update::AggregationUpdateQueue); @@ -641,6 +637,7 @@ pub use self::{ }, cleanup_old_edges::OutdatedEdge, connect_children::connect_children, + invalidate::make_task_dirty_internal, prepare_new_children::prepare_new_children, update_collectible::UpdateCollectibleOperation, }; diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_output.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_output.rs deleted file mode 100644 index 9aea8f099fc25..0000000000000 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_output.rs +++ /dev/null @@ -1,224 +0,0 @@ -use std::mem::take; - -use anyhow::Result; -use serde::{Deserialize, Serialize}; -use smallvec::SmallVec; -use turbo_tasks::{RawVc, TaskId, backend::TurboTasksExecutionError}; - -#[cfg(feature = "trace_task_dirty")] -use crate::backend::operation::invalidate::TaskDirtyCause; -use crate::{ - backend::{ - TaskDataCategory, - operation::{ - AggregationUpdateQueue, ExecuteContext, Operation, TaskGuard, - invalidate::make_task_dirty_internal, - }, - storage::{get, get_many}, - }, - data::{ - CachedDataItem, CachedDataItemKey, CellRef, InProgressState, InProgressStateInner, - OutputValue, - }, -}; - -#[derive(Serialize, Deserialize, Clone, Default)] -pub enum UpdateOutputOperation { - MakeDependentTasksDirty { - task_id: TaskId, - dependent_tasks: SmallVec<[TaskId; 4]>, - children: SmallVec<[TaskId; 4]>, - queue: AggregationUpdateQueue, - }, - EnsureUnfinishedChildrenDirty { - children: SmallVec<[TaskId; 4]>, - queue: AggregationUpdateQueue, - }, - AggregationUpdate { - queue: AggregationUpdateQueue, - }, - #[default] - Done, -} - -impl UpdateOutputOperation { - pub fn run( - task_id: TaskId, - output: Result, - mut ctx: impl ExecuteContext<'_>, - ) { - let mut dependent_tasks = Default::default(); - let mut children = Default::default(); - let mut queue = AggregationUpdateQueue::new(); - - 'output: { - let mut task = ctx.task(task_id, TaskDataCategory::All); - let in_progress_state = get!(task, InProgress); - if matches!(in_progress_state, Some(InProgressState::Canceled)) { - // Skip updating the output when the task was canceled - break 'output; - } - let Some(InProgressState::InProgress(box InProgressStateInner { - stale, - new_children, - .. - })) = in_progress_state - else { - panic!("Task is not in progress while updating the output"); - }; - if *stale { - // Skip updating the output when the task is stale - break 'output; - } - children = new_children.iter().copied().collect(); - - let current_output = get!(task, Output); - let output_value = match output { - Ok(RawVc::TaskOutput(output_task_id)) => { - if let Some(OutputValue::Output(current_task_id)) = current_output - && *current_task_id == output_task_id - { - break 'output; - } - OutputValue::Output(output_task_id) - } - Ok(RawVc::TaskCell(output_task_id, cell)) => { - if let Some(OutputValue::Cell(CellRef { - task: current_task_id, - cell: current_cell, - })) = current_output - && *current_task_id == output_task_id - && *current_cell == cell - { - break 'output; - } - OutputValue::Cell(CellRef { - task: output_task_id, - cell, - }) - } - Ok(RawVc::LocalOutput(..)) => { - panic!("Non-local tasks must not return a local Vc"); - } - Err(err) => { - if let Some(OutputValue::Error(old_error)) = current_output - && old_error == &err - { - break 'output; - } - OutputValue::Error(err) - } - }; - let old_content = task.insert(CachedDataItem::Output { - value: output_value, - }); - - if ctx.should_track_dependencies() { - dependent_tasks = get_many!(task, OutputDependent { task } => task); - } - - make_task_dirty_internal( - task, - task_id, - false, - #[cfg(feature = "trace_task_dirty")] - TaskDirtyCause::InitialDirty, - &mut queue, - &mut ctx, - ); - - drop(old_content); - } - - UpdateOutputOperation::MakeDependentTasksDirty { - task_id, - dependent_tasks, - children, - queue, - } - .execute(&mut ctx); - } -} - -impl Operation for UpdateOutputOperation { - fn execute(mut self, ctx: &mut impl ExecuteContext) { - loop { - ctx.operation_suspend_point(&self); - match self { - UpdateOutputOperation::MakeDependentTasksDirty { - task_id, - ref mut dependent_tasks, - ref mut children, - ref mut queue, - } => { - if let Some(dependent_task_id) = dependent_tasks.pop() { - if ctx.is_once_task(dependent_task_id) { - // once tasks are never invalidated - continue; - } - let dependent = ctx.task(dependent_task_id, TaskDataCategory::All); - if dependent.has_key(&CachedDataItemKey::OutdatedOutputDependency { - target: task_id, - }) { - // output dependency is outdated, so it hasn't read the output yet - // and doesn't need to be invalidated - continue; - } - if !dependent - .has_key(&CachedDataItemKey::OutputDependency { target: task_id }) - { - // output dependency has been removed, so the task doesn't depend on the - // output anymore and doesn't need to be invalidated - continue; - } - make_task_dirty_internal( - dependent, - dependent_task_id, - true, - #[cfg(feature = "trace_task_dirty")] - TaskDirtyCause::OutputChange { task_id }, - queue, - ctx, - ); - } - if dependent_tasks.is_empty() { - self = UpdateOutputOperation::EnsureUnfinishedChildrenDirty { - children: take(children), - queue: take(queue), - }; - } - } - UpdateOutputOperation::EnsureUnfinishedChildrenDirty { - ref mut children, - ref mut queue, - } => { - if let Some(child_id) = children.pop() { - let child_task = ctx.task(child_id, TaskDataCategory::Meta); - if !child_task.has_key(&CachedDataItemKey::Output {}) { - make_task_dirty_internal( - child_task, - child_id, - false, - #[cfg(feature = "trace_task_dirty")] - TaskDirtyCause::InitialDirty, - queue, - ctx, - ); - } - } - if children.is_empty() { - self = UpdateOutputOperation::AggregationUpdate { queue: take(queue) }; - } - } - UpdateOutputOperation::AggregationUpdate { ref mut queue } => { - if queue.process(ctx) { - self = UpdateOutputOperation::Done; - } - } - UpdateOutputOperation::Done => { - return; - } - } - } - } -} diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index fd0cd7e55baab..2afaab53df280 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -499,8 +499,6 @@ pub struct InProgressStateInner { /// Early marking as completed. This is set before the output is available and will ignore full /// task completion of the task for strongly consistent reads. pub marked_as_completed: bool, - /// Task execution has completed and the output is available. - pub done: bool, /// Event that is triggered when the task output is available (completed flag set). /// This is used to wait for completion when reading the task output before it's available. pub done_event: Event, diff --git a/turbopack/crates/turbo-tasks/src/backend.rs b/turbopack/crates/turbo-tasks/src/backend.rs index b5171d4f59db9..018fc1c579b90 100644 --- a/turbopack/crates/turbo-tasks/src/backend.rs +++ b/turbopack/crates/turbo-tasks/src/backend.rs @@ -538,18 +538,12 @@ pub trait Backend: Sync + Send { fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi); - fn task_execution_result( - &self, - task_id: TaskId, - result: Result, - turbo_tasks: &dyn TurboTasksBackendApi, - ); - fn task_execution_completed( &self, task: TaskId, duration: Duration, memory_usage: usize, + result: Result, cell_counters: &AutoMap, 8>, stateful: bool, has_invalidator: bool, diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index b3616ccd40b72..9ac93282ccc61 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -722,7 +722,6 @@ impl TurboTasks { Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))), }; - this.backend.task_execution_result(task_id, result, &*this); let FinishedTaskState { stateful, has_invalidator, @@ -733,6 +732,7 @@ impl TurboTasks { task_id, duration, alloc_info.memory_usage(), + result, &cell_counters, stateful, has_invalidator,