Skip to content

Commit 51e51b4

Browse files
committed
Adapt importers to the 'block streamer' api
1 parent 9fd4432 commit 51e51b4

File tree

2 files changed

+36
-34
lines changed

2 files changed

+36
-34
lines changed

mithril-aggregator/src/services/cardano_transactions_importer.rs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -101,19 +101,20 @@ impl CardanoTransactionsImporter {
101101
);
102102
return Ok(());
103103
}
104-
105-
// todo: temp algorithm, should be optimized to avoid loading all blocks & transactions
106-
// at once in memory (probably using iterators)
107-
let scanned_blocks = self.block_scanner.scan(&self.dirpath, from, until).await?;
108-
let parsed_transactions: Vec<CardanoTransaction> =
109-
scanned_blocks.flat_map(|b| b.into_transactions()).collect();
110104
debug!(
111105
self.logger,
112-
"TransactionsImporter retrieved '{}' Cardano transactions between immutables '{}' and '{until}'",
113-
parsed_transactions.len(),
106+
"TransactionsImporter will retrieve Cardano transactions between immutables '{}' and '{until}'",
114107
from.unwrap_or(0)
115108
);
116109

110+
let mut streamer = self.block_scanner.scan(&self.dirpath, from, until).await?;
111+
let parsed_transactions: Vec<CardanoTransaction> = streamer
112+
.poll_all()
113+
.await?
114+
.into_iter()
115+
.flat_map(|b| b.into_transactions())
116+
.collect();
117+
117118
self.transaction_store
118119
.store_transactions(parsed_transactions)
119120
.await?;
@@ -178,7 +179,9 @@ impl TransactionsImporter for CardanoTransactionsImporter {
178179
mod tests {
179180
use mockall::mock;
180181

181-
use mithril_common::cardano_block_scanner::{DumbBlockScanner, ScannedBlock};
182+
use mithril_common::cardano_block_scanner::{
183+
BlockStreamer, DumbBlockScanner, DumbBlockStreamer, ScannedBlock,
184+
};
182185
use mithril_common::crypto_helper::MKTree;
183186
use mithril_common::entities::{BlockNumber, BlockRangesSequence};
184187

@@ -197,7 +200,7 @@ mod tests {
197200
dirpath: &Path,
198201
from_immutable: Option<ImmutableFileNumber>,
199202
until_immutable: ImmutableFileNumber,
200-
) -> StdResult<Box<dyn Iterator<Item = ScannedBlock>>>;
203+
) -> StdResult<Box<dyn BlockStreamer>>;
201204
}
202205
}
203206

@@ -265,7 +268,7 @@ mod tests {
265268
scanner_mock
266269
.expect_scan()
267270
.withf(move |_, from, until| from.is_none() && until == &up_to_beacon)
268-
.return_once(move |_, _, _| Ok(Box::new(blocks.into_iter())));
271+
.return_once(move |_, _, _| Ok(Box::new(DumbBlockStreamer::new(vec![blocks]))));
269272
CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
270273
};
271274

@@ -427,7 +430,9 @@ mod tests {
427430
scanner_mock
428431
.expect_scan()
429432
.withf(move |_, from, until| from == &Some(12) && until == &up_to_beacon)
430-
.return_once(move |_, _, _| Ok(Box::new(scanned_blocks.into_iter())))
433+
.return_once(move |_, _, _| {
434+
Ok(Box::new(DumbBlockStreamer::new(vec![scanned_blocks])))
435+
})
431436
.once();
432437
CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
433438
};
@@ -605,13 +610,9 @@ mod tests {
605610
let transactions = into_transactions(&blocks);
606611
let importer = {
607612
let connection = cardano_tx_db_connection().unwrap();
608-
let mut scanner = MockBlockScannerImpl::new();
609-
scanner
610-
.expect_scan()
611-
.return_once(move |_, _, _| Ok(Box::new(blocks.into_iter())));
612613

613614
CardanoTransactionsImporter::new_for_test(
614-
Arc::new(scanner),
615+
Arc::new(DumbBlockScanner::new(blocks.clone())),
615616
Arc::new(CardanoTransactionRepository::new(Arc::new(connection))),
616617
)
617618
};

mithril-signer/src/cardano_transactions_importer.rs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -101,19 +101,20 @@ impl CardanoTransactionsImporter {
101101
);
102102
return Ok(());
103103
}
104-
105-
// todo: temp algorithm, should be optimized to avoid loading all blocks & transactions
106-
// at once in memory (probably using iterators)
107-
let scanned_blocks = self.block_scanner.scan(&self.dirpath, from, until).await?;
108-
let parsed_transactions: Vec<CardanoTransaction> =
109-
scanned_blocks.flat_map(|b| b.into_transactions()).collect();
110104
debug!(
111105
self.logger,
112-
"TransactionsImporter retrieved '{}' Cardano transactions between immutables '{}' and '{until}'",
113-
parsed_transactions.len(),
106+
"TransactionsImporter will retrieve Cardano transactions between immutables '{}' and '{until}'",
114107
from.unwrap_or(0)
115108
);
116109

