Skip to content

Commit b9e810f

Browse files
authored
Merge pull request #1619 from input-output-hk/ensemble/1591/incremental-storage-of-cardano-tx
Incremental storage of Cardano transactions
2 parents 336c070 + ed4d46d commit b9e810f

25 files changed

+1076
-256
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ As a minor extension, we have adopted a slightly different versioning convention
1313
- Certificate chain structure has been modified to remove coupling with immutable file number.
1414
- Client needs to be updated to verify certificate chain.
1515

16+
- Support incremental import for Cardano Transactions instead of scanning the whole immutable database for every signing round.
17+
1618
- Crates versions:
1719

1820
| Crate | Version |

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-aggregator/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-aggregator"
3-
version = "0.4.53"
3+
version = "0.4.54"
44
description = "A Mithril Aggregator server"
55
authors = { workspace = true }
66
edition = { workspace = true }

mithril-aggregator/src/database/cardano_transaction_migration.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,13 @@ alter table cardano_tx add column block_hash text not null;
5151
vacuum;
5252
"#,
5353
),
54+
// Migration 4
55+
// Add index on `block_number` column of `cardano_tx` table
56+
SqlMigration::new(
57+
4,
58+
r#"
59+
create index block_number_index on cardano_tx(block_number);
60+
"#,
61+
),
5462
]
5563
}

mithril-aggregator/src/database/provider/cardano_transaction.rs

