Skip to content

Commit 90e1af3

Browse files
emmaling27Convex, Inc.
authored andcommitted
Make SchemaWorker use MultiTableIterator to make it faster (#39361)
GitOrigin-RevId: e5973470da553783542e5b1a459680010dabb05f
1 parent c24c4f6 commit 90e1af3

File tree

1 file changed

+49
-39
lines changed
  • crates/application/src/schema_worker

1 file changed

+49
-39
lines changed

crates/application/src/schema_worker/mod.rs

Lines changed: 49 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,15 @@ impl<RT: Runtime> SchemaWorker<RT> {
158158
},
159159
)?;
160160

161-
for table_name in tables_to_check {
162-
let table_iterator = self.database.table_iterator(ts, 1000);
163-
let tablet_id = table_mapping.name_to_tablet()(table_name.clone())?;
161+
let tablet_ids = tables_to_check
162+
.into_iter()
163+
.map(|table_name| table_mapping.name_to_tablet()(table_name.clone()))
164+
.collect::<Result<Vec<_>, _>>()?;
165+
let mut table_iterator = self
166+
.database
167+
.table_iterator(ts, 1000)
168+
.multi(tablet_ids.clone());
169+
for tablet_id in tablet_ids {
164170
let stream = table_iterator.stream_documents_in_table(
165171
tablet_id,
166172
*by_id_indexes.get(&tablet_id).ok_or_else(|| {
@@ -169,48 +175,52 @@ impl<RT: Runtime> SchemaWorker<RT> {
169175
None,
170176
);
171177

172-
pin_mut!(stream);
173-
while let Some(LatestDocument { value: doc, .. }) = stream.try_next().await? {
174-
let table_name = table_mapping.tablet_name(doc.id().tablet_id)?;
175-
log_document_validated();
176-
log_document_bytes(doc.size());
177-
if let Err(schema_error) = db_schema.check_existing_document(
178-
&doc,
179-
table_name,
180-
&table_mapping,
181-
&virtual_system_mapping,
182-
) {
183-
let mut backoff = Backoff::new(INITIAL_COMMIT_BACKOFF, MAX_COMMIT_BACKOFF);
184-
while backoff.failures() < MAX_COMMIT_FAILURES {
185-
let mut tx = self.database.begin(Identity::system()).await?;
186-
SchemaModel::new(&mut tx, namespace)
187-
.mark_failed(id, schema_error.clone())
188-
.await?;
189-
if let Err(e) = self
190-
.database
191-
.commit_with_write_source(tx, "schema_worker_mark_failed")
192-
.await
193-
{
194-
if e.is_occ() {
195-
let delay = backoff.fail(&mut self.runtime.rng());
196-
tracing::error!(
197-
"Schema worker failed to commit ({e}), retrying after \
198-
{delay:?}"
199-
);
200-
self.runtime.wait(delay).await;
178+
{
179+
pin_mut!(stream);
180+
while let Some(LatestDocument { value: doc, .. }) = stream.try_next().await? {
181+
let table_name = table_mapping.tablet_name(doc.id().tablet_id)?;
182+
log_document_validated();
183+
log_document_bytes(doc.size());
184+
if let Err(schema_error) = db_schema.check_existing_document(
185+
&doc,
186+
table_name,
187+
&table_mapping,
188+
&virtual_system_mapping,
189+
) {
190+
let mut backoff =
191+
Backoff::new(INITIAL_COMMIT_BACKOFF, MAX_COMMIT_BACKOFF);
192+
while backoff.failures() < MAX_COMMIT_FAILURES {
193+
let mut tx = self.database.begin(Identity::system()).await?;
194+
SchemaModel::new(&mut tx, namespace)
195+
.mark_failed(id, schema_error.clone())
196+
.await?;
197+
if let Err(e) = self
198+
.database
199+
.commit_with_write_source(tx, "schema_worker_mark_failed")
200+
.await
201+
{
202+
if e.is_occ() {
203+
let delay = backoff.fail(&mut self.runtime.rng());
204+
tracing::error!(
205+
"Schema worker failed to commit ({e}), retrying after \
206+
{delay:?}"
207+
);
208+
self.runtime.wait(delay).await;
209+
} else {
210+
return Err(e);
211+
}
201212
} else {
202-
return Err(e);
213+
break;
203214
}
204-
} else {
205-
break;
206215
}
207-
}
208216

209-
tracing::info!("Schema is invalid");
210-
timer.finish_developer_error();
211-
return Ok(());
217+
tracing::info!("Schema is invalid");
218+
timer.finish_developer_error();
219+
return Ok(());
220+
}
212221
}
213222
}
223+
table_iterator.unregister_table(tablet_id)?;
214224
}
215225
let mut tx = self.database.begin(Identity::system()).await?;
216226
if let Err(error) = SchemaModel::new(&mut tx, namespace)

0 commit comments

Comments
 (0)