Skip to content

Commit 2f2d94d

Browse files
committed
Turbopack: process task result as part of the task completion
Set the task output when finishing the task Remove setting task as initial dirty in the normal execution Fix race condition when setting output and invalidating dependents
1 parent 1dd0434 commit 2f2d94d

File tree

6 files changed

+156
-277
lines changed

6 files changed

+156
-277
lines changed

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Lines changed: 153 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use crate::{
5252
AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
5353
CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
5454
Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number,
55-
get_uppers, is_root_node, prepare_new_children,
55+
get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children,
5656
},
5757
storage::{
5858
InnerStorageSnapshot, Storage, count, get, get_many, get_mut, get_mut_or_insert_with,
@@ -420,6 +420,8 @@ struct TaskExecutionCompletePrepareResult {
420420
pub new_children: FxHashSet<TaskId>,
421421
pub removed_data: Vec<CachedDataItem>,
422422
pub is_now_immutable: bool,
423+
pub new_output: Option<OutputValue>,
424+
pub output_dependent_tasks: SmallVec<[TaskId; 4]>,
423425
}
424426

425427
// Operations
@@ -493,16 +495,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
493495
Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
494496
}
495497
Some(InProgressState::InProgress(box InProgressStateInner {
496-
done,
497-
done_event,
498-
..
499-
})) => {
500-
if !*done {
501-
Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
502-
} else {
503-
None
504-
}
505-
}
498+
done_event, ..
499+
})) => Some(Ok(Err(listen_to_done_event(this, reader, done_event)))),
506500
Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
507501
"{} was canceled",
508502
ctx.get_task_description(task.id())
@@ -1593,7 +1587,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
15931587
done_event,
15941588
session_dependent: false,
15951589
marked_as_completed: false,
1596-
done: false,
15971590
new_children: Default::default(),
15981591
})),
15991592
});
@@ -1692,20 +1685,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
16921685
Some(TaskExecutionSpec { future, span })
16931686
}
16941687

1695-
fn task_execution_result(
1696-
&self,
1697-
task_id: TaskId,
1698-
result: Result<RawVc, TurboTasksExecutionError>,
1699-
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1700-
) {
1701-
operation::UpdateOutputOperation::run(task_id, result, self.execute_context(turbo_tasks));
1702-
}
1703-
17041688
fn task_execution_completed(
17051689
&self,
17061690
task_id: TaskId,
17071691
duration: Duration,
17081692
_memory_usage: usize,
1693+
result: Result<RawVc, TurboTasksExecutionError>,
17091694
cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
17101695
stateful: bool,
17111696
has_invalidator: bool,
@@ -1734,10 +1719,13 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
17341719
new_children,
17351720
mut removed_data,
17361721
is_now_immutable,
1722+
new_output,
1723+
output_dependent_tasks,
17371724
}) = self.task_execution_completed_prepare(
17381725
&mut ctx,
17391726
&span,
17401727
task_id,
1728+
result,
17411729
cell_counters,
17421730
stateful,
17431731
has_invalidator,
@@ -1751,7 +1739,20 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
17511739
// suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
17521740
// would be executed again.
17531741

1742+
if !output_dependent_tasks.is_empty() {
1743+
self.task_execution_completed_invalidate_output_dependent(
1744+
&mut ctx,
1745+
task_id,
1746+
output_dependent_tasks,
1747+
);
1748+
}
1749+
17541750
let has_new_children = !new_children.is_empty();
1751+
1752+
if has_new_children {
1753+
self.task_execution_completed_unfinished_children_dirty(&mut ctx, &new_children)
1754+
}
1755+
17551756
if has_new_children
17561757
&& self.task_execution_completed_connect(&mut ctx, task_id, new_children)
17571758
{
@@ -1762,6 +1763,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
17621763
if self.task_execution_completed_finish(
17631764
&mut ctx,
17641765
task_id,
1766+
new_output,
17651767
&mut removed_data,
17661768
is_now_immutable,
17671769
) {
@@ -1781,6 +1783,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
17811783
ctx: &mut impl ExecuteContext<'_>,
17821784
span: &Span,
17831785
task_id: TaskId,
1786+
result: Result<RawVc, TurboTasksExecutionError>,
17841787
cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
17851788
stateful: bool,
17861789
has_invalidator: bool,
@@ -1794,12 +1797,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
17941797
new_children: Default::default(),
17951798
removed_data: Default::default(),
17961799
is_now_immutable: false,
1800+
new_output: None,
1801+
output_dependent_tasks: Default::default(),
17971802
});
17981803
}
17991804
let &mut InProgressState::InProgress(box InProgressStateInner {
18001805
stale,
1801-
ref mut done,
1802-
ref done_event,
18031806
ref mut new_children,
18041807
session_dependent,
18051808
..
@@ -1843,12 +1846,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
18431846
return None;
18441847
}
18451848

1846-
if cfg!(not(feature = "no_fast_stale")) || !stale {
1847-
// mark the task as completed, so dependent tasks can continue working
1848-
*done = true;
1849-
done_event.notify(usize::MAX);
1850-
}
1851-
18521849
// take the children from the task to process them
18531850
let mut new_children = take(new_children);
18541851

@@ -1976,6 +1973,53 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
19761973
);
19771974
}
19781975

