Skip to content

Commit bdc2209

Browse files
committed
Speed up subsource planning
Signed-off-by: Moritz Hoffmann <mh@materialize.com>
1 parent 69b597c commit bdc2209

File tree

7 files changed

+296
-127
lines changed

7 files changed

+296
-127
lines changed

src/adapter/src/catalog.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,20 @@ impl Catalog {
771771
.err_into()
772772
}
773773

774+
/// Allocate `amount` many user IDs. See [`DurableCatalogState::allocate_user_ids`].
775+
pub async fn allocate_user_ids(
776+
&self,
777+
amount: u64,
778+
commit_ts: mz_repr::Timestamp,
779+
) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
780+
self.storage()
781+
.await
782+
.allocate_user_ids(amount, commit_ts)
783+
.await
784+
.maybe_terminate("allocating user ids")
785+
.err_into()
786+
}
787+
774788
pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
775789
let commit_ts = self.storage().await.current_upper().await;
776790
self.allocate_user_id(commit_ts).await

src/adapter/src/catalog/migrate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ pub(crate) async fn migrate(
9999
) -> Result<MigrateResult, anyhow::Error> {
100100
let catalog_version = tx.get_catalog_content_version();
101101
let catalog_version = match catalog_version {
102-
Some(v) => Version::parse(&v)?,
102+
Some(v) => Version::parse(v)?,
103103
None => Version::new(0, 0, 0),
104104
};
105105

src/adapter/src/catalog/open.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,8 @@ impl Catalog {
386386

387387
let last_seen_version = txn
388388
.get_catalog_content_version()
389-
.unwrap_or_else(|| "new".to_string());
389+
.unwrap_or_else(|| "new")
390+
.to_string();
390391

391392
// Migrate item ASTs.
392393
let builtin_table_update = if !config.skip_migrations {
@@ -930,7 +931,7 @@ fn add_new_remove_old_builtin_introspection_source_migration(
930931
removed_indexes.extend(
931932
introspection_source_index_ids
932933
.into_keys()
933-
.map(|name| (cluster.id, name)),
934+
.map(|name| (cluster.id, name.to_string())),
934935
);
935936
}
936937
txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
@@ -1090,7 +1091,7 @@ fn remove_invalid_config_param_role_defaults_migration(
10901091
/// Cluster Replicas may be created ephemerally during an alter statement, these replicas
10911092
/// are marked as pending and should be cleaned up on catalog opsn.
10921093
fn remove_pending_cluster_replicas_migration(tx: &mut Transaction) -> Result<(), anyhow::Error> {
1093-
for replica in tx.get_cluster_replicas() {
1094+
for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
10941095
if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
10951096
replica.config.location
10961097
{

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -409,16 +409,18 @@ impl Coordinator {
409409
/// Because of this, we have usually "missed" the opportunity to plan them
410410
/// through the normal statement execution life cycle (the exception being
411411
/// during bootstrapping).
412+
///
413+
/// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
412414
pub(crate) async fn plan_subsource(
413415
&mut self,
414416
session: &Session,
415417
params: &mz_sql::plan::Params,
416418
subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
419+
item_id: CatalogItemId,
420+
global_id: GlobalId,
417421
) -> Result<CreateSourcePlanBundle, AdapterError> {
418422
let catalog = self.catalog().for_session(session);
419423
let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
420-
let id_ts = self.get_catalog_write_ts().await;
421-
let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
422424

423425
let plan = self.plan_statement(
424426
session,
@@ -467,9 +469,15 @@ impl Coordinator {
467469
let ingestion_id = source.global_id();
468470
let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
469471

472+
let id_ts = self.get_catalog_write_ts().await;
473+
let mut ids = self
474+
.catalog_mut()
475+
.allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
476+
.await?;
470477
for subsource_stmt in subsource_stmts {
478+
let (item_id, global_id) = ids.pop().unwrap();
471479
let s = self
472-
.plan_subsource(session, &params, subsource_stmt)
480+
.plan_subsource(session, &params, subsource_stmt, item_id, global_id)
473481
.await?;
474482
subsource_plans.push(s);
475483
}
@@ -539,8 +547,10 @@ impl Coordinator {
539547
// guaranteeing that the shard ID is discoverable is to create this
540548
// collection first.
541549
assert_none!(progress_stmt.of_source);
550+
let id_ts = self.get_catalog_write_ts().await;
551+
let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
542552
let progress_plan = self
543-
.plan_subsource(ctx.session(), &params, progress_stmt)
553+
.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)
544554
.await?;
545555
let progress_full_name = self
546556
.catalog()
@@ -599,8 +609,16 @@ impl Coordinator {
599609
});
600610

601611
// 3. Finally, plan all the subsources
612+
let id_ts = self.get_catalog_write_ts().await;
613+
let mut ids = self
614+
.catalog_mut()
615+
.allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
616+
.await?;
602617
for stmt in subsource_stmts {
603-
let plan = self.plan_subsource(ctx.session(), &params, stmt).await?;
618+
let (item_id, global_id) = ids.pop().unwrap();
619+
let plan = self
620+
.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)
621+
.await?;
604622
create_source_plans.push(plan);
605623
}
606624

src/catalog/src/durable.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,24 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
336336
Ok(ids)
337337
}
338338

339+
/// Allocates and returns `amount` many user [`CatalogItemId`] and [`GlobalId`].
340+
///
341+
/// See [`Self::commit_transaction`] for details on `commit_ts`.
342+
async fn allocate_user_ids(
343+
&mut self,
344+
amount: u64,
345+
commit_ts: Timestamp,
346+
) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
347+
let ids = self
348+
.allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
349+
.await?;
350+
let ids = ids
351+
.iter()
352+
.map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
353+
.collect();
354+
Ok(ids)
355+
}
356+
339357
/// Allocates and returns both a user [`CatalogItemId`] and [`GlobalId`].
340358
///
341359
/// See [`Self::commit_transaction`] for details on `commit_ts`.

0 commit comments

Comments
 (0)