Skip to content

Commit 94a103c

Browse files
authored
Merge pull request #1672 from input-output-hk/djo/1657/consistent_transactions_sort
Consistent transactions sort, use block number then transaction hash instead of row id
2 parents 936ffe2 + 24d5c95 commit 94a103c

File tree

13 files changed

+329
-134
lines changed

13 files changed

+329
-134
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-aggregator/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-aggregator"
3-
version = "0.5.5"
3+
version = "0.5.6"
44
description = "A Mithril Aggregator server"
55
authors = { workspace = true }
66
edition = { workspace = true }
@@ -13,6 +13,10 @@ repository = { workspace = true }
1313
name = "cardano_transactions_import"
1414
harness = false
1515

16+
[[bench]]
17+
name = "cardano_transactions_get"
18+
harness = false
19+
1620
[dependencies]
1721
anyhow = "1.0.79"
1822
async-trait = "0.1.77"
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use std::sync::Arc;
2+
3+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
4+
use sqlite::ConnectionThreadSafe;
5+
6+
use mithril_aggregator::{
7+
database::repository::CardanoTransactionRepository, services::TransactionStore,
8+
};
9+
use mithril_common::{entities::CardanoTransaction, test_utils::TempDir};
10+
use mithril_persistence::sqlite::ConnectionBuilder;
11+
12+
fn cardano_tx_db_connection(db_file_name: &str) -> ConnectionThreadSafe {
13+
let db_path =
14+
TempDir::create("aggregator_benches", "bench_get_transactions").join(db_file_name);
15+
16+
if db_path.exists() {
17+
std::fs::remove_file(db_path.clone()).unwrap();
18+
}
19+
20+
ConnectionBuilder::open_file(&db_path)
21+
.with_migrations(
22+
mithril_aggregator::database::cardano_transaction_migration::get_migrations(),
23+
)
24+
.build()
25+
.unwrap()
26+
}
27+
28+
fn generate_transactions(nb_transactions: usize) -> Vec<CardanoTransaction> {
29+
// Note: we irrealistically generate transactions where each are on a different block.
30+
// This is to trick the repository `get_transactions_in_range` method to read the expected number
31+
// of transactions.
32+
(0..nb_transactions)
33+
.map(|i| {
34+
CardanoTransaction::new(
35+
format!("tx_hash-{}", i),
36+
i as u64,
37+
i as u64 * 100,
38+
format!("block_hash-{}", i),
39+
i as u64 + 1,
40+
)
41+
})
42+
.collect()
43+
}
44+
45+
async fn init_db(nb_transaction_in_db: usize) -> CardanoTransactionRepository {
46+
println!("Generating a db with {nb_transaction_in_db} transactions, one per block ...");
47+
let transactions = generate_transactions(nb_transaction_in_db);
48+
let connection = Arc::new(cardano_tx_db_connection(&format!(
49+
"cardano_tx-{nb_transaction_in_db}.db",
50+
)));
51+
let repository = CardanoTransactionRepository::new(connection);
52+
repository.store_transactions(transactions).await.unwrap();
53+
54+
repository
55+
}
56+
57+
fn run_bench(c: &mut Criterion, nb_transaction_in_db: usize) {
58+
let runtime = tokio::runtime::Runtime::new().unwrap();
59+
let repository = runtime.block_on(async { init_db(nb_transaction_in_db).await });
60+
61+
let mut group = c.benchmark_group(format!(
62+
"Get transactions - {nb_transaction_in_db} tx in db"
63+
));
64+
for max_block_number in [100, 10_000, 100_000, 1_000_000] {
65+
group.bench_with_input(
66+
BenchmarkId::from_parameter(format!(
67+
"get_transactions_in_range(0..{max_block_number})"
68+
)),
69+
&max_block_number,
70+
|b, &max_block_number| {
71+
b.to_async(&runtime).iter(|| async {
72+
let _transactions = repository
73+
.get_transactions_in_range(0..max_block_number)
74+
.await
75+
.unwrap();
76+
});
77+
},
78+
);
79+
}
80+
group.finish();
81+
}
82+
83+
fn bench_get_transactions(c: &mut Criterion) {
84+
// Two rounds of benchmarks: one with 1M transactions in the db, and one with 10M transactions.
85+
// Each time the number of transactions to read is 100, 10_000, 100_000, 1_000_000.
86+
run_bench(c, 1_000_000);
87+
run_bench(c, 10_000_000);
88+
}
89+
90+
criterion_group! {
91+
name = benches;
92+
config = Criterion::default().sample_size(20);
93+
targets = bench_get_transactions
94+
}
95+
criterion_main!(benches);