1976+
// Check if output need to be updated
1977+
let current_output = get!(task, Output);
1978+
let new_output = match result {
1979+
Ok(RawVc::TaskOutput(output_task_id)) => {
1980+
if let Some(OutputValue::Output(current_task_id)) = current_output
1981+
&& *current_task_id == output_task_id
1982+
{
1983+
None
1984+
} else {
1985+
Some(OutputValue::Output(output_task_id))
1986+
}
1987+
}
1988+
Ok(RawVc::TaskCell(output_task_id, cell)) => {
1989+
if let Some(OutputValue::Cell(CellRef {
1990+
task: current_task_id,
1991+
cell: current_cell,
1992+
})) = current_output
1993+
&& *current_task_id == output_task_id
1994+
&& *current_cell == cell
1995+
{
1996+
None
1997+
} else {
1998+
Some(OutputValue::Cell(CellRef {
1999+
task: output_task_id,
2000+
cell,
2001+
}))
2002+
}
2003+
}
2004+
Ok(RawVc::LocalOutput(..)) => {
2005+
panic!("Non-local tasks must not return a local Vc");
2006+
}
2007+
Err(err) => {
2008+
if let Some(OutputValue::Error(old_error)) = current_output
2009+
&& old_error == &err
2010+
{
2011+
None
2012+
} else {
2013+
Some(OutputValue::Error(err))
2014+
}
2015+
}
2016+
};
2017+
let mut output_dependent_tasks = SmallVec::<[_; 4]>::new();
2018+
// When output has changed, grab the dependent tasks
2019+
if new_output.is_some() && ctx.should_track_dependencies() {
2020+
output_dependent_tasks = get_many!(task, OutputDependent { task } => task);
2021+
}
2022+
19792023
drop(task);
19802024

19812025
// Check if the task can be marked as immutable
@@ -2002,9 +2046,76 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
20022046
new_children,
20032047
removed_data,
20042048
is_now_immutable,
2049+
new_output,
2050+
output_dependent_tasks,
20052051
})
20062052
}
20072053

2054+
fn task_execution_completed_invalidate_output_dependent(
2055+
&self,
2056+
ctx: &mut impl ExecuteContext<'_>,
2057+
task_id: TaskId,
2058+
output_dependent_tasks: SmallVec<[TaskId; 4]>,
2059+
) {
2060+
debug_assert!(!output_dependent_tasks.is_empty());
2061+
2062+
let mut queue = AggregationUpdateQueue::new();
2063+
for dependent_task_id in output_dependent_tasks {
2064+
if ctx.is_once_task(dependent_task_id) {
2065+
// once tasks are never invalidated
2066+
continue;
2067+
}
2068+
let dependent = ctx.task(dependent_task_id, TaskDataCategory::All);
2069+
if dependent.has_key(&CachedDataItemKey::OutdatedOutputDependency { target: task_id }) {
2070+
// output dependency is outdated, so it hasn't read the output yet
2071+
// and doesn't need to be invalidated
2072+
continue;
2073+
}
2074+
if !dependent.has_key(&CachedDataItemKey::OutputDependency { target: task_id }) {
2075+
// output dependency has been removed, so the task doesn't depend on the
2076+
// output anymore and doesn't need to be invalidated
2077+
continue;
2078+
}
2079+
make_task_dirty_internal(
2080+
dependent,
2081+
dependent_task_id,
2082+
true,
2083+
#[cfg(feature = "trace_task_dirty")]
2084+
TaskDirtyCause::OutputChange { task_id },
2085+
&mut queue,
2086+
ctx,
2087+
);
2088+
}
2089+
2090+
queue.execute(ctx);
2091+
}
2092+
2093+
fn task_execution_completed_unfinished_children_dirty(
2094+
&self,
2095+
ctx: &mut impl ExecuteContext<'_>,
2096+
new_children: &FxHashSet<TaskId>,
2097+
) {
2098+
debug_assert!(!new_children.is_empty());
2099+
2100+
let mut queue = AggregationUpdateQueue::new();
2101+
for &child_id in new_children {
2102+
let child_task = ctx.task(child_id, TaskDataCategory::Meta);
2103+
if !child_task.has_key(&CachedDataItemKey::Output {}) {
2104+
make_task_dirty_internal(
2105+
child_task,
2106+
child_id,
2107+
false,
2108+
#[cfg(feature = "trace_task_dirty")]
2109+
TaskDirtyCause::InitialDirty,
2110+
&mut queue,
2111+
ctx,
2112+
);
2113+
}
2114+
}
2115+
2116+
queue.execute(ctx);
2117+
}
2118+
20082119
fn task_execution_completed_connect(
20092120
&self,
20102121
ctx: &mut impl ExecuteContext<'_>,
@@ -2075,6 +2186,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
20752186
&self,
20762187
ctx: &mut impl ExecuteContext<'_>,
20772188
task_id: TaskId,
2189+
new_output: Option<OutputValue>,
20782190
removed_data: &mut Vec<CachedDataItem>,
20792191
is_now_immutable: bool,
20802192
) -> bool {
@@ -2091,7 +2203,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
20912203
once_task: _,
20922204
stale,
20932205
session_dependent,
2094-
done: _,
20952206
marked_as_completed: _,
20962207
new_children,
20972208
}) = in_progress
@@ -2111,6 +2222,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
21112222
return true;
21122223
}
21132224

