Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 153 additions & 39 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use crate::{
AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number,
get_uppers, is_root_node, prepare_new_children,
get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children,
},
storage::{
InnerStorageSnapshot, Storage, count, get, get_many, get_mut, get_mut_or_insert_with,
Expand Down Expand Up @@ -420,6 +420,8 @@ struct TaskExecutionCompletePrepareResult {
pub new_children: FxHashSet<TaskId>,
pub removed_data: Vec<CachedDataItem>,
pub is_now_immutable: bool,
pub new_output: Option<OutputValue>,
pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
}

// Operations
Expand Down Expand Up @@ -493,16 +495,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
}
Some(InProgressState::InProgress(box InProgressStateInner {
done,
done_event,
..
})) => {
if !*done {
Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
} else {
None
}
}
done_event, ..
})) => Some(Ok(Err(listen_to_done_event(this, reader, done_event)))),
Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
"{} was canceled",
ctx.get_task_description(task.id())
Expand Down Expand Up @@ -1596,7 +1590,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
done_event,
session_dependent: false,
marked_as_completed: false,
done: false,
new_children: Default::default(),
})),
});
Expand Down Expand Up @@ -1695,20 +1688,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
Some(TaskExecutionSpec { future, span })
}

fn task_execution_result(
&self,
task_id: TaskId,
result: Result<RawVc, TurboTasksExecutionError>,
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
) {
operation::UpdateOutputOperation::run(task_id, result, self.execute_context(turbo_tasks));
}

fn task_execution_completed(
&self,
task_id: TaskId,
duration: Duration,
_memory_usage: usize,
result: Result<RawVc, TurboTasksExecutionError>,
cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
stateful: bool,
has_invalidator: bool,
Expand Down Expand Up @@ -1737,10 +1722,13 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
new_children,
mut removed_data,
is_now_immutable,
new_output,
output_dependent_tasks,
}) = self.task_execution_completed_prepare(
&mut ctx,
&span,
task_id,
result,
cell_counters,
stateful,
has_invalidator,
Expand All @@ -1754,7 +1742,20 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
// suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
// would be executed again.

if !output_dependent_tasks.is_empty() {
self.task_execution_completed_invalidate_output_dependent(
&mut ctx,
task_id,
output_dependent_tasks,
);
}

let has_new_children = !new_children.is_empty();

if has_new_children {
self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
}

if has_new_children
&& self.task_execution_completed_connect(&mut ctx, task_id, new_children)
{
Expand All @@ -1765,6 +1766,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
if self.task_execution_completed_finish(
&mut ctx,
task_id,
new_output,
&mut removed_data,
is_now_immutable,
) {
Expand All @@ -1784,6 +1786,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
ctx: &mut impl ExecuteContext<'_>,
span: &Span,
task_id: TaskId,
result: Result<RawVc, TurboTasksExecutionError>,
cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
stateful: bool,
has_invalidator: bool,
Expand All @@ -1797,12 +1800,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
new_children: Default::default(),
removed_data: Default::default(),
is_now_immutable: false,
new_output: None,
output_dependent_tasks: Default::default(),
});
}
let &mut InProgressState::InProgress(box InProgressStateInner {
stale,
ref mut done,
ref done_event,
ref mut new_children,
session_dependent,
..
Expand Down Expand Up @@ -1846,12 +1849,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
return None;
}

if cfg!(not(feature = "no_fast_stale")) || !stale {
// mark the task as completed, so dependent tasks can continue working
*done = true;
done_event.notify(usize::MAX);
}

// take the children from the task to process them
let mut new_children = take(new_children);

Expand Down Expand Up @@ -1979,6 +1976,53 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
);
}

// Check if output need to be updated
let current_output = get!(task, Output);
let new_output = match result {
Ok(RawVc::TaskOutput(output_task_id)) => {
if let Some(OutputValue::Output(current_task_id)) = current_output
&& *current_task_id == output_task_id
{
None
} else {
Some(OutputValue::Output(output_task_id))
}
}
Ok(RawVc::TaskCell(output_task_id, cell)) => {
if let Some(OutputValue::Cell(CellRef {
task: current_task_id,
cell: current_cell,
})) = current_output
&& *current_task_id == output_task_id
&& *current_cell == cell
{
None
} else {
Some(OutputValue::Cell(CellRef {
task: output_task_id,
cell,
}))
}
}
Ok(RawVc::LocalOutput(..)) => {
panic!("Non-local tasks must not return a local Vc");
}
Err(err) => {
if let Some(OutputValue::Error(old_error)) = current_output
&& old_error == &err
{
None
} else {
Some(OutputValue::Error(err))
}
}
};
let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
// When output has changed, grab the dependent tasks
if new_output.is_some() && ctx.should_track_dependencies() {
output_dependent_tasks = get_many!(task, OutputDependent { task } => task);
}

drop(task);

// Check if the task can be marked as immutable
Expand All @@ -2005,9 +2049,76 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
new_children,
removed_data,
is_now_immutable,
new_output,
output_dependent_tasks,
})
}

