Skip to content

Commit 3cd5592

Browse files
authored
Merge pull request #34234 from teskje/replacement-mvs
adapter: replacement materialized views
2 parents e53caaa + 4d63825 commit 3cd5592

File tree

36 files changed

+813
-182
lines changed

36 files changed

+813
-182
lines changed

misc/python/materialize/mzcompose/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ def get_minimal_system_parameters(
106106
"enable_rbac_checks": "true",
107107
"enable_reduce_mfp_fusion": "true",
108108
"enable_refresh_every_mvs": "true",
109+
"enable_replacement_materialized_views": "true",
109110
"enable_repr_typecheck": "true",
110111
"enable_cluster_schedule_refresh": "true",
111112
"enable_sql_server_source": "true",

src/adapter/src/catalog/state.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1375,6 +1375,7 @@ impl CatalogState {
13751375
desc,
13761376
resolved_ids,
13771377
dependencies,
1378+
replacement_target: materialized_view.replacement_target,
13781379
cluster_id: materialized_view.cluster_id,
13791380
non_null_assertions: materialized_view.non_null_assertions,
13801381
custom_logical_compaction_window: materialized_view.compaction_window,

src/adapter/src/catalog/transact.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ pub enum Op {
101101
typ: SqlColumnType,
102102
sql: RawDataType,
103103
},
104+
AlterMaterializedViewApplyReplacement {
105+
id: CatalogItemId,
106+
replacement_id: CatalogItemId,
107+
},
104108
CreateDatabase {
105109
name: String,
106110
owner_id: RoleId,
@@ -819,6 +823,30 @@ impl Catalog {
819823
tx.update_item(id, new_entry.into())?;
820824
storage_collections_to_register.insert(new_global_id, shard_id);
821825
}
826+
Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
827+
let mut new_entry = state.get_entry(&id).clone();
828+
let replacement = state.get_entry(&replacement_id);
829+
830+
let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
831+
return Err(AdapterError::internal(
832+
"ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
833+
"id must refer to a materialized view",
834+
));
835+
};
836+
let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
837+
return Err(AdapterError::internal(
838+
"ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
839+
"replacement_id must refer to a materialized view",
840+
));
841+
};
842+
843+
mv.apply_replacement(replacement_mv.clone());
844+
845+
tx.remove_item(replacement_id)?;
846+
tx.update_item(id, new_entry.into())?;
847+
848+
// TODO(alter-mv): audit logging
849+
}
822850
Op::CreateDatabase { name, owner_id } => {
823851
let database_owner_privileges = vec![rbac::owner_privilege(
824852
mz_sql::catalog::ObjectType::Database,
@@ -1130,7 +1158,15 @@ impl Catalog {
11301158
storage_collections_to_create.insert(source.global_id());
11311159
}
11321160
CatalogItem::MaterializedView(mv) => {
1133-
storage_collections_to_create.insert(mv.global_id_writes());
1161+
let mv_gid = mv.global_id_writes();
1162+
if let Some(target_id) = mv.replacement_target {
1163+
let target_gid = state.get_entry(&target_id).latest_global_id();
1164+
let shard_id =
1165+
state.storage_metadata().get_collection_shard(target_gid)?;
1166+
storage_collections_to_register.insert(mv_gid, shard_id);
1167+
} else {
1168+
storage_collections_to_create.insert(mv_gid);
1169+
}
11341170
}
11351171
CatalogItem::ContinualTask(ct) => {
11361172
storage_collections_to_create.insert(ct.global_id());

src/adapter/src/command.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -735,6 +735,7 @@ impl ExecuteResponse {
735735
| AlterSource
736736
| AlterSink
737737
| AlterTableAddColumn
738+
| AlterMaterializedViewApplyReplacement
738739
| AlterNetworkPolicy => &[AlteredObject],
739740
AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
740741
AlterSetCluster => &[AlteredObject],

src/adapter/src/continual_task.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub fn ct_item_from_plan(
3939
expr: mut raw_expr,
4040
dependencies,
4141
column_names: _,
42+
replacement_target: _,
4243
non_null_assertions: _,
4344
compaction_window: _,
4445
refresh_schedule: _,

src/adapter/src/coord.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2117,7 +2117,12 @@ impl Coordinator {
21172117
}
21182118

21192119
self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2120-
self.allow_writes(mview.cluster_id, mview.global_id_writes());
2120+
2121+
// If this is a replacement MV, it must remain read-only until the replacement
2122+
// gets applied.
2123+
if mview.replacement_target.is_none() {
2124+
self.allow_writes(mview.cluster_id, mview.global_id_writes());
2125+
}
21212126
}
21222127
CatalogItem::Sink(sink) => {
21232128
policies_to_set

src/adapter/src/coord/appends.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,6 +1006,7 @@ pub(crate) fn waiting_on_startup_appends(
10061006
| Plan::AlterRole(_)
10071007
| Plan::AlterOwner(_)
10081008
| Plan::AlterTableAddColumn(_)
1009+
| Plan::AlterMaterializedViewApplyReplacement(_)
10091010
| Plan::Declare(_)
10101011
| Plan::Fetch(_)
10111012
| Plan::Close(_)

src/adapter/src/coord/catalog_implications.rs

Lines changed: 60 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,7 @@ impl Coordinator {
165165
let mut sources_to_drop = vec![];
166166
let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
167167
let mut storage_sink_gids_to_drop = vec![];
168-
let mut indexes_to_drop = vec![];
169-
let mut materialized_views_to_drop = vec![];
170-
let mut continual_tasks_to_drop = vec![];
168+
let mut compute_gids_to_drop = vec![];
171169
let mut view_gids_to_drop = vec![];
172170
let mut secrets_to_drop = vec![];
173171
let mut vpc_endpoints_to_drop = vec![];
@@ -188,6 +186,11 @@ impl Coordinator {
188186
let mut storage_policies_to_initialize = BTreeMap::new();
189187
let mut execution_timestamps_to_set = BTreeSet::new();
190188

189+
// Replacing a materialized view causes the replacement's catalog entry
190+
// to be dropped. Its compute and storage collections are transferred to
191+
// the target MV though, so we must make sure not to drop those.
192+
let mut replacement_gids = vec![];
193+
191194
// We're incrementally migrating the code that manipulates the
192195
// controller from closures in the sequencer. For some types of catalog
193196
// changes we haven't done this migration yet, so there you will see
@@ -311,7 +314,7 @@ impl Coordinator {
311314
);
312315
}
313316
CatalogImplication::Index(CatalogImplicationKind::Dropped(index, full_name)) => {
314-
indexes_to_drop.push((index.cluster_id, index.global_id()));
317+
compute_gids_to_drop.push((index.cluster_id, index.global_id()));
315318
dropped_item_names.insert(index.global_id(), full_name);
316319
}
317320
CatalogImplication::MaterializedView(CatalogImplicationKind::Added(mv)) => {
@@ -321,17 +324,20 @@ impl Coordinator {
321324
prev: prev_mv,
322325
new: new_mv,
323326
}) => {
324-
tracing::debug!(
325-
?prev_mv,
326-
?new_mv,
327-
"not handling AlterMaterializedView in here yet"
328-
);
327+
let old_gid = prev_mv.global_id_writes();
328+
let new_gid = new_mv.global_id_writes();
329+
if new_gid != old_gid {
330+
replacement_gids.push((new_mv.cluster_id, new_gid));
331+
}
332+
333+
self.handle_alter_materialized_view(prev_mv, new_mv)?;
329334
}
330335
CatalogImplication::MaterializedView(CatalogImplicationKind::Dropped(
331336
mv,
332337
full_name,
333338
)) => {
334-
materialized_views_to_drop.push((mv.cluster_id, mv.global_id_writes()));
339+
compute_gids_to_drop.push((mv.cluster_id, mv.global_id_writes()));
340+
sources_to_drop.extend(mv.global_ids().map(|gid| (catalog_id, gid)));
335341
dropped_item_names.insert(mv.global_id_writes(), full_name);
336342
}
337343
CatalogImplication::View(CatalogImplicationKind::Added(view)) => {
@@ -364,7 +370,8 @@ impl Coordinator {
364370
ct,
365371
_full_name,
366372
)) => {
367-
continual_tasks_to_drop.push((catalog_id, ct.cluster_id, ct.global_id()));
373+
compute_gids_to_drop.push((ct.cluster_id, ct.global_id()));
374+
sources_to_drop.push((catalog_id, ct.global_id()));
368375
}
369376
CatalogImplication::Secret(CatalogImplicationKind::Added(secret)) => {
370377
tracing::debug!(?secret, "not handling AddSecret in here yet");
@@ -506,6 +513,12 @@ impl Coordinator {
506513
}
507514
}
508515

516+
// Cancel out drops against replacements.
517+
for (cluster_id, gid) in replacement_gids {
518+
sources_to_drop.retain(|(_, id)| *id != gid);
519+
compute_gids_to_drop.retain(|id| *id != (cluster_id, gid));
520+
}
521+
509522
if !source_collections_to_create.is_empty() {
510523
self.create_source_collections(source_collections_to_create)
511524
.await?;
@@ -530,9 +543,7 @@ impl Coordinator {
530543
.map(|(_, gid)| *gid)
531544
.chain(tables_to_drop.iter().map(|(_, gid)| *gid))
532545
.chain(storage_sink_gids_to_drop.iter().copied())
533-
.chain(indexes_to_drop.iter().map(|(_, id)| *id))
534-
.chain(materialized_views_to_drop.iter().map(|(_, id)| *id))
535-
.chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid))
546+
.chain(compute_gids_to_drop.iter().map(|(_, gid)| *gid))
536547
.chain(view_gids_to_drop.iter().copied())
537548
.collect();
538549

@@ -598,27 +609,14 @@ impl Coordinator {
598609
}
599610
}
600611

601-
let storage_ids_to_drop: BTreeSet<_> = sources_to_drop
612+
let storage_gids_to_drop: BTreeSet<_> = sources_to_drop
602613
.iter()
603614
.map(|(_id, gid)| gid)
604615
.chain(storage_sink_gids_to_drop.iter())
605616
.chain(tables_to_drop.iter().map(|(_id, gid)| gid))
606-
.chain(materialized_views_to_drop.iter().map(|(_, id)| id))
607-
.chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| gid))
608617
.copied()
609618
.collect();
610619

611-
let compute_ids_to_drop: BTreeSet<_> = indexes_to_drop
612-
.iter()
613-
.copied()
614-
.chain(materialized_views_to_drop.iter().copied())
615-
.chain(
616-
continual_tasks_to_drop
617-
.iter()
618-
.map(|(_, cluster_id, gid)| (*cluster_id, *gid)),
619-
)
620-
.collect();
621-
622620
// Gather resources that we have to remove from timeline state and
623621
// pre-check if any Timelines become empty, when we drop the specified
624622
// storage and compute resources.
@@ -630,13 +628,13 @@ impl Coordinator {
630628
let mut id_bundle = CollectionIdBundle::default();
631629

632630
for storage_id in read_holds.storage_ids() {
633-
if storage_ids_to_drop.contains(&storage_id) {
631+
if storage_gids_to_drop.contains(&storage_id) {
634632
id_bundle.storage_ids.insert(storage_id);
635633
}
636634
}
637635

638636
for (instance_id, id) in read_holds.compute_ids() {
639-
if compute_ids_to_drop.contains(&(instance_id, id))
637+
if compute_gids_to_drop.contains(&(instance_id, id))
640638
|| clusters_to_drop.contains(&instance_id)
641639
{
642640
id_bundle
@@ -719,16 +717,8 @@ impl Coordinator {
719717
}
720718
}
721719

722-
if !indexes_to_drop.is_empty() {
723-
self.drop_indexes(indexes_to_drop);
724-
}
725-
726-
if !materialized_views_to_drop.is_empty() {
727-
self.drop_materialized_views(materialized_views_to_drop);
728-
}
729-
730-
if !continual_tasks_to_drop.is_empty() {
731-
self.drop_continual_tasks(continual_tasks_to_drop);
720+
if !compute_gids_to_drop.is_empty() {
721+
self.drop_compute_collections(compute_gids_to_drop);
732722
}
733723

734724
if !vpc_endpoints_to_drop.is_empty() {
@@ -1133,6 +1123,36 @@ impl Coordinator {
11331123
Ok(())
11341124
}
11351125

1126+
#[instrument(level = "debug")]
1127+
fn handle_alter_materialized_view(
1128+
&mut self,
1129+
prev_mv: MaterializedView,
1130+
new_mv: MaterializedView,
1131+
) -> Result<(), AdapterError> {
1132+
let old_gid = prev_mv.global_id_writes();
1133+
let new_gid = new_mv.global_id_writes();
1134+
1135+
if old_gid == new_gid {
1136+
// It's not an ALTER MATERIALIZED VIEW as far as the controller is
1137+
// concerned, because we still have the same GlobalId. This is
1138+
// likely a change from an ALTER SWAP.
1139+
return Ok(());
1140+
}
1141+
1142+
// Cut over the MV computation, by shutting down the old dataflow and allowing the
1143+
// new dataflow to start writing.
1144+
//
1145+
// Both commands are applied to their respective clusters asynchronously and they
1146+
// race, so it is possible that we allow writes by the replacement before having
1147+
// dropped the old dataflow. Thus there might be a period where the two dataflows
1148+
// are competing in trying to commit conflicting outputs. Eventually the old
1149+
// dataflow will be dropped and the replacement takes over.
1150+
self.drop_compute_collections(vec![(prev_mv.cluster_id, old_gid)]);
1151+
self.allow_writes(new_mv.cluster_id, new_gid);
1152+
1153+
Ok(())
1154+
}
1155+
11361156
#[instrument(level = "debug")]
11371157
async fn handle_create_source(
11381158
&mut self,

src/adapter/src/coord/catalog_serving.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>(
128128
| Plan::AlterRole(_)
129129
| Plan::AlterOwner(_)
130130
| Plan::AlterTableAddColumn(_)
131+
| Plan::AlterMaterializedViewApplyReplacement(_)
131132
| Plan::Declare(_)
132133
| Plan::Fetch(_)
133134
| Plan::Close(_)

src/adapter/src/coord/command_handler.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,6 +1045,7 @@ impl Coordinator {
10451045
| Statement::AlterConnection(_)
10461046
| Statement::AlterDefaultPrivileges(_)
10471047
| Statement::AlterIndex(_)
1048+
| Statement::AlterMaterializedViewApplyReplacement(_)
10481049
| Statement::AlterSetCluster(_)
10491050
| Statement::AlterOwner(_)
10501051
| Statement::AlterRetainHistory(_)
@@ -1286,6 +1287,7 @@ impl Coordinator {
12861287
if_exists: cmvs.if_exists,
12871288
name: cmvs.name,
12881289
columns: cmvs.columns,
1290+
replacing: cmvs.replacing,
12891291
in_cluster: cmvs.in_cluster,
12901292
query: cmvs.query,
12911293
with_options: cmvs.with_options,

0 commit comments

Comments
 (0)