Skip to content

Commit fd345f4

Browse files
committed
Synchronise TransactionsImporter change from signer to aggregator
1 parent 580b38b commit fd345f4

File tree

5 files changed

+375
-68
lines changed

5 files changed

+375
-68
lines changed

mithril-aggregator/src/database/provider/cardano_transaction.rs

Lines changed: 88 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,11 @@ impl<'client> CardanoTransactionProvider<'client> {
125125

126126
pub(crate) fn get_transaction_up_to_beacon_condition(
127127
&self,
128-
beacon: &CardanoDbBeacon,
128+
beacon: ImmutableFileNumber,
129129
) -> WhereCondition {
130130
WhereCondition::new(
131131
"immutable_file_number <= ?*",
132-
vec![Value::Integer(beacon.immutable_file_number as i64)],
132+
vec![Value::Integer(beacon as i64)],
133133
)
134134
}
135135
}
@@ -235,7 +235,7 @@ impl CardanoTransactionRepository {
235235
/// chronological order.
236236
pub async fn get_transactions_up_to(
237237
&self,
238-
beacon: &CardanoDbBeacon,
238+
beacon: ImmutableFileNumber,
239239
) -> StdResult<Vec<CardanoTransactionRecord>> {
240240
let provider = CardanoTransactionProvider::new(&self.connection);
241241
let filters = provider.get_transaction_up_to_beacon_condition(beacon);
@@ -279,12 +279,15 @@ impl CardanoTransactionRepository {
279279
}
280280

281281
/// Create new [CardanoTransactionRecord]s in the database.
282-
pub async fn create_transactions(
282+
pub async fn create_transactions<T: Into<CardanoTransactionRecord>>(
283283
&self,
284-
transactions: Vec<CardanoTransactionRecord>,
284+
transactions: Vec<T>,
285285
) -> StdResult<Vec<CardanoTransactionRecord>> {
286+
let records: Vec<CardanoTransactionRecord> =
287+
transactions.into_iter().map(|tx| tx.into()).collect();
288+
286289
let provider = InsertCardanoTransactionProvider::new(&self.connection);
287-
let filters = provider.get_insert_many_condition(transactions)?;
290+
let filters = provider.get_insert_many_condition(records)?;
288291
let cursor = provider.find(filters)?;
289292

290293
Ok(cursor.collect())
@@ -293,10 +296,43 @@ impl CardanoTransactionRepository {
293296

294297
#[async_trait]
295298
impl TransactionStore for CardanoTransactionRepository {
299+
async fn get_highest_beacon(&self) -> StdResult<Option<ImmutableFileNumber>> {
300+
let sql = "select max(immutable_file_number) as highest from cardano_tx;";
301+
match self
302+
.connection
303+
.prepare(sql)
304+
.with_context(|| {
305+
format!(
306+
"Prepare query error: SQL=`{}`",
307+
&sql.replace('\n', " ").trim()
308+
)
309+
})?
310+
.iter()
311+
.next()
312+
{
313+
None => Ok(None),
314+
Some(row) => {
315+
let highest = row?.read::<Option<i64>, _>(0);
316+
highest
317+
.map(u64::try_from)
318+
.transpose()
319+
.with_context(||
320+
format!("Integer field max(immutable_file_number) (value={highest:?}) is incompatible with u64 representation.")
321+
)
322+
}
323+
}
324+
}
325+
326+
async fn get_up_to(&self, beacon: ImmutableFileNumber) -> StdResult<Vec<CardanoTransaction>> {
327+
self.get_transactions_up_to(beacon).await.map(|v| {
328+
v.into_iter()
329+
.map(|record| record.into())
330+
.collect::<Vec<CardanoTransaction>>()
331+
})
332+
}
333+
296334
async fn store_transactions(&self, transactions: &[CardanoTransaction]) -> StdResult<()> {
297-
let records: Vec<CardanoTransactionRecord> =
298-
transactions.iter().map(|tx| tx.to_owned().into()).collect();
299-
self.create_transactions(records)
335+
self.create_transactions(transactions.to_vec())
300336
.await
301337
.with_context(|| "CardanoTransactionRepository can not store transactions")?;
302338

@@ -307,11 +343,13 @@ impl TransactionStore for CardanoTransactionRepository {
307343
#[async_trait]
308344
impl TransactionsRetriever for CardanoTransactionRepository {
309345
async fn get_up_to(&self, beacon: &CardanoDbBeacon) -> StdResult<Vec<CardanoTransaction>> {
310-
self.get_transactions_up_to(beacon).await.map(|v| {
311-
v.into_iter()
312-
.map(|record| record.into())
313-
.collect::<Vec<CardanoTransaction>>()
314-
})
346+
self.get_transactions_up_to(beacon.immutable_file_number)
347+
.await
348+
.map(|v| {
349+
v.into_iter()
350+
.map(|record| record.into())
351+
.collect::<Vec<CardanoTransaction>>()
352+
})
315353
}
316354
}
317355

@@ -368,10 +406,7 @@ mod tests {
368406
let connection = Connection::open_thread_safe(":memory:").unwrap();
369407
let provider = CardanoTransactionProvider::new(&connection);
370408
let (expr, params) = provider
371-
.get_transaction_up_to_beacon_condition(&CardanoDbBeacon {
372-
immutable_file_number: 2309,
373-
..CardanoDbBeacon::default()
374-
})
409+
.get_transaction_up_to_beacon_condition(2309)
375410
.expand();
376411

377412
assert_eq!("immutable_file_number <= ?1".to_string(), expr);
@@ -557,8 +592,8 @@ mod tests {
557592
let connection = get_connection().await;
558593
let repository = CardanoTransactionRepository::new(connection.clone());
559594

560-
let cardano_transactions: Vec<CardanoTransaction> = (20..=40)
561-
.map(|i| CardanoTransaction {
595+
let cardano_transactions: Vec<CardanoTransactionRecord> = (20..=40)
596+
.map(|i| CardanoTransactionRecord {
562597
transaction_hash: format!("tx-hash-{i}"),
563598
block_number: i % 10,
564599
slot_number: i * 100,
@@ -567,33 +602,18 @@ mod tests {
567602
})
568603
.collect();
569604
repository
570-
.store_transactions(&cardano_transactions)
571-
.await
572-
.unwrap();
573-
574-
let transaction_result = repository
575-
.get_up_to(&CardanoDbBeacon::new("".to_string(), 1, 34))
605+
.create_transactions(cardano_transactions.clone())
576606
.await
577607
.unwrap();
578608

609+
let transaction_result = repository.get_transactions_up_to(34).await.unwrap();
579610
assert_eq!(cardano_transactions[0..=14].to_vec(), transaction_result);
580611

581-
let transaction_result = repository
582-
.get_up_to(&CardanoDbBeacon::new("".to_string(), 1, 300))
583-
.await
584-
.unwrap();
612+
let transaction_result = repository.get_transactions_up_to(300).await.unwrap();
613+
assert_eq!(cardano_transactions.clone(), transaction_result);
585614

586-
assert_eq!(
587-
cardano_transactions.into_iter().collect::<Vec<_>>(),
588-
transaction_result
589-
);
590-
591-
let transaction_result = repository
592-
.get_up_to(&CardanoDbBeacon::new("".to_string(), 1, 19))
593-
.await
594-
.unwrap();
595-
596-
assert_eq!(Vec::<CardanoTransaction>::new(), transaction_result);
615+
let transaction_result = repository.get_transactions_up_to(19).await.unwrap();
616+
assert_eq!(Vec::<CardanoTransactionRecord>::new(), transaction_result);
597617
}
598618

599619
#[tokio::test]
@@ -654,4 +674,31 @@ mod tests {
654674
transaction_result
655675
);
656676
}
677+
678+
#[tokio::test]
679+
async fn repository_get_highest_beacon_without_transactions_in_db() {
680+
let connection = get_connection().await;
681+
let repository = CardanoTransactionRepository::new(connection.clone());
682+
683+
let highest_beacon = repository.get_highest_beacon().await.unwrap();
684+
assert_eq!(None, highest_beacon);
685+
}
686+
687+
#[tokio::test]
688+
async fn repository_get_highest_beacon_with_transactions_in_db() {
689+
let connection = get_connection().await;
690+
let repository = CardanoTransactionRepository::new(connection.clone());
691+
692+
let cardano_transactions = vec![
693+
CardanoTransaction::new("tx-hash-123".to_string(), 10, 50, "block-hash-123", 50),
694+
CardanoTransaction::new("tx-hash-456".to_string(), 11, 51, "block-hash-456", 100),
695+
];
696+
repository
697+
.store_transactions(&cardano_transactions)
698+
.await
699+
.unwrap();
700+
701+
let highest_beacon = repository.get_highest_beacon().await.unwrap();
702+
assert_eq!(Some(100), highest_beacon);
703+
}
657704
}

mithril-aggregator/src/database/provider/test_helper.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use uuid::Uuid;
66
use mithril_common::{entities::Epoch, test_utils::fake_keys, StdResult};
77
use mithril_persistence::sqlite::SqliteConnection;
88

9-
use crate::database::{migration::get_migrations, provider::UpdateSingleSignatureRecordProvider};
9+
use crate::database::provider::UpdateSingleSignatureRecordProvider;
1010

1111
use super::SingleSignatureRecord;
1212

@@ -45,7 +45,15 @@ pub fn disable_foreign_key_support(connection: &SqliteConnection) -> StdResult<(
4545
}
4646

4747
pub fn apply_all_migrations_to_db(connection: &SqliteConnection) -> StdResult<()> {
48-
for migration in get_migrations() {
48+
for migration in crate::database::migration::get_migrations() {
49+
connection.execute(&migration.alterations)?;
50+
}
51+
52+
Ok(())
53+
}
54+
55+
pub fn apply_all_transactions_db_migrations(connection: &SqliteConnection) -> StdResult<()> {
56+
for migration in crate::database::cardano_transaction_migration::get_migrations() {
4957
connection.execute(&migration.alterations)?;
5058
}
5159

mithril-aggregator/src/dependency_injection/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1084,6 +1084,8 @@ impl DependenciesBuilder {
10841084
self.get_transaction_parser().await?,
10851085
self.get_transaction_store().await?,
10861086
&self.configuration.db_directory,
1087+
// Rescan the last two immutables when importing transactions
1088+
Some(2),
10871089
self.get_logger().await?,
10881090
));
10891091
let cardano_transactions_builder = Arc::new(CardanoTransactionsSignableBuilder::new(

mithril-aggregator/src/lib.rs

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -65,21 +65,14 @@ pub use tools::{
6565
pub use dependency_injection::tests::initialize_dependencies;
6666

6767
#[cfg(test)]
68-
/// Create a [slog scope global logger][slog_scope::GlobalLoggerGuard] to use
69-
/// when debugging some tests that have logs.
70-
///
71-
/// * Remove it after use: it's only mean for debugging, leaving it expose the tests to the two
72-
/// following points.
73-
/// * Don't put it in more than one tests at a time since it is set globally meaning that a test that
74-
/// end will clean up the logger for the test that are still running.
75-
/// * Don't run more than one test at a time with it: logs from more than one tests will be mixed
76-
/// together otherwise.
77-
pub fn global_logger_for_tests() -> slog_scope::GlobalLoggerGuard {
68+
pub(crate) mod test_tools {
7869
use slog::Drain;
7970
use std::sync::Arc;
8071

81-
let decorator = slog_term::PlainDecorator::new(slog_term::TestStdoutWriter);
82-
let drain = slog_term::CompactFormat::new(decorator).build().fuse();
83-
let drain = slog_async::Async::new(drain).build().fuse();
84-
slog_scope::set_global_logger(slog::Logger::root(Arc::new(drain), slog::o!()))
72+
pub fn logger_for_tests() -> slog::Logger {
73+
let decorator = slog_term::PlainDecorator::new(slog_term::TestStdoutWriter);
74+
let drain = slog_term::CompactFormat::new(decorator).build().fuse();
75+
let drain = slog_async::Async::new(drain).build().fuse();
76+
slog::Logger::root(Arc::new(drain), slog::o!())
77+
}
8578
}

0 commit comments

Comments
 (0)