@@ -2,7 +2,10 @@ use std::mem::take;
2
2
3
3
use serde:: { Deserialize , Serialize } ;
4
4
use smallvec:: SmallVec ;
5
- use turbo_tasks:: { CellId , TaskId , backend:: CellContent } ;
5
+ use turbo_tasks:: {
6
+ CellId , TaskId , TypedSharedReference ,
7
+ backend:: { CellContent , TypedCellContent } ,
8
+ } ;
6
9
7
10
#[ cfg( feature = "trace_task_dirty" ) ]
8
11
use crate :: backend:: operation:: invalidate:: TaskDirtyCause ;
@@ -24,6 +27,12 @@ pub enum UpdateCellOperation {
24
27
InvalidateWhenCellDependency {
25
28
cell_ref : CellRef ,
26
29
dependent_tasks : SmallVec < [ TaskId ; 4 ] > ,
30
+ content : Option < TypedSharedReference > ,
31
+ queue : AggregationUpdateQueue ,
32
+ } ,
33
+ FinalCellChange {
34
+ cell_ref : CellRef ,
35
+ content : Option < TypedSharedReference > ,
27
36
queue : AggregationUpdateQueue ,
28
37
} ,
29
38
AggregationUpdate {
@@ -36,29 +45,22 @@ pub enum UpdateCellOperation {
36
45
impl UpdateCellOperation {
37
46
pub fn run ( task_id : TaskId , cell : CellId , content : CellContent , mut ctx : impl ExecuteContext ) {
38
47
let mut task = ctx. task ( task_id, TaskDataCategory :: All ) ;
39
- let old_content = if let CellContent ( Some ( new_content) ) = content {
40
- task. insert ( CachedDataItem :: CellData {
41
- cell,
42
- value : new_content. into_typed ( cell. type_id ) ,
43
- } )
44
- } else {
45
- task. remove ( & CachedDataItemKey :: CellData { cell } )
46
- } ;
47
-
48
- if let Some ( in_progress) = remove ! ( task, InProgressCell { cell } ) {
49
- in_progress. event . notify ( usize:: MAX ) ;
50
- }
51
48
52
49
// We need to detect recomputation, because here the content has not actually changed (even
53
50
// if it's not equal to the old content, as not all values implement Eq). We have to
54
51
// assume that tasks are deterministic and pure.
52
+ let should_invalidate = ctx. should_track_dependencies ( )
53
+ && ( task. has_key ( & CachedDataItemKey :: Dirty { } ) ||
54
+ // This is a hack for the streaming hack. Stateful tasks are never recomputed, so this forces invalidation for them in case of this hack.
55
+ task. has_key ( & CachedDataItemKey :: Stateful { } ) ) ;
56
+
57
+ if should_invalidate {
58
+ // Slow path: We need to invalidate tasks depending on this cell.
59
+ // To avoid a race condition, we need to remove the old content first,
60
+ // then invalidate dependent tasks and only then update the cell content.
61
+
62
+ let old_content = task. remove ( & CachedDataItemKey :: CellData { cell } ) ;
55
63
56
- if ctx. should_track_dependencies ( )
57
- && ( task. has_key ( & CachedDataItemKey :: Dirty { } )
58
- ||
59
- // This is a hack for the streaming hack. Stateful tasks are never recomputed, so this forces invalidation for them in case of this hack.
60
- task. has_key ( & CachedDataItemKey :: Stateful { } ) )
61
- {
62
64
let dependent_tasks = get_many ! (
63
65
task,
64
66
CellDependent { cell: dependent_cell, task }
@@ -69,18 +71,44 @@ impl UpdateCellOperation {
69
71
drop ( task) ;
70
72
drop ( old_content) ;
71
73
74
+ let content = if let CellContent ( Some ( new_content) ) = content {
75
+ Some ( new_content. into_typed ( cell. type_id ) )
76
+ } else {
77
+ None
78
+ } ;
79
+
72
80
UpdateCellOperation :: InvalidateWhenCellDependency {
73
81
cell_ref : CellRef {
74
82
task : task_id,
75
83
cell,
76
84
} ,
77
85
dependent_tasks,
86
+ content,
78
87
queue : AggregationUpdateQueue :: new ( ) ,
79
88
}
80
89
. execute ( & mut ctx) ;
81
90
} else {
91
+ // Fast path: We don't need to invalidate anything.
92
+ // So we can just update the cell content.
93
+
94
+ let old_content = if let CellContent ( Some ( new_content) ) = content {
95
+ let new_content = new_content. into_typed ( cell. type_id ) ;
96
+ task. insert ( CachedDataItem :: CellData {
97
+ cell,
98
+ value : new_content,
99
+ } )
100
+ } else {
101
+ task. remove ( & CachedDataItemKey :: CellData { cell } )
102
+ } ;
103
+
104
+ let in_progress_cell = remove ! ( task, InProgressCell { cell } ) ;
105
+
82
106
drop ( task) ;
83
107
drop ( old_content) ;
108
+
109
+ if let Some ( in_progress) = in_progress_cell {
110
+ in_progress. event . notify ( usize:: MAX ) ;
111
+ }
84
112
}
85
113
}
86
114
}
@@ -93,6 +121,7 @@ impl Operation for UpdateCellOperation {
93
121
UpdateCellOperation :: InvalidateWhenCellDependency {
94
122
cell_ref,
95
123
ref mut dependent_tasks,
124
+ ref mut content,
96
125
ref mut queue,
97
126
} => {
98
127
if let Some ( dependent_task_id) = dependent_tasks. pop ( ) {
@@ -129,9 +158,37 @@ impl Operation for UpdateCellOperation {
129
158
) ;
130
159
}
131
160
if dependent_tasks. is_empty ( ) {
132
- self = UpdateCellOperation :: AggregationUpdate { queue : take ( queue) } ;
161
+ self = UpdateCellOperation :: FinalCellChange {
162
+ cell_ref,
163
+ content : take ( content) ,
164
+ queue : take ( queue) ,
165
+ } ;
133
166
}
134
167
}
168
+ UpdateCellOperation :: FinalCellChange {
169
+ cell_ref : CellRef { task, cell } ,
170
+ content,
171
+ ref mut queue,
172
+ } => {
173
+ let mut task = ctx. task ( task, TaskDataCategory :: Data ) ;
174
+
175
+ if let Some ( content) = content {
176
+ task. add_new ( CachedDataItem :: CellData {
177
+ cell,
178
+ value : content,
179
+ } )
180
+ }
181
+
182
+ let in_progress_cell = remove ! ( task, InProgressCell { cell } ) ;
183
+
184
+ drop ( task) ;
185
+
186
+ if let Some ( in_progress) = in_progress_cell {
187
+ in_progress. event . notify ( usize:: MAX ) ;
188
+ }
189
+
190
+ self = UpdateCellOperation :: AggregationUpdate { queue : take ( queue) } ;
191
+ }
135
192
UpdateCellOperation :: AggregationUpdate { ref mut queue } => {
136
193
if queue. process ( ctx) {
137
194
self = UpdateCellOperation :: Done
0 commit comments