Skip to content

Commit 0005b59

Browse files
committed
adapter,sql: plan and sequence replacement MV application
1 parent 45a9e27 commit 0005b59

File tree

14 files changed

+239
-21
lines changed

14 files changed

+239
-21
lines changed

src/adapter/src/catalog/transact.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ pub enum Op {
100100
typ: SqlColumnType,
101101
sql: RawDataType,
102102
},
103+
AlterMaterializedViewApplyReplacement {
104+
id: CatalogItemId,
105+
replacement_id: CatalogItemId,
106+
},
103107
CreateDatabase {
104108
name: String,
105109
owner_id: RoleId,
@@ -769,6 +773,30 @@ impl Catalog {
769773
tx.update_item(id, new_entry.into())?;
770774
storage_collections_to_register.insert(new_global_id, shard_id);
771775
}
776+
Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
777+
let mut new_entry = state.get_entry(&id).clone();
778+
let replacement = state.get_entry(&replacement_id);
779+
780+
let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
781+
return Err(AdapterError::internal(
782+
"ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
783+
"id must refer to a materialized view",
784+
));
785+
};
786+
let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
787+
return Err(AdapterError::internal(
788+
"ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
789+
"replacement_id must refer to a materialized view",
790+
));
791+
};
792+
793+
mv.apply_replacement(replacement_mv.clone());
794+
795+
tx.remove_item(replacement_id)?;
796+
tx.update_item(id, new_entry.into())?;
797+
798+
// TODO(alter-mv): audit logging
799+
}
772800
Op::CreateDatabase { name, owner_id } => {
773801
let database_owner_privileges = vec![rbac::owner_privilege(
774802
mz_sql::catalog::ObjectType::Database,

src/adapter/src/command.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -718,6 +718,7 @@ impl ExecuteResponse {
718718
| AlterSource
719719
| AlterSink
720720
| AlterTableAddColumn
721+
| AlterMaterializedViewApplyReplacement
721722
| AlterNetworkPolicy => &[AlteredObject],
722723
AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
723724
AlterSetCluster => &[AlteredObject],

src/adapter/src/coord/appends.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,6 +1006,7 @@ pub(crate) fn waiting_on_startup_appends(
10061006
| Plan::AlterRole(_)
10071007
| Plan::AlterOwner(_)
10081008
| Plan::AlterTableAddColumn(_)
1009+
| Plan::AlterMaterializedViewApplyReplacement(_)
10091010
| Plan::Declare(_)
10101011
| Plan::Fetch(_)
10111012
| Plan::Close(_)

src/adapter/src/coord/catalog_serving.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>(
128128
| Plan::AlterRole(_)
129129
| Plan::AlterOwner(_)
130130
| Plan::AlterTableAddColumn(_)
131+
| Plan::AlterMaterializedViewApplyReplacement(_)
131132
| Plan::Declare(_)
132133
| Plan::Fetch(_)
133134
| Plan::Close(_)

src/adapter/src/coord/ddl.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1480,6 +1480,7 @@ impl Coordinator {
14801480
| Op::AlterRetainHistory { .. }
14811481
| Op::AlterNetworkPolicy { .. }
14821482
| Op::AlterAddColumn { .. }
1483+
| Op::AlterMaterializedViewApplyReplacement { .. }
14831484
| Op::UpdatePrivilege { .. }
14841485
| Op::UpdateDefaultPrivilege { .. }
14851486
| Op::GrantRole { .. }

src/adapter/src/coord/peek.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -859,7 +859,7 @@ impl crate::coord::Coordinator {
859859
// If a dataflow was created, drop it once the peek command is sent.
860860
if let Some(index_id) = drop_dataflow {
861861
self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
862-
self.drop_indexes(vec![(compute_instance, index_id)]);
862+
self.drop_compute_collections(vec![(compute_instance, index_id)]);
863863
}
864864

865865
let persist_client = self.persist_client.clone();

src/adapter/src/coord/sequencer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,12 @@ impl Coordinator {
537537
let result = self.sequence_alter_table(&mut ctx, plan).await;
538538
ctx.retire(result);
539539
}
540+
Plan::AlterMaterializedViewApplyReplacement(plan) => {
541+
let result = self
542+
.sequence_alter_materialized_view_apply_replacement(&mut ctx, plan)
543+
.await;
544+
ctx.retire(result);
545+
}
540546
Plan::AlterNetworkPolicy(plan) => {
541547
let res = self
542548
.sequence_alter_network_policy(ctx.session(), plan)

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@ use mz_sql::names::{
5858
Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
5959
SchemaSpecifier, SystemObjectId,
6060
};
61-
use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
61+
use mz_sql::plan::{
62+
AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
63+
StatementContext,
64+
};
6265
use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
6366
use mz_storage_types::sinks::StorageSinkDesc;
6467
use mz_storage_types::sources::{GenericSourceConnection, IngestionDescription, SourceExport};
@@ -5105,6 +5108,57 @@ impl Coordinator {
51055108
Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
51065109
}
51075110

5111+
#[instrument]
5112+
pub(super) async fn sequence_alter_materialized_view_apply_replacement(
5113+
&mut self,
5114+
ctx: &mut ExecuteContext,
5115+
plan: AlterMaterializedViewApplyReplacementPlan,
5116+
) -> Result<ExecuteResponse, AdapterError> {
5117+
const ERROR_CONTEXT: &str = "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT";
5118+
5119+
let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan;
5120+
5121+
// TODO(alter-mv): Wait until there is overlap between the old MV's write frontier and the
5122+
// new MV's as-of, to ensure no times are skipped.
5123+
5124+
let Some(old) = self.catalog().get_entry(&id).materialized_view() else {
5125+
return Err(AdapterError::internal(
5126+
ERROR_CONTEXT,
5127+
"id must refer to a materialized view",
5128+
));
5129+
};
5130+
let Some(new) = self
5131+
.catalog()
5132+
.get_entry(&replacement_id)
5133+
.materialized_view()
5134+
else {
5135+
return Err(AdapterError::internal(
5136+
ERROR_CONTEXT,
5137+
"replacement_id must refer to a materialized view",
5138+
));
5139+
};
5140+
5141+
let old_cluster_id = old.cluster_id;
5142+
let new_cluster_id = new.cluster_id;
5143+
5144+
let old_gid = old.global_id_writes();
5145+
let new_gid = new.global_id_writes();
5146+
5147+
let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
5148+
5149+
self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
5150+
Box::pin(async move {
5151+
// Cut over the MV computation, by shutting down the old dataflow and allowing the
5152+
// new dataflow to start writing.
5153+
coord.drop_compute_collections(vec![(old_cluster_id, old_gid)]);
5154+
coord.allow_writes(new_cluster_id, new_gid);
5155+
})
5156+
})
5157+
.await?;
5158+
5159+
Ok(ExecuteResponse::AlteredObject(ObjectType::MaterializedView))
5160+
}
5161+
51085162
pub(super) async fn statistics_oracle(
51095163
&self,
51105164
session: &Session,

src/catalog/src/memory/objects.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,10 @@ impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
752752
self.entry.writable_table_details()
753753
}
754754

755+
fn replacement_target(&self) -> Option<CatalogItemId> {
756+
self.entry.replacement_target()
757+
}
758+
755759
fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
756760
self.entry.type_details()
757761
}
@@ -1446,6 +1450,67 @@ impl MaterializedView {
14461450
self.desc
14471451
.at_version(RelationVersionSelector::Specific(*version))
14481452
}
1453+
1454+
/// Apply the given replacement materialized view to this [`MaterializedView`].
1455+
pub fn apply_replacement(&mut self, replacement: Self) {
1456+
let target_id = replacement
1457+
.replacement_target
1458+
.expect("replacement has target");
1459+
1460+
fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement<Raw> {
1461+
let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| {
1462+
panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}");
1463+
});
1464+
if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast {
1465+
cmvs
1466+
} else {
1467+
panic!("invalid MV create_sql persisted in catalog\n{create_sql}");
1468+
}
1469+
}
1470+
1471+
let old_stmt = parse(&self.create_sql);
1472+
let rpl_stmt = parse(&replacement.create_sql);
1473+
let new_stmt = mz_sql::ast::CreateMaterializedViewStatement {
1474+
if_exists: old_stmt.if_exists,
1475+
name: old_stmt.name,
1476+
columns: rpl_stmt.columns,
1477+
replacing: None,
1478+
in_cluster: rpl_stmt.in_cluster,
1479+
query: rpl_stmt.query,
1480+
as_of: rpl_stmt.as_of,
1481+
with_options: rpl_stmt.with_options,
1482+
};
1483+
let create_sql = new_stmt.to_ast_string_stable();
1484+
1485+
let mut collections = std::mem::take(&mut self.collections);
1486+
// Note: We can't use `self.desc.latest_version` here because a replacement doesn't
1487+
// necessary evolve the relation schema, so that version might be lower than the actual
1488+
// latest version.
1489+
let latest_version = collections.keys().max().expect("at least one version");
1490+
let new_version = latest_version.bump();
1491+
collections.insert(new_version, replacement.global_id_writes());
1492+
1493+
let mut resolved_ids = replacement.resolved_ids;
1494+
resolved_ids.remove_item(&target_id);
1495+
let mut dependencies = replacement.dependencies;
1496+
dependencies.0.remove(&target_id);
1497+
1498+
*self = Self {
1499+
create_sql,
1500+
collections,
1501+
raw_expr: replacement.raw_expr,
1502+
optimized_expr: replacement.optimized_expr,
1503+
desc: replacement.desc,
1504+
resolved_ids,
1505+
dependencies,
1506+
replacement_target: None,
1507+
cluster_id: replacement.cluster_id,
1508+
non_null_assertions: replacement.non_null_assertions,
1509+
custom_logical_compaction_window: replacement.custom_logical_compaction_window,
1510+
refresh_schedule: replacement.refresh_schedule,
1511+
initial_as_of: replacement.initial_as_of,
1512+
};
1513+
}
14491514
}
14501515

14511516
#[derive(Debug, Clone, Serialize)]
@@ -3369,6 +3434,14 @@ impl mz_sql::catalog::CatalogItem for CatalogEntry {
33693434
}
33703435
}
33713436

3437+
fn replacement_target(&self) -> Option<CatalogItemId> {
3438+
if let CatalogItem::MaterializedView(mv) = self.item() {
3439+
mv.replacement_target
3440+
} else {
3441+
None
3442+
}
3443+
}
3444+
33723445
fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
33733446
if let CatalogItem::Type(Type { details, .. }) = self.item() {
33743447
Some(details)

src/repr/src/relation.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1355,8 +1355,6 @@ impl VersionedRelationDesc {
13551355
/// Returns this [`RelationDesc`] at the specified version.
13561356
pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
13571357
// Get all of the changes from the start, up to whatever version was requested.
1358-
//
1359-
// TODO(parkmycar): We should probably panic on unknown verisons?
13601358
let up_to_version = match version {
13611359
RelationVersionSelector::Latest => RelationVersion(u64::MAX),
13621360
RelationVersionSelector::Specific(v) => v,

0 commit comments

Comments
 (0)