Skip to content

Commit d2de546

Browse files
goffrieConvex, Inc.
authored andcommitted
Move FinalTransaction::validate_memory_index_sizes to CommitterClient::_commit (#42982)
GitOrigin-RevId: 30773859123f6706a43847aea5707063fbad800c
1 parent 0c77fb5 commit d2de546

File tree

2 files changed

+27
-34
lines changed

2 files changed

+27
-34
lines changed

crates/database/src/committer.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1164,7 +1164,13 @@ impl CommitterClient {
11641164
self.check_generated_ids(&transaction).await?;
11651165

11661166
// Finish reading everything from persistence.
1167-
let transaction = transaction.finalize(self.snapshot_reader.clone()).await?;
1167+
let transaction = transaction.finalize()?;
1168+
1169+
// Note that we do a best effort validation for memory index sizes. We
1170+
// use the latest snapshot instead of the transaction base snapshot. This
1171+
// is both more accurate and also avoids pedant hitting transient errors.
1172+
let latest_snapshot = self.snapshot_reader.lock().latest_snapshot();
1173+
transaction.validate_memory_index_sizes(&latest_snapshot)?;
11681174

11691175
let queue_timer = metrics::commit_queue_timer();
11701176
let (tx, rx) = oneshot::channel();

crates/database/src/transaction.rs

Lines changed: 20 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ use common::{
5353
},
5454
runtime::Runtime,
5555
schemas::DatabaseSchema,
56-
sync::split_rw_lock::Reader,
5756
types::{
5857
GenericIndexName,
5958
IndexId,
@@ -120,10 +119,7 @@ use crate::{
120119
},
121120
reads::TransactionReadSet,
122121
schema_registry::SchemaRegistry,
123-
snapshot_manager::{
124-
Snapshot,
125-
SnapshotManager,
126-
},
122+
snapshot_manager::Snapshot,
127123
table_summary::table_summary_bootstrapping_error,
128124
token::Token,
129125
transaction_id_generator::TransactionIdGenerator,
@@ -1119,11 +1115,8 @@ impl<RT: Runtime> Transaction<RT> {
11191115
self.index_size_override = Some(size);
11201116
}
11211117

1122-
pub async fn finalize(
1123-
self,
1124-
snapshot_reader: Reader<SnapshotManager>,
1125-
) -> anyhow::Result<FinalTransaction> {
1126-
FinalTransaction::new(self, snapshot_reader).await
1118+
pub fn finalize(self) -> anyhow::Result<FinalTransaction> {
1119+
FinalTransaction::new(self)
11271120
}
11281121
}
11291122

@@ -1149,73 +1142,67 @@ pub struct FinalTransaction {
11491142
pub(crate) writes: Writes,
11501143

11511144
pub(crate) usage_tracker: FunctionUsageTracker,
1145+
1146+
#[cfg(any(test, feature = "testing"))]
1147+
index_size_override: Option<usize>,
11521148
}
11531149

11541150
impl FinalTransaction {
11551151
pub fn is_readonly(&self) -> bool {
11561152
self.writes.is_empty()
11571153
}
11581154

1159-
pub async fn new<RT: Runtime>(
1160-
mut transaction: Transaction<RT>,
1161-
snapshot_reader: Reader<SnapshotManager>,
1162-
) -> anyhow::Result<Self> {
1155+
fn new<RT: Runtime>(transaction: Transaction<RT>) -> anyhow::Result<Self> {
11631156
// All subtransactions must have committed or rolled back.
11641157
transaction.require_not_nested()?;
11651158

11661159
let begin_timestamp = transaction.begin_timestamp();
1167-
let table_mapping = transaction.table_mapping().clone();
1168-
let component_registry = transaction.component_registry.deref().clone();
1169-
// Note that we do a best effort validation for memory index sizes. We
1170-
// use the latest snapshot instead of the transaction base snapshot. This
1171-
// is both more accurate and also avoids pedant hitting transient errors.
1172-
let latest_snapshot = snapshot_reader.lock().latest_snapshot();
1173-
Self::validate_memory_index_sizes(&table_mapping, &transaction, &latest_snapshot)?;
11741160
Ok(Self {
11751161
begin_timestamp,
1176-
table_mapping,
1177-
component_registry,
1162+
table_mapping: transaction.metadata.into_flat()?.table_mapping().clone(),
1163+
component_registry: transaction.component_registry.into_flat()?,
11781164
reads: transaction.reads,
11791165
writes: transaction.writes.into_flat()?,
1180-
usage_tracker: transaction.usage_tracker.clone(),
1166+
usage_tracker: transaction.usage_tracker,
1167+
1168+
#[cfg(any(test, feature = "testing"))]
1169+
index_size_override: transaction.index_size_override,
11811170
})
11821171
}
11831172

1184-
fn validate_memory_index_sizes<RT: Runtime>(
1185-
table_mapping: &TableMapping,
1186-
transaction: &Transaction<RT>,
1173+
pub(crate) fn validate_memory_index_sizes(
1174+
&self,
11871175
base_snapshot: &Snapshot,
11881176
) -> anyhow::Result<()> {
11891177
#[allow(unused_mut)]
11901178
let mut vector_size_limit = *VECTOR_INDEX_SIZE_HARD_LIMIT;
11911179
#[cfg(any(test, feature = "testing"))]
1192-
if let Some(size) = transaction.index_size_override {
1180+
if let Some(size) = self.index_size_override {
11931181
vector_size_limit = size;
11941182
}
11951183

11961184
#[allow(unused_mut)]
11971185
let mut search_size_limit = *TEXT_INDEX_SIZE_HARD_LIMIT;
11981186
#[cfg(any(test, feature = "testing"))]
1199-
if let Some(size) = transaction.index_size_override {
1187+
if let Some(size) = self.index_size_override {
12001188
search_size_limit = size;
12011189
}
12021190

1203-
let modified_tables: BTreeSet<_> = transaction
1191+
let modified_tables: BTreeSet<_> = self
12041192
.writes
1205-
.as_flat()?
12061193
.coalesced_writes()
12071194
.map(|update| update.id.tablet_id)
12081195
.collect();
12091196
Self::validate_memory_index_size(
1210-
table_mapping,
1197+
&self.table_mapping,
12111198
base_snapshot,
12121199
&modified_tables,
12131200
base_snapshot.text_indexes.in_memory_sizes().into_iter(),
12141201
search_size_limit,
12151202
SearchType::Text,
12161203
)?;
12171204
Self::validate_memory_index_size(
1218-
table_mapping,
1205+
&self.table_mapping,
12191206
base_snapshot,
12201207
&modified_tables,
12211208
base_snapshot.vector_indexes.in_memory_sizes().into_iter(),

0 commit comments

Comments
 (0)