Skip to content

Commit 417c7f7

Browse files
committed
adapter,sql: plan and sequence replacement MV creation
1 parent 70408a5 commit 417c7f7

File tree

9 files changed

+122
-13
lines changed

9 files changed

+122
-13
lines changed

src/adapter/src/catalog/state.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1375,6 +1375,7 @@ impl CatalogState {
13751375
desc,
13761376
resolved_ids,
13771377
dependencies,
1378+
replacement_target: materialized_view.replacement_target,
13781379
cluster_id: materialized_view.cluster_id,
13791380
non_null_assertions: materialized_view.non_null_assertions,
13801381
custom_logical_compaction_window: materialized_view.compaction_window,

src/adapter/src/catalog/transact.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1130,7 +1130,15 @@ impl Catalog {
11301130
storage_collections_to_create.insert(source.global_id());
11311131
}
11321132
CatalogItem::MaterializedView(mv) => {
1133-
storage_collections_to_create.insert(mv.global_id_writes());
1133+
let mv_gid = mv.global_id_writes();
1134+
if let Some(target_id) = mv.replacement_target {
1135+
let target_gid = state.get_entry(&target_id).latest_global_id();
1136+
let shard_id =
1137+
state.storage_metadata().get_collection_shard(target_gid)?;
1138+
storage_collections_to_register.insert(mv_gid, shard_id);
1139+
} else {
1140+
storage_collections_to_create.insert(mv_gid);
1141+
}
11341142
}
11351143
CatalogItem::ContinualTask(ct) => {
11361144
storage_collections_to_create.insert(ct.global_id());

src/adapter/src/continual_task.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub fn ct_item_from_plan(
3939
expr: mut raw_expr,
4040
dependencies,
4141
column_names: _,
42+
replacement_target: _,
4243
non_null_assertions: _,
4344
compaction_window: _,
4445
refresh_schedule: _,

src/adapter/src/coord.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2117,7 +2117,12 @@ impl Coordinator {
21172117
}
21182118

21192119
self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2120-
self.allow_writes(mview.cluster_id, mview.global_id_writes());
2120+
2121+
// If this is a replacement MV, it must remain read-only until the replacement
2122+
// gets applied.
2123+
if mview.replacement_target.is_none() {
2124+
self.allow_writes(mview.cluster_id, mview.global_id_writes());
2125+
}
21212126
}
21222127
CatalogItem::Sink(sink) => {
21232128
policies_to_set

src/adapter/src/coord/sequencer/inner/create_materialized_view.rs

Lines changed: 61 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,7 @@ impl Coordinator {
561561
mut create_sql,
562562
expr: raw_expr,
563563
dependencies,
564+
replacement_target,
564565
cluster_id,
565566
non_null_assertions,
566567
compaction_window,
@@ -577,6 +578,23 @@ impl Coordinator {
577578
global_lir_plan,
578579
..
579580
} = stage;
581+
582+
// Validate the replacement target, if one is given.
583+
// TODO(alter-mv): Could we do this already in planning?
584+
if let Some(target_id) = replacement_target {
585+
let Some(target) = self.catalog().get_entry(&target_id).materialized_view() else {
586+
return Err(AdapterError::internal(
587+
"create materialized view",
588+
"replacement target not a materialized view",
589+
));
590+
};
591+
592+
// For now, we don't support schema evolution for materialized views.
593+
if &target.desc.latest() != global_lir_plan.desc() {
594+
return Err(AdapterError::Unstructured(anyhow!("incompatible schemas")));
595+
}
596+
}
597+
580598
// Timestamp selection
581599
let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
582600

@@ -593,8 +611,12 @@ impl Coordinator {
593611
&read_holds_owned
594612
};
595613

596-
let (dataflow_as_of, storage_as_of, until) =
597-
self.select_timestamps(id_bundle, refresh_schedule.as_ref(), read_holds)?;
614+
let (dataflow_as_of, storage_as_of, until) = self.select_timestamps(
615+
id_bundle,
616+
refresh_schedule.as_ref(),
617+
read_holds,
618+
replacement_target,
619+
)?;
598620

599621
tracing::info!(
600622
dataflow_as_of = ?dataflow_as_of,
@@ -647,6 +669,7 @@ impl Coordinator {
647669
collections,
648670
resolved_ids,
649671
dependencies,
672+
replacement_target,
650673
cluster_id,
651674
non_null_assertions,
652675
custom_logical_compaction_window: compaction_window,
@@ -687,17 +710,26 @@ impl Coordinator {
687710

688711
let storage_metadata = coord.catalog.state().storage_metadata();
689712

713+
let mut collection_desc =
714+
CollectionDescription::for_other(output_desc, Some(storage_as_of));
715+
let mut allow_writes = true;
716+
717+
// If this MV is intended to replace another one, we need to start it in
718+
// read-only mode, targeting the shard of the replacement target.
719+
if let Some(target_id) = replacement_target {
720+
let target_gid = coord.catalog.get_entry(&target_id).latest_global_id();
721+
collection_desc.primary = Some(target_gid);
722+
allow_writes = false;
723+
}
724+
690725
// Announce the creation of the materialized view source.
691726
coord
692727
.controller
693728
.storage
694729
.create_collections(
695730
storage_metadata,
696731
None,
697-
vec![(
698-
global_id,
699-
CollectionDescription::for_other(output_desc, Some(storage_as_of)),
700-
)],
732+
vec![(global_id, collection_desc)],
701733
)
702734
.await
703735
.unwrap_or_terminate("cannot fail to append");
@@ -716,7 +748,10 @@ impl Coordinator {
716748
notice_builtin_updates_fut,
717749
)
718750
.await;
719-
coord.allow_writes(cluster_id, global_id);
751+
752+
if allow_writes {
753+
coord.allow_writes(cluster_id, global_id);
754+
}
720755
})
721756
})
722757
.await;
@@ -746,6 +781,7 @@ impl Coordinator {
746781
id_bundle: CollectionIdBundle,
747782
refresh_schedule: Option<&RefreshSchedule>,
748783
read_holds: &ReadHolds<mz_repr::Timestamp>,
784+
replacement_target: Option<CatalogItemId>,
749785
) -> Result<
750786
(
751787
Antichain<mz_repr::Timestamp>,
@@ -809,6 +845,24 @@ impl Coordinator {
809845
.and_then(|r| r.try_step_forward());
810846
let until = Antichain::from_iter(until_ts);
811847

848+
// If this is a replacement MV, ensure that `storage_as_of` > the `since` of the target
849+
// storage collection. The storage controller requires the `since` of a storage collection
850+
// to always be greater than the `since`s of the collections it depends on.
851+
if let Some(target_id) = replacement_target {
852+
let target_gid = self.catalog().get_entry(&target_id).latest_global_id();
853+
let frontiers = self
854+
.controller
855+
.storage_collections
856+
.collection_frontiers(target_gid)
857+
.expect("replacement target exists");
858+
let lower_bound = frontiers
859+
.read_capabilities
860+
.iter()
861+
.map(|t| t.step_forward())
862+
.collect();
863+
storage_as_of.join_assign(&lower_bound);
864+
}
865+
812866
Ok((dataflow_as_of, storage_as_of, until))
813867
}
814868

src/catalog/src/memory/objects.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1390,6 +1390,8 @@ pub struct MaterializedView {
13901390
pub resolved_ids: ResolvedIds,
13911391
/// All of the catalog objects that are referenced by this view.
13921392
pub dependencies: DependencyIds,
1393+
/// ID of the materialized view this materialized view is intended to replace.
1394+
pub replacement_target: Option<CatalogItemId>,
13931395
/// Cluster that this materialized view runs on.
13941396
pub cluster_id: ClusterId,
13951397
/// Column indexes that we assert are not `NULL`.

src/sql/src/plan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1816,6 +1816,7 @@ pub struct MaterializedView {
18161816
pub dependencies: DependencyIds,
18171817
/// Columns of this view.
18181818
pub column_names: Vec<ColumnName>,
1819+
pub replacement_target: Option<CatalogItemId>,
18191820
/// Cluster this materialized view will get installed on.
18201821
pub cluster_id: ClusterId,
18211822
pub non_null_assertions: Vec<usize>,

src/sql/src/plan/error.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,12 @@ pub enum PlanError {
305305
},
306306
/// AS OF or UP TO should be an expression that is castable and simplifiable to a non-null mz_timestamp value.
307307
InvalidAsOfUpTo,
308+
InvalidReplacement {
309+
item_type: CatalogItemType,
310+
item_name: PartialItemName,
311+
replacement_type: CatalogItemType,
312+
replacement_name: PartialItemName,
313+
},
308314
// TODO(benesch): eventually all errors should be structured.
309315
Unstructured(String),
310316
}
@@ -845,7 +851,10 @@ impl fmt::Display for PlanError {
845851
write!(f, "cursor {} does not exist", name.quoted())
846852
}
847853
Self::CopyFromTargetTableDropped { target_name: name } => write!(f, "COPY FROM's target table {} was dropped", name.quoted()),
848-
Self::InvalidAsOfUpTo => write!(f, "AS OF or UP TO should be castable to a (non-null) mz_timestamp value")
854+
Self::InvalidAsOfUpTo => write!(f, "AS OF or UP TO should be castable to a (non-null) mz_timestamp value"),
855+
Self::InvalidReplacement { item_type, item_name, replacement_type, replacement_name } => {
856+
write!(f, "cannot replace {item_type} {item_name} with {replacement_type} {replacement_name}")
857+
}
849858
}
850859
}
851860
}

src/sql/src/plan/statement/ddl.rs

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ use crate::catalog::{
130130
use crate::iceberg::IcebergSinkConfigOptionExtracted;
131131
use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132132
use crate::names::{
133-
Aug, CommentObjectId, DatabaseId, ObjectId, PartialItemName, QualifiedItemName,
133+
Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134134
ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135135
ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136136
};
@@ -2733,6 +2733,28 @@ pub fn plan_create_materialized_view(
27332733
let partial_name = normalize::unresolved_item_name(stmt.name)?;
27342734
let name = scx.allocate_qualified_name(partial_name.clone())?;
27352735

2736+
// Validate the replacement target, if one is given.
2737+
let replacement_target = if let Some(target_name) = &stmt.replacing {
2738+
let target = scx.get_item_by_resolved_name(target_name)?;
2739+
if target.item_type() != CatalogItemType::MaterializedView {
2740+
return Err(PlanError::InvalidReplacement {
2741+
item_type: target.item_type(),
2742+
item_name: scx.catalog.minimal_qualification(target.name()),
2743+
replacement_type: CatalogItemType::MaterializedView,
2744+
replacement_name: partial_name,
2745+
});
2746+
}
2747+
if target.id().is_system() {
2748+
sql_bail!(
2749+
"cannot replace {} because it is required by the database system",
2750+
scx.catalog.minimal_qualification(target.name()),
2751+
);
2752+
}
2753+
Some(target.id())
2754+
} else {
2755+
None
2756+
};
2757+
27362758
let query::PlannedRootQuery {
27372759
expr,
27382760
mut desc,
@@ -2962,12 +2984,16 @@ pub fn plan_create_materialized_view(
29622984
.collect()
29632985
})
29642986
.unwrap_or_default();
2965-
let dependencies = expr
2987+
let mut dependencies: BTreeSet<_> = expr
29662988
.depends_on()
29672989
.into_iter()
29682990
.map(|gid| scx.catalog.resolve_item_id(&gid))
29692991
.collect();
29702992

2993+
if let Some(id) = replacement_target {
2994+
dependencies.insert(id);
2995+
}
2996+
29712997
// Check for an object in the catalog with this same name
29722998
let full_name = scx.catalog.resolve_full_name(&name);
29732999
let partial_name = PartialItemName::from(full_name.clone());
@@ -2987,8 +3013,9 @@ pub fn plan_create_materialized_view(
29873013
materialized_view: MaterializedView {
29883014
create_sql,
29893015
expr,
2990-
dependencies,
3016+
dependencies: DependencyIds(dependencies),
29913017
column_names,
3018+
replacement_target,
29923019
cluster_id,
29933020
non_null_assertions,
29943021
compaction_window,
@@ -3238,6 +3265,7 @@ pub fn plan_create_continual_task(
32383265
expr,
32393266
dependencies,
32403267
column_names,
3268+
replacement_target: None,
32413269
cluster_id,
32423270
non_null_assertions: Vec::new(),
32433271
compaction_window: None,

0 commit comments

Comments
 (0)