Skip to content

Commit 556b687

Browse files
authored
Merge pull request #1805 from input-output-hk/djo/remove_block_in_place
Remove `task::block_in_place` in transaction importer
2 parents 6288370 + 252f549 commit 556b687

File tree

9 files changed

+216
-38
lines changed

9 files changed

+216
-38
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-aggregator/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-aggregator"
3-
version = "0.5.41"
3+
version = "0.5.42"
44
description = "A Mithril Aggregator server"
55
authors = { workspace = true }
66
edition = { workspace = true }

mithril-aggregator/src/services/cardano_transactions_importer.rs

Lines changed: 104 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::ops::Range;
33
use std::path::{Path, PathBuf};
44
use std::sync::Arc;
55

6+
use anyhow::Context;
67
use async_trait::async_trait;
78
use slog::{debug, Logger};
89
use tokio::{runtime::Handle, task};
@@ -51,6 +52,7 @@ pub trait TransactionStore: Send + Sync {
5152
}
5253

5354
/// Import and store [CardanoTransaction].
55+
#[derive(Clone)]
5456
pub struct CardanoTransactionsImporter {
5557
block_scanner: Arc<dyn BlockScanner>,
5658
transaction_store: Arc<dyn TransactionStore>,
@@ -174,31 +176,29 @@ impl CardanoTransactionsImporter {
174176
.store_block_range_roots(block_ranges_with_merkle_root)
175177
.await
176178
}
177-
178-
async fn import_transactions_and_block_ranges(
179-
&self,
180-
up_to_beacon: BlockNumber,
181-
) -> StdResult<()> {
182-
self.import_transactions(up_to_beacon).await?;
183-
self.import_block_ranges(up_to_beacon).await
184-
}
185179
}
186180

