Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 9 additions & 126 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ use uuid::Uuid;

// DO NOT add any more imports from `crate` outside of `crate::catalog`.
pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
pub use crate::catalog::open::{
BuiltinMigrationMetadata, InitializeStateResult, OpenCatalogResult,
};
pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
pub use crate::catalog::state::CatalogState;
pub use crate::catalog::transact::{
DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult,
Expand Down Expand Up @@ -671,9 +669,9 @@ impl Catalog {
// debugging/testing.
let previous_ts = now().into();
let replica_size = &bootstrap_args.default_cluster_replica_size;
let read_only = false;
let OpenCatalogResult {
catalog,
storage_collections_to_drop: _,
migrated_storage_collections_0dt: _,
new_builtin_collections: _,
builtin_table_updates: _,
Expand All @@ -687,7 +685,7 @@ impl Catalog {
all_features: false,
build_info: &DUMMY_BUILD_INFO,
environment_id: environment_id.unwrap_or(EnvironmentId::for_tests()),
read_only: false,
read_only,
now,
boot_ts: previous_ts,
skip_migrations: true,
Expand All @@ -705,7 +703,10 @@ impl Catalog {
aws_privatelink_availability_zones: None,
http_host_name: None,
connection_context: ConnectionContext::for_tests(secrets_reader),
builtin_item_migration_config: BuiltinItemMigrationConfig::Legacy,
builtin_item_migration_config: BuiltinItemMigrationConfig {
persist_client: persist_client.clone(),
read_only,
},
persist_client,
enable_expression_cache_override,
enable_0dt_deployment: true,
Expand Down Expand Up @@ -2267,10 +2268,7 @@ mod tests {
use tokio_postgres::NoTls;
use uuid::Uuid;

use mz_catalog::builtin::{
Builtin, BuiltinType, UnsafeBuiltinTableFingerprintWhitespace, BUILTINS,
UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE,
};
use mz_catalog::builtin::{Builtin, BuiltinType, BUILTINS};
use mz_catalog::durable::{test_bootstrap_args, CatalogError, DurableCatalogError, FenceError};
use mz_catalog::SYSTEM_CONN_ID;
use mz_controller_types::{ClusterId, ReplicaId};
Expand All @@ -2285,9 +2283,7 @@ mod tests {
CatalogItemId, Datum, GlobalId, RelationType, RelationVersionSelector, RowArena,
ScalarType, Timestamp,
};
use mz_sql::catalog::{
BuiltinsConfig, CatalogDatabase, CatalogSchema, CatalogType, SessionCatalog,
};
use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
use mz_sql::func::{Func, FuncImpl, Operation, OP_IMPLS};
use mz_sql::names::{
self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
Expand Down Expand Up @@ -3507,119 +3503,6 @@ mod tests {
.await
}

#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
async fn test_builtin_migrations() {
let persist_client = PersistClient::new_for_tests().await;
let bootstrap_args = test_bootstrap_args();
let organization_id = Uuid::new_v4();
let mv_name = "mv";
let (mz_tables_id, mv_id) = {
let mut catalog = Catalog::open_debug_catalog(
persist_client.clone(),
organization_id.clone(),
&bootstrap_args,
)
.await
.expect("unable to open debug catalog");

// Create a materialized view over `mz_tables`.
let database_id = DatabaseId::User(1);
let database = catalog.get_database(&database_id);
let database_name = database.name();
let schemas = database.schemas();
let schema = schemas.first().expect("must have at least one schema");
let schema_spec = schema.id().clone();
let schema_name = &schema.name().schema;
let database_spec = ResolvedDatabaseSpecifier::Id(database_id);
let id_ts = catalog.storage().await.current_upper().await;
let (mv_id, mv_gid) = catalog
.allocate_user_id(id_ts)
.await
.expect("unable to allocate id");
let mv = catalog
.state()
.deserialize_item(
mv_gid,
&format!("CREATE MATERIALIZED VIEW {database_name}.{schema_name}.{mv_name} AS SELECT name FROM mz_tables"),
&BTreeMap::new(),
&mut LocalExpressionCache::Closed,
None,
)
.expect("unable to deserialize item");
let commit_ts = catalog.current_upper().await;
catalog
.transact(
None,
commit_ts,
None,
vec![Op::CreateItem {
id: mv_id,
name: QualifiedItemName {
qualifiers: ItemQualifiers {
database_spec,
schema_spec,
},
item: mv_name.to_string(),
},
item: mv,
owner_id: MZ_SYSTEM_ROLE_ID,
}],
)
.await
.expect("unable to transact");

let mz_tables_id = catalog
.entries()
.find(|entry| &entry.name.item == "mz_tables" && entry.is_table())
.expect("mz_tables doesn't exist")
.id();
let check_mv_id = catalog
.entries()
.find(|entry| &entry.name.item == mv_name && entry.is_materialized_view())
.unwrap_or_else(|| panic!("{mv_name} doesn't exist"))
.id();
assert_eq!(check_mv_id, mv_id);
catalog.expire().await;
(mz_tables_id, mv_id)
};
// Forcibly migrate all tables.
{
let mut guard =
UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE
.lock()
.expect("lock poisoned");
*guard = Some((
UnsafeBuiltinTableFingerprintWhitespace::All,
"\n".to_string(),
));
}
{
let catalog =
Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
.await
.expect("unable to open debug catalog");

let new_mz_tables_id = catalog
.entries()
.find(|entry| &entry.name.item == "mz_tables" && entry.is_table())
.expect("mz_tables doesn't exist")
.id();
// Assert that the table was migrated and got a new ID.
assert_ne!(new_mz_tables_id, mz_tables_id);

let new_mv_id = catalog
.entries()
.find(|entry| &entry.name.item == mv_name && entry.is_materialized_view())
.unwrap_or_else(|| panic!("{mv_name} doesn't exist"))
.id();
// Assert that the materialized view was migrated and got a new ID.
assert_ne!(new_mv_id, mv_id);

catalog.expire().await;
}
}

#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
async fn test_multi_subscriber_catalog() {
Expand Down
Loading