Skip to content

Commit 1f20196

Browse files
goffrieConvex, Inc.
authored andcommitted
Reduce cloning of ResolvedDocuments in Writes (#42966)
1. do not use OrdMap in FunctionWrites, which does not need to be cheaply cloneable 2. store an Arc inside of `Writes`'s OrdMap (which I have changed to an OrdSet), because OrdMap frequently clones its contents GitOrigin-RevId: 9a0e97384db9c83588f55ebcdda2b1a4d681c25d
1 parent 4fe00a3 commit 1f20196

File tree

7 files changed

+151
-91
lines changed

7 files changed

+151
-91
lines changed

crates/application/src/application_function_runner/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,9 @@ impl<RT: Runtime> FunctionRouter<RT> {
402402
udf_type,
403403
tx.identity().clone(),
404404
tx.begin_timestamp(),
405-
tx.writes().as_flat()?.clone().into(),
405+
FunctionWrites {
406+
updates: tx.writes().as_flat()?.coalesced_writes().cloned().collect(),
407+
},
406408
log_line_sender,
407409
function_metadata,
408410
http_action_metadata,

crates/database/src/committer.rs

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ use common::{
7373
Timestamp,
7474
WriteTimestamp,
7575
},
76-
value::ResolvedDocumentId,
7776
};
7877
use errors::ErrorMetadata;
7978
use fastrace::prelude::*;
@@ -142,7 +141,6 @@ use crate::{
142141
PendingWrites,
143142
WriteSource,
144143
},
145-
writes::DocumentWrite,
146144
ComponentRegistry,
147145
Snapshot,
148146
Transaction,
@@ -695,10 +693,10 @@ impl<RT: Runtime> Committer<RT> {
695693
// which is the same order they should be applied to database metadata
696694
// and index data structures
697695
let mut ordered_updates = updates;
698-
ordered_updates.sort_by_key(|(id, update)| {
696+
ordered_updates.sort_by_key(|update| {
699697
table_dependency_sort_key(
700698
BootstrapTableIds::new(&transaction.table_mapping),
701-
InternalDocumentId::from(**id),
699+
InternalDocumentId::from(update.id),
702700
update.new_document.as_ref(),
703701
)
704702
});
@@ -717,7 +715,7 @@ impl<RT: Runtime> Committer<RT> {
717715
commit_ts,
718716
ordered_updates
719717
.into_iter()
720-
.map(|(&id, update)| (id, PackedDocumentUpdate::pack(update)))
718+
.map(|update| (update.id, PackedDocumentUpdate::pack(update)))
721719
.collect(),
722720
write_source,
723721
snapshot,
@@ -735,7 +733,7 @@ impl<RT: Runtime> Committer<RT> {
735733
fn compute_writes(
736734
&self,
737735
commit_ts: Timestamp,
738-
ordered_updates: &Vec<(&ResolvedDocumentId, &DocumentUpdateWithPrevTs)>,
736+
ordered_updates: &Vec<&DocumentUpdateWithPrevTs>,
739737
) -> anyhow::Result<(
740738
Vec<ValidatedDocumentWrite>,
741739
BTreeSet<(Timestamp, DatabaseIndexUpdate)>,
@@ -754,16 +752,14 @@ impl<RT: Runtime> Committer<RT> {
754752
.pending_writes
755753
.latest_snapshot()
756754
.unwrap_or_else(|| self.snapshot_manager.read().latest_snapshot());
757-
for &(id, document_update) in ordered_updates.iter() {
755+
for &document_update in ordered_updates.iter() {
758756
let (updates, doc_in_vector_index) =
759757
latest_pending_snapshot.update(document_update, commit_ts)?;
760758
index_writes.extend(updates);
761759
document_writes.push(ValidatedDocumentWrite {
762760
commit_ts,
763-
id: (*id).into(),
764-
write: DocumentWrite {
765-
document: document_update.new_document.clone(),
766-
},
761+
id: document_update.id.into(),
762+
write: document_update.new_document.clone(),
767763
doc_in_vector_index,
768764
prev_ts: document_update.old_document.as_ref().map(|&(_, ts)| ts),
769765
});
@@ -934,7 +930,7 @@ impl<RT: Runtime> Committer<RT> {
934930
.map(|write| DocumentLogEntry {
935931
ts: write.commit_ts,
936932
id: write.id,
937-
value: write.write.document,
933+
value: write.write,
938934
prev_ts: write.prev_ts,
939935
})
940936
.collect_vec(),
@@ -1021,7 +1017,7 @@ impl<RT: Runtime> Committer<RT> {
10211017
}
10221018
for validated_write in document_writes {
10231019
let ValidatedDocumentWrite {
1024-
write: DocumentWrite { document },
1020+
write: document,
10251021
doc_in_vector_index,
10261022
..
10271023
} = validated_write;
@@ -1090,7 +1086,7 @@ impl<RT: Runtime> Committer<RT> {
10901086
struct ValidatedDocumentWrite {
10911087
commit_ts: Timestamp,
10921088
id: InternalDocumentId,
1093-
write: DocumentWrite,
1089+
write: Option<ResolvedDocument>,
10941090
doc_in_vector_index: DocInVectorIndex,
10951091
prev_ts: Option<Timestamp>,
10961092
}

crates/database/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ pub use write_log::{
9393
WriteSource,
9494
};
9595
pub use writes::{
96-
DocumentWrite,
9796
TransactionWriteSize,
9897
Writes,
9998
};

crates/database/src/tests/apply_function_runner_tx.rs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,12 @@ async fn test_apply_function_runner_tx_new_table(rt: TestRuntime) -> anyhow::Res
8080
.iter()
8181
.map(|(table, stats)| (*table, stats.rows_read))
8282
.collect();
83-
let updates = function_runner_tx.writes.as_flat()?.clone().into_updates();
83+
let updates = function_runner_tx
84+
.writes
85+
.as_flat()?
86+
.coalesced_writes()
87+
.cloned()
88+
.collect();
8489
backend_tx.apply_function_runner_tx(
8590
*begin_timestamp,
8691
reads,
@@ -133,7 +138,12 @@ async fn test_apply_function_runner_tx_read_only(rt: TestRuntime) -> anyhow::Res
133138
.iter()
134139
.map(|(table, stats)| (*table, stats.rows_read))
135140
.collect();
136-
let updates = function_runner_tx.writes.as_flat()?.clone().into_updates();
141+
let updates = function_runner_tx
142+
.writes
143+
.as_flat()?
144+
.coalesced_writes()
145+
.cloned()
146+
.collect();
137147
backend_tx.apply_function_runner_tx(
138148
*begin_timestamp,
139149
reads,
@@ -183,7 +193,12 @@ async fn test_apply_function_runner_tx_replace(rt: TestRuntime) -> anyhow::Resul
183193
.iter()
184194
.map(|(table, stats)| (*table, stats.rows_read))
185195
.collect();
186-
let updates = function_runner_tx.writes.as_flat()?.clone().into_updates();
196+
let updates = function_runner_tx
197+
.writes
198+
.as_flat()?
199+
.coalesced_writes()
200+
.cloned()
201+
.collect();
187202
backend_tx.apply_function_runner_tx(
188203
*begin_timestamp,
189204
reads,
@@ -218,7 +233,7 @@ async fn test_apply_function_runner_tx_merge_existing_writes(
218233
FunctionUsageTracker::new(),
219234
)
220235
.await?;
221-
let updates = backend_tx.writes().as_flat()?.clone().into_updates();
236+
let updates = backend_tx.writes().as_flat()?.coalesced_writes().cloned();
222237
function_runner_tx.merge_writes(updates)?;
223238

224239
// Perform writes as if in funrun
@@ -236,7 +251,12 @@ async fn test_apply_function_runner_tx_merge_existing_writes(
236251
.iter()
237252
.map(|(table, stats)| (*table, stats.rows_read))
238253
.collect();
239-
let updates = function_runner_tx.writes.as_flat()?.clone().into_updates();
254+
let updates = function_runner_tx
255+
.writes
256+
.as_flat()?
257+
.coalesced_writes()
258+
.cloned()
259+
.collect();
240260
backend_tx.apply_function_runner_tx(
241261
*begin_timestamp,
242262
reads,
@@ -288,7 +308,12 @@ async fn test_apply_function_runner_tx_merge_existing_writes_bad(
288308
.iter()
289309
.map(|(table, stats)| (*table, stats.rows_read))
290310
.collect();
291-
let updates = function_runner_tx.writes.as_flat()?.clone().into_updates();
311+
let updates = function_runner_tx
312+
.writes
313+
.as_flat()?
314+
.coalesced_writes()
315+
.cloned()
316+
.collect();
292317
assert!(backend_tx
293318
.apply_function_runner_tx(
294319
*begin_timestamp,

crates/database/src/transaction.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ use common::{
7777
virtual_system_mapping::VirtualSystemMapping,
7878
};
7979
use errors::ErrorMetadata;
80-
use imbl::OrdMap;
8180
use indexing::backend_in_memory_indexes::RangeRequest;
8281
use keybroker::{
8382
Identity,
@@ -394,8 +393,11 @@ impl<RT: Runtime> Transaction<RT> {
394393
let mut biggest_document_id = None;
395394
let mut max_nesting = 0;
396395
let mut most_nested_document_id = None;
397-
for (document_id, DocumentUpdateWithPrevTs { new_document, .. }) in
398-
self.writes.coalesced_writes()
396+
for DocumentUpdateWithPrevTs {
397+
id: document_id,
398+
new_document,
399+
..
400+
} in self.writes.coalesced_writes()
399401
{
400402
let (size, nesting) = new_document
401403
.as_ref()
@@ -440,7 +442,7 @@ impl<RT: Runtime> Transaction<RT> {
440442
num_intervals: usize,
441443
user_tx_size: crate::reads::TransactionReadSize,
442444
system_tx_size: crate::reads::TransactionReadSize,
443-
updates: OrdMap<ResolvedDocumentId, DocumentUpdateWithPrevTs>,
445+
updates: Vec<DocumentUpdateWithPrevTs>,
444446
rows_read_by_tablet: BTreeMap<TabletId, u64>,
445447
) -> anyhow::Result<()> {
446448
anyhow::ensure!(
@@ -467,24 +469,29 @@ impl<RT: Runtime> Transaction<RT> {
467469
// In most scenarios this transaction will have no writes.
468470
pub fn merge_writes(
469471
&mut self,
470-
updates: OrdMap<ResolvedDocumentId, DocumentUpdateWithPrevTs>,
472+
updates: impl IntoIterator<Item = DocumentUpdateWithPrevTs>,
471473
) -> anyhow::Result<()> {
472474
let existing_updates = self.writes().as_flat()?.clone().into_updates();
473475

474476
let mut updates = updates.into_iter().collect::<Vec<_>>();
475477
let bootstrap_tables = self.bootstrap_tables();
476-
updates.sort_by_cached_key(|(id, update)| {
477-
table_dependency_sort_key(bootstrap_tables, (*id).into(), update.new_document.as_ref())
478+
updates.sort_by_cached_key(|update| {
479+
table_dependency_sort_key(
480+
bootstrap_tables,
481+
update.id.into(),
482+
update.new_document.as_ref(),
483+
)
478484
});
479485

480486
let mut preserved_update_count = 0;
481-
for (id, update) in updates {
487+
for update in updates {
488+
let id = update.id;
482489
// Ensure that the existing update matches, and that
483490
// that the merged-in writes didn't otherwise modify documents
484491
// already written to in this transaction.
485492
if let Some(existing_update) = existing_updates.get(&id) {
486493
anyhow::ensure!(
487-
*existing_update == update,
494+
**existing_update == update,
488495
"Conflicting updates for document {id}"
489496
);
490497
preserved_update_count += 1;
@@ -1197,7 +1204,7 @@ impl FinalTransaction {
11971204
.writes
11981205
.as_flat()?
11991206
.coalesced_writes()
1200-
.map(|(id, _)| id.tablet_id)
1207+
.map(|update| update.id.tablet_id)
12011208
.collect();
12021209
Self::validate_memory_index_size(
12031210
table_mapping,

0 commit comments

Comments
 (0)