diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 63efa010aab4a..a05e0823a0dff 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -1532,7 +1532,10 @@ impl Catalog { CatalogError, > { let updates = self.storage().await.sync_to_current_updates().await?; - let (builtin_table_updates, catalog_updates) = self.state.apply_updates(updates)?; + let (builtin_table_updates, catalog_updates) = self + .state + .apply_updates(updates, &mut state::LocalExpressionCache::Closed) + .await; Ok((builtin_table_updates, catalog_updates)) } } diff --git a/src/adapter/src/catalog/apply.rs b/src/adapter/src/catalog/apply.rs index ff868c4606694..239f92ed8e9ba 100644 --- a/src/adapter/src/catalog/apply.rs +++ b/src/adapter/src/catalog/apply.rs @@ -94,12 +94,9 @@ impl CatalogState { /// Update in-memory catalog state from a list of updates made to the durable catalog state. /// /// Returns builtin table updates corresponding to the changes to catalog state. - /// - /// This is meant specifically for bootstrapping because it batches and applies builtin view - /// additions separately from other update types. #[must_use] #[instrument] - pub(crate) async fn apply_updates_for_bootstrap( + pub(crate) async fn apply_updates( &mut self, updates: Vec, local_expression_cache: &mut LocalExpressionCache, @@ -109,21 +106,31 @@ impl CatalogState { ) { let mut builtin_table_updates = Vec::with_capacity(updates.len()); let mut catalog_updates = Vec::with_capacity(updates.len()); - let updates = sort_updates(updates); + // First, consolidate updates. The code that applies parsed state + // updates _requires_ that the given updates are consolidated. There + // must be at most one addition and/or one retraction for a given item, + // as identified by that items ID type. + let updates = Self::consolidate_updates(updates); + + // Apply updates in groups, according to their timestamps. let mut groups: Vec> = Vec::new(); for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) { - groups.push(updates.collect()); + // Bring the updates into the pseudo-topological order that we need + // for updating our in-memory state and generating builtin table + // updates. + let updates = sort_updates(updates.collect()); + groups.push(updates); } + for updates in groups { - let mut apply_state = BootstrapApplyState::Updates(Vec::new()); + let mut apply_state = ApplyState::Updates(Vec::new()); let mut retractions = InProgressRetractions::default(); for update in updates { - let next_apply_state = BootstrapApplyState::new(update); let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state .step( - next_apply_state, + ApplyState::new(update), self, &mut retractions, local_expression_cache, @@ -145,47 +152,6 @@ impl CatalogState { (builtin_table_updates, catalog_updates) } - /// Update in-memory catalog state from a list of updates made to the durable catalog state. - /// - /// Returns builtin table updates corresponding to the changes to catalog state. - #[instrument] - pub(crate) fn apply_updates( - &mut self, - updates: Vec, - ) -> Result< - ( - Vec>, - Vec, - ), - CatalogError, - > { - let mut builtin_table_updates = Vec::with_capacity(updates.len()); - let mut catalog_updates = Vec::with_capacity(updates.len()); - - // First, consolidate updates. The code that applies parsed state - // updates _requires_ that the given updates are consolidated. There - // must be at most one addition and/or one retraction for a given item, - // as identified by that items ID type. - let updates = Self::consolidate_updates(updates); - - // Then bring it into the pseudo-topological order that we need for - // updating our in-memory state and generating builtin table updates. - let updates = sort_updates(updates); - - for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) { - let mut retractions = InProgressRetractions::default(); - let (builtin_table_update, catalog_updates_op) = self.apply_updates_inner( - updates.collect(), - &mut retractions, - &mut LocalExpressionCache::Closed, - )?; - builtin_table_updates.extend(builtin_table_update); - catalog_updates.extend(catalog_updates_op); - } - - Ok((builtin_table_updates, catalog_updates)) - } - /// It can happen that the sequencing logic creates "fluctuating" updates /// for a given catalog ID. For example, when doing a `DROP OWNED BY ...`, /// for a table, there will be a retraction of the original table state, @@ -203,7 +169,6 @@ impl CatalogState { updates .into_iter() - .filter(|(_kind, _ts, diff)| *diff != 0.into()) .map(|(kind, ts, diff)| StateUpdate { kind, ts, @@ -1956,21 +1921,13 @@ impl CatalogState { } } -/// Sort [`StateUpdate`]s in timestamp then dependency order -fn sort_updates(mut updates: Vec) -> Vec { - let mut sorted_updates = Vec::with_capacity(updates.len()); - - updates.sort_by_key(|update| update.ts); - for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) { - let sorted_ts_updates = sort_updates_inner(updates.collect()); - sorted_updates.extend(sorted_ts_updates); - } - - sorted_updates -} - -/// Sort [`StateUpdate`]s in dependency order for a single timestamp. -fn sort_updates_inner(updates: Vec) -> Vec { +/// Sort [`StateUpdate`]s in dependency order. +/// +/// # Panics +/// +/// This function assumes that all provided `updates` have the same timestamp +/// and will panic otherwise. +fn sort_updates(updates: Vec) -> Vec { fn push_update( update: T, diff: StateDiff, @@ -2387,46 +2344,54 @@ fn sort_updates_inner(updates: Vec) -> Vec { .collect() } -/// Most updates are applied one at a time, but during bootstrap, certain types are applied -/// separately in a batch for performance reasons. A constraint is that updates must be applied in -/// order. This process is modeled as a state machine that batches then applies groups of updates. -enum BootstrapApplyState { +/// Groups of updates of certain types are applied in batches to improve +/// performance. A constraint is that updates must be applied in order. This +/// process is modeled as a state machine that batches then applies groups of +/// updates. +enum ApplyState { /// Additions of builtin views. BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>), /// Item updates that aren't builtin view additions. + /// + /// This contains all updates whose application requires calling + /// `parse_item` and thus toggling the `enable_for_item_parsing` feature + /// flags. Items(Vec), /// All other updates. Updates(Vec), } -impl BootstrapApplyState { - fn new(update: StateUpdate) -> BootstrapApplyState { - match update { - StateUpdate { - kind: StateUpdateKind::SystemObjectMapping(system_object_mapping), - diff: StateDiff::Addition, - .. - } if matches!( - system_object_mapping.description.object_type, - CatalogItemType::View - ) => +impl ApplyState { + fn new(update: StateUpdate) -> Self { + use StateUpdateKind::*; + match &update.kind { + SystemObjectMapping(som) + if som.description.object_type == CatalogItemType::View + && update.diff == StateDiff::Addition => { - let view_addition = lookup_builtin_view_addition(system_object_mapping); - BootstrapApplyState::BuiltinViewAdditions(vec![view_addition]) - } - StateUpdate { - kind: StateUpdateKind::IntrospectionSourceIndex(_), - .. - } - | StateUpdate { - kind: StateUpdateKind::SystemObjectMapping(_), - .. - } - | StateUpdate { - kind: StateUpdateKind::Item(_), - .. - } => BootstrapApplyState::Items(vec![update]), - update => BootstrapApplyState::Updates(vec![update]), + let view_addition = lookup_builtin_view_addition(som.clone()); + Self::BuiltinViewAdditions(vec![view_addition]) + } + + IntrospectionSourceIndex(_) | SystemObjectMapping(_) | TemporaryItem(_) | Item(_) => { + Self::Items(vec![update]) + } + + Role(_) + | RoleAuth(_) + | Database(_) + | Schema(_) + | DefaultPrivilege(_) + | SystemPrivilege(_) + | SystemConfiguration(_) + | Cluster(_) + | NetworkPolicy(_) + | ClusterReplica(_) + | SourceReferences(_) + | Comment(_) + | AuditLog(_) + | StorageCollectionMetadata(_) + | UnfinalizedShard(_) => Self::Updates(vec![update]), } } @@ -2445,7 +2410,7 @@ impl BootstrapApplyState { Vec, ) { match self { - BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions) => { + Self::BuiltinViewAdditions(builtin_view_additions) => { let restore = state.system_configuration.clone(); state.system_configuration.enable_for_item_parsing(); let builtin_table_updates = CatalogState::parse_builtin_views( @@ -2458,12 +2423,12 @@ impl BootstrapApplyState { state.system_configuration = restore; (builtin_table_updates, Vec::new()) } - BootstrapApplyState::Items(updates) => state.with_enable_for_item_parsing(|state| { + Self::Items(updates) => state.with_enable_for_item_parsing(|state| { state .apply_updates_inner(updates, retractions, local_expression_cache) .expect("corrupt catalog") }), - BootstrapApplyState::Updates(updates) => state + Self::Updates(updates) => state .apply_updates_inner(updates, retractions, local_expression_cache) .expect("corrupt catalog"), } @@ -2471,12 +2436,12 @@ impl BootstrapApplyState { async fn step( self, - next: BootstrapApplyState, + next: Self, state: &mut CatalogState, retractions: &mut InProgressRetractions, local_expression_cache: &mut LocalExpressionCache, ) -> ( - BootstrapApplyState, + Self, ( Vec>, Vec, @@ -2484,34 +2449,25 @@ impl BootstrapApplyState { ) { match (self, next) { ( - BootstrapApplyState::BuiltinViewAdditions(mut builtin_view_additions), - BootstrapApplyState::BuiltinViewAdditions(next_builtin_view_additions), + Self::BuiltinViewAdditions(mut builtin_view_additions), + Self::BuiltinViewAdditions(next_builtin_view_additions), ) => { // Continue batching builtin view additions. builtin_view_additions.extend(next_builtin_view_additions); ( - BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions), + Self::BuiltinViewAdditions(builtin_view_additions), (Vec::new(), Vec::new()), ) } - (BootstrapApplyState::Items(mut updates), BootstrapApplyState::Items(next_updates)) => { + (Self::Items(mut updates), Self::Items(next_updates)) => { // Continue batching item updates. updates.extend(next_updates); - ( - BootstrapApplyState::Items(updates), - (Vec::new(), Vec::new()), - ) + (Self::Items(updates), (Vec::new(), Vec::new())) } - ( - BootstrapApplyState::Updates(mut updates), - BootstrapApplyState::Updates(next_updates), - ) => { + (Self::Updates(mut updates), Self::Updates(next_updates)) => { // Continue batching updates. updates.extend(next_updates); - ( - BootstrapApplyState::Updates(updates), - (Vec::new(), Vec::new()), - ) + (Self::Updates(updates), (Vec::new(), Vec::new())) } (apply_state, next_apply_state) => { // Apply the current batch and start batching new apply state. diff --git a/src/adapter/src/catalog/migrate.rs b/src/adapter/src/catalog/migrate.rs index 78e890c06da04..e663eb3d6e9ca 100644 --- a/src/adapter/src/catalog/migrate.rs +++ b/src/adapter/src/catalog/migrate.rs @@ -194,9 +194,8 @@ pub(crate) async fn migrate( .expect("known parameter"); } - let (mut ast_builtin_table_updates, mut ast_catalog_updates) = state - .apply_updates_for_bootstrap(item_updates, local_expr_cache) - .await; + let (mut ast_builtin_table_updates, mut ast_catalog_updates) = + state.apply_updates(item_updates, local_expr_cache).await; info!("migrating from catalog version {:?}", catalog_version); @@ -240,9 +239,8 @@ pub(crate) async fn migrate( // input and stages arbitrary transformations to the catalog on `tx`. let op_item_updates = tx.get_and_commit_op_updates(); - let (item_builtin_table_updates, item_catalog_updates) = state - .apply_updates_for_bootstrap(op_item_updates, local_expr_cache) - .await; + let (item_builtin_table_updates, item_catalog_updates) = + state.apply_updates(op_item_updates, local_expr_cache).await; ast_builtin_table_updates.extend(item_builtin_table_updates); ast_catalog_updates.extend(item_catalog_updates); diff --git a/src/adapter/src/catalog/open.rs b/src/adapter/src/catalog/open.rs index 6ff4f1ba3ada7..1550a29788025 100644 --- a/src/adapter/src/catalog/open.rs +++ b/src/adapter/src/catalog/open.rs @@ -314,7 +314,7 @@ impl Catalog { } let (builtin_table_update, _catalog_updates) = state - .apply_updates_for_bootstrap(pre_item_updates, &mut LocalExpressionCache::Closed) + .apply_updates(pre_item_updates, &mut LocalExpressionCache::Closed) .await; builtin_table_updates.extend(builtin_table_update); @@ -416,7 +416,7 @@ impl Catalog { // and return and use the updates from here. But that's at the very // least future work. let (builtin_table_update, _catalog_updates) = state - .apply_updates_for_bootstrap(system_item_updates, &mut local_expr_cache) + .apply_updates(system_item_updates, &mut local_expr_cache) .await; builtin_table_updates.extend(builtin_table_update); @@ -467,7 +467,7 @@ impl Catalog { ) } else { state - .apply_updates_for_bootstrap(item_updates, &mut local_expr_cache) + .apply_updates(item_updates, &mut local_expr_cache) .await }; builtin_table_updates.extend(builtin_table_update); @@ -481,7 +481,7 @@ impl Catalog { }) .collect(); let (builtin_table_update, _catalog_updates) = state - .apply_updates_for_bootstrap(post_item_updates, &mut local_expr_cache) + .apply_updates(post_item_updates, &mut local_expr_cache) .await; builtin_table_updates.extend(builtin_table_update); @@ -511,7 +511,7 @@ impl Catalog { // and return and use the updates from here. But that's at the very // least future work. let (table_updates, _catalog_updates) = state - .apply_updates_for_bootstrap(state_updates, &mut local_expr_cache) + .apply_updates(state_updates, &mut local_expr_cache) .await; builtin_table_updates.extend(table_updates); let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates); @@ -656,7 +656,9 @@ impl Catalog { .map_err(mz_catalog::durable::DurableCatalogError::from)?; let updates = txn.get_and_commit_op_updates(); - let (builtin_updates, catalog_updates) = state.apply_updates(updates)?; + let (builtin_updates, catalog_updates) = state + .apply_updates(updates, &mut LocalExpressionCache::Closed) + .await; assert!( builtin_updates.is_empty(), "storage is not allowed to generate catalog changes that would cause changes to builtin tables" diff --git a/src/adapter/src/catalog/transact.rs b/src/adapter/src/catalog/transact.rs index 49ed6deb60c57..cdace571f0d27 100644 --- a/src/adapter/src/catalog/transact.rs +++ b/src/adapter/src/catalog/transact.rs @@ -63,6 +63,7 @@ use mz_storage_client::storage_collections::StorageCollections; use tracing::{info, trace}; use crate::AdapterError; +use crate::catalog::state::LocalExpressionCache; use crate::catalog::{ BuiltinTableUpdate, Catalog, CatalogState, UpdatePrivilegeVariant, catalog_type_to_audit_object_type, comment_id_to_audit_object_type, is_reserved_name, @@ -598,17 +599,19 @@ impl Catalog { let mut op_updates: Vec<_> = tx.get_and_commit_op_updates(); op_updates.extend(temporary_item_updates); if !op_updates.is_empty() { - let (_op_builtin_table_updates, _op_catalog_updates) = - preliminary_state - .to_mut() - .apply_updates(op_updates.clone())?; + let (_op_builtin_table_updates, _op_catalog_updates) = preliminary_state + .to_mut() + .apply_updates(op_updates.clone(), &mut LocalExpressionCache::Closed) + .await; } updates.append(&mut op_updates); } if !updates.is_empty() { - let (op_builtin_table_updates, op_catalog_updates) = - state.to_mut().apply_updates(updates.clone())?; + let (op_builtin_table_updates, op_catalog_updates) = state + .to_mut() + .apply_updates(updates.clone(), &mut LocalExpressionCache::Closed) + .await; let op_builtin_table_updates = state .to_mut() .resolve_builtin_table_updates(op_builtin_table_updates); @@ -630,8 +633,10 @@ impl Catalog { let updates = tx.get_and_commit_op_updates(); if !updates.is_empty() { - let (op_builtin_table_updates, op_catalog_updates) = - state.to_mut().apply_updates(updates.clone())?; + let (op_builtin_table_updates, op_catalog_updates) = state + .to_mut() + .apply_updates(updates.clone(), &mut LocalExpressionCache::Closed) + .await; let op_builtin_table_updates = state .to_mut() .resolve_builtin_table_updates(op_builtin_table_updates); diff --git a/test/sqllogictest/swap.slt b/test/sqllogictest/swap.slt index dd194e778979e..00c9d0de9dfb0 100644 --- a/test/sqllogictest/swap.slt +++ b/test/sqllogictest/swap.slt @@ -154,6 +154,28 @@ SELECT name FROM mz_clusters JOIN ( SELECT cluster_id FROM mz_indexes WHERE name ---- bar + +# Ensure item parsing during applying a schema swap gracefully handles persisted +# SQL that depends on disabled feature flags. +# Regression test for database-issues#9971. + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_load_generator_counter TO true; +---- +COMPLETE 0 + +statement ok +CREATE SOURCE blue.ctr FROM LOAD GENERATOR COUNTER; + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_load_generator_counter TO false; +---- +COMPLETE 0 + +statement ok +ALTER SCHEMA blue SWAP WITH green; + + # Disable the feature. simple conn=mz_system,user=mz_system ALTER SYSTEM SET enable_alter_swap TO false;