Skip to content

Commit c3b5b58

Browse files
authored
Merge pull request #34476 from teskje/apply-always-enable_for_item_parsing
adapter: always use enable_for_item_parsing when applying state
2 parents d381ca3 + d601be3 commit c3b5b58

File tree

6 files changed

+125
-139
lines changed

6 files changed

+125
-139
lines changed

src/adapter/src/catalog.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1532,7 +1532,10 @@ impl Catalog {
15321532
CatalogError,
15331533
> {
15341534
let updates = self.storage().await.sync_to_current_updates().await?;
1535-
let (builtin_table_updates, catalog_updates) = self.state.apply_updates(updates)?;
1535+
let (builtin_table_updates, catalog_updates) = self
1536+
.state
1537+
.apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1538+
.await;
15361539
Ok((builtin_table_updates, catalog_updates))
15371540
}
15381541
}

src/adapter/src/catalog/apply.rs

Lines changed: 74 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,9 @@ impl CatalogState {
9494
/// Update in-memory catalog state from a list of updates made to the durable catalog state.
9595
///
9696
/// Returns builtin table updates corresponding to the changes to catalog state.
97-
///
98-
/// This is meant specifically for bootstrapping because it batches and applies builtin view
99-
/// additions separately from other update types.
10097
#[must_use]
10198
#[instrument]
102-
pub(crate) async fn apply_updates_for_bootstrap(
99+
pub(crate) async fn apply_updates(
103100
&mut self,
104101
updates: Vec<StateUpdate>,
105102
local_expression_cache: &mut LocalExpressionCache,
@@ -109,21 +106,31 @@ impl CatalogState {
109106
) {
110107
let mut builtin_table_updates = Vec::with_capacity(updates.len());
111108
let mut catalog_updates = Vec::with_capacity(updates.len());
112-
let updates = sort_updates(updates);
113109

110+
// First, consolidate updates. The code that applies parsed state
111+
// updates _requires_ that the given updates are consolidated. There
112+
// must be at most one addition and/or one retraction for a given item,
113+
// as identified by that items ID type.
114+
let updates = Self::consolidate_updates(updates);
115+
116+
// Apply updates in groups, according to their timestamps.
114117
let mut groups: Vec<Vec<_>> = Vec::new();
115118
for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
116-
groups.push(updates.collect());
119+
// Bring the updates into the pseudo-topological order that we need
120+
// for updating our in-memory state and generating builtin table
121+
// updates.
122+
let updates = sort_updates(updates.collect());
123+
groups.push(updates);
117124
}
125+
118126
for updates in groups {
119-
let mut apply_state = BootstrapApplyState::Updates(Vec::new());
127+
let mut apply_state = ApplyState::Updates(Vec::new());
120128
let mut retractions = InProgressRetractions::default();
121129

122130
for update in updates {
123-
let next_apply_state = BootstrapApplyState::new(update);
124131
let (next_apply_state, (builtin_table_update, catalog_update)) = apply_state
125132
.step(
126-
next_apply_state,
133+
ApplyState::new(update),
127134
self,
128135
&mut retractions,
129136
local_expression_cache,
@@ -145,47 +152,6 @@ impl CatalogState {
145152
(builtin_table_updates, catalog_updates)
146153
}
147154

148-
/// Update in-memory catalog state from a list of updates made to the durable catalog state.
149-
///
150-
/// Returns builtin table updates corresponding to the changes to catalog state.
151-
#[instrument]
152-
pub(crate) fn apply_updates(
153-
&mut self,
154-
updates: Vec<StateUpdate>,
155-
) -> Result<
156-
(
157-
Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
158-
Vec<ParsedStateUpdate>,
159-
),
160-
CatalogError,
161-
> {
162-
let mut builtin_table_updates = Vec::with_capacity(updates.len());
163-
let mut catalog_updates = Vec::with_capacity(updates.len());
164-
165-
// First, consolidate updates. The code that applies parsed state
166-
// updates _requires_ that the given updates are consolidated. There
167-
// must be at most one addition and/or one retraction for a given item,
168-
// as identified by that items ID type.
169-
let updates = Self::consolidate_updates(updates);
170-
171-
// Then bring it into the pseudo-topological order that we need for
172-
// updating our in-memory state and generating builtin table updates.
173-
let updates = sort_updates(updates);
174-
175-
for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
176-
let mut retractions = InProgressRetractions::default();
177-
let (builtin_table_update, catalog_updates_op) = self.apply_updates_inner(
178-
updates.collect(),
179-
&mut retractions,
180-
&mut LocalExpressionCache::Closed,
181-
)?;
182-
builtin_table_updates.extend(builtin_table_update);
183-
catalog_updates.extend(catalog_updates_op);
184-
}
185-
186-
Ok((builtin_table_updates, catalog_updates))
187-
}
188-
189155
/// It can happen that the sequencing logic creates "fluctuating" updates
190156
/// for a given catalog ID. For example, when doing a `DROP OWNED BY ...`,
191157
/// for a table, there will be a retraction of the original table state,
@@ -203,7 +169,6 @@ impl CatalogState {
203169

204170
updates
205171
.into_iter()
206-
.filter(|(_kind, _ts, diff)| *diff != 0.into())
207172
.map(|(kind, ts, diff)| StateUpdate {
208173
kind,
209174
ts,
@@ -1956,21 +1921,13 @@ impl CatalogState {
19561921
}
19571922
}
19581923

1959-
/// Sort [`StateUpdate`]s in timestamp then dependency order
1960-
fn sort_updates(mut updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1961-
let mut sorted_updates = Vec::with_capacity(updates.len());
1962-
1963-
updates.sort_by_key(|update| update.ts);
1964-
for (_, updates) in &updates.into_iter().chunk_by(|update| update.ts) {
1965-
let sorted_ts_updates = sort_updates_inner(updates.collect());
1966-
sorted_updates.extend(sorted_ts_updates);
1967-
}
1968-
1969-
sorted_updates
1970-
}
1971-
1972-
/// Sort [`StateUpdate`]s in dependency order for a single timestamp.
1973-
fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
1924+
/// Sort [`StateUpdate`]s in dependency order.
1925+
///
1926+
/// # Panics
1927+
///
1928+
/// This function assumes that all provided `updates` have the same timestamp
1929+
/// and will panic otherwise.
1930+
fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
19741931
fn push_update<T>(
19751932
update: T,
19761933
diff: StateDiff,
@@ -2387,46 +2344,54 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
23872344
.collect()
23882345
}
23892346

2390-
/// Most updates are applied one at a time, but during bootstrap, certain types are applied
2391-
/// separately in a batch for performance reasons. A constraint is that updates must be applied in
2392-
/// order. This process is modeled as a state machine that batches then applies groups of updates.
2393-
enum BootstrapApplyState {
2347+
/// Groups of updates of certain types are applied in batches to improve
2348+
/// performance. A constraint is that updates must be applied in order. This
2349+
/// process is modeled as a state machine that batches then applies groups of
2350+
/// updates.
2351+
enum ApplyState {
23942352
/// Additions of builtin views.
23952353
BuiltinViewAdditions(Vec<(&'static BuiltinView, CatalogItemId, GlobalId)>),
23962354
/// Item updates that aren't builtin view additions.
2355+
///
2356+
/// This contains all updates whose application requires calling
2357+
/// `parse_item` and thus toggling the `enable_for_item_parsing` feature
2358+
/// flags.
23972359
Items(Vec<StateUpdate>),
23982360
/// All other updates.
23992361
Updates(Vec<StateUpdate>),
24002362
}
24012363

2402-
impl BootstrapApplyState {
2403-
fn new(update: StateUpdate) -> BootstrapApplyState {
2404-
match update {
2405-
StateUpdate {
2406-
kind: StateUpdateKind::SystemObjectMapping(system_object_mapping),
2407-
diff: StateDiff::Addition,
2408-
..
2409-
} if matches!(
2410-
system_object_mapping.description.object_type,
2411-
CatalogItemType::View
2412-
) =>
2364+
impl ApplyState {
2365+
fn new(update: StateUpdate) -> Self {
2366+
use StateUpdateKind::*;
2367+
match &update.kind {
2368+
SystemObjectMapping(som)
2369+
if som.description.object_type == CatalogItemType::View
2370+
&& update.diff == StateDiff::Addition =>
24132371
{
2414-
let view_addition = lookup_builtin_view_addition(system_object_mapping);
2415-
BootstrapApplyState::BuiltinViewAdditions(vec![view_addition])
2416-
}
2417-
StateUpdate {
2418-
kind: StateUpdateKind::IntrospectionSourceIndex(_),
2419-
..
2420-
}
2421-
| StateUpdate {
2422-
kind: StateUpdateKind::SystemObjectMapping(_),
2423-
..
2424-
}
2425-
| StateUpdate {
2426-
kind: StateUpdateKind::Item(_),
2427-
..
2428-
} => BootstrapApplyState::Items(vec![update]),
2429-
update => BootstrapApplyState::Updates(vec![update]),
2372+
let view_addition = lookup_builtin_view_addition(som.clone());
2373+
Self::BuiltinViewAdditions(vec![view_addition])
2374+
}
2375+
2376+
IntrospectionSourceIndex(_) | SystemObjectMapping(_) | TemporaryItem(_) | Item(_) => {
2377+
Self::Items(vec![update])
2378+
}
2379+
2380+
Role(_)
2381+
| RoleAuth(_)
2382+
| Database(_)
2383+
| Schema(_)
2384+
| DefaultPrivilege(_)
2385+
| SystemPrivilege(_)
2386+
| SystemConfiguration(_)
2387+
| Cluster(_)
2388+
| NetworkPolicy(_)
2389+
| ClusterReplica(_)
2390+
| SourceReferences(_)
2391+
| Comment(_)
2392+
| AuditLog(_)
2393+
| StorageCollectionMetadata(_)
2394+
| UnfinalizedShard(_) => Self::Updates(vec![update]),
24302395
}
24312396
}
24322397

@@ -2445,7 +2410,7 @@ impl BootstrapApplyState {
24452410
Vec<ParsedStateUpdate>,
24462411
) {
24472412
match self {
2448-
BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions) => {
2413+
Self::BuiltinViewAdditions(builtin_view_additions) => {
24492414
let restore = state.system_configuration.clone();
24502415
state.system_configuration.enable_for_item_parsing();
24512416
let builtin_table_updates = CatalogState::parse_builtin_views(
@@ -2458,60 +2423,51 @@ impl BootstrapApplyState {
24582423
state.system_configuration = restore;
24592424
(builtin_table_updates, Vec::new())
24602425
}
2461-
BootstrapApplyState::Items(updates) => state.with_enable_for_item_parsing(|state| {
2426+
Self::Items(updates) => state.with_enable_for_item_parsing(|state| {
24622427
state
24632428
.apply_updates_inner(updates, retractions, local_expression_cache)
24642429
.expect("corrupt catalog")
24652430
}),
2466-
BootstrapApplyState::Updates(updates) => state
2431+
Self::Updates(updates) => state
24672432
.apply_updates_inner(updates, retractions, local_expression_cache)
24682433
.expect("corrupt catalog"),
24692434
}
24702435
}
24712436

24722437
async fn step(
24732438
self,
2474-
next: BootstrapApplyState,
2439+
next: Self,
24752440
state: &mut CatalogState,
24762441
retractions: &mut InProgressRetractions,
24772442
local_expression_cache: &mut LocalExpressionCache,
24782443
) -> (
2479-
BootstrapApplyState,
2444+
Self,
24802445
(
24812446
Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
24822447
Vec<ParsedStateUpdate>,
24832448
),
24842449
) {
24852450
match (self, next) {
24862451
(
2487-
BootstrapApplyState::BuiltinViewAdditions(mut builtin_view_additions),
2488-
BootstrapApplyState::BuiltinViewAdditions(next_builtin_view_additions),
2452+
Self::BuiltinViewAdditions(mut builtin_view_additions),
2453+
Self::BuiltinViewAdditions(next_builtin_view_additions),
24892454
) => {
24902455
// Continue batching builtin view additions.
24912456
builtin_view_additions.extend(next_builtin_view_additions);
24922457
(
2493-
BootstrapApplyState::BuiltinViewAdditions(builtin_view_additions),
2458+
Self::BuiltinViewAdditions(builtin_view_additions),
24942459
(Vec::new(), Vec::new()),
24952460
)
24962461
}
2497-
(BootstrapApplyState::Items(mut updates), BootstrapApplyState::Items(next_updates)) => {
2462+
(Self::Items(mut updates), Self::Items(next_updates)) => {
24982463
// Continue batching item updates.
24992464
updates.extend(next_updates);
2500-
(
2501-
BootstrapApplyState::Items(updates),
2502-
(Vec::new(), Vec::new()),
2503-
)
2465+
(Self::Items(updates), (Vec::new(), Vec::new()))
25042466
}
2505-
(
2506-
BootstrapApplyState::Updates(mut updates),
2507-
BootstrapApplyState::Updates(next_updates),
2508-
) => {
2467+
(Self::Updates(mut updates), Self::Updates(next_updates)) => {
25092468
// Continue batching updates.
25102469
updates.extend(next_updates);
2511-
(
2512-
BootstrapApplyState::Updates(updates),
2513-
(Vec::new(), Vec::new()),
2514-
)
2470+
(Self::Updates(updates), (Vec::new(), Vec::new()))
25152471
}
25162472
(apply_state, next_apply_state) => {
25172473
// Apply the current batch and start batching new apply state.

src/adapter/src/catalog/migrate.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,8 @@ pub(crate) async fn migrate(
194194
.expect("known parameter");
195195
}
196196

197-
let (mut ast_builtin_table_updates, mut ast_catalog_updates) = state
198-
.apply_updates_for_bootstrap(item_updates, local_expr_cache)
199-
.await;
197+
let (mut ast_builtin_table_updates, mut ast_catalog_updates) =
198+
state.apply_updates(item_updates, local_expr_cache).await;
200199

201200
info!("migrating from catalog version {:?}", catalog_version);
202201

@@ -240,9 +239,8 @@ pub(crate) async fn migrate(
240239
// input and stages arbitrary transformations to the catalog on `tx`.
241240

242241
let op_item_updates = tx.get_and_commit_op_updates();
243-
let (item_builtin_table_updates, item_catalog_updates) = state
244-
.apply_updates_for_bootstrap(op_item_updates, local_expr_cache)
245-
.await;
242+
let (item_builtin_table_updates, item_catalog_updates) =
243+
state.apply_updates(op_item_updates, local_expr_cache).await;
246244

247245
ast_builtin_table_updates.extend(item_builtin_table_updates);
248246
ast_catalog_updates.extend(item_catalog_updates);

src/adapter/src/catalog/open.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ impl Catalog {
314314
}
315315

316316
let (builtin_table_update, _catalog_updates) = state
317-
.apply_updates_for_bootstrap(pre_item_updates, &mut LocalExpressionCache::Closed)
317+
.apply_updates(pre_item_updates, &mut LocalExpressionCache::Closed)
318318
.await;
319319
builtin_table_updates.extend(builtin_table_update);
320320

@@ -416,7 +416,7 @@ impl Catalog {
416416
// and return and use the updates from here. But that's at the very
417417
// least future work.
418418
let (builtin_table_update, _catalog_updates) = state
419-
.apply_updates_for_bootstrap(system_item_updates, &mut local_expr_cache)
419+
.apply_updates(system_item_updates, &mut local_expr_cache)
420420
.await;
421421
builtin_table_updates.extend(builtin_table_update);
422422

@@ -467,7 +467,7 @@ impl Catalog {
467467
)
468468
} else {
469469
state
470-
.apply_updates_for_bootstrap(item_updates, &mut local_expr_cache)
470+
.apply_updates(item_updates, &mut local_expr_cache)
471471
.await
472472
};
473473
builtin_table_updates.extend(builtin_table_update);
@@ -481,7 +481,7 @@ impl Catalog {
481481
})
482482
.collect();
483483
let (builtin_table_update, _catalog_updates) = state
484-
.apply_updates_for_bootstrap(post_item_updates, &mut local_expr_cache)
484+
.apply_updates(post_item_updates, &mut local_expr_cache)
485485
.await;
486486
builtin_table_updates.extend(builtin_table_update);
487487

@@ -511,7 +511,7 @@ impl Catalog {
511511
// and return and use the updates from here. But that's at the very
512512
// least future work.
513513
let (table_updates, _catalog_updates) = state
514-
.apply_updates_for_bootstrap(state_updates, &mut local_expr_cache)
514+
.apply_updates(state_updates, &mut local_expr_cache)
515515
.await;
516516
builtin_table_updates.extend(table_updates);
517517
let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
@@ -656,7 +656,9 @@ impl Catalog {
656656
.map_err(mz_catalog::durable::DurableCatalogError::from)?;
657657

658658
let updates = txn.get_and_commit_op_updates();
659-
let (builtin_updates, catalog_updates) = state.apply_updates(updates)?;
659+
let (builtin_updates, catalog_updates) = state
660+
.apply_updates(updates, &mut LocalExpressionCache::Closed)
661+
.await;
660662
assert!(
661663
builtin_updates.is_empty(),
662664
"storage is not allowed to generate catalog changes that would cause changes to builtin tables"

0 commit comments

Comments
 (0)