Skip to content

Commit a7d3aec

Browse files
authored
Turbopack: avoid race condition when updating cells (#84598)
### What? The update cell operation has a race condition. There is a time between changing the value and invalidating the dependent tasks. That sounds fine as tasks are eventually invalidated. But it's not due to recomputing tasks. Tasks are considered as "recomputing" when they are not flagged as dirty while they update a cell/output. A recomputing tasks will not invalidate dependent tasks as a recomputation is considered as deterministic and must yield the same results. The race happens when a recomputation of a dependent task happens while a cell is updated. During that time the cell value is already updated, but the dependent task is not marked as dirty. So the recomputing tasks incorrectly doesn't trigger an invalidation for its cell change later, which depends on the already modified data
1 parent e0f676b commit a7d3aec

File tree

2 files changed

+104
-41
lines changed

2 files changed

+104
-41
lines changed

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -831,7 +831,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
831831
return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0));
832832
}
833833
let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
834-
let is_scheduled = matches!(in_progress, Some(InProgressState::Scheduled { .. }));
835834

836835
// Check cell index range (cell might not exist at all)
837836
let max_id = get!(
@@ -875,14 +874,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
875874
// Schedule the task, if not already scheduled
876875
if is_cancelled {
877876
bail!("{} was canceled", ctx.get_task_description(task_id));
878-
} else if !is_scheduled
879-
&& task.add(CachedDataItem::new_scheduled(
880-
TaskExecutionReason::CellNotAvailable,
881-
|| self.get_task_desc_fn(task_id),
882-
))
883-
{
884-
ctx.schedule_task(task);
885877
}
878+
task.add_new(CachedDataItem::new_scheduled(
879+
TaskExecutionReason::CellNotAvailable,
880+
|| self.get_task_desc_fn(task_id),
881+
));
882+
ctx.schedule_task(task);
886883

887884
Ok(Err(listener))
888885
}

turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs

Lines changed: 99 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::mem::take;
22

33
use serde::{Deserialize, Serialize};
44
use smallvec::SmallVec;
5-
use turbo_tasks::{CellId, TaskId, backend::CellContent};
5+
use turbo_tasks::{CellId, TaskId, TypedSharedReference, backend::CellContent};
66

