Skip to content

Commit 9a4ee61

Browse files
authored
Merge pull request #29673 from aljoscha/adapter-synthesize-controller-commands
adapter: derive implications from catalog changes
2 parents 42c0ae7 + 74f4bd5 commit 9a4ee61

File tree

17 files changed

+2526
-986
lines changed

17 files changed

+2526
-986
lines changed

src/adapter/src/catalog.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ pub use crate::catalog::transact::{
9797
};
9898
use crate::command::CatalogDump;
9999
use crate::coord::TargetCluster;
100+
#[cfg(test)]
101+
use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
100102
use crate::session::{Portal, PreparedStatement, Session};
101103
use crate::util::ResultExt;
102104
use crate::{AdapterError, AdapterNotice, ExecuteResponse};
@@ -1509,10 +1511,16 @@ impl Catalog {
15091511
#[cfg(test)]
15101512
async fn sync_to_current_updates(
15111513
&mut self,
1512-
) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
1514+
) -> Result<
1515+
(
1516+
Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1517+
Vec<ParsedStateUpdate>,
1518+
),
1519+
CatalogError,
1520+
> {
15131521
let updates = self.storage().await.sync_to_current_updates().await?;
1514-
let builtin_table_updates = self.state.apply_updates(updates)?;
1515-
Ok(builtin_table_updates)
1522+
let (builtin_table_updates, catalog_updates) = self.state.apply_updates(updates)?;
1523+
Ok((builtin_table_updates, catalog_updates))
15161524
}
15171525
}
15181526

src/adapter/src/catalog/apply.rs

Lines changed: 220 additions & 52 deletions
Large diffs are not rendered by default.

src/adapter/src/catalog/migrate.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use uuid::Uuid;
4141
use crate::catalog::open::into_consolidatable_updates_startup;
4242
use crate::catalog::state::LocalExpressionCache;
4343
use crate::catalog::{BuiltinTableUpdate, CatalogState, ConnCatalog};
44+
use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
4445

4546
/// Catalog key of the `migration_version` setting.
4647
///
@@ -117,6 +118,7 @@ where
117118

118119
pub(crate) struct MigrateResult {
119120
pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
121+
pub(crate) catalog_updates: Vec<ParsedStateUpdate>,
120122
pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
121123
}
122124

@@ -192,7 +194,7 @@ pub(crate) async fn migrate(
192194
.expect("known parameter");
193195
}
194196

195-
let mut ast_builtin_table_updates = state
197+
let (mut ast_builtin_table_updates, mut ast_catalog_updates) = state
196198
.apply_updates_for_bootstrap(item_updates, local_expr_cache)
197199
.await;
198200

@@ -238,18 +240,21 @@ pub(crate) async fn migrate(
238240
// input and stages arbitrary transformations to the catalog on `tx`.
239241

240242
let op_item_updates = tx.get_and_commit_op_updates();
241-
let item_builtin_table_updates = state
243+
let (item_builtin_table_updates, item_catalog_updates) = state
242244
.apply_updates_for_bootstrap(op_item_updates, local_expr_cache)
243245
.await;
244246

245247
ast_builtin_table_updates.extend(item_builtin_table_updates);
248+
ast_catalog_updates.extend(item_catalog_updates);
246249

247250
info!(
248251
"migration from catalog version {:?} complete",
249252
catalog_version
250253
);
254+
251255
Ok(MigrateResult {
252256
builtin_table_updates: ast_builtin_table_updates,
257+
catalog_updates: ast_catalog_updates,
253258
post_item_updates,
254259
})
255260
}

src/adapter/src/catalog/open.rs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ impl Catalog {
313313
}
314314
}
315315

316-
let builtin_table_update = state
316+
let (builtin_table_update, _catalog_updates) = state
317317
.apply_updates_for_bootstrap(pre_item_updates, &mut LocalExpressionCache::Closed)
318318
.await;
319319
builtin_table_updates.extend(builtin_table_update);
@@ -410,7 +410,12 @@ impl Catalog {
410410
expr_cache_start.elapsed()
411411
);
412412

