Skip to content

Commit 62d2c7d

Browse files
authored
Improve subsource planning (#31885)
Improve subsource planning by making various parts of the adapter more scalable. This PR includes changes to: * The *durable transaction* implementation to improve updates and validity checks. Updates are more efficient by cloning less and passing references where possible. Validity checks are more efficient by converting some that assert on name equality to a $n\cdot\log n$, where it previously was $n^2$. * *Planning subsources* is more efficient by allocating all IDs in one operation rather than one-by-one. * Avoiding *computing trace data* in the storage client when the level isn't high enough. Takes planning 10k subsources from I-terminated-it-after-one-hour to around one minute. ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post. --------- Signed-off-by: Moritz Hoffmann <mh@materialize.com>
1 parent 7ce5904 commit 62d2c7d

File tree

7 files changed

+313
-143
lines changed

7 files changed

+313
-143
lines changed

src/adapter/src/catalog.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,20 @@ impl Catalog {
781781
.err_into()
782782
}
783783

784+
/// Allocate `amount` many user IDs. See [`DurableCatalogState::allocate_user_ids`].
785+
pub async fn allocate_user_ids(
786+
&self,
787+
amount: u64,
788+
commit_ts: mz_repr::Timestamp,
789+
) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
790+
self.storage()
791+
.await
792+
.allocate_user_ids(amount, commit_ts)
793+
.await
794+
.maybe_terminate("allocating user ids")
795+
.err_into()
796+
}
797+
784798
pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
785799
let commit_ts = self.storage().await.current_upper().await;
786800
self.allocate_user_id(commit_ts).await

src/adapter/src/catalog/migrate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ pub(crate) async fn migrate(
9999
) -> Result<MigrateResult, anyhow::Error> {
100100
let catalog_version = tx.get_catalog_content_version();
101101
let catalog_version = match catalog_version {
102-
Some(v) => Version::parse(&v)?,
102+
Some(v) => Version::parse(v)?,
103103
None => Version::new(0, 0, 0),
104104
};
105105

src/adapter/src/catalog/open.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,8 @@ impl Catalog {
386386

387387
let last_seen_version = txn
388388
.get_catalog_content_version()
389-
.unwrap_or_else(|| "new".to_string());
389+
.unwrap_or("new")
390+
.to_string();
390391

391392
// Migrate item ASTs.
392393
let builtin_table_update = if !config.skip_migrations {
@@ -930,7 +931,7 @@ fn add_new_remove_old_builtin_introspection_source_migration(
930931
removed_indexes.extend(
931932
introspection_source_index_ids
932933
.into_keys()
933-
.map(|name| (cluster.id, name)),
934+
.map(|name| (cluster.id, name.to_string())),
934935
);
935936
}
936937
txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
@@ -1090,7 +1091,7 @@ fn remove_invalid_config_param_role_defaults_migration(
10901091
/// Cluster Replicas may be created ephemerally during an alter statement, these replicas
10911092
/// are marked as pending and should be cleaned up on catalog opsn.
10921093
fn remove_pending_cluster_replicas_migration(tx: &mut Transaction) -> Result<(), anyhow::Error> {
1093-
for replica in tx.get_cluster_replicas() {
1094+
for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
10941095
if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
10951096
replica.config.location
10961097
{

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -409,16 +409,18 @@ impl Coordinator {
409409
/// Because of this, we have usually "missed" the opportunity to plan them
410410
/// through the normal statement execution life cycle (the exception being
411411
/// during bootstrapping).
412-
pub(crate) async fn plan_subsource(
413-
&mut self,
412+
///
413+
/// The caller needs to provide a `CatalogItemId` and `GlobalId` for the sub-source.
414+
pub(crate) fn plan_subsource(
415+
&self,
414416
session: &Session,
415417
params: &mz_sql::plan::Params,
416418
subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
419+
item_id: CatalogItemId,
420+
global_id: GlobalId,
417421
) -> Result<CreateSourcePlanBundle, AdapterError> {
418422
let catalog = self.catalog().for_session(session);
419423
let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
420-
let id_ts = self.get_catalog_write_ts().await;
421-
let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
422424

423425
let plan = self.plan_statement(
424426
session,
@@ -467,10 +469,15 @@ impl Coordinator {
467469
let ingestion_id = source.global_id();
468470
let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
469471

470-
for subsource_stmt in subsource_stmts {
471-
let s = self
472-
.plan_subsource(session, &params, subsource_stmt)
473-
.await?;
472+
let id_ts = self.get_catalog_write_ts().await;
473+
let ids = self
474+
.catalog_mut()
475+
.allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
476+
.await?;
477+
for (subsource_stmt, (item_id, global_id)) in
478+
subsource_stmts.into_iter().zip(ids.into_iter())
479+
{
480+
let s = self.plan_subsource(session, &params, subsource_stmt, item_id, global_id)?;
474481
subsource_plans.push(s);
475482
}
476483

@@ -539,9 +546,10 @@ impl Coordinator {
539546
// guaranteeing that the shard ID is discoverable is to create this
540547
// collection first.
541548
assert_none!(progress_stmt.of_source);
542-
let progress_plan = self
543-
.plan_subsource(ctx.session(), &params, progress_stmt)
544-
.await?;
549+
let id_ts = self.get_catalog_write_ts().await;
550+
let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
551+
let progress_plan =
552+
self.plan_subsource(ctx.session(), &params, progress_stmt, item_id, global_id)?;
545553
let progress_full_name = self
546554
.catalog()
547555
.resolve_full_name(&progress_plan.plan.name, None);
@@ -599,8 +607,13 @@ impl Coordinator {
599607
});
600608

601609
// 3. Finally, plan all the subsources
602-
for stmt in subsource_stmts {
603-
let plan = self.plan_subsource(ctx.session(), &params, stmt).await?;
610+
let id_ts = self.get_catalog_write_ts().await;
611+
let ids = self
612+
.catalog_mut()
613+
.allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
614+
.await?;
615+
for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip(ids.into_iter()) {
616+
let plan = self.plan_subsource(ctx.session(), &params, stmt, item_id, global_id)?;
604617
create_source_plans.push(plan);
605618
}
606619

src/catalog/src/durable.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,24 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
336336
Ok(ids)
337337
}
338338

339+
/// Allocates and returns `amount` many user [`CatalogItemId`] and [`GlobalId`].
340+
///
341+
/// See [`Self::commit_transaction`] for details on `commit_ts`.
342+
async fn allocate_user_ids(
343+
&mut self,
344+
amount: u64,
345+
commit_ts: Timestamp,
346+
) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
347+
let ids = self
348+
.allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
349+
.await?;
350+
let ids = ids
351+
.iter()
352+
.map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
353+
.collect();
354+
Ok(ids)
355+
}
356+
339357
/// Allocates and returns both a user [`CatalogItemId`] and [`GlobalId`].
340358
///
341359
/// See [`Self::commit_transaction`] for details on `commit_ts`.

0 commit comments

Comments
 (0)