Skip to content

Commit d6c66fd

Browse files
authored
Remove unnecessary clones and use multi threads in default processor. (#512)
1 parent c42d401 commit d6c66fd

File tree

2 files changed

+32
-9
lines changed

2 files changed

+32
-9
lines changed

rust/processor/src/db/common/models/default_models/transactions.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use aptos_protos::transaction::v1::{
2525
};
2626
use bigdecimal::BigDecimal;
2727
use field_count::FieldCount;
28+
use rayon::prelude::*;
2829
use serde::{Deserialize, Serialize};
2930

3031
#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
@@ -330,9 +331,13 @@ impl Transaction {
330331
let mut wscs = vec![];
331332
let mut wsc_details = vec![];
332333

333-
for txn in transactions {
334-
let (txn, block_metadata, mut wsc_list, mut wsc_detail_list) =
335-
Self::from_transaction(txn);
334+
let processed_txns: Vec<_> = transactions
335+
.par_iter()
336+
.map(Self::from_transaction)
337+
.collect();
338+
339+
for processed_txn in processed_txns {
340+
let (txn, block_metadata, mut wsc_list, mut wsc_detail_list) = processed_txn;
336341
txns.push(txn);
337342
if let Some(a) = block_metadata {
338343
block_metadata_txns.push(a);

rust/processor/src/processors/default_processor.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,19 @@ impl ProcessorTrait for DefaultProcessor {
355355
)
356356
.await;
357357

358+
// These vectors could be super large and take a lot of time to drop, move to background to
359+
// make it faster.
360+
tokio::task::spawn(async move {
361+
drop(txns);
362+
drop(block_metadata_transactions);
363+
drop(write_set_changes);
364+
drop(move_modules);
365+
drop(move_resources);
366+
drop(table_items);
367+
drop(current_table_items);
368+
drop(table_metadata);
369+
});
370+
358371
let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64();
359372
match tx_result {
360373
Ok(_) => Ok(ProcessingResult::DefaultProcessingResult(
@@ -403,7 +416,7 @@ fn process_transactions(
403416
TransactionModel::from_transactions(&transactions);
404417
let mut block_metadata_transactions = vec![];
405418
for block_metadata_txn in block_metadata_txns {
406-
block_metadata_transactions.push(block_metadata_txn.clone());
419+
block_metadata_transactions.push(block_metadata_txn);
407420
}
408421
let mut move_modules = vec![];
409422
let mut move_resources = vec![];
@@ -412,19 +425,19 @@ fn process_transactions(
412425
let mut table_metadata = AHashMap::new();
413426
for detail in wsc_details {
414427
match detail {
415-
WriteSetChangeDetail::Module(module) => move_modules.push(module.clone()),
416-
WriteSetChangeDetail::Resource(resource) => move_resources.push(resource.clone()),
428+
WriteSetChangeDetail::Module(module) => move_modules.push(module),
429+
WriteSetChangeDetail::Resource(resource) => move_resources.push(resource),
417430
WriteSetChangeDetail::Table(item, current_item, metadata) => {
418-
table_items.push(item.clone());
431+
table_items.push(item);
419432
current_table_items.insert(
420433
(
421434
current_item.table_handle.clone(),
422435
current_item.key_hash.clone(),
423436
),
424-
current_item.clone(),
437+
current_item,
425438
);
426439
if let Some(meta) = metadata {
427-
table_metadata.insert(meta.handle.clone(), meta.clone());
440+
table_metadata.insert(meta.handle.clone(), meta);
428441
}
429442
},
430443
}
@@ -440,6 +453,11 @@ fn process_transactions(
440453
.sort_by(|a, b| (&a.table_handle, &a.key_hash).cmp(&(&b.table_handle, &b.key_hash)));
441454
table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle));
442455

456+
println!(
457+
"table_items: {}, current_table_items: {}",
458+
table_items.len(),
459+
current_table_items.len()
460+
);
443461
if flags.contains(TableFlags::MOVE_RESOURCES) {
444462
move_resources.clear();
445463
}

0 commit comments

Comments
 (0)