Skip to content

Commit aff3263

Browse files
committed
adapter: fix implication application for ALTER MV
When a replacement MV is applied, the catalog is updated by dropping the replacement and updating the target MV with a new version. This transfers both the replacement's compute and storage collection to the target MV, so they should not be dropped. The implication application logic must thus be extended to account for that.
1 parent a0bbb0b commit aff3263

File tree

2 files changed

+49
-46
lines changed

2 files changed

+49
-46
lines changed

src/adapter/src/coord/catalog_implications.rs

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,11 @@ impl Coordinator {
180180
let mut storage_policies_to_initialize = BTreeMap::new();
181181
let mut execution_timestamps_to_set = BTreeSet::new();
182182

183+
// Replacing a materialized view causes the replacement's catalog entry
184+
// to be dropped. Its compute and storage collections are transferred to
185+
// the target MV though, so we must make sure not to drop those.
186+
let mut replacement_gids = vec![];
187+
183188
// We're incrementally migrating the code that manipulates the
184189
// controller from closures in the sequencer. For some types of catalog
185190
// changes we haven't done this migration yet, so there you will see
@@ -313,11 +318,13 @@ impl Coordinator {
313318
prev: prev_mv,
314319
new: new_mv,
315320
}) => {
316-
tracing::debug!(
317-
?prev_mv,
318-
?new_mv,
319-
"not handling AlterMaterializedView in here yet"
320-
);
321+
let old_gid = prev_mv.global_id_writes();
322+
let new_gid = new_mv.global_id_writes();
323+
if new_gid != old_gid {
324+
replacement_gids.push((new_mv.cluster_id, new_gid));
325+
}
326+
327+
self.handle_alter_materialized_view(prev_mv, new_mv)?;
321328
}
322329
CatalogImplication::MaterializedView(CatalogImplicationKind::Dropped(
323330
mv,
@@ -500,6 +507,12 @@ impl Coordinator {
500507
}
501508
}
502509

510+
// Cancel out drops against replacements.
511+
for (cluster_id, gid) in replacement_gids {
512+
sources_to_drop.retain(|(_, id)| *id != gid);
513+
compute_gids_to_drop.retain(|id| *id != (cluster_id, gid));
514+
}
515+
503516
if !source_collections_to_create.is_empty() {
504517
self.create_source_collections(source_collections_to_create)
505518
.await?;
@@ -1104,6 +1117,36 @@ impl Coordinator {
11041117
Ok(())
11051118
}
11061119

1120+
#[instrument(level = "debug")]
1121+
fn handle_alter_materialized_view(
1122+
&mut self,
1123+
prev_mv: MaterializedView,
1124+
new_mv: MaterializedView,
1125+
) -> Result<(), AdapterError> {
1126+
let old_gid = prev_mv.global_id_writes();
1127+
let new_gid = new_mv.global_id_writes();
1128+
1129+
if old_gid == new_gid {
1130+
// It's not an ALTER MATERIALIZED VIEW as far as the controller is
1131+
// concerned, because we still have the same GlobalId. This is
1132+
// likely a change from an ALTER SWAP.
1133+
return Ok(());
1134+
}
1135+
1136+
// Cut over the MV computation, by shutting down the old dataflow and allowing the
1137+
// new dataflow to start writing.
1138+
//
1139+
// Both commands are applied to their respective clusters asynchronously and they
1140+
// race, so it is possible that we allow writes by the replacement before having
1141+
// dropped the old dataflow. Thus there might be a period where the two dataflows
1142+
// are competing in trying to commit conflicting outputs. Eventually the old
1143+
// dataflow will be dropped and the replacement takes over.
1144+
self.drop_compute_collections(vec![(prev_mv.cluster_id, old_gid)]);
1145+
self.allow_writes(new_mv.cluster_id, new_gid);
1146+
1147+
Ok(())
1148+
}
1149+
11071150
#[instrument(level = "debug")]
11081151
async fn handle_create_source(
11091152
&mut self,

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 1 addition & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4706,53 +4706,13 @@ impl Coordinator {
47064706
ctx: &mut ExecuteContext,
47074707
plan: AlterMaterializedViewApplyReplacementPlan,
47084708
) -> Result<ExecuteResponse, AdapterError> {
4709-
const ERROR_CONTEXT: &str = "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT";
4710-
47114709
let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan;
47124710

47134711
// TODO(alter-mv): Wait until there is overlap between the old MV's write frontier and the
47144712
// new MV's as-of, to ensure no times are skipped.
47154713

4716-
let Some(old) = self.catalog().get_entry(&id).materialized_view() else {
4717-
return Err(AdapterError::internal(
4718-
ERROR_CONTEXT,
4719-
"id must refer to a materialized view",
4720-
));
4721-
};
4722-
let Some(new) = self
4723-
.catalog()
4724-
.get_entry(&replacement_id)
4725-
.materialized_view()
4726-
else {
4727-
return Err(AdapterError::internal(
4728-
ERROR_CONTEXT,
4729-
"replacement_id must refer to a materialized view",
4730-
));
4731-
};
4732-
4733-
let old_cluster_id = old.cluster_id;
4734-
let new_cluster_id = new.cluster_id;
4735-
4736-
let old_gid = old.global_id_writes();
4737-
let new_gid = new.global_id_writes();
4738-
47394714
let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4740-
4741-
self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
4742-
Box::pin(async move {
4743-
// Cut over the MV computation, by shutting down the old dataflow and allowing the
4744-
// new dataflow to start writing.
4745-
//
4746-
// Both commands are applied to their respective clusters asynchronously and they
4747-
// race, so it is possible that we allow writes by the replacement before having
4748-
// dropped the old dataflow. Thus there might be a period where the two dataflows
4749-
// are competing in trying to commit conflicting outputs. Eventually the old
4750-
// dataflow will be dropped and the replacement takes over.
4751-
coord.drop_compute_collections(vec![(old_cluster_id, old_gid)]);
4752-
coord.allow_writes(new_cluster_id, new_gid);
4753-
})
4754-
})
4755-
.await?;
4715+
self.catalog_transact(Some(ctx.session()), ops).await?;
47564716

47574717
Ok(ExecuteResponse::AlteredObject(ObjectType::MaterializedView))
47584718
}

0 commit comments

Comments
 (0)