Lines changed: 97 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use mithril_common::{
33
BlockHash, BlockNumber, CardanoDbBeacon, CardanoTransaction, ImmutableFileNumber,
44
SlotNumber, TransactionHash,
55
},
6-
signable_builder::TransactionStore,
76
StdResult,
87
};
98
use mithril_persistence::sqlite::{
@@ -16,7 +15,7 @@ use async_trait::async_trait;
1615
use sqlite::{Row, Value};
1716
use std::{iter::repeat, sync::Arc};
1817

19-
use crate::services::TransactionsRetriever;
18+
use crate::services::{TransactionStore, TransactionsRetriever};
2019

2120
/// Cardano Transaction record is the representation of a cardano transaction.
2221
#[derive(Debug, PartialEq, Clone)]
@@ -126,11 +125,11 @@ impl<'client> CardanoTransactionProvider<'client> {
126125

127126
pub(crate) fn get_transaction_up_to_beacon_condition(
128127
&self,
129-
beacon: &CardanoDbBeacon,
128+
beacon: ImmutableFileNumber,
130129
) -> WhereCondition {
131130
WhereCondition::new(
132131
"immutable_file_number <= ?*",
133-
vec![Value::Integer(beacon.immutable_file_number as i64)],
132+
vec![Value::Integer(beacon as i64)],
134133
)
135134
}
136135
}
@@ -236,7 +235,7 @@ impl CardanoTransactionRepository {
236235
/// chronological order.
237236
pub async fn get_transactions_up_to(
238237
&self,
239-
beacon: &CardanoDbBeacon,
238+
beacon: ImmutableFileNumber,
240239
) -> StdResult<Vec<CardanoTransactionRecord>> {
241240
let provider = CardanoTransactionProvider::new(&self.connection);
242241
let filters = provider.get_transaction_up_to_beacon_condition(beacon);
@@ -280,12 +279,15 @@ impl CardanoTransactionRepository {
280279
}
281280

282281
/// Create new [CardanoTransactionRecord]s in the database.
283-
pub async fn create_transactions(
282+
pub async fn create_transactions<T: Into<CardanoTransactionRecord>>(
284283
&self,
285-
transactions: Vec<CardanoTransactionRecord>,
284+
transactions: Vec<T>,
286285
) -> StdResult<Vec<CardanoTransactionRecord>> {
286+
let records: Vec<CardanoTransactionRecord> =
287+
transactions.into_iter().map(|tx| tx.into()).collect();
288+
287289
let provider = InsertCardanoTransactionProvider::new(&self.connection);
288-
let filters = provider.get_insert_many_condition(transactions)?;
290+
let filters = provider.get_insert_many_condition(records)?;
289291
let cursor = provider.find(filters)?;
290292

291293
Ok(cursor.collect())
@@ -294,25 +296,62 @@ impl CardanoTransactionRepository {
294296

295297
#[async_trait]
296298
impl TransactionStore for CardanoTransactionRepository {
297-
async fn store_transactions(&self, transactions: &[CardanoTransaction]) -> StdResult<()> {
298-
let records: Vec<CardanoTransactionRecord> =
299-
transactions.iter().map(|tx| tx.to_owned().into()).collect();
300-
self.create_transactions(records)
301-
.await
302-
.with_context(|| "CardanoTransactionRepository can not store transactions")?;
299+
async fn get_highest_beacon(&self) -> StdResult<Option<ImmutableFileNumber>> {
300+
let sql = "select max(immutable_file_number) as highest from cardano_tx;";
301+
match self
302+
.connection
303+
.prepare(sql)
304+
.with_context(|| {
305+
format!(
306+
"Prepare query error: SQL=`{}`",
307+
&sql.replace('\n', " ").trim()
308+
)
309+
})?
310+
.iter()
311+
.next()
312+
{
313+
None => Ok(None),
314+
Some(row) => {
315+
let highest = row?.read::<Option<i64>, _>(0);
316+
highest
317+
.map(u64::try_from)
318+
.transpose()
319+
.with_context(||
320+
format!("Integer field max(immutable_file_number) (value={highest:?}) is incompatible with u64 representation.")
321+
)
322+
}
323+
}
324+
}
325+
326+
async fn get_up_to(&self, beacon: ImmutableFileNumber) -> StdResult<Vec<CardanoTransaction>> {
327+
self.get_transactions_up_to(beacon).await.map(|v| {
328+
v.into_iter()
329+
.map(|record| record.into())
330+
.collect::<Vec<CardanoTransaction>>()
331+
})
332+
}
303333

334+
async fn store_transactions(&self, transactions: Vec<CardanoTransaction>) -> StdResult<()> {
335+
// Chunk transactions to avoid an error when we exceed sqlite binding limitations
336+
for transactions_in_chunk in transactions.chunks(100) {
337+
self.create_transactions(transactions_in_chunk.to_vec())
338+
.await
339+
.with_context(|| "CardanoTransactionRepository can not store transactions")?;
340+
}
304341
Ok(())
305342
}
306343
}
307344

308345
#[async_trait]
309346
impl TransactionsRetriever for CardanoTransactionRepository {
310347
async fn get_up_to(&self, beacon: &CardanoDbBeacon) -> StdResult<Vec<CardanoTransaction>> {
311-
self.get_transactions_up_to(beacon).await.map(|v| {
312-
v.into_iter()
313-
.map(|record| record.into())
314-
.collect::<Vec<CardanoTransaction>>()
315-
})
348+
self.get_transactions_up_to(beacon.immutable_file_number)
349+
.await
350+
.map(|v| {
351+
v.into_iter()
352+
.map(|record| record.into())
353+
.collect::<Vec<CardanoTransaction>>()
354+
})
316355
}
317356
}
318357

@@ -369,10 +408,7 @@ mod tests {
369408
let connection = Connection::open_thread_safe(":memory:").unwrap();
370409
let provider = CardanoTransactionProvider::new(&connection);
371410
let (expr, params) = provider
372-
.get_transaction_up_to_beacon_condition(&CardanoDbBeacon {
373-
immutable_file_number: 2309,
374-
..CardanoDbBeacon::default()
375-
})
411+
.get_transaction_up_to_beacon_condition(2309)
376412
.expand();
377413

378414
assert_eq!("immutable_file_number <= ?1".to_string(), expr);
@@ -522,7 +558,7 @@ mod tests {
522558
CardanoTransaction::new("tx-hash-456", 11, 51, "block-hash-456", 100),
523559
];
524560
repository
525-
.store_transactions(&cardano_transactions)
561+
.create_transactions(cardano_transactions)
526562
.await
527563
.unwrap();
528564

@@ -558,8 +594,8 @@ mod tests {
558594
let connection = get_connection().await;
559595
let repository = CardanoTransactionRepository::new(connection.clone());
560596

561-
let cardano_transactions: Vec<CardanoTransaction> = (20..=40)
562-
.map(|i| CardanoTransaction {
597+
let cardano_transactions: Vec<CardanoTransactionRecord> = (20..=40)
598+
.map(|i| CardanoTransactionRecord {
563599
transaction_hash: format!("tx-hash-{i}"),
564600
block_number: i % 10,
565601
slot_number: i * 100,
@@ -568,33 +604,18 @@ mod tests {
568604
})
569605
.collect();
570606
repository
571-
.store_transactions(&cardano_transactions)
572-
.await
573-
.unwrap();
574-
575-
let transaction_result = repository
576-
.get_up_to(&CardanoDbBeacon::new("".to_string(), 1, 34))
607+
.create_transactions(cardano_transactions.clone())
577608
.await
578609
.unwrap();
579610

611+
let transaction_result = repository.get_transactions_up_to(34).await.unwrap();
580612
assert_eq!(cardano_transactions[0..=14].to_vec(), transaction_result);
581613

582-
let transaction_result = repository
583-
.get_up_to(&CardanoDbBeacon::new("".to_string(), 1, 300))
584-
.await
585-
.unwrap();
586-
587-
assert_eq!(
588-
cardano_transactions.into_iter().collect::<Vec<_>>(),
589-
transaction_result
590-
);
591-
592-
let transaction_result = repository
593-
.get_up_to(&CardanoDbBeacon::new("".to_string(), 1, 19))
594-
.await
595-
.unwrap();
614+
let transaction_result = repository.get_transactions_up_to(300).await.unwrap();
615+
assert_eq!(cardano_transactions.clone(), transaction_result);
596616

597-
assert_eq!(Vec::<CardanoTransaction>::new(), transaction_result);
617+
let transaction_result = repository.get_transactions_up_to(19).await.unwrap();
618+
assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
598619
}
599620

600621
#[tokio::test]
@@ -607,7 +628,7 @@ mod tests {
607628
CardanoTransaction::new("tx-hash-456".to_string(), 11, 51, "block-hash-456", 100),
608629
];
609630
repository
610-
.store_transactions(&cardano_transactions)
631+
.create_transactions(cardano_transactions.clone())
611632
.await
612633
.unwrap();
613634

@@ -638,7 +659,7 @@ mod tests {
638659
99,
639660
)];
640661
repository
641-
.store_transactions(&cardano_transactions)
662+
.create_transactions(cardano_transactions)
642663
.await
643664
.unwrap();
644665

@@ -655,4 +676,31 @@ mod tests {
655676
transaction_result
656677
);
657678
}
679+
680+
#[tokio::test]
681+
async fn repository_get_highest_beacon_without_transactions_in_db() {
682+
let connection = get_connection().await;
683+
let repository = CardanoTransactionRepository::new(connection.clone());
684+
685+
let highest_beacon = repository.get_highest_beacon().await.unwrap();
686+
assert_eq!(None, highest_beacon);
687+
}
688+
689+
#[tokio::test]
690+
async fn repository_get_highest_beacon_with_transactions_in_db() {
691+
let connection = get_connection().await;
692+
let repository = CardanoTransactionRepository::new(connection.clone());
693+
694+
let cardano_transactions = vec![
695+
CardanoTransaction::new("tx-hash-123".to_string(), 10, 50, "block-hash-123", 50),
696+
CardanoTransaction::new("tx-hash-456".to_string(), 11, 51, "block-hash-456", 100),
697+
];
698+
repository
699+
.create_transactions(cardano_transactions)
700+
.await
701+
.unwrap();
702+
703+
let highest_beacon = repository.get_highest_beacon().await.unwrap();
704+
assert_eq!(Some(100), highest_beacon);
705+
}
658706
}

mithril-aggregator/src/database/provider/test_helper.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use uuid::Uuid;
66
use mithril_common::{entities::Epoch, test_utils::fake_keys, StdResult};
77
use mithril_persistence::sqlite::SqliteConnection;
88

9-
use crate::database::{migration::get_migrations, provider::UpdateSingleSignatureRecordProvider};
9+
use crate::database::provider::UpdateSingleSignatureRecordProvider;
1010

1111
use super::SingleSignatureRecord;
1212

@@ -45,7 +45,15 @@ pub fn disable_foreign_key_support(connection: &SqliteConnection) -> StdResult<(
4545
}
4646

4747
pub fn apply_all_migrations_to_db(connection: &SqliteConnection) -> StdResult<()> {
48-
for migration in get_migrations() {
48+
for migration in crate::database::migration::get_migrations() {
49+
connection.execute(&migration.alterations)?;
50+
}
51+
52+
Ok(())
53+
}
54+
55+
pub fn apply_all_transactions_db_migrations(connection: &SqliteConnection) -> StdResult<()> {
56+
for migration in crate::database::cardano_transaction_migration::get_migrations() {
4957
connection.execute(&migration.alterations)?;
5058
}
5159

mithril-aggregator/src/dependency_injection/builder.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use mithril_common::{
3333
signable_builder::{
3434
CardanoImmutableFilesFullSignableBuilder, CardanoTransactionsSignableBuilder,
3535
MithrilSignableBuilderService, MithrilStakeDistributionSignableBuilder,
36-
SignableBuilderService, TransactionStore,
36+
SignableBuilderService,
3737
},
3838
TimePointProvider, TimePointProviderImpl,
3939
};
@@ -57,10 +57,11 @@ use crate::{
5757
event_store::{EventMessage, EventStore, TransmitterService},
5858
http_server::routes::router,
5959
services::{
60-
CertifierService, MessageService, MithrilCertifierService, MithrilEpochService,
61-
MithrilMessageService, MithrilProverService, MithrilSignedEntityService,
62-
MithrilStakeDistributionService, MithrilTickerService, ProverService, SignedEntityService,
63-
StakeDistributionService, TickerService,
60+
CardanoTransactionsImporter, CertifierService, MessageService, MithrilCertifierService,
61+
MithrilEpochService, MithrilMessageService, MithrilProverService,
62+
MithrilSignedEntityService, MithrilStakeDistributionService, MithrilTickerService,
63+
ProverService, SignedEntityService, StakeDistributionService, TickerService,
64+
TransactionStore,
6465
},
6566
tools::{CExplorerSignerRetriever, GcpFileUploader, GenesisToolsDependency, SignersImporter},
6667
AggregatorConfig, AggregatorRunner, AggregatorRuntime, CertificatePendingStore,
@@ -1079,10 +1080,16 @@ impl DependenciesBuilder {
10791080
&self.configuration.db_directory,
10801081
self.get_logger().await?,
10811082
));
1082-
let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::new(
1083+
let transactions_importer = Arc::new(CardanoTransactionsImporter::new(
10831084
self.get_transaction_parser().await?,
10841085
self.get_transaction_store().await?,
10851086
&self.configuration.db_directory,
1087+
// Rescan the last immutable when importing transactions, it may have been partially imported
1088+
Some(1),
1089+
self.get_logger().await?,
1090+
));
1091+
let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::new(
1092+
transactions_importer,
10861093
self.get_logger().await?,
10871094
));
10881095
let signable_builder_service = Arc::new(MithrilSignableBuilderService::new(

0 commit comments

Comments
 (0)