Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,20 @@ impl Catalog {
.err_into()
}

/// Allocate `amount` many user IDs. See [`DurableCatalogState::allocate_user_ids`].
pub async fn allocate_user_ids(
&self,
amount: u64,
commit_ts: mz_repr::Timestamp,
) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
self.storage()
.await
.allocate_user_ids(amount, commit_ts)
.await
.maybe_terminate("allocating user ids")
.err_into()
}

pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
let commit_ts = self.storage().await.current_upper().await;
self.allocate_user_id(commit_ts).await
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub(crate) async fn migrate(
) -> Result<MigrateResult, anyhow::Error> {
let catalog_version = tx.get_catalog_content_version();
let catalog_version = match catalog_version {
Some(v) => Version::parse(&v)?,
Some(v) => Version::parse(v)?,
None => Version::new(0, 0, 0),
};

Expand Down
7 changes: 4 additions & 3 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ impl Catalog {

let last_seen_version = txn
.get_catalog_content_version()
.unwrap_or_else(|| "new".to_string());
.unwrap_or_else(|| "new")
.to_string();

// Migrate item ASTs.
let builtin_table_update = if !config.skip_migrations {
Expand Down Expand Up @@ -930,7 +931,7 @@ fn add_new_remove_old_builtin_introspection_source_migration(
removed_indexes.extend(
introspection_source_index_ids
.into_keys()
.map(|name| (cluster.id, name)),
.map(|name| (cluster.id, name.to_string())),
);
}
txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
Expand Down Expand Up @@ -1090,7 +1091,7 @@ fn remove_invalid_config_param_role_defaults_migration(
/// Cluster Replicas may be created ephemerally during an alter statement, these replicas
/// are marked as pending and should be cleaned up on catalog opsn.
fn remove_pending_cluster_replicas_migration(tx: &mut Transaction) -> Result<(), anyhow::Error> {
for replica in tx.get_cluster_replicas() {
for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
replica.config.location
{
Expand Down
28 changes: 23 additions & 5 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,16 +409,18 @@ impl Coordinator {
/// Because of this, we have usually "missed" the opportunity to plan them
/// through the normal statement execution life cycle (the exception being
/// during bootstrapping).
///
/// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
pub(crate) async fn plan_subsource(
&mut self,
session: &Session,
params: &mz_sql::plan::Params,
subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
item_id: CatalogItemId,
global_id: GlobalId,
) -> Result<CreateSourcePlanBundle, AdapterError> {
let catalog = self.catalog().for_session(session);
let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
let id_ts = self.get_catalog_write_ts().await;
let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;

let plan = self.plan_statement(
session,
Expand Down Expand Up @@ -467,9 +469,15 @@ impl Coordinator {
let ingestion_id = source.global_id();
let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;

let id_ts = self.get_catalog_write_ts().await;
let mut ids = self
.catalog_mut()
.allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
.await?;
for subsource_stmt in subsource_stmts {
let (item_id, global_id) = ids.pop().unwrap();
let s = self
.plan_subsource(session, &params, subsource_stmt)
.plan_subsource(session, &params, subsource_stmt, item_id, global_id)
.await?;
subsource_plans.push(s);
}
Expand Down Expand Up @@ -539,8 +547,10 @@ impl Coordinator {
// guaranteeing that the shard ID is discoverable is to create this
// collection first.
assert_none!(progress_stmt.of_source);
let id_ts = self.get_catalog_write_ts().await;
let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
let progress_plan = self
.plan_subsource(ctx.session(), &params, progress_stmt)
.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)
.await?;
let progress_full_name = self
.catalog()
Expand Down Expand Up @@ -599,8 +609,16 @@ impl Coordinator {
});

// 3. Finally, plan all the subsources
let id_ts = self.get_catalog_write_ts().await;
let mut ids = self
.catalog_mut()
.allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
.await?;
for stmt in subsource_stmts {
let plan = self.plan_subsource(ctx.session(), &params, stmt).await?;
let (item_id, global_id) = ids.pop().unwrap();
let plan = self
.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)
.await?;
create_source_plans.push(plan);
}

Expand Down
18 changes: 18 additions & 0 deletions src/catalog/src/durable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,24 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
Ok(ids)
}

/// Allocates and returns `amount` many user [`CatalogItemId`] and [`GlobalId`].
///
/// See [`Self::commit_transaction`] for details on `commit_ts`.
async fn allocate_user_ids(
&mut self,
amount: u64,
commit_ts: Timestamp,
) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
let ids = self
.allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
.await?;
let ids = ids
.iter()
.map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
.collect();
Ok(ids)
}

/// Allocates and returns both a user [`CatalogItemId`] and [`GlobalId`].
///
/// See [`Self::commit_transaction`] for details on `commit_ts`.
Expand Down
Loading