Skip to content

Commit 836d83a

Browse files
authored
Simplify account_transaction_processor and use rayon to speed it up. (#495)
1 parent 82ae5ff commit 836d83a

File tree

3 files changed

+47
-85
lines changed

3 files changed

+47
-85
lines changed

rust/processor/src/db/common/models/account_transaction_models/account_transactions.rs

Lines changed: 28 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,8 @@ use crate::{
1313
schema::account_transactions,
1414
utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address},
1515
};
16-
use ahash::AHashMap;
17-
use aptos_protos::transaction::v1::{
18-
transaction::TxnData, write_set_change::Change, DeleteResource, Event, Transaction,
19-
WriteResource,
20-
};
16+
use ahash::AHashSet;
17+
use aptos_protos::transaction::v1::{transaction::TxnData, write_set_change::Change, Transaction};
2118
use field_count::FieldCount;
2219
use serde::{Deserialize, Serialize};
2320

@@ -39,7 +36,7 @@ impl AccountTransaction {
3936
/// We will also consider transactions that the account signed or is part of a multi sig / multi agent.
4037
/// TODO: recursively find the parent account of an object
4138
/// TODO: include table items in the detection path
42-
pub fn from_transaction(transaction: &Transaction) -> AHashMap<AccountTransactionPK, Self> {
39+
pub fn get_accounts(transaction: &Transaction) -> AHashSet<String> {
4340
let txn_version = transaction.version as i64;
4441
let txn_data = match transaction.txn_data.as_ref() {
4542
Some(data) => data,
@@ -51,7 +48,7 @@ impl AccountTransaction {
5148
transaction_version = transaction.version,
5249
"Transaction data doesn't exist",
5350
);
54-
return AHashMap::new();
51+
return AHashSet::new();
5552
},
5653
};
5754
let transaction_info = transaction.info.as_ref().unwrap_or_else(|| {
@@ -73,82 +70,43 @@ impl AccountTransaction {
7370
TxnData::BlockMetadata(inner) => (&inner.events, vec![]),
7471
TxnData::Validator(inner) => (&inner.events, vec![]),
7572
_ => {
76-
return AHashMap::new();
73+
return AHashSet::new();
7774
},
7875
};
79-
let mut account_transactions = AHashMap::new();
80-
for sig in &signatures {
81-
account_transactions.insert((sig.signer.clone(), txn_version), Self {
82-
transaction_version: txn_version,
83-
account_address: sig.signer.clone(),
84-
});
76+
let mut accounts = AHashSet::new();
77+
for sig in signatures {
78+
accounts.insert(sig.signer);
8579
}
8680
for event in events {
87-
account_transactions.extend(Self::from_event(event, txn_version));
81+
// Record event account address. We don't really have to worry about objects here
82+
// because it'll be taken care of in the resource section.
83+
accounts.insert(standardize_address(
84+
event.key.as_ref().unwrap().account_address.as_str(),
85+
));
8886
}
8987
for wsc in wscs {
9088
match wsc.change.as_ref().unwrap() {
9189
Change::DeleteResource(res) => {
92-
account_transactions
93-
.extend(Self::from_delete_resource(res, txn_version).unwrap());
90+
// Record resource account.
91+
// TODO: If the resource is an object, then we need to look for the latest
92+
// owner. This isn't really possible right now given we have parallel threads
93+
// so it'll be very difficult to ensure that we have the correct latest owner.
94+
accounts.insert(standardize_address(res.address.as_str()));
9495
},
9596
Change::WriteResource(res) => {
96-
account_transactions
97-
.extend(Self::from_write_resource(res, txn_version).unwrap());
97+
// Record resource account. If the resource is an object, then we record the
98+
// owner as well.
99+
// This handles partial deletes as well.
100+
accounts.insert(standardize_address(res.address.as_str()));
101+
if let Some(inner) =
102+
&ObjectWithMetadata::from_write_resource(res, txn_version).unwrap()
103+
{
104+
accounts.insert(inner.object_core.get_owner_address());
105+
}
98106
},
99107
_ => {},
100108
}
101109
}
102-
account_transactions
103-
}
104-
105-
/// Base case, record event account address. We don't really have to worry about
106-
/// objects here because it'll be taken care of in the resource section
107-
fn from_event(event: &Event, txn_version: i64) -> AHashMap<AccountTransactionPK, Self> {
108-
let account_address =
109-
standardize_address(event.key.as_ref().unwrap().account_address.as_str());
110-
AHashMap::from([((account_address.clone(), txn_version), Self {
111-
transaction_version: txn_version,
112-
account_address,
113-
})])
114-
}
115-
116-
/// Base case, record resource account. If the resource is an object, then we record the owner as well
117-
/// This handles partial deletes as well
118-
fn from_write_resource(
119-
write_resource: &WriteResource,
120-
txn_version: i64,
121-
) -> anyhow::Result<AHashMap<AccountTransactionPK, Self>> {
122-
let mut result = AHashMap::new();
123-
let account_address = standardize_address(write_resource.address.as_str());
124-
result.insert((account_address.clone(), txn_version), Self {
125-
transaction_version: txn_version,
126-
account_address,
127-
});
128-
if let Some(inner) = &ObjectWithMetadata::from_write_resource(write_resource, txn_version)?
129-
{
130-
result.insert((inner.object_core.get_owner_address(), txn_version), Self {
131-
transaction_version: txn_version,
132-
account_address: inner.object_core.get_owner_address(),
133-
});
134-
}
135-
Ok(result)
136-
}
137-
138-
/// Base case, record resource account.
139-
/// TODO: If the resource is an object, then we need to look for the latest owner. This isn't really possible
140-
/// right now given we have parallel threads so it'll be very difficult to ensure that we have the correct
141-
/// latest owner
142-
fn from_delete_resource(
143-
delete_resource: &DeleteResource,
144-
txn_version: i64,
145-
) -> anyhow::Result<AHashMap<AccountTransactionPK, Self>> {
146-
let mut result = AHashMap::new();
147-
let account_address = standardize_address(delete_resource.address.as_str());
148-
result.insert((account_address.clone(), txn_version), Self {
149-
transaction_version: txn_version,
150-
account_address,
151-
});
152-
Ok(result)
110+
accounts
153111
}
154112
}

rust/processor/src/processors/account_transactions_processor.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use anyhow::bail;
1313
use aptos_protos::transaction::v1::Transaction;
1414
use async_trait::async_trait;
1515
use diesel::{pg::Pg, query_builder::QueryFragment};
16+
use rayon::prelude::*;
1617
use std::fmt::Debug;
1718
use tracing::error;
1819

@@ -101,20 +102,23 @@ impl ProcessorTrait for AccountTransactionsProcessor {
101102
let processing_start = std::time::Instant::now();
102103
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();
103104

104-
let mut account_transactions = AHashMap::new();
105-
106-
for txn in &transactions {
107-
account_transactions.extend(AccountTransaction::from_transaction(txn));
108-
}
109-
let mut account_transactions = account_transactions
110-
.into_values()
111-
.collect::<Vec<AccountTransaction>>();
112-
113-
// Sort by PK
114-
account_transactions.sort_by(|a, b| {
115-
(&a.transaction_version, &a.account_address)
116-
.cmp(&(&b.transaction_version, &b.account_address))
117-
});
105+
let account_transactions: Vec<_> = transactions
106+
.into_par_iter()
107+
.map(|txn| {
108+
let transaction_version = txn.version as i64;
109+
let accounts = AccountTransaction::get_accounts(&txn);
110+
accounts
111+
.into_iter()
112+
.map(|account_address| AccountTransaction {
113+
transaction_version,
114+
account_address,
115+
})
116+
.collect()
117+
})
118+
.collect::<Vec<Vec<_>>>()
119+
.into_iter()
120+
.flatten()
121+
.collect();
118122

119123
let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
120124
let db_insertion_start = std::time::Instant::now();

rust/processor/src/worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ impl Worker {
562562

563563
let num_processed = (last_txn_version - first_txn_version) + 1;
564564

565-
debug!(
565+
info!(
566566
processor_name = processor_name,
567567
service_type = PROCESSOR_SERVICE_TYPE,
568568
first_txn_version,

0 commit comments

Comments
 (0)