413-
let builtin_table_update = state
413+
// When initializing/bootstrapping, we don't use the catalog updates but
414+
// instead load the catalog fully and then go ahead and apply commands
415+
// to the controller(s). Maybe we _should_ instead use the same logic
416+
// and return and use the updates from here. But that's at the very
417+
// least future work.
418+
let (builtin_table_update, _catalog_updates) = state
414419
.apply_updates_for_bootstrap(system_item_updates, &mut local_expr_cache)
415420
.await;
416421
builtin_table_updates.extend(builtin_table_update);
@@ -426,7 +431,7 @@ impl Catalog {
426431
state.mock_authentication_nonce = Some(mz_authentication_mock_nonce);
427432

428433
// Migrate item ASTs.
429-
let builtin_table_update = if !config.skip_migrations {
434+
let (builtin_table_update, _catalog_updates) = if !config.skip_migrations {
430435
let migrate_result = migrate::migrate(
431436
&mut state,
432437
&mut txn,
@@ -456,7 +461,10 @@ impl Catalog {
456461
differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
457462
}
458463

459-
migrate_result.builtin_table_updates
464+
(
465+
migrate_result.builtin_table_updates,
466+
migrate_result.catalog_updates,
467+
)
460468
} else {
461469
state
462470
.apply_updates_for_bootstrap(item_updates, &mut local_expr_cache)
@@ -472,7 +480,7 @@ impl Catalog {
472480
diff: diff.try_into().expect("valid diff"),
473481
})
474482
.collect();
475-
let builtin_table_update = state
483+
let (builtin_table_update, _catalog_updates) = state
476484
.apply_updates_for_bootstrap(post_item_updates, &mut local_expr_cache)
477485
.await;
478486
builtin_table_updates.extend(builtin_table_update);
@@ -496,7 +504,13 @@ impl Catalog {
496504
.await?;
497505

498506
let state_updates = txn.get_and_commit_op_updates();
499-
let table_updates = state
507+
508+
// When initializing/bootstrapping, we don't use the catalog updates but
509+
// instead load the catalog fully and then go ahead and apply commands
510+
// to the controller(s). Maybe we _should_ instead use the same logic
511+
// and return and use the updates from here. But that's at the very
512+
// least future work.
513+
let (table_updates, _catalog_updates) = state
500514
.apply_updates_for_bootstrap(state_updates, &mut local_expr_cache)
501515
.await;
502516
builtin_table_updates.extend(table_updates);
@@ -642,8 +656,15 @@ impl Catalog {
642656
.map_err(mz_catalog::durable::DurableCatalogError::from)?;
643657

644658
let updates = txn.get_and_commit_op_updates();
645-
let builtin_updates = state.apply_updates(updates)?;
646-
assert!(builtin_updates.is_empty());
659+
let (builtin_updates, catalog_updates) = state.apply_updates(updates)?;
660+
assert!(
661+
builtin_updates.is_empty(),
662+
"storage is not allowed to generate catalog changes that would cause changes to builtin tables"
663+
);
664+
assert!(
665+
catalog_updates.is_empty(),
666+
"storage is not allowed to generate catalog changes that would change the catalog or controller state"
667+
);
647668
let commit_ts = txn.upper();
648669
txn.commit(commit_ts).await?;
649670
drop(storage);

src/adapter/src/catalog/state.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1533,7 +1533,7 @@ impl CatalogState {
15331533
/// Gets a reference to the specified replica of the specified cluster.
15341534
///
15351535
/// Panics if either the cluster or the replica does not exist.
1536-
pub(super) fn get_cluster_replica(
1536+
pub(crate) fn get_cluster_replica(
15371537
&self,
15381538
cluster_id: ClusterId,
15391539
replica_id: ReplicaId,
@@ -1629,6 +1629,13 @@ impl CatalogState {
16291629
.into_first()
16301630
}
16311631

1632+
pub(super) fn find_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1633+
self.temporary_schemas
1634+
.values()
1635+
.filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1636+
.into_first()
1637+
}
1638+
16321639
pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
16331640
self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
16341641
}

src/adapter/src/catalog/transact.rs

Lines changed: 81 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ use crate::catalog::{
7070
system_object_type_to_audit_object_type,
7171
};
7272
use crate::coord::ConnMeta;
73+
use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
7374
use crate::coord::cluster_scheduling::SchedulingDecision;
7475
use crate::util::ResultExt;
7576

@@ -327,6 +328,8 @@ impl ReplicaCreateDropReason {
327328

328329
pub struct TransactionResult {
329330
pub builtin_table_updates: Vec<BuiltinTableUpdate>,
331+
/// Parsed catalog updates from which we will derive catalog implications.
332+
pub catalog_updates: Vec<ParsedStateUpdate>,
330333
pub audit_events: Vec<VersionedEvent>,
331334
}
332335

@@ -421,6 +424,7 @@ impl Catalog {
421424

422425
let temporary_ids = self.temporary_ids(&ops, temporary_drops)?;
423426
let mut builtin_table_updates = vec![];
427+
let mut catalog_updates = vec![];
424428
let mut audit_events = vec![];
425429
let mut storage = self.storage().await;
426430
let mut tx = storage
@@ -435,6 +439,7 @@ impl Catalog {
435439
ops,
436440
temporary_ids,
437441
&mut builtin_table_updates,
442+
&mut catalog_updates,
438443
&mut audit_events,
439444
&mut tx,
440445
&self.state,
@@ -470,6 +475,7 @@ impl Catalog {
470475

471476
Ok(TransactionResult {
472477
builtin_table_updates,
478+
catalog_updates,
473479
audit_events,
474480
})
475481
}
@@ -491,10 +497,40 @@ impl Catalog {
491497
mut ops: Vec<Op>,
492498
temporary_ids: BTreeSet<CatalogItemId>,
493499
builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
500+
parsed_catalog_updates: &mut Vec<ParsedStateUpdate>,
494501
audit_events: &mut Vec<VersionedEvent>,
495502
tx: &mut Transaction<'_>,
496503
state: &CatalogState,
497504
) -> Result<Option<CatalogState>, AdapterError> {
505+
// We come up with new catalog state, builtin state updates, and parsed
506+
// catalog updates (for deriving catalog implications) in two phases:
507+
//
508+
// 1. We (cow)-clone catalog state as `preliminary_state` and apply ops
509+
// one-by-one. This will give us the full list of updates to apply to
510+
// the catalog, which will allow us to apply it in one batch, which
511+
// in turn will allow the apply machinery to consolidate the updates.
512+
// 2. We do one final apply call with all updates, which gives us the
513+
// final builtin table updates and parsed catalog updates.
514+
//
515+
// The reason is that the loop that is working off ops first does a
516+
// transact_op to derive the state updates for that op, and then calls
517+
// apply_updates on the catalog state. And successive ops might expect
518+
// the catalog state to reflect the modified state _after_ applying
519+
// previous ops.
520+
//
521+
// We want to, however, have one final apply_state that takes all the
522+
// accumulated updates to derive the required controller updates and the
523+
// builtin table updates.
524+
//
525+
// We won't win any DDL throughput benchmarks, but so far that's not
526+
// what we're optimizing for and there would probably be other
527+
// bottlenecks before we hit this one as a bottleneck.
528+
//
529+
// We could work around this by refactoring how the interplay of
530+
// transact_op and apply_updates works, but that's a larger undertaking.
531+
let mut preliminary_state = Cow::Borrowed(state);
532+
533+
// The final state that we will return, if modified.
498534
let mut state = Cow::Borrowed(state);
499535

500536
let dry_run_ops = match ops.last() {
@@ -512,6 +548,8 @@ impl Catalog {
512548
let mut storage_collections_to_drop = BTreeSet::new();
513549
let mut storage_collections_to_register = BTreeMap::new();
514550

551+
let mut updates = Vec::new();
552+
515553
for op in ops {
516554
let (weird_builtin_table_update, temporary_item_updates) = Self::transact_op(
517555
oracle_write_ts,
@@ -520,7 +558,7 @@ impl Catalog {
520558
&temporary_ids,
521559
audit_events,
522560
tx,
523-
&*state,
561+
&*preliminary_state,
524562
&mut storage_collections_to_create,
525563
&mut storage_collections_to_drop,
526564
&mut storage_collections_to_register,
@@ -543,25 +581,35 @@ impl Catalog {
543581
// separately for updating state and builtin tables.
544582
// TODO(jkosh44) Some more thought needs to be given as to how temporary tables work
545583
// in a multi-subscriber catalog world.
546-
let op_id = tx.op_id().into();
584+
let upper = tx.upper();
547585
let temporary_item_updates =
548586
temporary_item_updates
549587
.into_iter()
550588
.map(|(item, diff)| StateUpdate {
551589
kind: StateUpdateKind::TemporaryItem(item),
552-
ts: op_id,
590+
ts: upper,
553591
diff,
554592
});
555593

556-
let mut updates: Vec<_> = tx.get_and_commit_op_updates();
557-
updates.extend(temporary_item_updates);
558-
if !updates.is_empty() {
559-
let op_builtin_table_updates = state.to_mut().apply_updates(updates)?;
560-
let op_builtin_table_updates = state
561-
.to_mut()
562-
.resolve_builtin_table_updates(op_builtin_table_updates);
563-
builtin_table_updates.extend(op_builtin_table_updates);
594+
let mut op_updates: Vec<_> = tx.get_and_commit_op_updates();
595+
op_updates.extend(temporary_item_updates);
596+
if !op_updates.is_empty() {
597+
let (_op_builtin_table_updates, _op_catalog_updates) =
598+
preliminary_state
599+
.to_mut()
600+
.apply_updates(op_updates.clone())?;
564601
}
602+
updates.append(&mut op_updates);
603+
}
604+
605+
if !updates.is_empty() {
606+
let (op_builtin_table_updates, op_catalog_updates) =
607+
state.to_mut().apply_updates(updates.clone())?;
608+
let op_builtin_table_updates = state
609+
.to_mut()
610+
.resolve_builtin_table_updates(op_builtin_table_updates);
611+
builtin_table_updates.extend(op_builtin_table_updates);
612+
parsed_catalog_updates.extend(op_catalog_updates);
565613
}
566614

567615
if dry_run_ops.is_empty() {
@@ -578,11 +626,13 @@ impl Catalog {
578626

579627
let updates = tx.get_and_commit_op_updates();
580628
if !updates.is_empty() {
581-
let op_builtin_table_updates = state.to_mut().apply_updates(updates)?;
629+
let (op_builtin_table_updates, op_catalog_updates) =
630+
state.to_mut().apply_updates(updates.clone())?;
582631
let op_builtin_table_updates = state
583632
.to_mut()
584633
.resolve_builtin_table_updates(op_builtin_table_updates);
585634
builtin_table_updates.extend(op_builtin_table_updates);
635+
parsed_catalog_updates.extend(op_catalog_updates);
586636
}
587637

588638
match state {
@@ -1146,15 +1196,31 @@ impl Catalog {
11461196
)));
11471197
}
11481198
let oid = tx.allocate_oid(&temporary_oids)?;
1199+
1200+
let schema_id = name.qualifiers.schema_spec.clone().into();
1201+
let item_type = item.typ();
1202+
let (create_sql, global_id, versions) = item.to_serialized();
1203+
11491204
let item = TemporaryItem {
11501205
id,
11511206
oid,
1152-
name: name.clone(),
1153-
item: item.clone(),
1207+
global_id,
1208+
schema_id,
1209+
name: name.item.clone(),
1210+
create_sql,
1211+
conn_id: item.conn_id().cloned(),
11541212
owner_id,
1155-
privileges: PrivilegeMap::from_mz_acl_items(privileges),
1213+
privileges: privileges.clone(),
1214+
extra_versions: versions,
11561215
};
11571216
temporary_item_updates.push((item, StateDiff::Addition));
1217+
1218+
info!(
1219+
"create temporary {} {} ({})",
1220+
item_type,
1221+
state.resolve_full_name(&name, None),
1222+
id
1223+
);
11581224
} else {
11591225
if let Some(temp_id) =
11601226
item.uses()

src/adapter/src/coord.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ pub(crate) mod statement_logging;
221221
pub(crate) mod timeline;
222222
pub(crate) mod timestamp_selection;
223223

224+
pub mod catalog_implications;
224225
mod caught_up;
225226
mod command_handler;
226227
mod ddl;

0 commit comments

Comments
 (0)