Skip to content

Commit a3215f5

Browse files
ldanilekConvex, Inc.
authored andcommitted
[components] pass namespace into ResolvedQuery/DeveloperQuery (#26021)
some more threading of TableNamespace -- this time from ResolvedQuery all the way down to the table mapping, which involves passing it to everywhere we resolve `IndexName<TableName>` -> `TabletIndexName`. Everything still uses TableNamespace::Global, it's just being passed explicitly to more places. This refactor will allow different module definitions to be looked up in different namespaces. GitOrigin-RevId: 8dd562c578c7d622984c8e1cb4ea8e5b0db50e2e
1 parent ba68229 commit a3215f5

File tree

45 files changed

+321
-150
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+321
-150
lines changed

crates/application/src/cron_jobs/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,10 @@ use model::{
7878
modules::ModuleModel,
7979
};
8080
use usage_tracking::FunctionUsageTracker;
81-
use value::ResolvedDocumentId;
81+
use value::{
82+
ResolvedDocumentId,
83+
TableNamespace,
84+
};
8285

8386
use crate::{
8487
application_function_runner::ApplicationFunctionRunner,
@@ -162,7 +165,8 @@ impl<RT: Runtime> CronJobExecutor<RT> {
162165
range: vec![],
163166
order: Order::Asc,
164167
});
165-
let mut query_stream = ResolvedQuery::new(&mut tx, index_query)?;
168+
let mut query_stream =
169+
ResolvedQuery::new(&mut tx, TableNamespace::Global, index_query)?;
166170

167171
let mut next_job_wait = None;
168172
while let Some(doc) = query_stream.next(&mut tx, None).await? {

crates/application/src/export_worker.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ use usage_tracking::{
112112
use value::{
113113
export::ValueFormat,
114114
id_v6::DeveloperDocumentId,
115+
TableNamespace,
115116
TableNumber,
116117
TabletId,
117118
VirtualTableMapping,
@@ -263,7 +264,7 @@ impl<RT: Runtime> ExportWorker<RT> {
263264
order: Order::Asc,
264265
};
265266
let query = common::query::Query::index_range(index_range);
266-
let mut query_stream = ResolvedQuery::new(tx, query)?;
267+
let mut query_stream = ResolvedQuery::new(tx, TableNamespace::Global, query)?;
267268
query_stream
268269
.expect_at_most_one(tx)
269270
.await?
@@ -287,7 +288,7 @@ impl<RT: Runtime> ExportWorker<RT> {
287288
order: Order::Desc,
288289
};
289290
let query = common::query::Query::index_range(index_range);
290-
let mut query_stream = ResolvedQuery::new(tx, query)?;
291+
let mut query_stream = ResolvedQuery::new(tx, TableNamespace::Global, query)?;
291292
query_stream.expect_at_most_one(tx).await
292293
}
293294

crates/application/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ use value::{
264264
ConvexValue,
265265
Namespace,
266266
ResolvedDocumentId,
267+
TableNamespace,
267268
};
268269
use vector::{
269270
PublicVectorSearchQueryResult,
@@ -2041,8 +2042,8 @@ impl<RT: Runtime> Application<RT> {
20412042
IndexMetadata::new_backfilling(*tx.begin_timestamp(), index_name, index_fields);
20422043
let mut model = IndexModel::new(&mut tx);
20432044
if let Some(existing_index_metadata) = model
2044-
.pending_index_metadata(&index_metadata.name)?
2045-
.or(model.enabled_index_metadata(&index_metadata.name)?)
2045+
.pending_index_metadata(TableNamespace::Global, &index_metadata.name)?
2046+
.or(model.enabled_index_metadata(TableNamespace::Global, &index_metadata.name)?)
20462047
{
20472048
if !index_metadata
20482049
.config

crates/application/src/scheduled_jobs/mod.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@ use model::{
8585
use parking_lot::Mutex;
8686
use sync_types::Timestamp;
8787
use usage_tracking::FunctionUsageTracker;
88-
use value::ResolvedDocumentId;
88+
use value::{
89+
ResolvedDocumentId,
90+
TableNamespace,
91+
};
8992

9093
use crate::{
9194
application_function_runner::ApplicationFunctionRunner,
@@ -326,7 +329,7 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
326329
)],
327330
order: Order::Asc,
328331
});
329-
let mut query_stream = ResolvedQuery::new(tx, index_query)?;
332+
let mut query_stream = ResolvedQuery::new(tx, TableNamespace::Global, index_query)?;
330333
while let Some(doc) = query_stream.next(tx, None).await? {
331334
let job: ParsedDocument<ScheduledJob> = doc.try_into()?;
332335
let (job_id, job) = job.clone().into_id_and_value();
@@ -820,7 +823,8 @@ impl<RT: Runtime> ScheduledJobGarbageCollector<RT> {
820823
order: Order::Asc,
821824
})
822825
.limit(*SCHEDULED_JOB_GARBAGE_COLLECTION_BATCH_SIZE);
823-
let mut query_stream = ResolvedQuery::new(&mut tx, index_query)?;
826+
let mut query_stream =
827+
ResolvedQuery::new(&mut tx, TableNamespace::Global, index_query)?;
824828

825829
let mut next_job_wait = None;
826830
let mut jobs_to_delete = vec![];

crates/application/src/snapshot_import.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2378,6 +2378,7 @@ mod tests {
23782378
ConvexObject,
23792379
FieldName,
23802380
TableName,
2381+
TableNamespace,
23812382
};
23822383

23832384
use super::{
@@ -2992,7 +2993,7 @@ a
29922993
let mut tx = app.begin(identity.clone()).await?;
29932994
let mut index_model = IndexModel::new(&mut tx);
29942995
let index = index_model
2995-
.enabled_index_metadata(&index_name)?
2996+
.enabled_index_metadata(TableNamespace::Global, &index_name)?
29962997
.context("index does not exist")?;
29972998
assert_ne!(index.id(), index_id);
29982999
assert!(index.config.is_enabled());
@@ -3072,7 +3073,7 @@ a
30723073
let mut tx = app.begin(new_admin_id()).await?;
30733074
let table_name = TableName::from_str(table_name)?;
30743075
let query = common::query::Query::full_table_scan(table_name.clone(), Order::Asc);
3075-
let mut query_stream = ResolvedQuery::new(&mut tx, query)?;
3076+
let mut query_stream = ResolvedQuery::new(&mut tx, TableNamespace::Global, query)?;
30763077

30773078
let mut docs: Vec<ResolvedDocument> = Vec::new();
30783079
while let Some(doc) = query_stream.next(&mut tx, None).await? {

crates/application/src/tests/cron_jobs.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use model::{
4545
};
4646
use runtime::testing::TestRuntime;
4747
use serde_json::Value as JsonValue;
48+
use value::TableNamespace;
4849

4950
use crate::{
5051
test_helpers::{
@@ -88,6 +89,7 @@ async fn create_cron_job(
8889
fn cron_log_query<RT: Runtime>(tx: &mut Transaction<RT>) -> anyhow::Result<DeveloperQuery<RT>> {
8990
DeveloperQuery::new(
9091
tx,
92+
TableNamespace::Global,
9193
Query::index_range(IndexRange {
9294
index_name: CRON_JOB_LOGS_INDEX_BY_NAME_TS.clone(),
9395
range: vec![IndexRangeExpression::Eq(

crates/common/src/components/mod.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use std::sync::LazyLock;
22

33
use cmd_util::env::env_config;
4-
use value::InternalId;
4+
use value::{
5+
InternalId,
6+
TableNamespace,
7+
};
58

69
mod component_definition_path;
710
mod component_path;
@@ -91,3 +94,14 @@ impl ComponentDefinitionId {
9194
matches!(self, ComponentDefinitionId::Root)
9295
}
9396
}
97+
98+
impl From<ComponentDefinitionId> for TableNamespace {
99+
fn from(value: ComponentDefinitionId) -> Self {
100+
// TODO(lee) convert these to the correct namespaces:
101+
// RootComponentDefinition and ByComponentDefinition respectively.
102+
match value {
103+
ComponentDefinitionId::Root => TableNamespace::Global,
104+
ComponentDefinitionId::Child(_id) => TableNamespace::Global,
105+
}
106+
}
107+
}

crates/database/src/bootstrap_model/components/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ impl<'a, RT: Runtime> BootstrapComponentsModel<'a, RT> {
107107
};
108108
let mut query = ResolvedQuery::new(
109109
self.tx,
110+
TableNamespace::Global,
110111
Query::index_range(IndexRange {
111112
index_name: COMPONENTS_BY_PARENT_INDEX.clone(),
112113
range,

crates/database/src/bootstrap_model/index.rs

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,13 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
169169
}
170170

171171
#[cfg(any(test, feature = "testing"))]
172-
pub async fn enable_index_for_testing(&mut self, index: &IndexName) -> anyhow::Result<()> {
172+
pub async fn enable_index_for_testing(
173+
&mut self,
174+
namespace: TableNamespace,
175+
index: &IndexName,
176+
) -> anyhow::Result<()> {
173177
let metadata = self
174-
.pending_index_metadata(index)?
178+
.pending_index_metadata(namespace, index)?
175179
.ok_or_else(|| anyhow::anyhow!("Failed to find pending index: {}", index))?;
176180
self.enable_index(&metadata.into_value()).await
177181
}
@@ -369,7 +373,7 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
369373
for new_index in indexes_in_schema {
370374
remaining_indexes.remove(&new_index.name);
371375

372-
match self.compare_new_and_existing_indexes(new_index)? {
376+
match self.compare_new_and_existing_indexes(TableNamespace::Global, new_index)? {
373377
IndexComparison::Added(index) => diff.added.push(index),
374378
IndexComparison::Identical(index) => diff.identical.push(index),
375379
IndexComparison::Replaced {
@@ -402,10 +406,11 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
402406

403407
fn compare_new_and_existing_indexes(
404408
&mut self,
409+
namespace: TableNamespace,
405410
new_index: DeveloperIndexMetadata,
406411
) -> anyhow::Result<IndexComparison> {
407-
let pending_index = self.pending_index_metadata(&new_index.name)?;
408-
let enabled_index = self.enabled_index_metadata(&new_index.name)?;
412+
let pending_index = self.pending_index_metadata(namespace, &new_index.name)?;
413+
let enabled_index = self.enabled_index_metadata(namespace, &new_index.name)?;
409414

410415
fn identical_or_replaced(
411416
existing_index: ParsedDocument<TabletIndexMetadata>,
@@ -543,9 +548,10 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
543548
/// pending_index_metadata.
544549
pub fn enabled_index_metadata(
545550
&mut self,
551+
namespace: TableNamespace,
546552
index_name: &IndexName,
547553
) -> anyhow::Result<Option<ParsedDocument<TabletIndexMetadata>>> {
548-
self._index_metadata(index_name, |indexes, reads, index_name| {
554+
self._index_metadata(namespace, index_name, |indexes, reads, index_name| {
549555
indexes.get_enabled(reads, &index_name)
550556
})
551557
}
@@ -558,9 +564,10 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
558564
/// or require_enabled_index_metadata instead.
559565
pub fn pending_index_metadata(
560566
&mut self,
567+
namespace: TableNamespace,
561568
index_name: &IndexName,
562569
) -> anyhow::Result<Option<ParsedDocument<TabletIndexMetadata>>> {
563-
self._index_metadata(index_name, |indexes, reads, index_name| {
570+
self._index_metadata(namespace, index_name, |indexes, reads, index_name| {
564571
indexes.get_pending(reads, &index_name)
565572
})
566573
}
@@ -575,6 +582,7 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
575582

576583
fn _index_metadata<'b>(
577584
&'b mut self,
585+
namespace: TableNamespace,
578586
index_name: &IndexName,
579587
getter: impl FnOnce(
580588
&'b mut TransactionIndex,
@@ -585,18 +593,19 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
585593
if !self
586594
.tx
587595
.table_mapping()
588-
.namespace(TableNamespace::Global)
596+
.namespace(namespace)
589597
.name_exists(index_name.table())
590598
{
591599
return Ok(None);
592600
}
593-
let index_name = self.resolve_index_name(index_name)?;
601+
let index_name = self.resolve_index_name(namespace, index_name)?;
594602
Ok(getter(&mut self.tx.index, &mut self.tx.reads, index_name)
595603
.map(|index| index.metadata.clone()))
596604
}
597605

598606
pub fn stable_index_name(
599607
&mut self,
608+
namespace: TableNamespace,
600609
index_name: &IndexName,
601610
table_filter: TableFilter,
602611
) -> anyhow::Result<StableIndexName> {
@@ -612,24 +621,24 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
612621
.clone();
613622
Ok(StableIndexName::Virtual(
614623
index_name.clone(),
615-
self.resolve_index_name(&physical_index_name)?,
624+
self.resolve_index_name(namespace, &physical_index_name)?,
616625
))
617626
} else if self
618627
.tx
619628
.table_mapping()
620-
.namespace(TableNamespace::Global)
629+
.namespace(namespace)
621630
.name_exists(index_name.table())
622631
{
623632
match table_filter {
624633
TableFilter::IncludePrivateSystemTables => Ok(StableIndexName::Physical(
625-
self.resolve_index_name(index_name)?,
634+
self.resolve_index_name(namespace, index_name)?,
626635
)),
627636
TableFilter::ExcludePrivateSystemTables => {
628637
if index_name.table().is_system() {
629638
Ok(StableIndexName::Missing)
630639
} else {
631640
Ok(StableIndexName::Physical(
632-
self.resolve_index_name(index_name)?,
641+
self.resolve_index_name(namespace, index_name)?,
633642
))
634643
}
635644
},
@@ -639,14 +648,14 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
639648
}
640649
}
641650

642-
fn resolve_index_name(&mut self, index_name: &IndexName) -> anyhow::Result<TabletIndexName> {
643-
let resolved = index_name.clone().map_table(
644-
&self
645-
.tx
646-
.table_mapping()
647-
.namespace(TableNamespace::Global)
648-
.name_to_id(),
649-
)?;
651+
fn resolve_index_name(
652+
&mut self,
653+
namespace: TableNamespace,
654+
index_name: &IndexName,
655+
) -> anyhow::Result<TabletIndexName> {
656+
let resolved = index_name
657+
.clone()
658+
.map_table(&self.tx.table_mapping().namespace(namespace).name_to_id())?;
650659
Ok(resolved.into())
651660
}
652661

@@ -699,7 +708,7 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
699708
range: vec![],
700709
order: Order::Asc,
701710
});
702-
let mut query_stream = ResolvedQuery::new(self.tx, index_query)?;
711+
let mut query_stream = ResolvedQuery::new(self.tx, TableNamespace::Global, index_query)?;
703712

704713
let mut indexes = vec![];
705714
while let Some(document) = query_stream.next(self.tx, None).await? {

crates/database/src/bootstrap_model/index_workers/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use value::{
3030
FieldPath,
3131
InternalId,
3232
TableName,
33+
TableNamespace,
3334
};
3435

3536
use crate::{
@@ -104,7 +105,7 @@ impl<'a, RT: Runtime> IndexWorkerMetadataModel<'a, RT> {
104105
range,
105106
order: Order::Asc,
106107
});
107-
let mut query_stream = ResolvedQuery::new(self.tx, query)?;
108+
let mut query_stream = ResolvedQuery::new(self.tx, TableNamespace::Global, query)?;
108109
let result = query_stream.next(self.tx, None).await?;
109110
result
110111
.map(ParsedDocument::<IndexWorkerMetadataRecord>::try_from)

0 commit comments

Comments
 (0)