Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 78 additions & 66 deletions src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1926,8 +1926,9 @@ impl CatalogState {
///
/// # Panics
///
/// This function assumes that all provided `updates` have the same timestamp
/// and will panic otherwise.
/// This function assumes that all provided `updates` have the same timestamp and will panic
/// otherwise. It also requires that the provided `updates` are consolidated, i.e. all contained
/// `StateUpdateKinds` are unique.
fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
fn push_update<T>(
update: T,
Expand All @@ -1945,6 +1946,13 @@ fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
updates.iter().map(|update| update.ts).all_equal(),
"all timestamps should be equal: {updates:?}"
);
soft_assert_no_log!(
{
let mut dedup = BTreeSet::new();
updates.iter().all(|update| dedup.insert(&update.kind))
},
"updates should be consolidated: {updates:?}"
);

// Partition updates by type so that we can weave different update types into the right spots.
let mut pre_cluster_retractions = Vec::new();
Expand Down Expand Up @@ -2061,78 +2069,82 @@ fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
}
}

/// Sort `CONNECTION` items.
/// Sort items by their dependencies using topological sort.
///
/// `CONNECTION`s can depend on one another, e.g. a `KAFKA CONNECTION` contains
/// a list of `BROKERS` and these broker definitions can themselves reference other
/// connections. This `BROKERS` list can also be `ALTER`ed and thus it's possible
/// for a connection to depend on another with an ID greater than its own.
fn sort_connections(connections: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
let mut topo: BTreeMap<
(mz_catalog::durable::Item, Timestamp, StateDiff),
BTreeSet<CatalogItemId>,
> = BTreeMap::default();
let existing_connections: BTreeSet<_> = connections.iter().map(|item| item.0.id).collect();

// Initialize our set of topological sort.
tracing::debug!(?connections, "sorting connections");
for (connection, ts, diff) in connections.drain(..) {
let statement = mz_sql::parse::parse(&connection.create_sql)
.expect("valid CONNECTION create_sql")
/// # Panics
///
/// This function requires that all provided items have unique item IDs.
fn sort_items_topological(items: &mut Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>) {
tracing::debug!(?items, "sorting items by dependencies");

let all_item_ids: BTreeSet<_> = items.iter().map(|item| item.0.id).collect();

// For each item, the update that contains it.
let mut updates_by_id =
BTreeMap::<CatalogItemId, (mz_catalog::durable::Item, Timestamp, StateDiff)>::new();
// For each item, the number of unprocessed dependencies.
let mut in_degree = BTreeMap::<CatalogItemId, usize>::new();
// For each item, the IDs of items depending on it.
let mut dependents = BTreeMap::<CatalogItemId, Vec<CatalogItemId>>::new();
// Items that have no unprocessed dependencies.
let mut ready = Vec::new();

// Build the graph.
for (item, ts, diff) in items.drain(..) {
let id = item.id;
let statement = mz_sql::parse::parse(&item.create_sql)
.expect("valid create_sql")
.into_element()
.ast;

let mut dependencies = mz_sql::names::dependencies(&statement)
.expect("failed to find dependencies of CONNECTION");
// Be defensive and remove any possible self references.
dependencies.remove(&connection.id);
// It's possible we're applying updates to a connection where the
// dependency already exists and thus it's not in `connections`.
dependencies.retain(|dep| existing_connections.contains(dep));

// Be defensive and ensure we're not clobbering any items.
assert_none!(topo.insert((connection, ts, diff), dependencies));
}
tracing::debug!(?topo, ?existing_connections, "built topological sort");

// Do a topological sort, pushing back into the provided Vec.
while !topo.is_empty() {
// Get all of the connections with no dependencies.
let no_deps: Vec<_> = topo
.iter()
.filter_map(|(item, deps)| {
if deps.is_empty() {
Some(item.clone())
} else {
None
}
})
.collect();
.expect("failed to find dependencies of item");
// Remove any dependencies not contained in `items`.
// As a defensive measure, also remove any self-references.
dependencies.retain(|dep| all_item_ids.contains(dep) && *dep != id);

let prev = updates_by_id.insert(id, (item, ts, diff));
assert_none!(prev);

// Cycle in our graph!
if no_deps.is_empty() {
panic!("programming error, cycle in Connections");
in_degree.insert(id, dependencies.len());

for dep_id in &dependencies {
dependents.entry(*dep_id).or_default().push(id);
}

// Process all of the items with no dependencies.
for item in no_deps {
// Remove the item from our topological sort.
topo.remove(&item);
// Remove this item from anything that depends on it.
topo.values_mut().for_each(|deps| {
deps.remove(&item.0.id);
});
// Push it back into our list as "completed".
connections.push(item);
if dependencies.is_empty() {
ready.push(id);
}
}

// Process items in topological order, pushing back into the provided Vec.
while let Some(id) = ready.pop() {
let update = updates_by_id.remove(&id).expect("must exist");
items.push(update);

if let Some(depts) = dependents.get(&id) {
for dept_id in depts {
let deg = in_degree.get_mut(dept_id).expect("must exist");
*deg -= 1;
if *deg == 0 {
ready.push(*dept_id);
}
}
}
}

// Cycle detection: if we didn't process all items, there's a cycle.
if !updates_by_id.is_empty() {
panic!("programming error, cycle in item dependencies");
}
}

/// Sort item updates by dependency.
///
/// First we group items into groups that are totally ordered by dependency. For example, when
/// sorting all items by dependency we know that all tables can come after all sources, because
/// a source can never depend on a table. Within these groups, the ID order matches the
/// dependency order.
/// a source can never depend on a table. Second, we sort the items in each group in
/// topological order, or by ID, depending on the type.
///
/// It used to be the case that the ID order of ALL items matched the dependency order. However,
/// certain migrations shuffled item IDs around s.t. this was no longer true. A much better
Expand Down Expand Up @@ -2175,25 +2187,25 @@ fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
}
}

// Within each group, sort by ID.
// For some groups, the items in them can depend on each other and can be `ALTER`ed so that
// an item ends up depending on an item with a greater ID. Thus we need to perform
// topological sort for these groups.
sort_items_topological(&mut connections);
sort_items_topological(&mut derived_items);

// Other groups we can simply sort by ID.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this bite us again at some point? 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be! Also the grouping might become wrong in the future. As the comment at the top says, ideally we would just throw everything into a single topo search. The comment says we don't know how to find the dependencies, which I'm not quite sure what that means. But also we'd have to do parse the SQL of each object for that, which makes me hesitant to try doing more than we need here.

(Random thought: Would be nice if we didn't store the SQL but a parsed format that we don't have to re-parse all the time...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've had that random thought as well, but then again you're now stuck with a format that you have to stick to or have an evolvability problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd imagine this would be encoded as part of mz-catalog-protos, for which we already have a migration framework in place to evolve that format.

for group in [
&mut types,
&mut funcs,
&mut secrets,
&mut sources,
&mut tables,
&mut derived_items,
&mut sinks,
&mut continual_tasks,
] {
group.sort_by_key(|(item, _, _)| item.id);
}

// HACK(parkmycar): Connections are special and can depend on one another. Additionally
// connections can be `ALTER`ed and thus a `CONNECTION` can depend on another whose ID
// is greater than its own.
sort_connections(&mut connections);

iter::empty()
.chain(types)
.chain(funcs)
Expand Down
30 changes: 30 additions & 0 deletions test/cluster/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -6153,3 +6153,33 @@ def workflow_github_9961(c: Composition):

# database-issues#9961 causes this command to crash envd.
c.sql("CREATE REPLACEMENT MATERIALIZED VIEW rpl FOR mv AS SELECT * FROM t")


def workflow_github_10018(c: Composition):
"""Regression test for database-issues#10018."""

c.down(destroy_volumes=True)
c.up("materialized")

# Set up an MV that depends on an item with a higher catalog ID.
c.sql(
"""
CREATE TABLE t (a int);
CREATE MATERIALIZED VIEW mv1 AS SELECT a FROM t;
CREATE MATERIALIZED VIEW mv2 AS SELECT a FROM t;
CREATE REPLACEMENT MATERIALIZED VIEW rp FOR mv1 AS SELECT a FROM mv2;
ALTER MATERIALIZED VIEW mv1 APPLY REPLACEMENT rp;
"""
)

result = c.sql_query("SHOW CREATE MATERIALIZED VIEW mv1")
assert "mv2" in result[0][1], result

# Restart envd. This used to panic due to incorrect item ordering during bootstrap.
c.kill("materialized")
c.up("materialized")

# Verify the system is functional after restart.
c.sql("INSERT INTO t VALUES (1)")
result = c.sql_query("SELECT * FROM mv1")
assert result[0][0] == 1, result