fn task_execution_completed_invalidate_output_dependent(
&self,
ctx: &mut impl ExecuteContext<'_>,
task_id: TaskId,
output_dependent_tasks: SmallVec<[TaskId; 4]>,
) {
debug_assert!(!output_dependent_tasks.is_empty());

let mut queue = AggregationUpdateQueue::new();
for dependent_task_id in output_dependent_tasks {
if ctx.is_once_task(dependent_task_id) {
// once tasks are never invalidated
continue;
}
let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
if dependent.has_key(&CachedDataItemKey::OutdatedOutputDependency { target: task_id }) {
// output dependency is outdated, so it hasn't read the output yet
// and doesn't need to be invalidated
continue;
}
if !dependent.has_key(&CachedDataItemKey::OutputDependency { target: task_id }) {
// output dependency has been removed, so the task doesn't depend on the
// output anymore and doesn't need to be invalidated
continue;
}
make_task_dirty_internal(
dependent,
dependent_task_id,
true,
#[cfg(feature = "trace_task_dirty")]
TaskDirtyCause::OutputChange { task_id },
&mut queue,
ctx,
);
}

queue.execute(ctx);
}

fn task_execution_completed_unfinished_children_dirty(
&self,
ctx: &mut impl ExecuteContext<'_>,
new_children: &FxHashSet<TaskId>,
) {
debug_assert!(!new_children.is_empty());

let mut queue = AggregationUpdateQueue::new();
for &child_id in new_children {
let child_task = ctx.task(child_id, TaskDataCategory::Meta);
if !child_task.has_key(&CachedDataItemKey::Output {}) {
make_task_dirty_internal(
child_task,
child_id,
false,
#[cfg(feature = "trace_task_dirty")]
TaskDirtyCause::InitialDirty,
&mut queue,
ctx,
);
}
}

queue.execute(ctx);
}

fn task_execution_completed_connect(
&self,
ctx: &mut impl ExecuteContext<'_>,
Expand Down Expand Up @@ -2078,6 +2189,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
&self,
ctx: &mut impl ExecuteContext<'_>,
task_id: TaskId,
new_output: Option<OutputValue>,
removed_data: &mut Vec<CachedDataItem>,
is_now_immutable: bool,
) -> bool {
Expand All @@ -2094,7 +2206,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
once_task: _,
stale,
session_dependent,
done: _,
marked_as_completed: _,
new_children,
}) = in_progress
Expand All @@ -2114,6 +2225,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
return true;
}

// Set the output if it has changed
let mut old_content = None;
if let Some(value) = new_output {
old_content = task.insert(CachedDataItem::Output { value });
}

// If the task is not stateful and has no mutable children, it does not have a way to be
// invalidated and we can mark it as immutable.
if is_now_immutable {
Expand Down Expand Up @@ -2194,6 +2311,10 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
};

drop(task);
drop(old_content);

// Notify dependent tasks that are waiting for this task to finish
done_event.notify(usize::MAX);

if let Some(data_update) = data_update {
AggregationUpdateQueue::run(data_update, ctx);
Expand Down Expand Up @@ -2981,20 +3102,12 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
self.0.try_start_task_execution(task_id, turbo_tasks)
}

fn task_execution_result(
&self,
task_id: TaskId,
result: Result<RawVc, TurboTasksExecutionError>,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) {
self.0.task_execution_result(task_id, result, turbo_tasks);
}

fn task_execution_completed(
&self,
task_id: TaskId,
_duration: Duration,
_memory_usage: usize,
result: Result<RawVc, TurboTasksExecutionError>,
cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
stateful: bool,
has_invalidator: bool,
Expand All @@ -3004,6 +3117,7 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
task_id,
_duration,
_memory_usage,
result,
cell_counters,
stateful,
has_invalidator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ mod invalidate;
mod prepare_new_children;
mod update_cell;
mod update_collectible;
mod update_output;

use std::{
fmt::{Debug, Formatter},
Expand Down Expand Up @@ -600,7 +599,6 @@ macro_rules! impl_operation {
pub enum AnyOperation {
ConnectChild(connect_child::ConnectChildOperation),
Invalidate(invalidate::InvalidateOperation),
UpdateOutput(update_output::UpdateOutputOperation),
UpdateCell(update_cell::UpdateCellOperation),
CleanupOldEdges(cleanup_old_edges::CleanupOldEdgesOperation),
AggregationUpdate(aggregation_update::AggregationUpdateQueue),
Expand All @@ -612,7 +610,6 @@ impl AnyOperation {
match self {
AnyOperation::ConnectChild(op) => op.execute(ctx),
AnyOperation::Invalidate(op) => op.execute(ctx),
AnyOperation::UpdateOutput(op) => op.execute(ctx),
AnyOperation::UpdateCell(op) => op.execute(ctx),
AnyOperation::CleanupOldEdges(op) => op.execute(ctx),
AnyOperation::AggregationUpdate(op) => op.execute(ctx),
Expand All @@ -627,7 +624,6 @@ impl AnyOperation {

impl_operation!(ConnectChild connect_child::ConnectChildOperation);
impl_operation!(Invalidate invalidate::InvalidateOperation);
impl_operation!(UpdateOutput update_output::UpdateOutputOperation);
impl_operation!(UpdateCell update_cell::UpdateCellOperation);
impl_operation!(CleanupOldEdges cleanup_old_edges::CleanupOldEdgesOperation);
impl_operation!(AggregationUpdate aggregation_update::AggregationUpdateQueue);
Expand All @@ -641,6 +637,7 @@ pub use self::{
},
cleanup_old_edges::OutdatedEdge,
connect_children::connect_children,
invalidate::make_task_dirty_internal,
prepare_new_children::prepare_new_children,
update_collectible::UpdateCollectibleOperation,
};
Loading
Loading