Skip to content

Commit f9e2d1a

Browse files
committed
Turbopack: process task result as part of the task completion
Set the task output when finishing the task Remove setting task as initial dirty in the normal execution Fix race condition when setting output and invalidating dependents
1 parent 79e7838 commit f9e2d1a

File tree

6 files changed

+156
-277
lines changed

6 files changed

+156
-277
lines changed

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Lines changed: 153 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use crate::{
5252
AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
5353
CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
5454
Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number,
55-
get_uppers, is_root_node, prepare_new_children,
55+
get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children,
5656
},
5757
storage::{
5858
InnerStorageSnapshot, Storage, count, get, get_many, get_mut, get_mut_or_insert_with,
@@ -420,6 +420,8 @@ struct TaskExecutionCompletePrepareResult {
420420
pub new_children: FxHashSet<TaskId>,
421421
pub removed_data: Vec<CachedDataItem>,
422422
pub is_now_immutable: bool,
423+
pub new_output: Option<OutputValue>,
424+
pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
423425
}
424426

425427
// Operations
@@ -493,16 +495,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
493495
Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
494496
}
495497
Some(InProgressState::InProgress(box InProgressStateInner {
496-
done,
497-
done_event,
498-
..
499-
})) => {
500-
if !*done {
501-
Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
502-
} else {
503-
None
504-
}
505-
}
498+
done_event, ..
499+
})) => Some(Ok(Err(listen_to_done_event(this, reader, done_event)))),
506500
Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
507501
"{} was canceled",
508502
ctx.get_task_description(task.id())
@@ -1596,7 +1590,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
15961590
done_event,
15971591
session_dependent: false,
15981592
marked_as_completed: false,
1599-
done: false,
16001593
new_children: Default::default(),
16011594
})),
16021595
});
@@ -1695,20 +1688,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
16951688
Some(TaskExecutionSpec { future, span })
16961689
}
16971690

1698-
fn task_execution_result(
1699-
&self,
1700-
task_id: TaskId,
1701-
result: Result<RawVc, TurboTasksExecutionError>,
1702-
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1703-
) {
1704-
operation::UpdateOutputOperation::run(task_id, result, self.execute_context(turbo_tasks));
1705-
}
1706-
17071691
fn task_execution_completed(
17081692
&self,
17091693
task_id: TaskId,
17101694
duration: Duration,
17111695
_memory_usage: usize,
1696+
result: Result<RawVc, TurboTasksExecutionError>,
17121697
cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
17131698
stateful: bool,
17141699
has_invalidator: bool,
@@ -1737,10 +1722,13 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
17371722
new_children,
17381723
mut removed_data,
17391724
is_now_immutable,
1725+
new_output,
1726+
output_dependent_tasks,
17401727
}) = self.task_execution_completed_prepare(
17411728
&mut ctx,
17421729
&span,
17431730
task_id,
1731+
result,
17441732
cell_counters,
17451733
stateful,
17461734
has_invalidator,
@@ -1754,7 +1742,20 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
17541742
// suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
17551743
// would be executed again.
17561744

1745+
if !output_dependent_tasks.is_empty() {
1746+
self.task_execution_completed_invalidate_output_dependent(
1747+
&mut ctx,
1748+
task_id,
1749+
output_dependent_tasks,
1750+
);
1751+
}
1752+
17571753
let has_new_children = !new_children.is_empty();
1754+
1755+
if has_new_children {
1756+
self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
1757+
}
1758+
17581759
if has_new_children
17591760
&& self.task_execution_completed_connect(&mut ctx, task_id, new_children)
17601761
{
@@ -1765,6 +1766,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
17651766
if self.task_execution_completed_finish(
17661767
&mut ctx,
17671768
task_id,
1769+
new_output,
17681770
&mut removed_data,
17691771
is_now_immutable,
17701772
) {
@@ -1784,6 +1786,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
17841786
ctx: &mut impl ExecuteContext<'_>,
17851787
span: &Span,
17861788
task_id: TaskId,
1789+
result: Result<RawVc, TurboTasksExecutionError>,
17871790
cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
17881791
stateful: bool,
17891792
has_invalidator: bool,
@@ -1797,12 +1800,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
17971800
new_children: Default::default(),
17981801
removed_data: Default::default(),
17991802
is_now_immutable: false,
1803+
new_output: None,
1804+
output_dependent_tasks: Default::default(),
18001805
});
18011806
}
18021807
let &mut InProgressState::InProgress(box InProgressStateInner {
18031808
stale,
1804-
ref mut done,
1805-
ref done_event,
18061809
ref mut new_children,
18071810
session_dependent,
18081811
..
@@ -1846,12 +1849,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
18461849
return None;
18471850
}
18481851

1849-
if cfg!(not(feature = "no_fast_stale")) || !stale {
1850-
// mark the task as completed, so dependent tasks can continue working
1851-
*done = true;
1852-
done_event.notify(usize::MAX);
1853-
}
1854-
18551852
// take the children from the task to process them
18561853
let mut new_children = take(new_children);
18571854

@@ -1979,6 +1976,53 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
19791976
);
19801977
}
19811978