110+
let mut streamer = self.block_scanner.scan(&self.dirpath, from, until).await?;
111+
let parsed_transactions: Vec<CardanoTransaction> = streamer
112+
.poll_all()
113+
.await?
114+
.into_iter()
115+
.flat_map(|b| b.into_transactions())
116+
.collect();
117+
117118
self.transaction_store
118119
.store_transactions(parsed_transactions)
119120
.await?;
@@ -178,7 +179,9 @@ impl TransactionsImporter for CardanoTransactionsImporter {
178179
mod tests {
179180
use mockall::mock;
180181

181-
use mithril_common::cardano_block_scanner::{DumbBlockScanner, ScannedBlock};
182+
use mithril_common::cardano_block_scanner::{
183+
BlockStreamer, DumbBlockScanner, DumbBlockStreamer, ScannedBlock,
184+
};
182185
use mithril_common::crypto_helper::MKTree;
183186
use mithril_common::entities::{BlockNumber, BlockRangesSequence};
184187

@@ -197,7 +200,7 @@ mod tests {
197200
dirpath: &Path,
198201
from_immutable: Option<ImmutableFileNumber>,
199202
until_immutable: ImmutableFileNumber,
200-
) -> StdResult<Box<dyn Iterator<Item = ScannedBlock>>>;
203+
) -> StdResult<Box<dyn BlockStreamer>>;
201204
}
202205
}
203206

@@ -265,7 +268,7 @@ mod tests {
265268
scanner_mock
266269
.expect_scan()
267270
.withf(move |_, from, until| from.is_none() && until == &up_to_beacon)
268-
.return_once(move |_, _, _| Ok(Box::new(blocks.into_iter())));
271+
.return_once(move |_, _, _| Ok(Box::new(DumbBlockStreamer::new(vec![blocks]))));
269272
CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
270273
};
271274

@@ -427,7 +430,9 @@ mod tests {
427430
scanner_mock
428431
.expect_scan()
429432
.withf(move |_, from, until| from == &Some(12) && until == &up_to_beacon)
430-
.return_once(move |_, _, _| Ok(Box::new(scanned_blocks.into_iter())))
433+
.return_once(move |_, _, _| {
434+
Ok(Box::new(DumbBlockStreamer::new(vec![scanned_blocks])))
435+
})
431436
.once();
432437
CardanoTransactionsImporter::new_for_test(Arc::new(scanner_mock), repository.clone())
433438
};
@@ -605,13 +610,9 @@ mod tests {
605610
let transactions = into_transactions(&blocks);
606611
let importer = {
607612
let connection = cardano_tx_db_connection().unwrap();
608-
let mut scanner = MockBlockScannerImpl::new();
609-
scanner
610-
.expect_scan()
611-
.return_once(move |_, _, _| Ok(Box::new(blocks.into_iter())));
612613

613614
CardanoTransactionsImporter::new_for_test(
614-
Arc::new(scanner),
615+
Arc::new(DumbBlockScanner::new(blocks.clone())),
615616
Arc::new(CardanoTransactionRepository::new(Arc::new(connection))),
616617
)
617618
};

0 commit comments

Comments
 (0)