Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def get_minimal_system_parameters(
"enable_rbac_checks": "true",
"enable_reduce_mfp_fusion": "true",
"enable_refresh_every_mvs": "true",
"enable_replacement_materialized_views": "true",
"enable_repr_typecheck": "true",
"enable_cluster_schedule_refresh": "true",
"enable_sql_server_source": "true",
Expand Down
27 changes: 12 additions & 15 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,7 @@ use mz_adapter_types::dyncfgs::ENABLE_PASSWORD_AUTH;
use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
use mz_catalog::SYSTEM_CONN_ID;
use mz_catalog::builtin::{
BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS,
MZ_CONNECTIONS, MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS,
MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_INDEXES,
MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OBJECT_GLOBAL_IDS,
MZ_OPERATORS, MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
MZ_PSEUDO_TYPES, MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SCHEMAS,
MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES,
MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
MZ_WEBHOOKS_SOURCES,
BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS, MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTERS, MZ_CLUSTER_REPLICAS, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_COLUMNS, MZ_COMMENTS, MZ_CONNECTIONS, MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS, MZ_INDEXES, MZ_INDEX_COLUMNS, MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCES, MZ_KAFKA_SOURCE_TABLES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES, MZ_MATERIALIZED_VIEWS, MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES, MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCES, MZ_POSTGRES_SOURCE_TABLES, MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLES, MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCES, MZ_SOURCE_REFERENCES, MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPES, MZ_TYPE_PG_METADATA, MZ_VIEWS, MZ_WEBHOOKS_SOURCES
};
use mz_catalog::config::AwsPrincipalContext;
use mz_catalog::durable::SourceReferences;
Expand Down Expand Up @@ -1538,6 +1524,17 @@ impl CatalogState {
));
}

if let Some(target_id) = mview.replacement_target {
updates.push(BuiltinTableUpdate::row(
&*MZ_REPLACEMENTS,
Row::pack_slice(&[
Datum::String(&id.to_string()),
Datum::String(&target_id.to_string()),
]),
diff,
));
}

updates
}

Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,7 @@ impl CatalogState {
desc,
resolved_ids,
dependencies,
replacement_target: materialized_view.replacement_target,
cluster_id: materialized_view.cluster_id,
non_null_assertions: materialized_view.non_null_assertions,
custom_logical_compaction_window: materialized_view.compaction_window,
Expand Down
38 changes: 37 additions & 1 deletion src/adapter/src/catalog/transact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ pub enum Op {
typ: SqlColumnType,
sql: RawDataType,
},
AlterMaterializedViewApplyReplacement {
id: CatalogItemId,
replacement_id: CatalogItemId,
},
CreateDatabase {
name: String,
owner_id: RoleId,
Expand Down Expand Up @@ -819,6 +823,30 @@ impl Catalog {
tx.update_item(id, new_entry.into())?;
storage_collections_to_register.insert(new_global_id, shard_id);
}
Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
let mut new_entry = state.get_entry(&id).clone();
let replacement = state.get_entry(&replacement_id);

let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
return Err(AdapterError::internal(
"ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
"id must refer to a materialized view",
));
};
let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
return Err(AdapterError::internal(
"ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
"replacement_id must refer to a materialized view",
));
};

mv.apply_replacement(replacement_mv.clone());

tx.remove_item(replacement_id)?;
tx.update_item(id, new_entry.into())?;

