Skip to content

Commit c1ded99

Browse files
ldanilekConvex, Inc.
authored andcommitted
SchemaWorker prefetch work before validating (#26790)
refactor SchemaWorker to fetch data from a transaction, and construct the subscription token, all before we do the long-duration schema validation step. This will allow us to fetch data for multiple schemas in a loop, quickly within one transaction, before doing the actual validation. GitOrigin-RevId: 82b2850812c2f6bc4a9ad5a5ff78fb96d4249309
1 parent 32a822a commit c1ded99

File tree

1 file changed

+28
-4
lines changed
  • crates/application/src/schema_worker

1 file changed

+28
-4
lines changed

crates/application/src/schema_worker/mod.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ impl<RT: Runtime> SchemaWorker<RT> {
6868
let status = log_worker_starting("SchemaWorker");
6969
let mut tx: Transaction<RT> = self.database.begin(Identity::system()).await?;
7070
let snapshot = self.database.snapshot(tx.begin_timestamp())?;
71+
let mut pending_schema_work = None;
7172
if let Some((id, db_schema)) = SchemaModel::new(&mut tx, TableNamespace::Global)
7273
.get_by_state(SchemaState::Pending)
7374
.await?
@@ -81,15 +82,39 @@ impl<RT: Runtime> SchemaWorker<RT> {
8182
.get_by_state(SchemaState::Active)
8283
.await?
8384
.map(|(_id, active_schema)| active_schema);
85+
let ts = tx.begin_timestamp();
86+
let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?;
87+
pending_schema_work = Some((
88+
id,
89+
timer,
90+
table_mapping,
91+
virtual_table_mapping,
92+
db_schema,
93+
ts,
94+
active_schema,
95+
by_id_indexes,
96+
));
97+
}
98+
let token = tx.into_token()?;
99+
100+
if let Some((
101+
id,
102+
timer,
103+
table_mapping,
104+
virtual_table_mapping,
105+
db_schema,
106+
ts,
107+
active_schema,
108+
by_id_indexes,
109+
)) = pending_schema_work
110+
{
84111
let tables_to_check = DatabaseSchema::tables_to_validate(
85112
&db_schema,
86113
active_schema,
87114
&table_mapping,
88115
&virtual_table_mapping,
89116
&|table_name| snapshot.table_summary(table_name).inferred_type().clone(),
90117
)?;
91-
let ts = tx.begin_timestamp();
92-
let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?;
93118

94119
for table_name in tables_to_check {
95120
let table_iterator = self.database.table_iterator(ts, 1000, None);
@@ -111,7 +136,7 @@ impl<RT: Runtime> SchemaWorker<RT> {
111136
&doc,
112137
table_name,
113138
&table_mapping,
114-
&tx.virtual_table_mapping().clone(),
139+
&virtual_table_mapping,
115140
) {
116141
let mut backoff = Backoff::new(INITIAL_COMMIT_BACKOFF, MAX_COMMIT_BACKOFF);
117142
while backoff.failures() < MAX_COMMIT_FAILURES {
@@ -164,7 +189,6 @@ impl<RT: Runtime> SchemaWorker<RT> {
164189
}
165190

166191
drop(status);
167-
let token = tx.into_token()?;
168192
tracing::debug!("SchemaWorker waiting...");
169193
let subscription = self.database.subscribe(token).await?;
170194
subscription.wait_for_invalidation().await;

0 commit comments

Comments
 (0)