Skip to content

Commit 7183271

Browse files
committed
Turbopack: avoid race condition when updating cells
1 parent db63b09 commit 7183271

File tree

1 file changed

+77
-20
lines changed
  • turbopack/crates/turbo-tasks-backend/src/backend/operation

1 file changed

+77
-20
lines changed

turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs

Lines changed: 77 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ use std::mem::take;
22

33
use serde::{Deserialize, Serialize};
44
use smallvec::SmallVec;
5-
use turbo_tasks::{CellId, TaskId, backend::CellContent};
5+
use turbo_tasks::{
6+
CellId, TaskId, TypedSharedReference,
7+
backend::{CellContent, TypedCellContent},
8+
};
69

710
#[cfg(feature = "trace_task_dirty")]
811
use crate::backend::operation::invalidate::TaskDirtyCause;
@@ -24,6 +27,12 @@ pub enum UpdateCellOperation {
2427
InvalidateWhenCellDependency {
2528
cell_ref: CellRef,
2629
dependent_tasks: SmallVec<[TaskId; 4]>,
30+
content: Option<TypedSharedReference>,
31+
queue: AggregationUpdateQueue,
32+
},
33+
FinalCellChange {
34+
cell_ref: CellRef,
35+
content: Option<TypedSharedReference>,
2736
queue: AggregationUpdateQueue,
2837
},
2938
AggregationUpdate {
@@ -36,29 +45,22 @@ pub enum UpdateCellOperation {
3645
impl UpdateCellOperation {
3746
pub fn run(task_id: TaskId, cell: CellId, content: CellContent, mut ctx: impl ExecuteContext) {
3847
let mut task = ctx.task(task_id, TaskDataCategory::All);
39-
let old_content = if let CellContent(Some(new_content)) = content {
40-
task.insert(CachedDataItem::CellData {
41-
cell,
42-
value: new_content.into_typed(cell.type_id),
43-
})
44-
} else {
45-
task.remove(&CachedDataItemKey::CellData { cell })
46-
};
47-
48-
if let Some(in_progress) = remove!(task, InProgressCell { cell }) {
49-
in_progress.event.notify(usize::MAX);
50-
}
5148

5249
// We need to detect recomputation, because here the content has not actually changed (even
5350
// if it's not equal to the old content, as not all values implement Eq). We have to
5451
// assume that tasks are deterministic and pure.
52+
let should_invalidate = ctx.should_track_dependencies()
53+
&& (task.has_key(&CachedDataItemKey::Dirty {}) ||
54+
// This is a hack for the streaming hack. Stateful tasks are never recomputed, so this forces invalidation for them in case of this hack.
55+
task.has_key(&CachedDataItemKey::Stateful {}));
56+
57+
if should_invalidate {
58+
// Slow path: We need to invalidate tasks depending on this cell.
59+
// To avoid a race condition, we need to remove the old content first,
60+
// then invalidate dependent tasks and only then update the cell content.
61+
62+
let old_content = task.remove(&CachedDataItemKey::CellData { cell });
5563

56-
if ctx.should_track_dependencies()
57-
&& (task.has_key(&CachedDataItemKey::Dirty {})
58-
||
59-
// This is a hack for the streaming hack. Stateful tasks are never recomputed, so this forces invalidation for them in case of this hack.
60-
task.has_key(&CachedDataItemKey::Stateful {}))
61-
{
6264
let dependent_tasks = get_many!(
6365
task,
6466
CellDependent { cell: dependent_cell, task }
@@ -69,18 +71,44 @@ impl UpdateCellOperation {
6971
drop(task);
7072
drop(old_content);
7173

74+
let content = if let CellContent(Some(new_content)) = content {
75+
Some(new_content.into_typed(cell.type_id))
76+
} else {
77+
None
78+
};
79+
7280
UpdateCellOperation::InvalidateWhenCellDependency {
7381
cell_ref: CellRef {
7482
task: task_id,
7583
cell,
7684
},
7785
dependent_tasks,
86+
content,
7887
queue: AggregationUpdateQueue::new(),
7988
}
8089
.execute(&mut ctx);
8190
} else {
91+
// Fast path: We don't need to invalidate anything.
92+
// So we can just update the cell content.
93+
94+
let old_content = if let CellContent(Some(new_content)) = content {
95+
let new_content = new_content.into_typed(cell.type_id);
96+
task.insert(CachedDataItem::CellData {
97+
cell,
98+
value: new_content,
99+
})
100+
} else {
101+
task.remove(&CachedDataItemKey::CellData { cell })
102+
};
103+
104+
let in_progress_cell = remove!(task, InProgressCell { cell });
105+
82106
drop(task);
83107
drop(old_content);
108+
109+
if let Some(in_progress) = in_progress_cell {
110+
in_progress.event.notify(usize::MAX);
111+
}
84112
}
85113
}
86114
}
@@ -93,6 +121,7 @@ impl Operation for UpdateCellOperation {
93121
UpdateCellOperation::InvalidateWhenCellDependency {
94122
cell_ref,
95123
ref mut dependent_tasks,
124+
ref mut content,
96125
ref mut queue,
97126
} => {
98127
if let Some(dependent_task_id) = dependent_tasks.pop() {
@@ -129,9 +158,37 @@ impl Operation for UpdateCellOperation {
129158
);
130159
}
131160
if dependent_tasks.is_empty() {
132-
self = UpdateCellOperation::AggregationUpdate { queue: take(queue) };
161+
self = UpdateCellOperation::FinalCellChange {
162+
cell_ref,
163+
content: take(content),
164+
queue: take(queue),
165+
};
133166
}
134167
}
168+
UpdateCellOperation::FinalCellChange {
169+
cell_ref: CellRef { task, cell },
170+
content,
171+
ref mut queue,
172+
} => {
173+
let mut task = ctx.task(task, TaskDataCategory::Data);
174+
175+
if let Some(content) = content {
176+
task.add_new(CachedDataItem::CellData {
177+
cell,
178+
value: content,
179+
})
180+
}
181+
182+
let in_progress_cell = remove!(task, InProgressCell { cell });
183+
184+
drop(task);
185+
186+
if let Some(in_progress) = in_progress_cell {
187+
in_progress.event.notify(usize::MAX);
188+
}
189+
190+
self = UpdateCellOperation::AggregationUpdate { queue: take(queue) };
191+
}
135192
UpdateCellOperation::AggregationUpdate { ref mut queue } => {
136193
if queue.process(ctx) {
137194
self = UpdateCellOperation::Done

0 commit comments

Comments
 (0)