// TODO(alter-mv): audit logging
}
Op::CreateDatabase { name, owner_id } => {
let database_owner_privileges = vec![rbac::owner_privilege(
mz_sql::catalog::ObjectType::Database,
Expand Down Expand Up @@ -1130,7 +1158,15 @@ impl Catalog {
storage_collections_to_create.insert(source.global_id());
}
CatalogItem::MaterializedView(mv) => {
storage_collections_to_create.insert(mv.global_id_writes());
let mv_gid = mv.global_id_writes();
if let Some(target_id) = mv.replacement_target {
let target_gid = state.get_entry(&target_id).latest_global_id();
let shard_id =
state.storage_metadata().get_collection_shard(target_gid)?;
storage_collections_to_register.insert(mv_gid, shard_id);
} else {
storage_collections_to_create.insert(mv_gid);
}
}
CatalogItem::ContinualTask(ct) => {
storage_collections_to_create.insert(ct.global_id());
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ impl ExecuteResponse {
| AlterSource
| AlterSink
| AlterTableAddColumn
| AlterMaterializedViewApplyReplacement
| AlterNetworkPolicy => &[AlteredObject],
AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
AlterSetCluster => &[AlteredObject],
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/continual_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub fn ct_item_from_plan(
expr: mut raw_expr,
dependencies,
column_names: _,
replacement_target: _,
non_null_assertions: _,
compaction_window: _,
refresh_schedule: _,
Expand Down
7 changes: 6 additions & 1 deletion src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2117,7 +2117,12 @@ impl Coordinator {
}

self.ship_dataflow(df_desc, mview.cluster_id, None).await;
self.allow_writes(mview.cluster_id, mview.global_id_writes());

// If this is a replacement MV, it must remain read-only until the replacement
// gets applied.
if mview.replacement_target.is_none() {
self.allow_writes(mview.cluster_id, mview.global_id_writes());
}
}
CatalogItem::Sink(sink) => {
policies_to_set
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/appends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,7 @@ pub(crate) fn waiting_on_startup_appends(
| Plan::AlterRole(_)
| Plan::AlterOwner(_)
| Plan::AlterTableAddColumn(_)
| Plan::AlterMaterializedViewApplyReplacement(_)
| Plan::Declare(_)
| Plan::Fetch(_)
| Plan::Close(_)
Expand Down
100 changes: 60 additions & 40 deletions src/adapter/src/coord/catalog_implications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,7 @@ impl Coordinator {
let mut sources_to_drop = vec![];
let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
let mut storage_sink_gids_to_drop = vec![];
let mut indexes_to_drop = vec![];
let mut materialized_views_to_drop = vec![];
let mut continual_tasks_to_drop = vec![];
let mut compute_gids_to_drop = vec![];
let mut view_gids_to_drop = vec![];
let mut secrets_to_drop = vec![];
let mut vpc_endpoints_to_drop = vec![];
Expand All @@ -182,6 +180,11 @@ impl Coordinator {
let mut storage_policies_to_initialize = BTreeMap::new();
let mut execution_timestamps_to_set = BTreeSet::new();

// Replacing a materialized view causes the replacement's catalog entry
// to be dropped. Its compute and storage collections are transferred to
// the target MV though, so we must make sure not to drop those.
let mut replacement_gids = vec![];

// We're incrementally migrating the code that manipulates the
// controller from closures in the sequencer. For some types of catalog
// changes we haven't done this migration yet, so there you will see
Expand Down Expand Up @@ -305,7 +308,7 @@ impl Coordinator {
);
}
CatalogImplication::Index(CatalogImplicationKind::Dropped(index, full_name)) => {
indexes_to_drop.push((index.cluster_id, index.global_id()));
compute_gids_to_drop.push((index.cluster_id, index.global_id()));
dropped_item_names.insert(index.global_id(), full_name);
}
CatalogImplication::MaterializedView(CatalogImplicationKind::Added(mv)) => {
Expand All @@ -315,17 +318,20 @@ impl Coordinator {
prev: prev_mv,
new: new_mv,
}) => {
tracing::debug!(
?prev_mv,
?new_mv,
"not handling AlterMaterializedView in here yet"
);
let old_gid = prev_mv.global_id_writes();
let new_gid = new_mv.global_id_writes();
if new_gid != old_gid {
replacement_gids.push((new_mv.cluster_id, new_gid));
}

self.handle_alter_materialized_view(prev_mv, new_mv)?;
}
CatalogImplication::MaterializedView(CatalogImplicationKind::Dropped(
mv,
full_name,
)) => {
materialized_views_to_drop.push((mv.cluster_id, mv.global_id_writes()));
compute_gids_to_drop.push((mv.cluster_id, mv.global_id_writes()));
sources_to_drop.extend(mv.global_ids().map(|gid| (catalog_id, gid)));
dropped_item_names.insert(mv.global_id_writes(), full_name);
}
CatalogImplication::View(CatalogImplicationKind::Added(view)) => {
Expand Down Expand Up @@ -358,7 +364,8 @@ impl Coordinator {
ct,
_full_name,
)) => {
continual_tasks_to_drop.push((catalog_id, ct.cluster_id, ct.global_id()));
compute_gids_to_drop.push((ct.cluster_id, ct.global_id()));
sources_to_drop.push((catalog_id, ct.global_id()));
}
CatalogImplication::Secret(CatalogImplicationKind::Added(secret)) => {
tracing::debug!(?secret, "not handling AddSecret in here yet");
Expand Down Expand Up @@ -500,6 +507,12 @@ impl Coordinator {
}
}

// Cancel out drops against replacements.
for (cluster_id, gid) in replacement_gids {
sources_to_drop.retain(|(_, id)| *id != gid);
compute_gids_to_drop.retain(|id| *id != (cluster_id, gid));
}

if !source_collections_to_create.is_empty() {
self.create_source_collections(source_collections_to_create)
.await?;
Expand All @@ -524,9 +537,7 @@ impl Coordinator {
.map(|(_, gid)| *gid)
.chain(tables_to_drop.iter().map(|(_, gid)| *gid))
.chain(storage_sink_gids_to_drop.iter().copied())
.chain(indexes_to_drop.iter().map(|(_, id)| *id))
.chain(materialized_views_to_drop.iter().map(|(_, id)| *id))
.chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid))
.chain(compute_gids_to_drop.iter().map(|(_, gid)| *gid))
.chain(view_gids_to_drop.iter().copied())
.collect();

Expand Down Expand Up @@ -592,27 +603,14 @@ impl Coordinator {
}
}

let storage_ids_to_drop: BTreeSet<_> = sources_to_drop
let storage_gids_to_drop: BTreeSet<_> = sources_to_drop
.iter()
.map(|(_id, gid)| gid)
.chain(storage_sink_gids_to_drop.iter())
.chain(tables_to_drop.iter().map(|(_id, gid)| gid))
.chain(materialized_views_to_drop.iter().map(|(_, id)| id))
.chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| gid))
.copied()
.collect();

let compute_ids_to_drop: BTreeSet<_> = indexes_to_drop
.iter()
.copied()
.chain(materialized_views_to_drop.iter().copied())
.chain(
continual_tasks_to_drop
.iter()
.map(|(_, cluster_id, gid)| (*cluster_id, *gid)),
)
.collect();

// Gather resources that we have to remove from timeline state and
// pre-check if any Timelines become empty, when we drop the specified
// storage and compute resources.
Expand All @@ -624,13 +622,13 @@ impl Coordinator {
let mut id_bundle = CollectionIdBundle::default();

for storage_id in read_holds.storage_ids() {
if storage_ids_to_drop.contains(&storage_id) {
if storage_gids_to_drop.contains(&storage_id) {
id_bundle.storage_ids.insert(storage_id);
}
}

for (instance_id, id) in read_holds.compute_ids() {
if compute_ids_to_drop.contains(&(instance_id, id))
if compute_gids_to_drop.contains(&(instance_id, id))
|| clusters_to_drop.contains(&instance_id)
{
id_bundle
Expand Down Expand Up @@ -713,16 +711,8 @@ impl Coordinator {
}
}

if !indexes_to_drop.is_empty() {
self.drop_indexes(indexes_to_drop);
}

if !materialized_views_to_drop.is_empty() {
self.drop_materialized_views(materialized_views_to_drop);
}

if !continual_tasks_to_drop.is_empty() {
self.drop_continual_tasks(continual_tasks_to_drop);
if !compute_gids_to_drop.is_empty() {
self.drop_compute_collections(compute_gids_to_drop);
}

if !vpc_endpoints_to_drop.is_empty() {
Expand Down Expand Up @@ -1127,6 +1117,36 @@ impl Coordinator {
Ok(())
}

#[instrument(level = "debug")]
fn handle_alter_materialized_view(
&mut self,
prev_mv: MaterializedView,
new_mv: MaterializedView,
) -> Result<(), AdapterError> {
let old_gid = prev_mv.global_id_writes();
let new_gid = new_mv.global_id_writes();

if old_gid == new_gid {
// It's not an ALTER MATERIALIZED VIEW as far as the controller is
// concerned, because we still have the same GlobalId. This is
// likely a change from an ALTER SWAP.
return Ok(());
}

// Cut over the MV computation, by shutting down the old dataflow and allowing the
// new dataflow to start writing.
//
// Both commands are applied to their respective clusters asynchronously and they
// race, so it is possible that we allow writes by the replacement before having
// dropped the old dataflow. Thus there might be a period where the two dataflows
// are competing in trying to commit conflicting outputs. Eventually the old
// dataflow will be dropped and the replacement takes over.
self.drop_compute_collections(vec![(prev_mv.cluster_id, old_gid)]);
self.allow_writes(new_mv.cluster_id, new_gid);

Ok(())
}

#[instrument(level = "debug")]
async fn handle_create_source(
&mut self,
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/catalog_serving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>(
| Plan::AlterRole(_)
| Plan::AlterOwner(_)
| Plan::AlterTableAddColumn(_)
| Plan::AlterMaterializedViewApplyReplacement(_)
| Plan::Declare(_)
| Plan::Fetch(_)
| Plan::Close(_)
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,7 @@ impl Coordinator {
| Statement::AlterConnection(_)
| Statement::AlterDefaultPrivileges(_)
| Statement::AlterIndex(_)
| Statement::AlterMaterializedViewApplyReplacement(_)
| Statement::AlterSetCluster(_)
| Statement::AlterOwner(_)
| Statement::AlterRetainHistory(_)
Expand Down Expand Up @@ -1286,6 +1287,7 @@ impl Coordinator {
if_exists: cmvs.if_exists,
name: cmvs.name,
columns: cmvs.columns,
replacing: cmvs.replacing,
in_cluster: cmvs.in_cluster,
query: cmvs.query,
with_options: cmvs.with_options,
Expand Down
Loading