diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index bd3e99a14c4b3..7cef90bef8bdb 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -831,7 +831,6 @@ impl TurboTasksBackendInner { return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0)); } let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled)); - let is_scheduled = matches!(in_progress, Some(InProgressState::Scheduled { .. })); // Check cell index range (cell might not exist at all) let max_id = get!( @@ -875,14 +874,12 @@ impl TurboTasksBackendInner { // Schedule the task, if not already scheduled if is_cancelled { bail!("{} was canceled", ctx.get_task_description(task_id)); - } else if !is_scheduled - && task.add(CachedDataItem::new_scheduled( - TaskExecutionReason::CellNotAvailable, - || self.get_task_desc_fn(task_id), - )) - { - ctx.schedule_task(task); } + task.add_new(CachedDataItem::new_scheduled( + TaskExecutionReason::CellNotAvailable, + || self.get_task_desc_fn(task_id), + )); + ctx.schedule_task(task); Ok(Err(listener)) } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs index af1bac10149bc..70dee0010a772 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs @@ -2,7 +2,7 @@ use std::mem::take; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; -use turbo_tasks::{CellId, TaskId, backend::CellContent}; +use turbo_tasks::{CellId, TaskId, TypedSharedReference, backend::CellContent}; #[cfg(feature = "trace_task_dirty")] use crate::backend::operation::invalidate::TaskDirtyCause; @@ -24,6 +24,12 @@ pub enum UpdateCellOperation { InvalidateWhenCellDependency { cell_ref: CellRef, dependent_tasks: SmallVec<[TaskId; 4]>, + content: Option, + queue: AggregationUpdateQueue, + }, + FinalCellChange { + cell_ref: CellRef, + content: Option, queue: AggregationUpdateQueue, }, AggregationUpdate { @@ -36,51 +42,82 @@ pub enum UpdateCellOperation { impl UpdateCellOperation { pub fn run(task_id: TaskId, cell: CellId, content: CellContent, mut ctx: impl ExecuteContext) { let mut task = ctx.task(task_id, TaskDataCategory::All); - let old_content = if let CellContent(Some(new_content)) = content { - task.insert(CachedDataItem::CellData { - cell, - value: new_content.into_typed(cell.type_id), - }) - } else { - task.remove(&CachedDataItemKey::CellData { cell }) - }; - - if let Some(in_progress) = remove!(task, InProgressCell { cell }) { - in_progress.event.notify(usize::MAX); - } // We need to detect recomputation, because here the content has not actually changed (even // if it's not equal to the old content, as not all values implement Eq). We have to // assume that tasks are deterministic and pure. + let should_invalidate = ctx.should_track_dependencies() + && (task.has_key(&CachedDataItemKey::Dirty {}) || + // This is a hack for the streaming hack. Stateful tasks are never recomputed, so this forces invalidation for them in case of this hack. + task.has_key(&CachedDataItemKey::Stateful {})); - if ctx.should_track_dependencies() - && (task.has_key(&CachedDataItemKey::Dirty {}) - || - // This is a hack for the streaming hack. Stateful tasks are never recomputed, so this forces invalidation for them in case of this hack. - task.has_key(&CachedDataItemKey::Stateful {})) - { - let dependent_tasks = get_many!( + if should_invalidate { + let dependent_tasks: SmallVec<[TaskId; 4]> = get_many!( task, CellDependent { cell: dependent_cell, task } if dependent_cell == cell => task ); - drop(task); - drop(old_content); + if !dependent_tasks.is_empty() { + // Slow path: We need to invalidate tasks depending on this cell. + // To avoid a race condition, we need to remove the old content first, + // then invalidate dependent tasks and only then update the cell content. + + // The reason behind this is that we consider tasks that haven't the dirty flag set + // as "recomputing" tasks. Recomputing tasks won't invalidate + // dependent tasks, when a cell is changed. This would cause missing invalidating if + // a task is recomputing while a dependency is in the middle of a cell update (where + // the value has been changed, but the dependent tasks have not be flagged dirty + // yet). So to avoid that we first remove the cell content, invalidate all dependent + // tasks and after that set the new cell content. When the cell content is unset, + // readers will wait for it to be set via InProgressCell. + + let old_content = task.remove(&CachedDataItemKey::CellData { cell }); + + drop(task); + drop(old_content); - UpdateCellOperation::InvalidateWhenCellDependency { - cell_ref: CellRef { - task: task_id, - cell, - }, - dependent_tasks, - queue: AggregationUpdateQueue::new(), + let content = if let CellContent(Some(new_content)) = content { + Some(new_content.into_typed(cell.type_id)) + } else { + None + }; + + UpdateCellOperation::InvalidateWhenCellDependency { + cell_ref: CellRef { + task: task_id, + cell, + }, + dependent_tasks, + content, + queue: AggregationUpdateQueue::new(), + } + .execute(&mut ctx); + return; } - .execute(&mut ctx); + } + + // Fast path: We don't need to invalidate anything. + // So we can just update the cell content. + + let old_content = if let CellContent(Some(new_content)) = content { + let new_content = new_content.into_typed(cell.type_id); + task.insert(CachedDataItem::CellData { + cell, + value: new_content, + }) } else { - drop(task); - drop(old_content); + task.remove(&CachedDataItemKey::CellData { cell }) + }; + + let in_progress_cell = remove!(task, InProgressCell { cell }); + + drop(task); + drop(old_content); + + if let Some(in_progress) = in_progress_cell { + in_progress.event.notify(usize::MAX); } } } @@ -93,6 +130,7 @@ impl Operation for UpdateCellOperation { UpdateCellOperation::InvalidateWhenCellDependency { cell_ref, ref mut dependent_tasks, + ref mut content, ref mut queue, } => { if let Some(dependent_task_id) = dependent_tasks.pop() { @@ -129,9 +167,37 @@ impl Operation for UpdateCellOperation { ); } if dependent_tasks.is_empty() { - self = UpdateCellOperation::AggregationUpdate { queue: take(queue) }; + self = UpdateCellOperation::FinalCellChange { + cell_ref, + content: take(content), + queue: take(queue), + }; } } + UpdateCellOperation::FinalCellChange { + cell_ref: CellRef { task, cell }, + content, + ref mut queue, + } => { + let mut task = ctx.task(task, TaskDataCategory::Data); + + if let Some(content) = content { + task.add_new(CachedDataItem::CellData { + cell, + value: content, + }) + } + + let in_progress_cell = remove!(task, InProgressCell { cell }); + + drop(task); + + if let Some(in_progress) = in_progress_cell { + in_progress.event.notify(usize::MAX); + } + + self = UpdateCellOperation::AggregationUpdate { queue: take(queue) }; + } UpdateCellOperation::AggregationUpdate { ref mut queue } => { if queue.process(ctx) { self = UpdateCellOperation::Done