Skip to content

Commit b1db073

Browse files
authored
Merge pull request #30998 from teskje/compute-controller-fix-race
compute: fix a race condition in collecting dependency frontiers
2 parents af56601 + 42a22b7 commit b1db073

File tree

1 file changed

+5
-12
lines changed

1 file changed

+5
-12
lines changed

src/compute-client/src/controller/instance.rs

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2035,21 +2035,14 @@ where
20352035
continue;
20362036
}
20372037

2038-
let compute_frontiers = collection.compute_dependency_ids().flat_map(|dep_id| {
2038+
let compute_frontiers = collection.compute_dependency_ids().filter_map(|dep_id| {
20392039
let collection = self.collections.get(&dep_id);
20402040
collection.map(|c| c.write_frontier())
20412041
});
2042-
2043-
let existing_storage_dependencies = collection
2044-
.storage_dependency_ids()
2045-
.filter(|id| self.storage_collections.check_exists(*id).is_ok())
2046-
.collect::<Vec<_>>();
2047-
let storage_frontiers = self
2048-
.storage_collections
2049-
.collections_frontiers(existing_storage_dependencies)
2050-
.expect("missing storage collections")
2051-
.into_iter()
2052-
.map(|f| f.write_frontier);
2042+
let storage_frontiers = collection.storage_dependency_ids().filter_map(|dep_id| {
2043+
let frontiers = self.storage_collections.collection_frontiers(dep_id).ok();
2044+
frontiers.map(|f| f.write_frontier)
2045+
});
20532046

20542047
let mut new_capability = Antichain::new();
20552048
for frontier in compute_frontiers.chain(storage_frontiers) {

0 commit comments

Comments
 (0)