Skip to content
Merged
42 changes: 42 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2776,6 +2776,48 @@ mod tests {
.await;
}

#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
async fn verify_builtin_descs() {
Catalog::with_debug(|catalog| async move {
let conn_catalog = catalog.for_system_session();

let builtins_cfg = BuiltinsConfig {
include_continual_tasks: true,
};
for builtin in BUILTINS::iter(&builtins_cfg) {
let (schema, name, expected_desc) = match builtin {
Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
Builtin::View(v) => (&v.schema, &v.name, &v.desc),
Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
Builtin::Log(_)
| Builtin::Type(_)
| Builtin::Func(_)
| Builtin::ContinualTask(_)
| Builtin::Index(_)
| Builtin::Connection(_) => continue,
};
let item = conn_catalog
.resolve_item(&PartialItemName {
database: None,
schema: Some(schema.to_string()),
item: name.to_string(),
})
.expect("unable to resolve item")
.at_version(RelationVersionSelector::Latest);

let full_name = conn_catalog.resolve_full_name(item.name());
let actual_desc = item.desc(&full_name).expect("invalid item type");
assert_eq!(
&*actual_desc, expected_desc,
"item {schema}.{name} did not match its expected RelationDesc"
);
}
catalog.expire().await;
})
.await
}

// Connect to a running Postgres server and verify that our builtin
// types and functions match it, in addition to some other things.
#[mz_ore::test(tokio::test)]
Expand Down
47 changes: 44 additions & 3 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use mz_sql::catalog::{
BuiltinsConfig, CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars,
};
use mz_sql::func::OP_IMPLS;
use mz_sql::names::CommentObjectId;
use mz_sql::rbac;
use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
Expand Down Expand Up @@ -727,10 +728,13 @@ fn add_new_remove_old_builtin_items_migration(
}
});
let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
let new_builtins = new_builtins.into_iter().zip(new_builtin_ids.clone());
let new_builtins: Vec<_> = new_builtins
.into_iter()
.zip(new_builtin_ids.clone())
.collect();

// Look for migrated builtins.
for (builtin, system_object_mapping, fingerprint) in existing_builtins {
for (builtin, system_object_mapping, fingerprint) in existing_builtins.iter().cloned() {
if system_object_mapping.unique_identifier.fingerprint != fingerprint {
// `mz_storage_usage_by_shard` cannot be migrated for multiple reasons. Firstly,
// it was cause the table to be truncated because the contents are not also
Expand Down Expand Up @@ -760,7 +764,7 @@ fn add_new_remove_old_builtin_items_migration(
}

// Add new builtin items to catalog.
for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins {
for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
new_builtin_mappings.push(SystemObjectMapping {
description: SystemObjectDescription {
schema_name: builtin.schema().to_string(),
Expand Down Expand Up @@ -812,6 +816,43 @@ fn add_new_remove_old_builtin_items_migration(
}
txn.set_system_object_mappings(new_builtin_mappings)?;

// Update comments of all builtin objects
let builtins_with_catalog_ids = existing_builtins
.iter()
.map(|(b, m, _)| (*b, m.unique_identifier.catalog_id))
.chain(
new_builtins
.into_iter()
.map(|((b, _), (catalog_id, _))| (b, catalog_id)),
);

for (builtin, id) in builtins_with_catalog_ids {
let (comment_id, desc, comments) = match builtin {
Builtin::Source(s) => (CommentObjectId::Source(id), &s.desc, &s.column_comments),
Builtin::View(v) => (CommentObjectId::View(id), &v.desc, &v.column_comments),
Builtin::Table(t) => (CommentObjectId::Table(id), &t.desc, &t.column_comments),
Builtin::Log(_)
| Builtin::Type(_)
| Builtin::Func(_)
| Builtin::ContinualTask(_)
| Builtin::Index(_)
| Builtin::Connection(_) => continue,
};
txn.drop_comments(&BTreeSet::from_iter([comment_id]))?;

let mut comments = comments.clone();
for (col_idx, name) in desc.iter_names().enumerate() {
if let Some(comment) = comments.remove(name.as_str()) {
// Comment column indices are 1 based
txn.update_comment(comment_id, Some(col_idx + 1), Some(comment.to_owned()))?;
}
}
assert!(
comments.is_empty(),
"builtin object contains dangling comments that don't correspond to columns {comments:?}"
);
}

// Anything left in `system_object_mappings` must have been deleted and should be removed from
// the catalog.
let mut deleted_system_objects = BTreeSet::new();
Expand Down
Loading