77
#[cfg(feature = "trace_task_dirty")]
88
use crate::backend::operation::invalidate::TaskDirtyCause;
@@ -24,6 +24,12 @@ pub enum UpdateCellOperation {
2424
InvalidateWhenCellDependency {
2525
cell_ref: CellRef,
2626
dependent_tasks: SmallVec<[TaskId; 4]>,
27+
content: Option<TypedSharedReference>,
28+
queue: AggregationUpdateQueue,
29+
},
30+
FinalCellChange {
31+
cell_ref: CellRef,
32+
content: Option<TypedSharedReference>,
2733
queue: AggregationUpdateQueue,
2834
},
2935
AggregationUpdate {
@@ -36,51 +42,82 @@ pub enum UpdateCellOperation {
3642
impl UpdateCellOperation {
3743
pub fn run(task_id: TaskId, cell: CellId, content: CellContent, mut ctx: impl ExecuteContext) {
3844
let mut task = ctx.task(task_id, TaskDataCategory::All);
39-
let old_content = if let CellContent(Some(new_content)) = content {
40-
task.insert(CachedDataItem::CellData {
41-
cell,
42-
value: new_content.into_typed(cell.type_id),
43-
})
44-
} else {
45-
task.remove(&CachedDataItemKey::CellData { cell })
46-
};
47-
48-
if let Some(in_progress) = remove!(task, InProgressCell { cell }) {
49-
in_progress.event.notify(usize::MAX);
50-
}
5145

5246
// We need to detect recomputation, because here the content has not actually changed (even
5347
// if it's not equal to the old content, as not all values implement Eq). We have to
5448
// assume that tasks are deterministic and pure.
49+
let should_invalidate = ctx.should_track_dependencies()
50+
&& (task.has_key(&CachedDataItemKey::Dirty {}) ||
51+
// This is a hack for the streaming hack. Stateful tasks are never recomputed, so this forces invalidation for them in case of this hack.
52+
task.has_key(&CachedDataItemKey::Stateful {}));
5553

56-
if ctx.should_track_dependencies()
57-
&& (task.has_key(&CachedDataItemKey::Dirty {})
58-
||
59-
// This is a hack for the streaming hack. Stateful tasks are never recomputed, so this forces invalidation for them in case of this hack.
60-
task.has_key(&CachedDataItemKey::Stateful {}))
61-
{
62-
let dependent_tasks = get_many!(
54+
if should_invalidate {
55+
let dependent_tasks: SmallVec<[TaskId; 4]> = get_many!(
6356
task,
6457
CellDependent { cell: dependent_cell, task }
6558
if dependent_cell == cell
6659
=> task
6760
);
6861

69-
drop(task);
70-
drop(old_content);
62+
if !dependent_tasks.is_empty() {
63+
// Slow path: We need to invalidate tasks depending on this cell.
64+
// To avoid a race condition, we need to remove the old content first,
65+
// then invalidate dependent tasks and only then update the cell content.
66+
67+
// The reason behind this is that we consider tasks that haven't the dirty flag set
68+
// as "recomputing" tasks. Recomputing tasks won't invalidate
69+
// dependent tasks, when a cell is changed. This would cause missing invalidating if
70+
// a task is recomputing while a dependency is in the middle of a cell update (where
71+
// the value has been changed, but the dependent tasks have not be flagged dirty
72+
// yet). So to avoid that we first remove the cell content, invalidate all dependent
73+
// tasks and after that set the new cell content. When the cell content is unset,
74+
// readers will wait for it to be set via InProgressCell.
75+
76+
let old_content = task.remove(&CachedDataItemKey::CellData { cell });
77+
78+
drop(task);
79+
drop(old_content);
7180

72-
UpdateCellOperation::InvalidateWhenCellDependency {
73-
cell_ref: CellRef {
74-
task: task_id,
75-
cell,
76-
},
77-
dependent_tasks,
78-
queue: AggregationUpdateQueue::new(),
81+
let content = if let CellContent(Some(new_content)) = content {
82+
Some(new_content.into_typed(cell.type_id))
83+
} else {
84+
None
85+
};
86+
87+
UpdateCellOperation::InvalidateWhenCellDependency {
88+
cell_ref: CellRef {
89+
task: task_id,
90+
cell,
91+
},
92+
dependent_tasks,
93+
content,
94+
queue: AggregationUpdateQueue::new(),
95+
}
96+
.execute(&mut ctx);
97+
return;
7998
}
80-
.execute(&mut ctx);
99+
}
100+
101+
// Fast path: We don't need to invalidate anything.
102+
// So we can just update the cell content.
103+
104+
let old_content = if let CellContent(Some(new_content)) = content {
105+
let new_content = new_content.into_typed(cell.type_id);
106+
task.insert(CachedDataItem::CellData {
107+
cell,
108+
value: new_content,
109+
})
81110
} else {
82-
drop(task);
83-
drop(old_content);
111+
task.remove(&CachedDataItemKey::CellData { cell })
112+
};
113+
114+
let in_progress_cell = remove!(task, InProgressCell { cell });
115+
116+
drop(task);
117+
drop(old_content);
118+
119+
if let Some(in_progress) = in_progress_cell {
120+
in_progress.event.notify(usize::MAX);
84121
}
85122
}
86123
}
@@ -93,6 +130,7 @@ impl Operation for UpdateCellOperation {
93130
UpdateCellOperation::InvalidateWhenCellDependency {
94131
cell_ref,
95132
ref mut dependent_tasks,
133+
ref mut content,
96134
ref mut queue,
97135
} => {
98136
if let Some(dependent_task_id) = dependent_tasks.pop() {
@@ -129,9 +167,37 @@ impl Operation for UpdateCellOperation {
129167
);
130168
}
131169
if dependent_tasks.is_empty() {
132-
self = UpdateCellOperation::AggregationUpdate { queue: take(queue) };
170+
self = UpdateCellOperation::FinalCellChange {
171+
cell_ref,
172+
content: take(content),
173+
queue: take(queue),
174+
};
133175
}
134176
}
177+
UpdateCellOperation::FinalCellChange {
178+
cell_ref: CellRef { task, cell },
179+
content,
180+
ref mut queue,
181+
} => {
182+
let mut task = ctx.task(task, TaskDataCategory::Data);
183+
184+
if let Some(content) = content {
185+
task.add_new(CachedDataItem::CellData {
186+
cell,
187+
value: content,
188+
})
189+
}
190+
191+
let in_progress_cell = remove!(task, InProgressCell { cell });
192+
193+
drop(task);
194+
195+
if let Some(in_progress) = in_progress_cell {
196+
in_progress.event.notify(usize::MAX);
197+
}
198+
199+
self = UpdateCellOperation::AggregationUpdate { queue: take(queue) };
200+
}
135201
UpdateCellOperation::AggregationUpdate { ref mut queue } => {
136202
if queue.process(ctx) {
137203
self = UpdateCellOperation::Done

0 commit comments

Comments
 (0)