Skip to content

Commit eab89da

Browse files
committed
adapter: optimize topological sort performance
The topological sorting in apply was using an inefficient quadratic algorithm. That was fine when we were only sorting connection items but is not fine anymore now that all most catalog items need to be sorted topologically. This commit changes the topo sort implementation to use Kahn's algorithm (https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm), which does the job in time linear in the number of items and dependency edges.
1 parent cccd4f3 commit eab89da

File tree

1 file changed

+63
-46
lines changed

1 file changed

+63
-46
lines changed

src/adapter/src/catalog/apply.rs

Lines changed: 63 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1926,8 +1926,9 @@ impl CatalogState {
19261926
///
19271927
/// # Panics
19281928
///
1929-
/// This function assumes that all provided `updates` have the same timestamp
1930-
/// and will panic otherwise.
1929+
/// This function assumes that all provided `updates` have the same timestamp and will panic
1930+
/// otherwise. It also requires that the provided `updates` are consolidated, i.e. all contained
1931+
/// `StateUpdateKinds` are unique.
19311932
fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
19321933
fn push_update<T>(
19331934
update: T,
@@ -1945,6 +1946,13 @@ fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
19451946
updates.iter().map(|update| update.ts).all_equal(),
19461947
"all timestamps should be equal: {updates:?}"
19471948
);
1949+
soft_assert_no_log!(
1950+
{
1951+
let mut dedup = BTreeSet::new();
1952+
updates.iter().all(|update| dedup.insert(&update.kind))
1953+
},
1954+
"updates should be consolidated: {updates:?}"
1955+
);
19481956

19491957
// Partition updates by type so that we can weave different update types into the right spots.
19501958
let mut pre_cluster_retractions = Vec::new();
@@ -2062,64 +2070,73 @@ fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
20622070
}
20632071

20642072
/// Sort items by their dependencies using topological sort.
2073+
///
2074+
/// # Panics
2075+
///
2076+
/// This function requires that all provided items have unique item IDs.
20652077
fn sort_items_topological(items: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
2066-
let mut topo: BTreeMap<
2067-
(mz_catalog::durable::Item, Timestamp, StateDiff),
2068-
BTreeSet<CatalogItemId>,
2069-
> = BTreeMap::default();
2070-
let existing: BTreeSet<_> = items.iter().map(|item| item.0.id).collect();
2071-
2072-
// Initialize our set of topological sort.
20732078
tracing::debug!(?items, "sorting items by dependencies");
2079+
2080+
let all_item_ids: BTreeSet<_> = items.iter().map(|item| item.0.id).collect();
2081+
2082+
// For each item, the update that contains it.
2083+
let mut updates_by_id =
2084+
BTreeMap::<CatalogItemId, (mz_catalog::durable::Item, Timestamp, StateDiff)>::new();
2085+
// For each item, the number of unprocessed dependencies.
2086+
let mut in_degree = BTreeMap::<CatalogItemId, usize>::new();
2087+
// For each item, the IDs of items depending on it.
2088+
let mut dependents = BTreeMap::<CatalogItemId, Vec<CatalogItemId>>::new();
2089+
// Items that have no unprocessed dependencies.
2090+
let mut ready = Vec::new();
2091+
2092+
// Build the graph.
20742093
for (item, ts, diff) in items.drain(..) {
2094+
let id = item.id;
20752095
let statement = mz_sql::parse::parse(&item.create_sql)
20762096
.expect("valid create_sql")
20772097
.into_element()
20782098
.ast;
2099+
20792100
let mut dependencies = mz_sql::names::dependencies(&statement)
20802101
.expect("failed to find dependencies of item");
2081-
// Be defensive and remove any possible self references.
2082-
dependencies.remove(&item.id);
2083-
// It's possible we're applying updates to an item where the
2084-
// dependency already exists and thus it's not in `items`.
2085-
dependencies.retain(|dep| existing.contains(dep));
2086-
2087-
// Be defensive and ensure we're not clobbering any items.
2088-
assert_none!(topo.insert((item, ts, diff), dependencies));
2089-
}
2090-
tracing::debug!(?topo, ?existing, "built topological sort",);
2091-
2092-
// Do a topological sort, pushing back into the provided Vec.
2093-
while !topo.is_empty() {
2094-
// Get all of the items with no dependencies.
2095-
let no_deps: Vec<_> = topo
2096-
.iter()
2097-
.filter_map(|(item, deps)| {
2098-
if deps.is_empty() {
2099-
Some(item.clone())
2100-
} else {
2101-
None
2102-
}
2103-
})
2104-
.collect();
2102+
// Remove any dependencies not contained in `items`.
2103+
// As a defensive measure, also remove any self-references.
2104+
dependencies.retain(|dep| all_item_ids.contains(dep) && *dep != id);
2105+
2106+
let prev = updates_by_id.insert(id, (item, ts, diff));
2107+
assert_none!(prev);
2108+
2109+
in_degree.insert(id, dependencies.len());
21052110

2106-
// Cycle in our graph!
2107-
if no_deps.is_empty() {
2108-
panic!("programming error, cycle in item dependencies");
2111+
for dep_id in &dependencies {
2112+
dependents.entry(*dep_id).or_default().push(id);
21092113
}
21102114

2111-
// Process all of the items with no dependencies.
2112-
for item in no_deps {
2113-
// Remove the item from our topological sort.
2114-
topo.remove(&item);
2115-
// Remove this item from anything that depends on it.
2116-
topo.values_mut().for_each(|deps| {
2117-
deps.remove(&item.0.id);
2118-
});
2119-
// Push it back into our list as "completed".
2120-
items.push(item);
2115+
if dependencies.is_empty() {
2116+
ready.push(id);
21212117
}
21222118
}
2119+
2120+
// Process items in topological order, pushing back into the provided Vec.
2121+
while let Some(id) = ready.pop() {
2122+
let update = updates_by_id.remove(&id).expect("must exist");
2123+
items.push(update);
2124+
2125+
if let Some(depts) = dependents.get(&id) {
2126+
for dept_id in depts {
2127+
let deg = in_degree.get_mut(dept_id).expect("must exist");
2128+
*deg -= 1;
2129+
if *deg == 0 {
2130+
ready.push(*dept_id);
2131+
}
2132+
}
2133+
}
2134+
}
2135+
2136+
// Cycle detection: if we didn't process all items, there's a cycle.
2137+
if !updates_by_id.is_empty() {
2138+
panic!("programming error, cycle in item dependencies");
2139+
}
21232140
}
21242141

21252142
/// Sort item updates by dependency.

0 commit comments

Comments
 (0)