1979+
// Check if output need to be updated
1980+
let current_output = get!(task, Output);
1981+
let new_output = match result {
1982+
Ok(RawVc::TaskOutput(output_task_id)) => {
1983+
if let Some(OutputValue::Output(current_task_id)) = current_output
1984+
&& *current_task_id == output_task_id
1985+
{
1986+
None
1987+
} else {
1988+
Some(OutputValue::Output(output_task_id))
1989+
}
1990+
}
1991+
Ok(RawVc::TaskCell(output_task_id, cell)) => {
1992+
if let Some(OutputValue::Cell(CellRef {
1993+
task: current_task_id,
1994+
cell: current_cell,
1995+
})) = current_output
1996+
&& *current_task_id == output_task_id
1997+
&& *current_cell == cell
1998+
{
1999+
None
2000+
} else {
2001+
Some(OutputValue::Cell(CellRef {
2002+
task: output_task_id,
2003+
cell,
2004+
}))
2005+
}
2006+
}
2007+
Ok(RawVc::LocalOutput(..)) => {
2008+
panic!("Non-local tasks must not return a local Vc");
2009+
}
2010+
Err(err) => {
2011+
if let Some(OutputValue::Error(old_error)) = current_output
2012+
&& old_error == &err
2013+
{
2014+
None
2015+
} else {
2016+
Some(OutputValue::Error(err))
2017+
}
2018+
}
2019+
};
2020+
let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2021+
// When output has changed, grab the dependent tasks
2022+
if new_output.is_some() && ctx.should_track_dependencies() {
2023+
output_dependent_tasks = get_many!(task, OutputDependent { task } => task);
2024+
}
2025+
19822026
drop(task);
19832027

19842028
// Check if the task can be marked as immutable
@@ -2005,9 +2049,76 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
20052049
new_children,
20062050
removed_data,
20072051
is_now_immutable,
2052+
new_output,
2053+
output_dependent_tasks,
20082054
})
20092055
}
20102056

2057+
fn task_execution_completed_invalidate_output_dependent(
2058+
&self,
2059+
ctx: &mut impl ExecuteContext<'_>,
2060+
task_id: TaskId,
2061+
output_dependent_tasks: SmallVec<[TaskId; 4]>,
2062+
) {
2063+
debug_assert!(!output_dependent_tasks.is_empty());
2064+
2065+
let mut queue = AggregationUpdateQueue::new();
2066+
for dependent_task_id in output_dependent_tasks {
2067+
if ctx.is_once_task(dependent_task_id) {
2068+
// once tasks are never invalidated
2069+
continue;
2070+
}
2071+
let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2072+
if dependent.has_key(&CachedDataItemKey::OutdatedOutputDependency { target: task_id }) {
2073+
// output dependency is outdated, so it hasn't read the output yet
2074+
// and doesn't need to be invalidated
2075+
continue;
2076+
}
2077+
if !dependent.has_key(&CachedDataItemKey::OutputDependency { target: task_id }) {
2078+
// output dependency has been removed, so the task doesn't depend on the
2079+
// output anymore and doesn't need to be invalidated
2080+
continue;
2081+
}
2082+
make_task_dirty_internal(
2083+
dependent,
2084+
dependent_task_id,
2085+
true,
2086+
#[cfg(feature = "trace_task_dirty")]
2087+
TaskDirtyCause::OutputChange { task_id },
2088+
&mut queue,
2089+
ctx,
2090+
);
2091+
}
2092+
2093+
queue.execute(ctx);
2094+
}
2095+
2096+
fn task_execution_completed_unfinished_children_dirty(
2097+
&self,
2098+
ctx: &mut impl ExecuteContext<'_>,
2099+
new_children: &FxHashSet<TaskId>,
2100+
) {
2101+
debug_assert!(!new_children.is_empty());
2102+
2103+
let mut queue = AggregationUpdateQueue::new();
2104+
for &child_id in new_children {
2105+
let child_task = ctx.task(child_id, TaskDataCategory::Meta);
2106+
if !child_task.has_key(&CachedDataItemKey::Output {}) {
2107+
make_task_dirty_internal(
2108+
child_task,
2109+
child_id,
2110+
false,
2111+
#[cfg(feature = "trace_task_dirty")]
2112+
TaskDirtyCause::InitialDirty,
2113+
&mut queue,
2114+
ctx,
2115+
);
2116+
}
2117+
}
2118+
2119+
queue.execute(ctx);
2120+
}
2121+
20112122
fn task_execution_completed_connect(
20122123
&self,
20132124
ctx: &mut impl ExecuteContext<'_>,
@@ -2078,6 +2189,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
20782189
&self,
20792190
ctx: &mut impl ExecuteContext<'_>,
20802191
task_id: TaskId,
2192+
new_output: Option<OutputValue>,
20812193
removed_data: &mut Vec<CachedDataItem>,
20822194
is_now_immutable: bool,
20832195
) -> bool {
@@ -2094,7 +2206,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
20942206
once_task: _,
20952207
stale,
20962208
session_dependent,
2097-
done: _,
20982209
marked_as_completed: _,
20992210
new_children,
21002211
}) = in_progress
@@ -2114,6 +2225,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
21142225
return true;
21152226
}
21162227