mithril-aggregator/src/database/cardano_transaction_migration.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,20 @@ create table block_range_root (
7070
merkle_root text not null,
7171
primary key (start, end)
7272
);
73+
"#,
74+
),
75+
// Migration 6
76+
// Add composite index on `block_number/transaction_hash` column of `cardano_tx` table
77+
// Truncate `block_range_root` table after changing the order of retrieval of the transactions
78+
SqlMigration::new(
79+
6,
80+
r#"
81+
create index block_number_transaction_hash_index on cardano_tx(block_number, transaction_hash);
82+
83+
-- remove all data from the block_range_root table since the order used to create them has changed
84+
delete from block_range_root;
85+
86+
vacuum;
7387
"#,
7488
),
7589
]

mithril-aggregator/src/database/provider/cardano_transaction/get_cardano_transaction.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@ use std::ops::Range;
33
use sqlite::Value;
44

55
use mithril_common::entities::{BlockNumber, BlockRange, TransactionHash};
6+
#[cfg(test)]
7+
use mithril_persistence::sqlite::GetAllCondition;
68
use mithril_persistence::sqlite::{
79
Provider, SourceAlias, SqLiteEntity, SqliteConnection, WhereCondition,
810
};
911

10-
#[cfg(test)]
11-
use mithril_persistence::sqlite::GetAllCondition;
12-
1312
use crate::database::record::CardanoTransactionRecord;
1413

1514
/// Simple queries to retrieve [CardanoTransaction] from the sqlite database.
@@ -87,7 +86,7 @@ impl<'client> Provider<'client> for GetCardanoTransactionProvider<'client> {
8786
let aliases = SourceAlias::new(&[("{:cardano_tx:}", "cardano_tx")]);
8887
let projection = Self::Entity::get_projection().expand(aliases);
8988

90-
format!("select {projection} from cardano_tx where {condition} order by rowid")
89+
format!("select {projection} from cardano_tx where {condition} order by block_number, transaction_hash")
9190
}
9291
}
9392

mithril-aggregator/src/database/repository/cardano_transaction_repository.rs

Lines changed: 91 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,6 @@ use crate::database::provider::{
2222
use crate::database::record::{BlockRangeRootRecord, CardanoTransactionRecord};
2323
use crate::services::{TransactionStore, TransactionsRetriever};
2424

25-
#[cfg(test)]
26-
use mithril_persistence::sqlite::GetAllProvider;
27-
2825
/// ## Cardano transaction repository
2926
///
3027
/// This is a business oriented layer to perform actions on the database through
@@ -177,12 +174,21 @@ impl CardanoTransactionRepository {
177174
}
178175
}
179176

180-
#[cfg(test)]
181-
pub(crate) async fn get_all(&self) -> StdResult<Vec<CardanoTransaction>> {
182-
let provider = GetCardanoTransactionProvider::new(&self.connection);
183-
let records = provider.get_all()?;
177+
/// Retrieve all the Block Range Roots in database up to the given end block number excluded.
178+
pub async fn retrieve_block_range_roots_up_to(
179+
&self,
180+
end_block_number: BlockNumber,
181+
) -> StdResult<Box<dyn Iterator<Item = (BlockRange, MKTreeNode)>>> {
182+
let provider = GetBlockRangeRootProvider::new(&self.connection);
183+
let filters = provider.get_up_to_block_number_condition(end_block_number);
184+
let block_range_roots = provider.find(filters)?;
185+
let iterator = block_range_roots
186+
.into_iter()
187+
.map(|record| -> (BlockRange, MKTreeNode) { record.into() })
188+
.collect::<Vec<_>>() // TODO: remove this collect when we should ba able return the iterator directly
189+
.into_iter();
184190

185-
Ok(records.map(|record| record.into()).collect())
191+
Ok(Box::new(iterator))
186192
}
187193
}
188194

