Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0));
}
let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
let is_scheduled = matches!(in_progress, Some(InProgressState::Scheduled { .. }));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It already early returns for this case, so is_scheduled is always false


// Check cell index range (cell might not exist at all)
let max_id = get!(
Expand Down Expand Up @@ -875,14 +874,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
// Schedule the task, if not already scheduled
if is_cancelled {
bail!("{} was canceled", ctx.get_task_description(task_id));
} else if !is_scheduled
&& task.add(CachedDataItem::new_scheduled(
TaskExecutionReason::CellNotAvailable,
|| self.get_task_desc_fn(task_id),
))
{
ctx.schedule_task(task);
}
task.add_new(CachedDataItem::new_scheduled(
TaskExecutionReason::CellNotAvailable,
|| self.get_task_desc_fn(task_id),
));
ctx.schedule_task(task);

Ok(Err(listener))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::mem::take;

use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use turbo_tasks::{CellId, TaskId, backend::CellContent};
use turbo_tasks::{CellId, TaskId, TypedSharedReference, backend::CellContent};

#[cfg(feature = "trace_task_dirty")]
use crate::backend::operation::invalidate::TaskDirtyCause;
Expand All @@ -24,6 +24,12 @@ pub enum UpdateCellOperation {
InvalidateWhenCellDependency {
cell_ref: CellRef,
dependent_tasks: SmallVec<[TaskId; 4]>,
content: Option<TypedSharedReference>,
queue: AggregationUpdateQueue,
},
FinalCellChange {
cell_ref: CellRef,
content: Option<TypedSharedReference>,
queue: AggregationUpdateQueue,
},
AggregationUpdate {
Expand All @@ -36,51 +42,82 @@ pub enum UpdateCellOperation {
impl UpdateCellOperation {
pub fn run(task_id: TaskId, cell: CellId, content: CellContent, mut ctx: impl ExecuteContext) {
let mut task = ctx.task(task_id, TaskDataCategory::All);
let old_content = if let CellContent(Some(new_content)) = content {
task.insert(CachedDataItem::CellData {
cell,
value: new_content.into_typed(cell.type_id),
})
} else {
task.remove(&CachedDataItemKey::CellData { cell })
};

if let Some(in_progress) = remove!(task, InProgressCell { cell }) {
in_progress.event.notify(usize::MAX);
}

// We need to detect recomputation, because here the content has not actually changed (even
// if it's not equal to the old content, as not all values implement Eq). We have to
// assume that tasks are deterministic and pure.
let should_invalidate = ctx.should_track_dependencies()
&& (task.has_key(&CachedDataItemKey::Dirty {}) ||
// This is a hack for the streaming hack. Stateful tasks are never recomputed, so this forces invalidation for them in case of this hack.
task.has_key(&CachedDataItemKey::Stateful {}));

if ctx.should_track_dependencies()
&& (task.has_key(&CachedDataItemKey::Dirty {})
||
// This is a hack for the streaming hack. Stateful tasks are never recomputed, so this forces invalidation for them in case of this hack.
task.has_key(&CachedDataItemKey::Stateful {}))
{
let dependent_tasks = get_many!(
if should_invalidate {
let dependent_tasks: SmallVec<[TaskId; 4]> = get_many!(
task,
CellDependent { cell: dependent_cell, task }
if dependent_cell == cell
=> task
);

drop(task);
drop(old_content);
if !dependent_tasks.is_empty() {
// Slow path: We need to invalidate tasks depending on this cell.
// To avoid a race condition, we need to remove the old content first,
// then invalidate dependent tasks and only then update the cell content.

// The reason behind this is that we consider tasks that haven't the dirty flag set
// as "recomputing" tasks. Recomputing tasks won't invalidate
// dependent tasks, when a cell is changed. This would cause missing invalidating if
// a task is recomputing while a dependency is in the middle of a cell update (where
// the value has been changed, but the dependent tasks have not be flagged dirty
// yet). So to avoid that we first remove the cell content, invalidate all dependent
// tasks and after that set the new cell content. When the cell content is unset,
// readers will wait for it to be set via InProgressCell.

let old_content = task.remove(&CachedDataItemKey::CellData { cell });

drop(task);
drop(old_content);

UpdateCellOperation::InvalidateWhenCellDependency {
cell_ref: CellRef {
task: task_id,
cell,
},
dependent_tasks,
queue: AggregationUpdateQueue::new(),
let content = if let CellContent(Some(new_content)) = content {
Some(new_content.into_typed(cell.type_id))
} else {
None
};

UpdateCellOperation::InvalidateWhenCellDependency {
cell_ref: CellRef {
task: task_id,
cell,
},
dependent_tasks,
content,
queue: AggregationUpdateQueue::new(),
}
.execute(&mut ctx);
return;
}
.execute(&mut ctx);
}

// Fast path: We don't need to invalidate anything.
// So we can just update the cell content.

let old_content = if let CellContent(Some(new_content)) = content {
let new_content = new_content.into_typed(cell.type_id);
task.insert(CachedDataItem::CellData {
cell,
value: new_content,
})
} else {
drop(task);
drop(old_content);
task.remove(&CachedDataItemKey::CellData { cell })
};

let in_progress_cell = remove!(task, InProgressCell { cell });

drop(task);
drop(old_content);

if let Some(in_progress) = in_progress_cell {
in_progress.event.notify(usize::MAX);
}
}
}
Expand All @@ -93,6 +130,7 @@ impl Operation for UpdateCellOperation {
UpdateCellOperation::InvalidateWhenCellDependency {
cell_ref,
ref mut dependent_tasks,
ref mut content,
ref mut queue,
} => {
if let Some(dependent_task_id) = dependent_tasks.pop() {
Expand Down Expand Up @@ -129,9 +167,37 @@ impl Operation for UpdateCellOperation {
);
}
if dependent_tasks.is_empty() {
self = UpdateCellOperation::AggregationUpdate { queue: take(queue) };
self = UpdateCellOperation::FinalCellChange {
cell_ref,
content: take(content),
queue: take(queue),
};
}
}
UpdateCellOperation::FinalCellChange {
cell_ref: CellRef { task, cell },
content,
ref mut queue,
} => {
let mut task = ctx.task(task, TaskDataCategory::Data);

if let Some(content) = content {
task.add_new(CachedDataItem::CellData {
cell,
value: content,
})
}

let in_progress_cell = remove!(task, InProgressCell { cell });

drop(task);

if let Some(in_progress) = in_progress_cell {
in_progress.event.notify(usize::MAX);
}

self = UpdateCellOperation::AggregationUpdate { queue: take(queue) };
}
UpdateCellOperation::AggregationUpdate { ref mut queue } => {
if queue.process(ctx) {
self = UpdateCellOperation::Done
Expand Down
Loading