2228+
// Set the output if it has changed
2229+
let mut old_content = None;
2230+
if let Some(value) = new_output {
2231+
old_content = task.insert(CachedDataItem::Output { value });
2232+
}
2233+
21172234
// If the task is not stateful and has no mutable children, it does not have a way to be
21182235
// invalidated and we can mark it as immutable.
21192236
if is_now_immutable {
@@ -2194,6 +2311,10 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
21942311
};
21952312

21962313
drop(task);
2314+
drop(old_content);
2315+
2316+
// Notify dependent tasks that are waiting for this task to finish
2317+
done_event.notify(usize::MAX);
21972318

21982319
if let Some(data_update) = data_update {
21992320
AggregationUpdateQueue::run(data_update, ctx);
@@ -2981,20 +3102,12 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
29813102
self.0.try_start_task_execution(task_id, turbo_tasks)
29823103
}
29833104

2984-
fn task_execution_result(
2985-
&self,
2986-
task_id: TaskId,
2987-
result: Result<RawVc, TurboTasksExecutionError>,
2988-
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2989-
) {
2990-
self.0.task_execution_result(task_id, result, turbo_tasks);
2991-
}
2992-
29933105
fn task_execution_completed(
29943106
&self,
29953107
task_id: TaskId,
29963108
_duration: Duration,
29973109
_memory_usage: usize,
3110+
result: Result<RawVc, TurboTasksExecutionError>,
29983111
cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
29993112
stateful: bool,
30003113
has_invalidator: bool,
@@ -3004,6 +3117,7 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
30043117
task_id,
30053118
_duration,
30063119
_memory_usage,
3120+
result,
30073121
cell_counters,
30083122
stateful,
30093123
has_invalidator,

turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ mod invalidate;
66
mod prepare_new_children;
77
mod update_cell;
88
mod update_collectible;
9-
mod update_output;
109

1110
use std::{
1211
fmt::{Debug, Formatter},
@@ -600,7 +599,6 @@ macro_rules! impl_operation {
600599
pub enum AnyOperation {
601600
ConnectChild(connect_child::ConnectChildOperation),
602601
Invalidate(invalidate::InvalidateOperation),
603-
UpdateOutput(update_output::UpdateOutputOperation),
604602
UpdateCell(update_cell::UpdateCellOperation),
605603
CleanupOldEdges(cleanup_old_edges::CleanupOldEdgesOperation),
606604
AggregationUpdate(aggregation_update::AggregationUpdateQueue),
@@ -612,7 +610,6 @@ impl AnyOperation {
612610
match self {
613611
AnyOperation::ConnectChild(op) => op.execute(ctx),
614612
AnyOperation::Invalidate(op) => op.execute(ctx),
615-
AnyOperation::UpdateOutput(op) => op.execute(ctx),
616613
AnyOperation::UpdateCell(op) => op.execute(ctx),
617614
AnyOperation::CleanupOldEdges(op) => op.execute(ctx),
618615
AnyOperation::AggregationUpdate(op) => op.execute(ctx),
@@ -627,7 +624,6 @@ impl AnyOperation {
627624

628625
impl_operation!(ConnectChild connect_child::ConnectChildOperation);
629626
impl_operation!(Invalidate invalidate::InvalidateOperation);
630-
impl_operation!(UpdateOutput update_output::UpdateOutputOperation);
631627
impl_operation!(UpdateCell update_cell::UpdateCellOperation);
632628
impl_operation!(CleanupOldEdges cleanup_old_edges::CleanupOldEdgesOperation);
633629
impl_operation!(AggregationUpdate aggregation_update::AggregationUpdateQueue);
@@ -641,6 +637,7 @@ pub use self::{
641637
},
642638
cleanup_old_edges::OutdatedEdge,
643639
connect_children::connect_children,
640+
invalidate::make_task_dirty_internal,
644641
prepare_new_children::prepare_new_children,
645642
update_collectible::UpdateCollectibleOperation,
646643
};

0 commit comments

Comments
 (0)