2225+
// Set the output if it has changed
2226+
let mut old_content = None;
2227+
if let Some(value) = new_output {
2228+
old_content = task.insert(CachedDataItem::Output { value });
2229+
}
2230+
21142231
// If the task is not stateful and has no mutable children, it does not have a way to be
21152232
// invalidated and we can mark it as immutable.
21162233
if is_now_immutable {
@@ -2191,6 +2308,10 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
21912308
};
21922309

21932310
drop(task);
2311+
drop(old_content);
2312+
2313+
// Notify dependent tasks that are waiting for this task to finish
2314+
done_event.notify(usize::MAX);
21942315

21952316
if let Some(data_update) = data_update {
21962317
AggregationUpdateQueue::run(data_update, ctx);
@@ -2978,20 +3099,12 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
29783099
self.0.try_start_task_execution(task_id, turbo_tasks)
29793100
}
29803101

2981-
fn task_execution_result(
2982-
&self,
2983-
task_id: TaskId,
2984-
result: Result<RawVc, TurboTasksExecutionError>,
2985-
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
2986-
) {
2987-
self.0.task_execution_result(task_id, result, turbo_tasks);
2988-
}
2989-
29903102
fn task_execution_completed(
29913103
&self,
29923104
task_id: TaskId,
29933105
_duration: Duration,
29943106
_memory_usage: usize,
3107+
result: Result<RawVc, TurboTasksExecutionError>,
29953108
cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
29963109
stateful: bool,
29973110
has_invalidator: bool,
@@ -3001,6 +3114,7 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
30013114
task_id,
30023115
_duration,
30033116
_memory_usage,
3117+
result,
30043118
cell_counters,
30053119
stateful,
30063120
has_invalidator,

turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ mod invalidate;
66
mod prepare_new_children;
77
mod update_cell;
88
mod update_collectible;
9-
mod update_output;
109

1110
use std::{
1211
fmt::{Debug, Formatter},
@@ -600,7 +599,6 @@ macro_rules! impl_operation {
600599
pub enum AnyOperation {
601600
ConnectChild(connect_child::ConnectChildOperation),
602601
Invalidate(invalidate::InvalidateOperation),
603-
UpdateOutput(update_output::UpdateOutputOperation),
604602
UpdateCell(update_cell::UpdateCellOperation),
605603
CleanupOldEdges(cleanup_old_edges::CleanupOldEdgesOperation),
606604
AggregationUpdate(aggregation_update::AggregationUpdateQueue),
@@ -612,7 +610,6 @@ impl AnyOperation {
612610
match self {
613611
AnyOperation::ConnectChild(op) => op.execute(ctx),
614612
AnyOperation::Invalidate(op) => op.execute(ctx),
615-
AnyOperation::UpdateOutput(op) => op.execute(ctx),
616613
AnyOperation::UpdateCell(op) => op.execute(ctx),
617614
AnyOperation::CleanupOldEdges(op) => op.execute(ctx),
618615
AnyOperation::AggregationUpdate(op) => op.execute(ctx),
@@ -627,7 +624,6 @@ impl AnyOperation {
627624

628625
impl_operation!(ConnectChild connect_child::ConnectChildOperation);
629626
impl_operation!(Invalidate invalidate::InvalidateOperation);
630-
impl_operation!(UpdateOutput update_output::UpdateOutputOperation);
631627
impl_operation!(UpdateCell update_cell::UpdateCellOperation);
632628
impl_operation!(CleanupOldEdges cleanup_old_edges::CleanupOldEdgesOperation);
633629
impl_operation!(AggregationUpdate aggregation_update::AggregationUpdateQueue);
@@ -641,6 +637,7 @@ pub use self::{
641637
},
642638
cleanup_old_edges::OutdatedEdge,
643639
connect_children::connect_children,
640+
invalidate::make_task_dirty_internal,
644641
prepare_new_children::prepare_new_children,
645642
update_collectible::UpdateCollectibleOperation,
646643
};

0 commit comments

Comments
 (0)