Skip to content

Commit 8b09f9b

Browse files
committed
adapter,sql: plan and sequence replacement MV application
1 parent e68d38e commit 8b09f9b

File tree

13 files changed

+244
-20
lines changed

13 files changed

+244
-20
lines changed

src/adapter/src/catalog/transact.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ pub enum Op {
101101
typ: SqlColumnType,
102102
sql: RawDataType,
103103
},
104+
AlterMaterializedViewApplyReplacement {
105+
id: CatalogItemId,
106+
replacement_id: CatalogItemId,
107+
},
104108
CreateDatabase {
105109
name: String,
106110
owner_id: RoleId,
@@ -819,6 +823,30 @@ impl Catalog {
819823
tx.update_item(id, new_entry.into())?;
820824
storage_collections_to_register.insert(new_global_id, shard_id);
821825
}
826+
Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
827+
let mut new_entry = state.get_entry(&id).clone();
828+
let replacement = state.get_entry(&replacement_id);
829+
830+
let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
831+
return Err(AdapterError::internal(
832+
"ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
833+
"id must refer to a materialized view",
834+
));
835+
};
836+
let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
837+
return Err(AdapterError::internal(
838+
"ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
839+
"replacement_id must refer to a materialized view",
840+
));
841+
};
842+
843+
mv.apply_replacement(replacement_mv.clone());
844+
845+
tx.remove_item(replacement_id)?;
846+
tx.update_item(id, new_entry.into())?;
847+
848+
// TODO(alter-mv): audit logging
849+
}
822850
Op::CreateDatabase { name, owner_id } => {
823851
let database_owner_privileges = vec![rbac::owner_privilege(
824852
mz_sql::catalog::ObjectType::Database,

src/adapter/src/command.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -735,6 +735,7 @@ impl ExecuteResponse {
735735
| AlterSource
736736
| AlterSink
737737
| AlterTableAddColumn
738+
| AlterMaterializedViewApplyReplacement
738739
| AlterNetworkPolicy => &[AlteredObject],
739740
AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
740741
AlterSetCluster => &[AlteredObject],

src/adapter/src/coord/appends.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,6 +1006,7 @@ pub(crate) fn waiting_on_startup_appends(
10061006
| Plan::AlterRole(_)
10071007
| Plan::AlterOwner(_)
10081008
| Plan::AlterTableAddColumn(_)
1009+
| Plan::AlterMaterializedViewApplyReplacement(_)
10091010
| Plan::Declare(_)
10101011
| Plan::Fetch(_)
10111012
| Plan::Close(_)

src/adapter/src/coord/catalog_serving.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>(
128128
| Plan::AlterRole(_)
129129
| Plan::AlterOwner(_)
130130
| Plan::AlterTableAddColumn(_)
131+
| Plan::AlterMaterializedViewApplyReplacement(_)
131132
| Plan::Declare(_)
132133
| Plan::Fetch(_)
133134
| Plan::Close(_)

src/adapter/src/coord/ddl.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1210,6 +1210,7 @@ impl Coordinator {
12101210
| Op::AlterRetainHistory { .. }
12111211
| Op::AlterNetworkPolicy { .. }
12121212
| Op::AlterAddColumn { .. }
1213+
| Op::AlterMaterializedViewApplyReplacement { .. }
12131214
| Op::UpdatePrivilege { .. }
12141215
| Op::UpdateDefaultPrivilege { .. }
12151216
| Op::GrantRole { .. }

src/adapter/src/coord/sequencer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,12 @@ impl Coordinator {
540540
let result = self.sequence_alter_table(&mut ctx, plan).await;
541541
ctx.retire(result);
542542
}
543+
Plan::AlterMaterializedViewApplyReplacement(plan) => {
544+
let result = self
545+
.sequence_alter_materialized_view_apply_replacement(&mut ctx, plan)
546+
.await;
547+
ctx.retire(result);
548+
}
543549
Plan::AlterNetworkPolicy(plan) => {
544550
let res = self
545551
.sequence_alter_network_policy(ctx.session(), plan)

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,10 @@ use mz_sql::names::{
5757
Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
5858
SchemaSpecifier, SystemObjectId,
5959
};
60-
use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
60+
use mz_sql::plan::{
61+
AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
62+
StatementContext,
63+
};
6164
use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
6265
use mz_storage_types::sinks::StorageSinkDesc;
6366
use mz_storage_types::sources::GenericSourceConnection;
@@ -4697,6 +4700,63 @@ impl Coordinator {
46974700
Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
46984701
}
46994702

4703+
#[instrument]
4704+
pub(super) async fn sequence_alter_materialized_view_apply_replacement(
4705+
&mut self,
4706+
ctx: &mut ExecuteContext,
4707+
plan: AlterMaterializedViewApplyReplacementPlan,
4708+
) -> Result<ExecuteResponse, AdapterError> {
4709+
const ERROR_CONTEXT: &str = "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT";
4710+
4711+
let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan;
4712+
4713+
// TODO(alter-mv): Wait until there is overlap between the old MV's write frontier and the
4714+
// new MV's as-of, to ensure no times are skipped.
4715+
4716+
let Some(old) = self.catalog().get_entry(&id).materialized_view() else {
4717+
return Err(AdapterError::internal(
4718+
ERROR_CONTEXT,
4719+
"id must refer to a materialized view",
4720+
));
4721+
};
4722+
let Some(new) = self
4723+
.catalog()
4724+
.get_entry(&replacement_id)
4725+
.materialized_view()
4726+
else {
4727+
return Err(AdapterError::internal(
4728+
ERROR_CONTEXT,
4729+
"replacement_id must refer to a materialized view",
4730+
));
4731+
};
4732+
4733+
let old_cluster_id = old.cluster_id;
4734+
let new_cluster_id = new.cluster_id;
4735+
4736+
let old_gid = old.global_id_writes();
4737+
let new_gid = new.global_id_writes();
4738+
4739+
let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
4740+
4741+
self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
4742+
Box::pin(async move {
4743+
// Cut over the MV computation, by shutting down the old dataflow and allowing the
4744+
// new dataflow to start writing.
4745+
//
4746+
// Both commands are applied to their respective clusters asynchronously and they
4747+
// race, so it is possible that we allow writes by the replacement before having
4748+
// dropped the old dataflow. Thus there might be a period where the two dataflows
4749+
// are competing in trying to commit conflicting outputs. Eventually the old
4750+
// dataflow will be dropped and the replacement takes over.
4751+
coord.drop_compute_collections(vec![(old_cluster_id, old_gid)]);
4752+
coord.allow_writes(new_cluster_id, new_gid);
4753+
})
4754+
})
4755+
.await?;
4756+
4757+
Ok(ExecuteResponse::AlteredObject(ObjectType::MaterializedView))
4758+
}
4759+
47004760
pub(super) async fn statistics_oracle(
47014761
&self,
47024762
session: &Session,

src/catalog/src/memory/objects.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,10 @@ impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
753753
self.entry.writable_table_details()
754754
}
755755

756+
fn replacement_target(&self) -> Option<CatalogItemId> {
757+
self.entry.replacement_target()
758+
}
759+
756760
fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
757761
self.entry.type_details()
758762
}
@@ -1447,6 +1451,67 @@ impl MaterializedView {
14471451
self.desc
14481452
.at_version(RelationVersionSelector::Specific(*version))
14491453
}
1454+
1455+
/// Apply the given replacement materialized view to this [`MaterializedView`].
1456+
pub fn apply_replacement(&mut self, replacement: Self) {
1457+
let target_id = replacement
1458+
.replacement_target
1459+
.expect("replacement has target");
1460+
1461+
fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement<Raw> {
1462+
let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| {
1463+
panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}");
1464+
});
1465+
if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast {
1466+
cmvs
1467+
} else {
1468+
panic!("invalid MV create_sql persisted in catalog\n{create_sql}");
1469+
}
1470+
}
1471+
1472+
let old_stmt = parse(&self.create_sql);
1473+
let rpl_stmt = parse(&replacement.create_sql);
1474+
let new_stmt = mz_sql::ast::CreateMaterializedViewStatement {
1475+
if_exists: old_stmt.if_exists,
1476+
name: old_stmt.name,
1477+
columns: rpl_stmt.columns,
1478+
replacing: None,
1479+
in_cluster: rpl_stmt.in_cluster,
1480+
query: rpl_stmt.query,
1481+
as_of: rpl_stmt.as_of,
1482+
with_options: rpl_stmt.with_options,
1483+
};
1484+
let create_sql = new_stmt.to_ast_string_stable();
1485+
1486+
let mut collections = std::mem::take(&mut self.collections);
1487+
// Note: We can't use `self.desc.latest_version` here because a replacement doesn't
1488+
// necessary evolve the relation schema, so that version might be lower than the actual
1489+
// latest version.
1490+
let latest_version = collections.keys().max().expect("at least one version");
1491+
let new_version = latest_version.bump();
1492+
collections.insert(new_version, replacement.global_id_writes());
1493+
1494+
let mut resolved_ids = replacement.resolved_ids;
1495+
resolved_ids.remove_item(&target_id);
1496+
let mut dependencies = replacement.dependencies;
1497+
dependencies.0.remove(&target_id);
1498+
1499+
*self = Self {
1500+
create_sql,
1501+
collections,
1502+
raw_expr: replacement.raw_expr,
1503+
optimized_expr: replacement.optimized_expr,
1504+
desc: replacement.desc,
1505+
resolved_ids,
1506+
dependencies,
1507+
replacement_target: None,
1508+
cluster_id: replacement.cluster_id,
1509+
non_null_assertions: replacement.non_null_assertions,
1510+
custom_logical_compaction_window: replacement.custom_logical_compaction_window,
1511+
refresh_schedule: replacement.refresh_schedule,
1512+
initial_as_of: replacement.initial_as_of,
1513+
};
1514+
}
14501515
}
14511516

14521517
#[derive(Debug, Clone, Serialize)]
@@ -3401,6 +3466,14 @@ impl mz_sql::catalog::CatalogItem for CatalogEntry {
34013466
}
34023467
}
34033468

3469+
fn replacement_target(&self) -> Option<CatalogItemId> {
3470+
if let CatalogItem::MaterializedView(mv) = self.item() {
3471+
mv.replacement_target
3472+
} else {
3473+
None
3474+
}
3475+
}
3476+
34043477
fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
34053478
if let CatalogItem::Type(Type { details, .. }) = self.item() {
34063479
Some(details)

src/repr/src/relation.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1355,8 +1355,6 @@ impl VersionedRelationDesc {
13551355
/// Returns this [`RelationDesc`] at the specified version.
13561356
pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
13571357
// Get all of the changes from the start, up to whatever version was requested.
1358-
//
1359-
// TODO(parkmycar): We should probably panic on unknown verisons?
13601358
let up_to_version = match version {
13611359
RelationVersionSelector::Latest => RelationVersion(u64::MAX),
13621360
RelationVersionSelector::Specific(v) => v,

src/sql/src/catalog.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -836,6 +836,9 @@ pub trait CatalogItem {
836836
/// catalog item is a table that accepts writes.
837837
fn writable_table_details(&self) -> Option<&[Expr<Aug>]>;
838838

839+
/// The item this catalog item replaces, if any.
840+
fn replacement_target(&self) -> Option<CatalogItemId>;
841+
839842
/// Returns the type information associated with the catalog item, if the
840843
/// catalog item is a type.
841844
fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>>;

0 commit comments

Comments
 (0)