diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 9aa37bc400501..e317ad29f5893 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -85,9 +85,7 @@ use uuid::Uuid; // DO NOT add any more imports from `crate` outside of `crate::catalog`. pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate; -pub use crate::catalog::open::{ - BuiltinMigrationMetadata, InitializeStateResult, OpenCatalogResult, -}; +pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult}; pub use crate::catalog::state::CatalogState; pub use crate::catalog::transact::{ DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult, @@ -671,9 +669,9 @@ impl Catalog { // debugging/testing. let previous_ts = now().into(); let replica_size = &bootstrap_args.default_cluster_replica_size; + let read_only = false; let OpenCatalogResult { catalog, - storage_collections_to_drop: _, migrated_storage_collections_0dt: _, new_builtin_collections: _, builtin_table_updates: _, @@ -687,7 +685,7 @@ impl Catalog { all_features: false, build_info: &DUMMY_BUILD_INFO, environment_id: environment_id.unwrap_or(EnvironmentId::for_tests()), - read_only: false, + read_only, now, boot_ts: previous_ts, skip_migrations: true, @@ -705,7 +703,10 @@ impl Catalog { aws_privatelink_availability_zones: None, http_host_name: None, connection_context: ConnectionContext::for_tests(secrets_reader), - builtin_item_migration_config: BuiltinItemMigrationConfig::Legacy, + builtin_item_migration_config: BuiltinItemMigrationConfig { + persist_client: persist_client.clone(), + read_only, + }, persist_client, enable_expression_cache_override, enable_0dt_deployment: true, @@ -2267,10 +2268,7 @@ mod tests { use tokio_postgres::NoTls; use uuid::Uuid; - use mz_catalog::builtin::{ - Builtin, BuiltinType, UnsafeBuiltinTableFingerprintWhitespace, BUILTINS, - UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE, - }; + use mz_catalog::builtin::{Builtin, BuiltinType, BUILTINS}; use mz_catalog::durable::{test_bootstrap_args, CatalogError, DurableCatalogError, FenceError}; use mz_catalog::SYSTEM_CONN_ID; use mz_controller_types::{ClusterId, ReplicaId}; @@ -2285,9 +2283,7 @@ mod tests { CatalogItemId, Datum, GlobalId, RelationType, RelationVersionSelector, RowArena, ScalarType, Timestamp, }; - use mz_sql::catalog::{ - BuiltinsConfig, CatalogDatabase, CatalogSchema, CatalogType, SessionCatalog, - }; + use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog}; use mz_sql::func::{Func, FuncImpl, Operation, OP_IMPLS}; use mz_sql::names::{ self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName, @@ -3507,119 +3503,6 @@ mod tests { .await } - #[mz_ore::test(tokio::test)] - #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` - async fn test_builtin_migrations() { - let persist_client = PersistClient::new_for_tests().await; - let bootstrap_args = test_bootstrap_args(); - let organization_id = Uuid::new_v4(); - let mv_name = "mv"; - let (mz_tables_id, mv_id) = { - let mut catalog = Catalog::open_debug_catalog( - persist_client.clone(), - organization_id.clone(), - &bootstrap_args, - ) - .await - .expect("unable to open debug catalog"); - - // Create a materialized view over `mz_tables`. - let database_id = DatabaseId::User(1); - let database = catalog.get_database(&database_id); - let database_name = database.name(); - let schemas = database.schemas(); - let schema = schemas.first().expect("must have at least one schema"); - let schema_spec = schema.id().clone(); - let schema_name = &schema.name().schema; - let database_spec = ResolvedDatabaseSpecifier::Id(database_id); - let id_ts = catalog.storage().await.current_upper().await; - let (mv_id, mv_gid) = catalog - .allocate_user_id(id_ts) - .await - .expect("unable to allocate id"); - let mv = catalog - .state() - .deserialize_item( - mv_gid, - &format!("CREATE MATERIALIZED VIEW {database_name}.{schema_name}.{mv_name} AS SELECT name FROM mz_tables"), - &BTreeMap::new(), - &mut LocalExpressionCache::Closed, - None, - ) - .expect("unable to deserialize item"); - let commit_ts = catalog.current_upper().await; - catalog - .transact( - None, - commit_ts, - None, - vec![Op::CreateItem { - id: mv_id, - name: QualifiedItemName { - qualifiers: ItemQualifiers { - database_spec, - schema_spec, - }, - item: mv_name.to_string(), - }, - item: mv, - owner_id: MZ_SYSTEM_ROLE_ID, - }], - ) - .await - .expect("unable to transact"); - - let mz_tables_id = catalog - .entries() - .find(|entry| &entry.name.item == "mz_tables" && entry.is_table()) - .expect("mz_tables doesn't exist") - .id(); - let check_mv_id = catalog - .entries() - .find(|entry| &entry.name.item == mv_name && entry.is_materialized_view()) - .unwrap_or_else(|| panic!("{mv_name} doesn't exist")) - .id(); - assert_eq!(check_mv_id, mv_id); - catalog.expire().await; - (mz_tables_id, mv_id) - }; - // Forcibly migrate all tables. - { - let mut guard = - UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE - .lock() - .expect("lock poisoned"); - *guard = Some(( - UnsafeBuiltinTableFingerprintWhitespace::All, - "\n".to_string(), - )); - } - { - let catalog = - Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args) - .await - .expect("unable to open debug catalog"); - - let new_mz_tables_id = catalog - .entries() - .find(|entry| &entry.name.item == "mz_tables" && entry.is_table()) - .expect("mz_tables doesn't exist") - .id(); - // Assert that the table was migrated and got a new ID. - assert_ne!(new_mz_tables_id, mz_tables_id); - - let new_mv_id = catalog - .entries() - .find(|entry| &entry.name.item == mv_name && entry.is_materialized_view()) - .unwrap_or_else(|| panic!("{mv_name} doesn't exist")) - .id(); - // Assert that the materialized view was migrated and got a new ID. - assert_ne!(new_mv_id, mv_id); - - catalog.expire().await; - } - } - #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` async fn test_multi_subscriber_catalog() { diff --git a/src/adapter/src/catalog/open.rs b/src/adapter/src/catalog/open.rs index 1b66b119edd7a..ac200eba0a54f 100644 --- a/src/adapter/src/catalog/open.rs +++ b/src/adapter/src/catalog/open.rs @@ -17,12 +17,10 @@ use std::time::{Duration, Instant}; use futures::future::{BoxFuture, FutureExt}; use itertools::{Either, Itertools}; -use mz_adapter_types::compaction::CompactionWindow; use mz_adapter_types::dyncfgs::{ENABLE_CONTINUAL_TASK_BUILTINS, ENABLE_EXPRESSION_CACHE}; use mz_catalog::builtin::{ - Builtin, BuiltinTable, Fingerprint, BUILTINS, BUILTIN_CLUSTERS, BUILTIN_CLUSTER_REPLICAS, - BUILTIN_PREFIXES, BUILTIN_ROLES, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, - RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL, + Builtin, Fingerprint, BUILTINS, BUILTIN_CLUSTERS, BUILTIN_CLUSTER_REPLICAS, BUILTIN_PREFIXES, + BUILTIN_ROLES, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL, }; use mz_catalog::config::{ClusterReplicaSizeMap, StateConfig}; use mz_catalog::durable::objects::{ @@ -34,35 +32,30 @@ use mz_catalog::expr_cache::{ }; use mz_catalog::memory::error::{Error, ErrorKind}; use mz_catalog::memory::objects::{ - BootstrapStateUpdateKind, CatalogEntry, CatalogItem, CommentsMap, DefaultPrivileges, - StateUpdate, + BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, StateUpdate, }; use mz_catalog::SYSTEM_CONN_ID; -use mz_compute_client::logging::LogVariant; use mz_controller::clusters::{ReplicaAllocation, ReplicaLogging}; use mz_controller_types::ClusterId; use mz_ore::cast::usize_to_u64; -use mz_ore::collections::{CollectionExt, HashSet}; +use mz_ore::collections::HashSet; use mz_ore::now::to_datetime; use mz_ore::{instrument, soft_assert_no_log}; use mz_repr::adt::mz_acl_item::PrivilegeMap; use mz_repr::namespaces::is_unstable_schema; -use mz_repr::role_id::RoleId; -use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp}; +use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp}; use mz_sql::catalog::{ - BuiltinsConfig, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, - CatalogItemType, RoleMembership, RoleVars, + BuiltinsConfig, CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars, }; use mz_sql::func::OP_IMPLS; -use mz_sql::names::SchemaId; use mz_sql::rbac; use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER}; use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput}; -use mz_sql_parser::ast::display::AstDisplay; use mz_storage_client::storage_collections::StorageCollections; use timely::Container; -use tracing::{error, info, warn, Instrument}; +use tracing::{info, warn, Instrument}; use uuid::Uuid; + // DO NOT add any more imports from `crate` outside of `crate::catalog`. use crate::catalog::open::builtin_item_migration::{ migrate_builtin_items, BuiltinItemMigrationResult, @@ -73,113 +66,9 @@ use crate::catalog::{ }; use crate::AdapterError; -#[derive(Debug)] -pub struct BuiltinMigrationMetadata { - /// Used to drop objects on STORAGE nodes. - /// - /// Note: These collections are only known by the storage controller, and not the - /// Catalog, thus we identify them by their [`GlobalId`]. - pub previous_storage_collection_ids: BTreeSet, - // Used to update persisted on disk catalog state - pub migrated_system_object_mappings: BTreeMap, - pub introspection_source_index_updates: - BTreeMap>, - pub user_item_drop_ops: Vec, - pub user_item_create_ops: Vec, -} - -#[derive(Debug)] -pub struct CreateOp { - id: CatalogItemId, - oid: u32, - global_id: GlobalId, - schema_id: SchemaId, - name: String, - owner_id: RoleId, - privileges: PrivilegeMap, - item_rebuilder: CatalogItemRebuilder, -} - -impl BuiltinMigrationMetadata { - fn new() -> BuiltinMigrationMetadata { - BuiltinMigrationMetadata { - previous_storage_collection_ids: BTreeSet::new(), - migrated_system_object_mappings: BTreeMap::new(), - introspection_source_index_updates: BTreeMap::new(), - user_item_drop_ops: Vec::new(), - user_item_create_ops: Vec::new(), - } - } -} - -#[derive(Debug)] -pub enum CatalogItemRebuilder { - SystemSource(CatalogItem), - Object { - sql: String, - is_retained_metrics_object: bool, - custom_logical_compaction_window: Option, - }, -} - -impl CatalogItemRebuilder { - fn new( - entry: &CatalogEntry, - id: CatalogItemId, - ancestor_ids: &BTreeMap, - ) -> Self { - if id.is_system() - && (entry.is_table() || entry.is_introspection_source() || entry.is_source()) - { - Self::SystemSource(entry.item().clone()) - } else { - let create_sql = entry.create_sql().to_string(); - let mut create_stmt = mz_sql::parse::parse(&create_sql) - .expect("invalid create sql persisted to catalog") - .into_element() - .ast; - mz_sql::ast::transform::create_stmt_replace_ids(&mut create_stmt, ancestor_ids); - Self::Object { - sql: create_stmt.to_ast_string_stable(), - is_retained_metrics_object: entry.item().is_retained_metrics_object(), - custom_logical_compaction_window: entry.item().custom_logical_compaction_window(), - } - } - } - - fn build( - self, - global_id: GlobalId, - state: &CatalogState, - versions: &BTreeMap, - ) -> CatalogItem { - match self { - Self::SystemSource(item) => item, - Self::Object { - sql, - is_retained_metrics_object, - custom_logical_compaction_window, - } => state - .parse_item( - global_id, - &sql, - versions, - None, - is_retained_metrics_object, - custom_logical_compaction_window, - &mut LocalExpressionCache::Closed, - None, - ) - .unwrap_or_else(|error| panic!("invalid persisted create sql ({error:?}): {sql}")), - } - } -} - pub struct InitializeStateResult { /// An initialized [`CatalogState`]. pub state: CatalogState, - /// A set of storage collections to drop (only used by legacy migrations). - pub storage_collections_to_drop: BTreeSet, /// A set of new shards that may need to be initialized (only used by 0dt migration). pub migrated_storage_collections_0dt: BTreeSet, /// A set of new builtin items. @@ -199,12 +88,7 @@ pub struct InitializeStateResult { pub struct OpenCatalogResult { /// An opened [`Catalog`]. pub catalog: Catalog, - /// A set of storage collections to drop (only used by legacy migrations). - /// - /// Note: These Collections will not be in the Catalog, and are only known about by - /// the storage controller, which is why we identify them by [`GlobalId`]. - pub storage_collections_to_drop: BTreeSet, - /// A set of new shards that may need to be initialized (only used by 0dt migration). + /// A set of new shards that may need to be initialized. pub migrated_storage_collections_0dt: BTreeSet, /// A set of new builtin items. pub new_builtin_collections: BTreeSet, @@ -540,7 +424,6 @@ impl Catalog { // Migrate builtin items. let BuiltinItemMigrationResult { builtin_table_updates: builtin_table_update, - storage_collections_to_drop, migrated_storage_collections_0dt, cleanup_action, } = migrate_builtin_items( @@ -560,7 +443,6 @@ impl Catalog { Ok(InitializeStateResult { state, - storage_collections_to_drop, migrated_storage_collections_0dt, new_builtin_collections: new_builtin_collections.into_iter().collect(), builtin_table_updates, @@ -588,7 +470,6 @@ impl Catalog { let InitializeStateResult { state, - storage_collections_to_drop, migrated_storage_collections_0dt, new_builtin_collections, mut builtin_table_updates, @@ -640,7 +521,6 @@ impl Catalog { Ok(OpenCatalogResult { catalog, - storage_collections_to_drop, migrated_storage_collections_0dt, new_builtin_collections, builtin_table_updates, @@ -663,7 +543,6 @@ impl Catalog { storage_collections: &Arc< dyn StorageCollections + Send + Sync, >, - storage_collections_to_drop: BTreeSet, ) -> Result<(), mz_catalog::durable::CatalogError> { let collections = self .entries() @@ -679,7 +558,7 @@ impl Catalog { let mut txn = storage.transaction().await?; storage_collections - .initialize_state(&mut txn, collections, storage_collections_to_drop) + .initialize_state(&mut txn, collections) .await .map_err(mz_catalog::durable::DurableCatalogError::from)?; @@ -702,7 +581,6 @@ impl Catalog { config: mz_controller::ControllerConfig, envd_epoch: core::num::NonZeroI64, read_only: bool, - storage_collections_to_drop: BTreeSet, ) -> Result, mz_catalog::durable::CatalogError> { let controller_start = Instant::now(); @@ -726,7 +604,7 @@ impl Catalog { mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await }; - self.initialize_storage_state(&controller.storage_collections, storage_collections_to_drop) + self.initialize_storage_state(&controller.storage_collections) .await?; info!( @@ -737,277 +615,6 @@ impl Catalog { Ok(controller) } - /// The objects in the catalog form one or more DAGs (directed acyclic graph) via object - /// dependencies. To migrate a builtin object we must drop that object along with all of its - /// descendants, and then recreate that object along with all of its descendants using new - /// [`CatalogItemId`]s. To achieve this we perform a DFS (depth first search) on the catalog - /// items starting with the nodes that correspond to builtin objects that have changed schemas. - /// - /// Objects need to be dropped starting from the leafs of the DAG going up towards the roots, - /// and they need to be recreated starting at the roots of the DAG and going towards the leafs. - fn generate_builtin_migration_metadata( - state: &CatalogState, - txn: &mut Transaction<'_>, - migrated_ids: Vec, - id_fingerprint_map: BTreeMap, - ) -> Result { - // First obtain a topological sorting of all migrated objects and their children. - let mut visited_set = BTreeSet::new(); - let mut sorted_entries = Vec::new(); - for item_id in migrated_ids { - if !visited_set.contains(&item_id) { - let migrated_topological_sort = - Catalog::topological_sort(state, item_id, &mut visited_set); - sorted_entries.extend(migrated_topological_sort); - } - } - sorted_entries.reverse(); - - // Then process all objects in sorted order. - let mut migration_metadata = BuiltinMigrationMetadata::new(); - let mut ancestor_ids = BTreeMap::new(); - let mut migrated_log_ids = BTreeMap::new(); - let log_name_map: BTreeMap<_, _> = BUILTINS::logs() - .map(|log| (log.variant.clone(), log.name)) - .collect(); - for entry in sorted_entries { - let id = entry.id(); - - let (new_item_id, new_global_id) = match id { - CatalogItemId::System(_) => txn.allocate_system_item_ids(1)?.into_element(), - CatalogItemId::IntrospectionSourceIndex(id) => ( - CatalogItemId::IntrospectionSourceIndex(id), - GlobalId::IntrospectionSourceIndex(id), - ), - CatalogItemId::User(_) => txn.allocate_user_item_ids(1)?.into_element(), - _ => unreachable!("can't migrate id: {id}"), - }; - - let name = state.resolve_full_name(entry.name(), None); - info!("migrating {name} from {id} to {new_item_id}"); - - // Generate value to update fingerprint and global ID persisted mapping for system objects. - // Not every system object has a fingerprint, like introspection source indexes. - if let Some(fingerprint) = id_fingerprint_map.get(&id) { - assert!( - id.is_system(), - "id_fingerprint_map should only contain builtin objects" - ); - let schema_name = state - .get_schema( - &entry.name().qualifiers.database_spec, - &entry.name().qualifiers.schema_spec, - entry.conn_id().unwrap_or(&SYSTEM_CONN_ID), - ) - .name - .schema - .as_str(); - migration_metadata.migrated_system_object_mappings.insert( - id, - SystemObjectMapping { - description: SystemObjectDescription { - schema_name: schema_name.to_string(), - object_type: entry.item_type(), - object_name: entry.name().item.clone(), - }, - unique_identifier: SystemObjectUniqueIdentifier { - catalog_id: new_item_id, - global_id: new_global_id, - fingerprint: fingerprint.clone(), - }, - }, - ); - } - - ancestor_ids.insert(id, new_item_id); - - if entry.item().is_storage_collection() { - migration_metadata - .previous_storage_collection_ids - .extend(entry.global_ids()); - } - - // Push drop commands. - match entry.item() { - CatalogItem::Log(log) => { - migrated_log_ids.insert(log.global_id(), log.variant.clone()); - } - CatalogItem::Index(index) => { - if id.is_system() { - if let Some(variant) = migrated_log_ids.get(&index.on) { - migration_metadata - .introspection_source_index_updates - .entry(index.cluster_id) - .or_default() - .push(( - variant.clone(), - log_name_map - .get(variant) - .expect("all variants have a name") - .to_string(), - new_item_id, - new_global_id, - entry.oid(), - )); - } - } - } - CatalogItem::Table(_) - | CatalogItem::Source(_) - | CatalogItem::MaterializedView(_) - | CatalogItem::ContinualTask(_) => { - // Storage objects don't have any external objects to drop. - } - CatalogItem::Sink(_) => { - // Sinks don't have any external objects to drop--however, - // this would change if we add a collections for sinks - // database-issues#5148. - } - CatalogItem::View(_) => { - // Views don't have any external objects to drop. - } - CatalogItem::Type(_) - | CatalogItem::Func(_) - | CatalogItem::Secret(_) - | CatalogItem::Connection(_) => unreachable!( - "impossible to migrate schema for builtin {}", - entry.item().typ() - ), - } - if id.is_user() { - migration_metadata.user_item_drop_ops.push(id); - } - - // Push create commands. - let name = entry.name().clone(); - if id.is_user() { - let schema_id = name.qualifiers.schema_spec.clone().into(); - let item_rebuilder = CatalogItemRebuilder::new(entry, new_item_id, &ancestor_ids); - migration_metadata.user_item_create_ops.push(CreateOp { - id: new_item_id, - oid: entry.oid(), - global_id: new_global_id, - schema_id, - name: name.item.clone(), - owner_id: entry.owner_id().clone(), - privileges: entry.privileges().clone(), - item_rebuilder, - }); - } - } - - // Reverse drop commands. - migration_metadata.user_item_drop_ops.reverse(); - - Ok(migration_metadata) - } - - fn topological_sort<'a, 'b>( - state: &'a CatalogState, - id: CatalogItemId, - visited_set: &'b mut BTreeSet, - ) -> Vec<&'a CatalogEntry> { - let mut sorted_entries = Vec::new(); - visited_set.insert(id); - let entry = state.get_entry(&id); - for dependant in entry.used_by() { - if !visited_set.contains(dependant) { - let child_topological_sort = - Catalog::topological_sort(state, *dependant, visited_set); - sorted_entries.extend(child_topological_sort); - } - } - sorted_entries.push(entry); - sorted_entries - } - - #[mz_ore::instrument] - async fn apply_builtin_migration( - state: &mut CatalogState, - txn: &mut Transaction<'_>, - migration_metadata: &mut BuiltinMigrationMetadata, - ) -> Result>, Error> { - for id in &migration_metadata.user_item_drop_ops { - let entry = state.get_entry(id); - if entry.is_sink() { - let full_name = state.resolve_full_name(entry.name(), None); - error!( - "user sink {full_name} will be recreated as part of a builtin migration which \ - can result in duplicate data being emitted. This is a known issue, \ - https://github.com/MaterializeInc/database-issues/issues/5553. Please inform the \ - customer that their sink may produce duplicate data." - ) - } - } - - let mut builtin_table_updates = Vec::new(); - txn.remove_items(&migration_metadata.user_item_drop_ops.drain(..).collect())?; - txn.update_system_object_mappings(std::mem::take( - &mut migration_metadata.migrated_system_object_mappings, - ))?; - txn.update_introspection_source_index_gids( - std::mem::take(&mut migration_metadata.introspection_source_index_updates) - .into_iter() - .map(|(cluster_id, updates)| { - ( - cluster_id, - updates - .into_iter() - .map(|(_variant, name, item_id, index_id, oid)| { - (name, item_id, index_id, oid) - }), - ) - }), - )?; - let updates = txn.get_and_commit_op_updates(); - let builtin_table_update = state - .apply_updates_for_bootstrap(updates, &mut LocalExpressionCache::Closed) - .await; - builtin_table_updates.extend(builtin_table_update); - for CreateOp { - id, - oid, - global_id, - schema_id, - name, - owner_id, - privileges, - item_rebuilder, - } in migration_metadata.user_item_create_ops.drain(..) - { - // Builtin Items can't be versioned. - let versions = BTreeMap::new(); - let item = item_rebuilder.build(global_id, state, &versions); - let (create_sql, expect_gid, expect_versions) = item.to_serialized(); - assert_eq!( - global_id, expect_gid, - "serializing a CatalogItem changed the GlobalId" - ); - assert_eq!( - versions, expect_versions, - "serializing a CatalogItem changed the Versions" - ); - - txn.insert_item( - id, - oid, - global_id, - schema_id, - &name, - create_sql, - owner_id.clone(), - privileges.all_values_owned().collect(), - versions, - )?; - let updates = txn.get_and_commit_op_updates(); - let builtin_table_update = state - .apply_updates_for_bootstrap(updates, &mut LocalExpressionCache::Closed) - .await; - builtin_table_updates.extend(builtin_table_update); - } - Ok(builtin_table_updates) - } - /// Politely releases all external resources that can only be released in an async context. pub async fn expire(self) { // If no one else holds a reference to storage, then clean up the storage resources. @@ -1570,705 +1177,3 @@ fn get_dyncfg_val_from_defaults_and_remote( } val } - -#[cfg(test)] -mod builtin_migration_tests { - use std::collections::{BTreeMap, BTreeSet}; - - use itertools::Itertools; - use mz_catalog::memory::objects::{ - CatalogItem, Index, MaterializedView, Table, TableDataSource, - }; - use mz_catalog::SYSTEM_CONN_ID; - use mz_controller_types::ClusterId; - use mz_expr::MirRelationExpr; - use mz_ore::id_gen::Gen; - use mz_repr::{ - CatalogItemId, GlobalId, RelationDesc, RelationType, RelationVersion, ScalarType, - VersionedRelationDesc, - }; - use mz_sql::catalog::CatalogDatabase; - use mz_sql::names::{ - DependencyIds, ItemQualifiers, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, - }; - use mz_sql::session::user::MZ_SYSTEM_ROLE_ID; - use mz_sql::DEFAULT_SCHEMA; - use mz_sql_parser::ast::Expr; - - use crate::catalog::{Catalog, Op, OptimizedMirRelationExpr}; - use crate::session::DEFAULT_DATABASE_NAME; - - enum ItemNamespace { - System, - User, - } - - enum SimplifiedItem { - Table, - MaterializedView { referenced_names: Vec }, - Index { on: String }, - } - - struct SimplifiedCatalogEntry { - name: String, - namespace: ItemNamespace, - item: SimplifiedItem, - } - - impl SimplifiedCatalogEntry { - // A lot of the fields here aren't actually used in the test so we can fill them in with dummy - // values. - fn to_catalog_item( - self, - item_id_mapping: &BTreeMap, - global_id_mapping: &BTreeMap, - global_id_gen: &mut Gen, - ) -> (String, ItemNamespace, CatalogItem, GlobalId) { - let global_id = GlobalId::User(global_id_gen.allocate_id()); - let desc = RelationDesc::builder() - .with_column("a", ScalarType::Int32.nullable(true)) - .with_key(vec![0]) - .finish(); - let item = match self.item { - SimplifiedItem::Table => CatalogItem::Table(Table { - create_sql: Some("CREATE TABLE materialize.public.t (a INT)".to_string()), - desc: VersionedRelationDesc::new(desc), - collections: [(RelationVersion::root(), global_id)].into_iter().collect(), - conn_id: None, - resolved_ids: ResolvedIds::empty(), - custom_logical_compaction_window: None, - is_retained_metrics_object: false, - data_source: TableDataSource::TableWrites { - defaults: vec![Expr::null(); 1], - }, - }), - SimplifiedItem::MaterializedView { referenced_names } => { - let table_list = referenced_names - .iter() - .map(|table| format!("materialize.public.{table}")) - .join(","); - let column_list = referenced_names - .iter() - .enumerate() - .map(|(idx, _)| format!("a{idx}")) - .join(","); - let resolved_ids = - convert_names_to_ids(referenced_names, item_id_mapping, global_id_mapping); - - CatalogItem::MaterializedView(MaterializedView { - global_id, - create_sql: format!( - "CREATE MATERIALIZED VIEW materialize.public.mv ({column_list}) AS SELECT * FROM {table_list}" - ), - raw_expr: mz_sql::plan::HirRelationExpr::constant( - Vec::new(), - RelationType { - column_types: Vec::new(), - keys: Vec::new(), - }, - ).into(), - dependencies: DependencyIds(Default::default()), - optimized_expr: OptimizedMirRelationExpr(MirRelationExpr::Constant { - rows: Ok(Vec::new()), - typ: RelationType { - column_types: Vec::new(), - keys: Vec::new(), - }, - }).into(), - desc: RelationDesc::builder() - .with_column("a", ScalarType::Int32.nullable(true)) - .with_key(vec![0]) - .finish(), - resolved_ids: resolved_ids.into_iter().collect(), - cluster_id: ClusterId::user(1).expect("1 is a valid ID"), - non_null_assertions: vec![], - custom_logical_compaction_window: None, - refresh_schedule: None, - initial_as_of: None, - }) - } - SimplifiedItem::Index { on } => { - let on_item_id = item_id_mapping[&on]; - let on_gid = global_id_mapping[&on]; - CatalogItem::Index(Index { - create_sql: format!("CREATE INDEX idx ON materialize.public.{on} (a)"), - global_id, - on: on_gid, - keys: Default::default(), - conn_id: None, - resolved_ids: [(on_item_id, on_gid)].into_iter().collect(), - cluster_id: ClusterId::user(1).expect("1 is a valid ID"), - custom_logical_compaction_window: None, - is_retained_metrics_object: false, - }) - } - }; - (self.name, self.namespace, item, global_id) - } - } - - struct BuiltinMigrationTestCase { - test_name: &'static str, - initial_state: Vec, - migrated_names: Vec, - expected_previous_storage_collection_names: Vec, - expected_migrated_system_object_mappings: Vec, - expected_user_item_drop_ops: Vec, - expected_user_item_create_ops: Vec, - } - - async fn add_item( - catalog: &mut Catalog, - name: String, - item: CatalogItem, - item_namespace: ItemNamespace, - ) -> CatalogItemId { - let id_ts = catalog.storage().await.current_upper().await; - let (item_id, _) = match item_namespace { - ItemNamespace::User => catalog - .allocate_user_id(id_ts) - .await - .expect("cannot fail to allocate user ids"), - ItemNamespace::System => catalog - .allocate_system_id(id_ts) - .await - .expect("cannot fail to allocate system ids"), - }; - let database_id = catalog - .resolve_database(DEFAULT_DATABASE_NAME) - .expect("failed to resolve default database") - .id(); - let database_spec = ResolvedDatabaseSpecifier::Id(database_id); - let schema_spec = catalog - .resolve_schema_in_database(&database_spec, DEFAULT_SCHEMA, &SYSTEM_CONN_ID) - .expect("failed to resolve default schemas") - .id - .clone(); - - let commit_ts = catalog.storage().await.current_upper().await; - catalog - .transact( - None, - commit_ts, - None, - vec![Op::CreateItem { - id: item_id, - name: QualifiedItemName { - qualifiers: ItemQualifiers { - database_spec, - schema_spec, - }, - item: name, - }, - item, - owner_id: MZ_SYSTEM_ROLE_ID, - }], - ) - .await - .expect("failed to transact"); - - item_id - } - - fn convert_names_to_ids( - name_vec: Vec, - item_id_lookup: &BTreeMap, - global_id_lookup: &BTreeMap, - ) -> BTreeMap { - name_vec - .into_iter() - .map(|name| { - let item_id = item_id_lookup[&name]; - let global_id = global_id_lookup[&name]; - (item_id, global_id) - }) - .collect() - } - - fn convert_ids_to_names>( - ids: I, - name_lookup: &BTreeMap, - ) -> BTreeSet { - ids.into_iter().map(|id| name_lookup[&id].clone()).collect() - } - - fn convert_global_ids_to_names>( - ids: I, - global_id_lookup: &BTreeMap, - ) -> BTreeSet { - ids.into_iter() - .flat_map(|id_a| { - global_id_lookup - .iter() - .filter_map(move |(name, id_b)| (id_a == *id_b).then_some(name)) - }) - .cloned() - .collect() - } - - async fn run_test_case(test_case: BuiltinMigrationTestCase) { - Catalog::with_debug_in_bootstrap(|mut catalog| async move { - let mut item_id_mapping = BTreeMap::new(); - let mut name_mapping = BTreeMap::new(); - - let mut global_id_gen = Gen::::default(); - let mut global_id_mapping = BTreeMap::new(); - - for entry in test_case.initial_state { - let (name, namespace, item, global_id) = - entry.to_catalog_item(&item_id_mapping, &global_id_mapping, &mut global_id_gen); - let item_id = add_item(&mut catalog, name.clone(), item, namespace).await; - - item_id_mapping.insert(name.clone(), item_id); - global_id_mapping.insert(name.clone(), global_id); - name_mapping.insert(item_id, name); - } - - let migrated_ids = test_case - .migrated_names - .into_iter() - .map(|name| item_id_mapping[&name]) - .collect(); - let id_fingerprint_map: BTreeMap = item_id_mapping - .iter() - .filter(|(_name, id)| id.is_system()) - // We don't use the new fingerprint in this test, so we can just hard code it - .map(|(_name, id)| (*id, "".to_string())) - .collect(); - - let migration_metadata = { - // This cloning is a hacky way to appease the borrow checker. It doesn't really - // matter because we never look at catalog again. We could probably rewrite this - // test to not even need a `Catalog` which would significantly speed it up. - let state = catalog.state.clone(); - let mut storage = catalog.storage().await; - let mut txn = storage - .transaction() - .await - .expect("failed to create transaction"); - Catalog::generate_builtin_migration_metadata( - &state, - &mut txn, - migrated_ids, - id_fingerprint_map, - ) - .expect("failed to generate builtin migration metadata") - }; - - assert_eq!( - convert_global_ids_to_names( - migration_metadata - .previous_storage_collection_ids - .into_iter(), - &global_id_mapping - ), - test_case - .expected_previous_storage_collection_names - .into_iter() - .collect(), - "{} test failed with wrong previous collection_names", - test_case.test_name - ); - assert_eq!( - migration_metadata - .migrated_system_object_mappings - .values() - .map(|mapping| mapping.description.object_name.clone()) - .collect::>(), - test_case - .expected_migrated_system_object_mappings - .into_iter() - .collect(), - "{} test failed with wrong migrated system object mappings", - test_case.test_name - ); - assert_eq!( - convert_ids_to_names( - migration_metadata.user_item_drop_ops.into_iter(), - &name_mapping - ), - test_case.expected_user_item_drop_ops.into_iter().collect(), - "{} test failed with wrong user drop ops", - test_case.test_name - ); - assert_eq!( - migration_metadata - .user_item_create_ops - .into_iter() - .map(|create_op| create_op.name) - .collect::>(), - test_case - .expected_user_item_create_ops - .into_iter() - .collect(), - "{} test failed with wrong user create ops", - test_case.test_name - ); - catalog.expire().await; - }) - .await - } - - #[mz_ore::test(tokio::test)] - #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` - async fn test_builtin_migration_no_migrations() { - let test_case = BuiltinMigrationTestCase { - test_name: "no_migrations", - initial_state: vec![SimplifiedCatalogEntry { - name: "s1".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::Table, - }], - migrated_names: vec![], - expected_previous_storage_collection_names: vec![], - expected_migrated_system_object_mappings: vec![], - expected_user_item_drop_ops: vec![], - expected_user_item_create_ops: vec![], - }; - run_test_case(test_case).await; - } - - #[mz_ore::test(tokio::test)] - #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` - async fn test_builtin_migration_single_migrations() { - let test_case = BuiltinMigrationTestCase { - test_name: "single_migrations", - initial_state: vec![SimplifiedCatalogEntry { - name: "s1".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::Table, - }], - migrated_names: vec!["s1".to_string()], - expected_previous_storage_collection_names: vec!["s1".to_string()], - expected_migrated_system_object_mappings: vec!["s1".to_string()], - expected_user_item_drop_ops: vec![], - expected_user_item_create_ops: vec![], - }; - run_test_case(test_case).await; - } - - #[mz_ore::test(tokio::test)] - #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` - async fn test_builtin_migration_child_migrations() { - let test_case = BuiltinMigrationTestCase { - test_name: "child_migrations", - initial_state: vec![ - SimplifiedCatalogEntry { - name: "s1".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::Table, - }, - SimplifiedCatalogEntry { - name: "u1".to_string(), - namespace: ItemNamespace::User, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s1".to_string()], - }, - }, - ], - migrated_names: vec!["s1".to_string()], - expected_previous_storage_collection_names: vec!["u1".to_string(), "s1".to_string()], - expected_migrated_system_object_mappings: vec!["s1".to_string()], - expected_user_item_drop_ops: vec!["u1".to_string()], - expected_user_item_create_ops: vec!["u1".to_string()], - }; - run_test_case(test_case).await; - } - - #[mz_ore::test(tokio::test)] - #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` - async fn test_builtin_migration_multi_child_migrations() { - let test_case = BuiltinMigrationTestCase { - test_name: "multi_child_migrations", - initial_state: vec![ - SimplifiedCatalogEntry { - name: "s1".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::Table, - }, - SimplifiedCatalogEntry { - name: "u1".to_string(), - namespace: ItemNamespace::User, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s1".to_string()], - }, - }, - SimplifiedCatalogEntry { - name: "u2".to_string(), - namespace: ItemNamespace::User, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s1".to_string()], - }, - }, - ], - migrated_names: vec!["s1".to_string()], - expected_previous_storage_collection_names: vec![ - "u1".to_string(), - "u2".to_string(), - "s1".to_string(), - ], - expected_migrated_system_object_mappings: vec!["s1".to_string()], - expected_user_item_drop_ops: vec!["u1".to_string(), "u2".to_string()], - expected_user_item_create_ops: vec!["u2".to_string(), "u1".to_string()], - }; - run_test_case(test_case).await; - } - - #[mz_ore::test(tokio::test)] - #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` - async fn test_builtin_migration_topological_sort() { - let test_case = BuiltinMigrationTestCase { - test_name: "topological_sort", - initial_state: vec![ - SimplifiedCatalogEntry { - name: "s1".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::Table, - }, - SimplifiedCatalogEntry { - name: "s2".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::Table, - }, - SimplifiedCatalogEntry { - name: "u1".to_string(), - namespace: ItemNamespace::User, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s2".to_string()], - }, - }, - SimplifiedCatalogEntry { - name: "u2".to_string(), - namespace: ItemNamespace::User, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s1".to_string(), "u1".to_string()], - }, - }, - ], - migrated_names: vec!["s1".to_string(), "s2".to_string()], - expected_previous_storage_collection_names: vec![ - "u2".to_string(), - "u1".to_string(), - "s1".to_string(), - "s2".to_string(), - ], - expected_migrated_system_object_mappings: vec!["s1".to_string(), "s2".to_string()], - expected_user_item_drop_ops: vec!["u2".to_string(), "u1".to_string()], - expected_user_item_create_ops: vec!["u1".to_string(), "u2".to_string()], - }; - run_test_case(test_case).await; - } - - #[mz_ore::test(tokio::test)] - #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` - async fn test_builtin_migration_topological_sort_complex() { - let test_case = BuiltinMigrationTestCase { - test_name: "topological_sort_complex", - initial_state: vec![ - SimplifiedCatalogEntry { - name: "s273".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::Table, - }, - SimplifiedCatalogEntry { - name: "s322".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::Table, - }, - SimplifiedCatalogEntry { - name: "s317".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::Table, - }, - SimplifiedCatalogEntry { - name: "s349".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s273".to_string()], - }, - }, - SimplifiedCatalogEntry { - name: "s421".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s273".to_string()], - }, - }, - SimplifiedCatalogEntry { - name: "s295".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s273".to_string()], - }, - }, - SimplifiedCatalogEntry { - name: "s296".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s295".to_string()], - }, - }, - SimplifiedCatalogEntry { - name: "s320".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s295".to_string()], - }, - }, - SimplifiedCatalogEntry { - name: "s340".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s295".to_string()], - }, - }, - SimplifiedCatalogEntry { - name: "s318".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s295".to_string()], - }, - }, - SimplifiedCatalogEntry { - name: "s323".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s295".to_string(), "s322".to_string()], - }, - }, - SimplifiedCatalogEntry { - name: "s330".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s318".to_string(), "s317".to_string()], - }, - }, - SimplifiedCatalogEntry { - name: "s321".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s318".to_string()], - }, - }, - SimplifiedCatalogEntry { - name: "s315".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s296".to_string()], - }, - }, - SimplifiedCatalogEntry { - name: "s354".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s296".to_string()], - }, - }, - SimplifiedCatalogEntry { - name: "s327".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s296".to_string()], - }, - }, - SimplifiedCatalogEntry { - name: "s339".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s296".to_string()], - }, - }, - SimplifiedCatalogEntry { - name: "s355".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::MaterializedView { - referenced_names: vec!["s315".to_string()], - }, - }, - ], - migrated_names: vec![ - "s273".to_string(), - "s317".to_string(), - "s318".to_string(), - "s320".to_string(), - "s321".to_string(), - "s322".to_string(), - "s323".to_string(), - "s330".to_string(), - "s339".to_string(), - "s340".to_string(), - ], - expected_previous_storage_collection_names: vec![ - "s349".to_string(), - "s421".to_string(), - "s355".to_string(), - "s315".to_string(), - "s354".to_string(), - "s327".to_string(), - "s339".to_string(), - "s296".to_string(), - "s320".to_string(), - "s340".to_string(), - "s330".to_string(), - "s321".to_string(), - "s318".to_string(), - "s323".to_string(), - "s295".to_string(), - "s273".to_string(), - "s317".to_string(), - "s322".to_string(), - ], - expected_migrated_system_object_mappings: vec![ - "s322".to_string(), - "s317".to_string(), - "s273".to_string(), - "s295".to_string(), - "s323".to_string(), - "s318".to_string(), - "s321".to_string(), - "s330".to_string(), - "s340".to_string(), - "s320".to_string(), - "s296".to_string(), - "s339".to_string(), - "s327".to_string(), - "s354".to_string(), - "s315".to_string(), - "s355".to_string(), - "s421".to_string(), - "s349".to_string(), - ], - expected_user_item_drop_ops: vec![], - expected_user_item_create_ops: vec![], - }; - run_test_case(test_case).await; - } - - #[mz_ore::test(tokio::test)] - #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` - async fn test_builtin_migration_system_child_migrations() { - let test_case = BuiltinMigrationTestCase { - test_name: "system_child_migrations", - initial_state: vec![ - SimplifiedCatalogEntry { - name: "s1".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::Table, - }, - SimplifiedCatalogEntry { - name: "s2".to_string(), - namespace: ItemNamespace::System, - item: SimplifiedItem::Index { - on: "s1".to_string(), - }, - }, - ], - migrated_names: vec!["s1".to_string()], - expected_previous_storage_collection_names: vec!["s1".to_string()], - expected_migrated_system_object_mappings: vec!["s1".to_string(), "s2".to_string()], - expected_user_item_drop_ops: vec![], - expected_user_item_create_ops: vec![], - }; - run_test_case(test_case).await; - } -} diff --git a/src/adapter/src/catalog/open/builtin_item_migration.rs b/src/adapter/src/catalog/open/builtin_item_migration.rs index 077ffb1da93aa..50273314d7639 100644 --- a/src/adapter/src/catalog/open/builtin_item_migration.rs +++ b/src/adapter/src/catalog/open/builtin_item_migration.rs @@ -40,15 +40,13 @@ use tracing::{debug, error}; use crate::catalog::open::builtin_item_migration::persist_schema::{TableKey, TableKeySchema}; use crate::catalog::state::LocalExpressionCache; -use crate::catalog::{BuiltinTableUpdate, Catalog, CatalogState}; +use crate::catalog::{BuiltinTableUpdate, CatalogState}; /// The results of a builtin item migration. pub(crate) struct BuiltinItemMigrationResult { /// A vec of updates to apply to the builtin tables. pub(crate) builtin_table_updates: Vec>, - /// A set of storage collections to drop (only used by legacy migration). - pub(crate) storage_collections_to_drop: BTreeSet, - /// A set of new shards that may need to be initialized (only used by 0dt migration). + /// A set of new shards that may need to be initialized. pub(crate) migrated_storage_collections_0dt: BTreeSet, /// Some cleanup action to take once the migration has been made durable. pub(crate) cleanup_action: BoxFuture<'static, ()>, @@ -60,59 +58,20 @@ pub(crate) async fn migrate_builtin_items( txn: &mut Transaction<'_>, local_expr_cache: &mut LocalExpressionCache, migrated_builtins: Vec, - config: BuiltinItemMigrationConfig, -) -> Result { - match config { - BuiltinItemMigrationConfig::Legacy => { - migrate_builtin_items_legacy(state, txn, migrated_builtins).await - } - BuiltinItemMigrationConfig::ZeroDownTime { - persist_client, - read_only, - } => { - migrate_builtin_items_0dt( - state, - txn, - local_expr_cache, - persist_client, - migrated_builtins, - read_only, - ) - .await - } - } -} - -/// The legacy method for builtin migrations is to drop all migrated items and all of their -/// dependents and re-create them all with the new schema and new global IDs. -async fn migrate_builtin_items_legacy( - state: &mut CatalogState, - txn: &mut Transaction<'_>, - migrated_builtins: Vec, + BuiltinItemMigrationConfig { + persist_client, + read_only, + }: BuiltinItemMigrationConfig, ) -> Result { - let id_fingerprint_map: BTreeMap<_, _> = BUILTINS::iter(&state.config().builtins_cfg) - .map(|builtin| { - let id = state.resolve_builtin_object(builtin); - let fingerprint = builtin.fingerprint(); - (id, fingerprint) - }) - .collect(); - let mut builtin_migration_metadata = Catalog::generate_builtin_migration_metadata( + migrate_builtin_items_0dt( state, txn, + local_expr_cache, + persist_client, migrated_builtins, - id_fingerprint_map, - )?; - let builtin_table_updates = - Catalog::apply_builtin_migration(state, txn, &mut builtin_migration_metadata).await?; - - let cleanup_action = async {}.boxed(); - Ok(BuiltinItemMigrationResult { - builtin_table_updates, - storage_collections_to_drop: builtin_migration_metadata.previous_storage_collection_ids, - migrated_storage_collections_0dt: BTreeSet::new(), - cleanup_action, - }) + read_only, + ) + .await } /// An implementation of builtin item migrations that is compatible with zero down-time upgrades. @@ -472,7 +431,6 @@ async fn migrate_builtin_items_0dt( Ok(BuiltinItemMigrationResult { builtin_table_updates, - storage_collections_to_drop: BTreeSet::new(), migrated_storage_collections_0dt, cleanup_action, }) diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 0bd74636268a3..a7d19ab15870f 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -3983,17 +3983,14 @@ pub fn serve( .open(controller_config.persist_location.clone()) .await .context("opening persist client")?; - let builtin_item_migration_config = if enable_0dt_deployment { - BuiltinItemMigrationConfig::ZeroDownTime { + let builtin_item_migration_config = + BuiltinItemMigrationConfig { persist_client: persist_client.clone(), read_only: read_only_controllers, } - } else { - BuiltinItemMigrationConfig::Legacy - }; + ; let OpenCatalogResult { mut catalog, - storage_collections_to_drop, migrated_storage_collections_0dt, new_builtin_collections, builtin_table_updates, @@ -4158,7 +4155,6 @@ pub fn serve( controller_config, controller_envd_epoch, read_only_controllers, - storage_collections_to_drop, ) }) .unwrap_or_terminate("failed to initialize storage_controller"); diff --git a/src/catalog-debug/src/main.rs b/src/catalog-debug/src/main.rs index 0e04188e1ba9b..aaba9b8f7a365 100644 --- a/src/catalog-debug/src/main.rs +++ b/src/catalog-debug/src/main.rs @@ -566,12 +566,12 @@ async fn upgrade_check( .clone(); let boot_ts = now().into(); + let read_only = true; // BOXED FUTURE: As of Nov 2023 the returned Future from this function was 7.5KB. This would // get stored on the stack which is bad for runtime performance, and blow up our stack usage. // Because of that we purposefully move this Future onto the heap (i.e. Box it). let InitializeStateResult { state, - storage_collections_to_drop: _, migrated_storage_collections_0dt: _, new_builtin_collections: _, builtin_table_updates: _, @@ -585,7 +585,7 @@ async fn upgrade_check( all_features: false, build_info: &BUILD_INFO, environment_id: args.environment_id.clone(), - read_only: true, + read_only, now, boot_ts, skip_migrations: false, @@ -610,7 +610,12 @@ async fn upgrade_check( secrets_reader, None, ), - builtin_item_migration_config: BuiltinItemMigrationConfig::Legacy, + builtin_item_migration_config: BuiltinItemMigrationConfig { + // We don't actually want to write anything down, so use an in-memory persist + // client. + persist_client: PersistClient::new_for_tests().await, + read_only, + }, persist_client: persist_client.clone(), enable_expression_cache_override: None, enable_0dt_deployment: true, diff --git a/src/catalog/src/config.rs b/src/catalog/src/config.rs index f4a0acb81ae10..e5324b9176335 100644 --- a/src/catalog/src/config.rs +++ b/src/catalog/src/config.rs @@ -94,12 +94,9 @@ pub struct StateConfig { } #[derive(Debug)] -pub enum BuiltinItemMigrationConfig { - Legacy, - ZeroDownTime { - persist_client: PersistClient, - read_only: bool, - }, +pub struct BuiltinItemMigrationConfig { + pub persist_client: PersistClient, + pub read_only: bool, } #[derive(Debug, Clone, Deserialize, Serialize)] diff --git a/src/compute-client/src/as_of_selection.rs b/src/compute-client/src/as_of_selection.rs index 154db123b74d2..faf132518f351 100644 --- a/src/compute-client/src/as_of_selection.rs +++ b/src/compute-client/src/as_of_selection.rs @@ -812,7 +812,6 @@ mod tests { &self, _txn: &mut (dyn StorageTxn + Send), _init_ids: BTreeSet, - _drop_ids: BTreeSet, ) -> Result<(), StorageError> { unimplemented!() } diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index 9f0460965fc14..a9115fd7668ad 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -93,14 +93,10 @@ pub trait StorageCollections: Debug { /// We get `init_ids`, which tells us about all collections that currently /// exist, so that we can record durable state for those that _we_ don't /// know yet about. - /// - /// We also get `drop_ids`, which tells us about all collections that we - /// might have known about before and have now been dropped. async fn initialize_state( &self, txn: &mut (dyn StorageTxn + Send), init_ids: BTreeSet, - drop_ids: BTreeSet, ) -> Result<(), StorageError>; /// Update storage configuration with new parameters. @@ -1318,7 +1314,6 @@ where &self, txn: &mut (dyn StorageTxn + Send), init_ids: BTreeSet, - drop_ids: BTreeSet, ) -> Result<(), StorageError> { let metadata = txn.get_collection_metadata(); let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect(); @@ -1327,12 +1322,16 @@ where let new_collections: BTreeSet = init_ids.difference(&existing_metadata).cloned().collect(); - self.prepare_state(txn, new_collections, drop_ids, BTreeMap::default()) - .await?; + self.prepare_state( + txn, + new_collections, + BTreeSet::default(), + BTreeMap::default(), + ) + .await?; // All shards that belong to collections dropped in the last epoch are - // eligible for finalization. This intentionally includes any built-in - // collections present in `drop_ids`. + // eligible for finalization. // // n.b. this introduces an unlikely race condition: if a collection is // dropped from the catalog, but the dataflow is still running on a