187181
#[async_trait]
188182
impl TransactionsImporter for CardanoTransactionsImporter {
189183
async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
190-
task::block_in_place(move || {
184+
let importer = self.clone();
185+
task::spawn_blocking(move || {
191186
Handle::current().block_on(async move {
192-
self.import_transactions_and_block_ranges(up_to_beacon)
193-
.await
187+
importer.import_transactions(up_to_beacon).await?;
188+
importer.import_block_ranges(up_to_beacon).await?;
189+
Ok(())
194190
})
195191
})
192+
.await
193+
.with_context(|| "TransactionsImporter - worker thread crashed")?
196194
}
197195
}
198196

199197
#[cfg(test)]
200198
mod tests {
201-
use mithril_persistence::sqlite::SqliteConnectionPool;
199+
use std::sync::atomic::AtomicUsize;
200+
use std::time::Duration;
201+
202202
use mockall::mock;
203203

204204
use mithril_common::cardano_block_scanner::{
@@ -207,6 +207,7 @@ mod tests {
207207
use mithril_common::crypto_helper::MKTree;
208208
use mithril_common::entities::{BlockNumber, BlockRangesSequence};
209209
use mithril_persistence::database::repository::CardanoTransactionRepository;
210+
use mithril_persistence::sqlite::SqliteConnectionPool;
210211

211212
use crate::database::test_helper::cardano_tx_db_connection;
212213
use crate::test_tools::TestLogger;
@@ -717,7 +718,7 @@ mod tests {
717718
);
718719
}
719720

720-
#[tokio::test(flavor = "multi_thread")]
721+
#[tokio::test]
721722
async fn importing_twice_starting_with_nothing_in_a_real_db_should_yield_transactions_in_same_order(
722723
) {
723724
let blocks = vec![
@@ -754,7 +755,7 @@ mod tests {
754755
assert_eq!(cold_imported_transactions, warm_imported_transactions);
755756
}
756757

757-
#[tokio::test(flavor = "multi_thread")]
758+
#[tokio::test]
758759
async fn when_rollbackward_should_remove_transactions() {
759760
let connection = cardano_tx_db_connection().unwrap();
760761
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
@@ -797,7 +798,7 @@ mod tests {
797798
assert_eq!(expected_remaining_transactions, stored_transactions);
798799
}
799800

800-
#[tokio::test(flavor = "multi_thread")]
801+
#[tokio::test]
801802
async fn when_rollbackward_should_remove_block_ranges() {
802803
let connection = cardano_tx_db_connection().unwrap();
803804
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
@@ -869,4 +870,92 @@ mod tests {
869870
.collect::<Vec<_>>()
870871
);
871872
}
873+
874+
#[tokio::test]
875+
async fn test_import_is_non_blocking() {
876+
static COUNTER: AtomicUsize = AtomicUsize::new(0);
877+
static MAX_COUNTER: usize = 25;
878+
static WAIT_TIME: u64 = 50;
879+
880+
// Use a local set to ensure the counter task is not dispatched on a different thread
881+
let local = task::LocalSet::new();
882+
local
883+
.run_until(async {
884+
let importer = CardanoTransactionsImporter::new_for_test(
885+
Arc::new(DumbBlockScanner::new()),
886+
Arc::new(BlockingRepository {
887+
wait_time: Duration::from_millis(WAIT_TIME),
888+
}),
889+
);
890+
891+
let importer_future = importer.import(100);
892+
let counter_task = task::spawn_local(async {
893+
while COUNTER.load(std::sync::atomic::Ordering::SeqCst) < MAX_COUNTER {
894+
tokio::time::sleep(Duration::from_millis(1)).await;
895+
COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
896+
}
897+
});
898+
importer_future.await.unwrap();
899+
900+
counter_task.abort();
901+
})
902+
.await;
903+
904+
assert_eq!(
905+
MAX_COUNTER,
906+
COUNTER.load(std::sync::atomic::Ordering::SeqCst)
907+
);
908+
909+
struct BlockingRepository {
910+
wait_time: Duration,
911+
}
912+
913+
impl BlockingRepository {
914+
fn block_thread(&self) {
915+
std::thread::sleep(self.wait_time);
916+
}
917+
}
918+
919+
#[async_trait]
920+
impl TransactionStore for BlockingRepository {
921+
async fn get_highest_beacon(&self) -> StdResult<Option<ChainPoint>> {
922+
self.block_thread();
923+
Ok(None)
924+
}
925+
926+
async fn get_highest_block_range(&self) -> StdResult<Option<BlockRange>> {
927+
self.block_thread();
928+
Ok(None)
929+
}
930+
931+
async fn store_transactions(&self, _: Vec<CardanoTransaction>) -> StdResult<()> {
932+
self.block_thread();
933+
Ok(())
934+
}
935+
936+
async fn get_transactions_in_range(
937+
&self,
938+
_: Range<BlockNumber>,
939+
) -> StdResult<Vec<CardanoTransaction>> {
940+
self.block_thread();
941+
Ok(vec![])
942+
}
943+
944+
async fn store_block_range_roots(
945+
&self,
946+
_: Vec<(BlockRange, MKTreeNode)>,
947+
) -> StdResult<()> {
948+
self.block_thread();
949+
Ok(())
950+
}
951+
952+
async fn remove_rolled_back_transactions_and_block_range(
953+
&self,
954+
_: SlotNumber,
955+
) -> StdResult<()> {
956+
self.block_thread();
957+
Ok(())
958+
}
959+
}
960+
}
872961
}

mithril-aggregator/tests/create_certificate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use mithril_common::{
1010
};
1111
use test_extensions::{utilities::get_test_dir, ExpectedCertificate, RuntimeTester};
1212

13-
#[tokio::test(flavor = "multi_thread")]
13+
#[tokio::test]
1414
async fn create_certificate() {
1515
let protocol_parameters = ProtocolParameters {
1616
k: 5,

mithril-aggregator/tests/prove_transactions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::test_extensions::utilities::tx_hash;
1313

1414
mod test_extensions;
1515

16-
#[tokio::test(flavor = "multi_thread")]
16+
#[tokio::test]
1717
async fn prove_transactions() {
1818
let protocol_parameters = ProtocolParameters {
1919
k: 5,

mithril-signer/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-signer"
3-
version = "0.2.163"
3+
version = "0.2.164"
44
description = "A Mithril Signer"
55
authors = { workspace = true }
66
edition = { workspace = true }

0 commit comments

Comments
 (0)