Skip to content

Commit 00f5ec1

Browse files
ldanilekConvex, Inc.
authored andcommitted
run SchemaWorker in all namespaces (#26817)
run schema worker in every namespace that has a `_schemas` table GitOrigin-RevId: e1a9776a68c7c9bed8300f682c62f41c30bc28f5
1 parent b162768 commit 00f5ec1

File tree

1 file changed

+45
-30
lines changed
  • crates/application/src/schema_worker

1 file changed

+45
-30
lines changed

crates/application/src/schema_worker/mod.rs

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use database::{
1212
IndexModel,
1313
SchemaModel,
1414
Transaction,
15+
SCHEMAS_TABLE,
1516
};
1617
use errors::ErrorMetadataAnyhowExt;
1718
use futures::{
@@ -25,7 +26,6 @@ use metrics::{
2526
log_document_validated,
2627
schema_validation_timer,
2728
};
28-
use value::TableNamespace;
2929

3030
use crate::metrics::log_worker_starting;
3131

@@ -68,36 +68,51 @@ impl<RT: Runtime> SchemaWorker<RT> {
6868
let status = log_worker_starting("SchemaWorker");
6969
let mut tx: Transaction<RT> = self.database.begin(Identity::system()).await?;
7070
let snapshot = self.database.snapshot(tx.begin_timestamp())?;
71-
let mut pending_schema_work = None;
72-
if let Some((id, db_schema)) = SchemaModel::new(&mut tx, TableNamespace::Global)
73-
.get_by_state(SchemaState::Pending)
74-
.await?
75-
{
76-
tracing::debug!("SchemaWorker found a pending schema and is validating it...");
77-
let timer = schema_validation_timer();
78-
let table_mapping = tx.table_mapping().namespace(TableNamespace::Global);
79-
let virtual_table_mapping = tx.virtual_table_mapping().clone();
80-
81-
let active_schema = SchemaModel::new(&mut tx, TableNamespace::Global)
82-
.get_by_state(SchemaState::Active)
71+
let mut pending_schema_work = Vec::new();
72+
let namespaces: Vec<_> = tx
73+
.table_mapping()
74+
.iter()
75+
.filter_map(|(_, namespace, _, table_name)| {
76+
if *table_name == *SCHEMAS_TABLE {
77+
Some(namespace)
78+
} else {
79+
None
80+
}
81+
})
82+
.collect();
83+
for namespace in namespaces {
84+
if let Some((id, db_schema)) = SchemaModel::new(&mut tx, namespace)
85+
.get_by_state(SchemaState::Pending)
8386
.await?
84-
.map(|(_id, active_schema)| active_schema);
85-
let ts = tx.begin_timestamp();
86-
let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?;
87-
pending_schema_work = Some((
88-
id,
89-
timer,
90-
table_mapping,
91-
virtual_table_mapping,
92-
db_schema,
93-
ts,
94-
active_schema,
95-
by_id_indexes,
96-
));
87+
{
88+
tracing::debug!("SchemaWorker found a pending schema and is validating it...");
89+
let timer = schema_validation_timer();
90+
let table_mapping = tx.table_mapping().namespace(namespace);
91+
let virtual_table_mapping = tx.virtual_table_mapping().clone();
92+
93+
let active_schema = SchemaModel::new(&mut tx, namespace)
94+
.get_by_state(SchemaState::Active)
95+
.await?
96+
.map(|(_id, active_schema)| active_schema);
97+
let ts = tx.begin_timestamp();
98+
let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?;
99+
pending_schema_work.push((
100+
namespace,
101+
id,
102+
timer,
103+
table_mapping,
104+
virtual_table_mapping,
105+
db_schema,
106+
ts,
107+
active_schema,
108+
by_id_indexes,
109+
));
110+
}
97111
}
98112
let token = tx.into_token()?;
99113

100-
if let Some((
114+
for (
115+
namespace,
101116
id,
102117
timer,
103118
table_mapping,
@@ -106,7 +121,7 @@ impl<RT: Runtime> SchemaWorker<RT> {
106121
ts,
107122
active_schema,
108123
by_id_indexes,
109-
)) = pending_schema_work
124+
) in pending_schema_work
110125
{
111126
let tables_to_check = DatabaseSchema::tables_to_validate(
112127
&db_schema,
@@ -141,7 +156,7 @@ impl<RT: Runtime> SchemaWorker<RT> {
141156
let mut backoff = Backoff::new(INITIAL_COMMIT_BACKOFF, MAX_COMMIT_BACKOFF);
142157
while backoff.failures() < MAX_COMMIT_FAILURES {
143158
let mut tx = self.database.begin(Identity::system()).await?;
144-
SchemaModel::new(&mut tx, TableNamespace::Global)
159+
SchemaModel::new(&mut tx, namespace)
145160
.mark_failed(id, schema_error.clone())
146161
.await?;
147162
if let Err(e) = self
@@ -171,7 +186,7 @@ impl<RT: Runtime> SchemaWorker<RT> {
171186
}
172187
}
173188
let mut tx = self.database.begin(Identity::system()).await?;
174-
if let Err(error) = SchemaModel::new(&mut tx, TableNamespace::Global)
189+
if let Err(error) = SchemaModel::new(&mut tx, namespace)
175190
.mark_validated(id)
176191
.await
177192
{

0 commit comments

Comments
 (0)