@@ -195,6 +201,13 @@ pub mod test_extensions {
195201
use super::*;
196202

197203
impl CardanoTransactionRepository {
204+
pub async fn get_all(&self) -> StdResult<Vec<CardanoTransaction>> {
205+
let provider = GetCardanoTransactionProvider::new(&self.connection);
206+
let records = provider.get_all()?;
207+
208+
Ok(records.map(|record| record.into()).collect())
209+
}
210+
198211
pub fn get_all_block_range_root(&self) -> StdResult<Vec<BlockRangeRootRecord>> {
199212
let provider = GetBlockRangeRootProvider::new(&self.connection);
200213
let records = provider.get_all()?;
@@ -233,14 +246,6 @@ impl TransactionStore for CardanoTransactionRepository {
233246
}
234247
}
235248

236-
async fn get_up_to(&self, beacon: ImmutableFileNumber) -> StdResult<Vec<CardanoTransaction>> {
237-
self.get_transactions_up_to(beacon).await.map(|v| {
238-
v.into_iter()
239-
.map(|record| record.into())
240-
.collect::<Vec<CardanoTransaction>>()
241-
})
242-
}
243-
244249
async fn store_transactions(&self, transactions: Vec<CardanoTransaction>) -> StdResult<()> {
245250
const DB_TRANSACTION_SIZE: usize = 100000;
246251
for transactions_in_db_transaction_chunk in transactions.chunks(DB_TRANSACTION_SIZE) {
@@ -348,16 +353,8 @@ impl BlockRangeRootRetriever for CardanoTransactionRepository {
348353
.get_highest_block_number_for_immutable_number(up_to_beacon)
349354
.await?
350355
.unwrap_or(0);
351-
let provider = GetBlockRangeRootProvider::new(&self.connection);
352-
let filters = provider.get_up_to_block_number_condition(block_number);
353-
let block_range_roots = provider.find(filters)?;
354-
let block_range_roots = block_range_roots
355-
.into_iter()
356-
.map(|record| -> (BlockRange, MKTreeNode) { record.into() })
357-
.collect::<Vec<_>>() // TODO: remove this collect when we should ba able return the iterator directly
358-
.into_iter();
359356

360-
Ok(Box::new(block_range_roots))
357+
self.retrieve_block_range_roots_up_to(block_number).await
361358
}
362359
}
363360

@@ -815,4 +812,73 @@ mod tests {
815812
record
816813
);
817814
}
815+
816+
#[tokio::test]
817+
async fn repository_retrieve_block_range_roots_up_to() {
818+
let connection = Arc::new(cardano_tx_db_connection().unwrap());
819+
let repository = CardanoTransactionRepository::new(connection);
820+
let block_range_roots = vec![
821+
(
822+
BlockRange::from_block_number(15),
823+
MKTreeNode::from_hex("AAAA").unwrap(),
824+
),
825+
(
826+
BlockRange::from_block_number(30),
827+
MKTreeNode::from_hex("BBBB").unwrap(),
828+
),
829+
(
830+
BlockRange::from_block_number(45),
831+
MKTreeNode::from_hex("CCCC").unwrap(),
832+
),
833+
];
834+
repository
835+
.store_block_range_roots(block_range_roots.clone())
836+
.await
837+
.unwrap();
838+
839+
// Retrieve with a block far higher than the highest block range - should return all
840+
{
841+
let retrieved_block_ranges = repository
842+
.retrieve_block_range_roots_up_to(1000)
843+
.await
844+
.unwrap();
845+
assert_eq!(
846+
block_range_roots,
847+
retrieved_block_ranges.collect::<Vec<_>>()
848+
);
849+
}
850+
// Retrieve with a block bellow than the smallest block range - should return none
851+
{
852+
let retrieved_block_ranges = repository
853+
.retrieve_block_range_roots_up_to(2)
854+
.await
855+
.unwrap();
856+
assert_eq!(
857+
Vec::<(BlockRange, MKTreeNode)>::new(),
858+
retrieved_block_ranges.collect::<Vec<_>>()
859+
);
860+
}
861+
// The given block is matched to the end (excluded) - should return the first of the three
862+
{
863+
let retrieved_block_ranges = repository
864+
.retrieve_block_range_roots_up_to(45)
865+
.await
866+
.unwrap();
867+
assert_eq!(
868+
vec![block_range_roots[0].clone()],
869+
retrieved_block_ranges.collect::<Vec<_>>()
870+
);
871+
}
872+
// Right after the end of the second block range - should return first two of the three
873+
{
874+
let retrieved_block_ranges = repository
875+
.retrieve_block_range_roots_up_to(46)
876+
.await
877+
.unwrap();
878+
assert_eq!(
879+
block_range_roots[0..=1].to_vec(),
880+
retrieved_block_ranges.collect::<Vec<_>>()
881+
);
882+
}
883+
}
818884
}

mithril-aggregator/src/services/cardano_transactions_importer.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,6 @@ pub trait TransactionStore: Send + Sync {
1919
/// Get the highest known transaction beacon
2020
async fn get_highest_beacon(&self) -> StdResult<Option<ImmutableFileNumber>>;
2121

22-
/// Get stored transactions up to the given beacon
23-
async fn get_up_to(
24-
&self,
25-
immutable_file_number: ImmutableFileNumber,
26-
) -> StdResult<Vec<CardanoTransaction>>;
27-
2822
/// Store list of transactions
2923
async fn store_transactions(&self, transactions: Vec<CardanoTransaction>) -> StdResult<()>;
3024

mithril-signer/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-signer"
3-
version = "0.2.133"
3+
version = "0.2.134"
44
description = "A Mithril Signer"
55
authors = { workspace = true }
66
edition = { workspace = true }

0 commit comments

Comments
 (0)