Skip to content

Commit b559ad7

Browse files
dlachaumesfauvel
andcommitted
Add create_artifact_return_join_handle that call create_artifact in a separate task
Co-authored-by: Sébastien Fauvel <[email protected]>
1 parent ac55f5f commit b559ad7

File tree

1 file changed

+75
-87
lines changed

1 file changed

+75
-87
lines changed

mithril-aggregator/src/services/signed_entity.rs

Lines changed: 75 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use async_trait::async_trait;
77
use chrono::Utc;
88
use slog_scope::info;
99
use std::sync::Arc;
10+
use tokio::{runtime::Handle, task::JoinHandle};
1011

1112
use mithril_common::{
1213
entities::{
@@ -69,6 +70,7 @@ pub trait SignedEntityService: Send + Sync {
6970
}
7071

7172
/// Mithril ArtifactBuilder Service
73+
#[derive(Clone)]
7274
pub struct MithrilSignedEntityService {
7375
signed_entity_storer: Arc<dyn SignedEntityStorer>,
7476
mithril_stake_distribution_artifact_builder:
@@ -101,6 +103,22 @@ impl MithrilSignedEntityService {
101103
}
102104
}
103105

106+
fn create_artifact_return_join_handle(
107+
&self,
108+
signed_entity_type: SignedEntityType,
109+
certificate: &Certificate,
110+
) -> JoinHandle<StdResult<()>> {
111+
let service = self.clone();
112+
let certificate_cloned = certificate.clone();
113+
tokio::task::spawn_blocking(|| {
114+
Handle::current().block_on(async move {
115+
service
116+
.create_artifact(signed_entity_type, &certificate_cloned)
117+
.await
118+
})
119+
})
120+
}
121+
104122
/// Compute artifact from signed entity type
105123
async fn compute_artifact(
106124
&self,
@@ -299,19 +317,14 @@ impl SignedEntityService for MithrilSignedEntityService {
299317

300318
#[cfg(test)]
301319
mod tests {
302-
use std::{
303-
sync::atomic::{AtomicBool, Ordering},
304-
thread::sleep,
305-
time::Duration,
306-
};
320+
use std::time::Duration;
307321

308322
use mithril_common::{
309323
entities::{CardanoTransactionsSnapshot, Epoch},
310324
signable_builder,
311325
test_utils::fake_data,
312326
};
313327
use serde::{de::DeserializeOwned, Serialize};
314-
use tokio::task::JoinSet;
315328

316329
use crate::artifact_builder::MockArtifactBuilder;
317330
use crate::database::repository::MockSignedEntityStorer;
@@ -530,141 +543,116 @@ mod tests {
530543
.expect(error_message_str);
531544
}
532545

546+
// TODO: Verify the relevance of this test
533547
#[tokio::test]
534548
async fn create_artifact_for_two_signed_entity_types_in_sequence() {
535-
let signed_entity_type_immutable =
536-
SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
537-
let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
538-
let artifact_snapshot = fake_data::snapshots(1).first().unwrap().to_owned();
539-
let artifact_msd = create_stake_distribution(Epoch(1), 5);
540549
let mut mock_container = MockDependencyInjector::new();
541550
{
542-
let artifact_snapshot_clone: Arc<dyn Artifact> = Arc::new(artifact_snapshot.clone());
543-
let signed_entity_artifact = serde_json::to_string(&artifact_snapshot_clone).unwrap();
551+
let snapshot = fake_data::snapshots(1).first().unwrap().to_owned();
552+
let artifact_snapshot: Arc<dyn Artifact> = Arc::new(snapshot.clone());
553+
let artifact = serde_json::to_string(&artifact_snapshot).unwrap();
544554
mock_container
545-
.mock_signed_entity_storer
546-
.expect_store_signed_entity()
547-
.withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
548-
.return_once(|_| Ok(()));
555+
.mock_cardano_immutable_files_full_artifact_builder
556+
.expect_compute_artifact()
557+
.times(1)
558+
.return_once(|_, _| Ok(snapshot));
549559

550-
let artifact_msd_clone: Arc<dyn Artifact> = Arc::new(artifact_msd.clone());
551-
let signed_entity_artifact_msd = serde_json::to_string(&artifact_msd_clone).unwrap();
552560
mock_container
553561
.mock_signed_entity_storer
554562
.expect_store_signed_entity()
555-
.withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact_msd)
563+
.withf(move |signed_entity| signed_entity.artifact == artifact)
564+
.times(1)
556565
.return_once(|_| Ok(()));
557566
}
558567
{
559-
let artifact_cloned = artifact_snapshot.clone();
568+
let msd = create_stake_distribution(Epoch(1), 5);
569+
let artifact_msd: Arc<dyn Artifact> = Arc::new(msd.clone());
570+
let artifact = serde_json::to_string(&artifact_msd).unwrap();
560571
mock_container
561-
.mock_cardano_immutable_files_full_artifact_builder
572+
.mock_mithril_stake_distribution_artifact_builder
562573
.expect_compute_artifact()
563574
.times(1)
564-
.return_once(|_, _| Ok(artifact_cloned));
575+
.return_once(|_, _| Ok(msd));
565576

566-
let artifact_msd_cloned = artifact_msd.clone();
567577
mock_container
568-
.mock_mithril_stake_distribution_artifact_builder
569-
.expect_compute_artifact()
578+
.mock_signed_entity_storer
579+
.expect_store_signed_entity()
580+
.withf(move |signed_entity| signed_entity.artifact == artifact)
570581
.times(1)
571-
.return_once(|_, _| Ok(artifact_msd_cloned));
582+
.return_once(|_| Ok(()));
572583
}
573584
let artifact_builder_service = mock_container.build_artifact_builder_service();
574585

575586
let certificate = fake_data::certificate("hash".to_string());
576587

588+
let signed_entity_type_immutable =
589+
SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
577590
artifact_builder_service
578591
.create_artifact(signed_entity_type_immutable, &certificate)
579592
.await
580593
.unwrap();
581594

595+
let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
582596
artifact_builder_service
583597
.create_artifact(signed_entity_type_msd, &certificate)
584598
.await
585599
.unwrap();
586600
}
587601

588602
#[tokio::test]
589-
async fn create_artifact_for_two_signed_entity_types_in_parallel() {
590-
let signed_entity_type_immutable =
591-
SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
592-
let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
593-
let artifact_snapshot = fake_data::snapshots(1).first().unwrap().to_owned();
594-
let artifact_msd = create_stake_distribution(Epoch(1), 5);
603+
async fn create_artifact_for_two_signed_entity_types_in_sequence_not_blocking() {
595604
let mut mock_container = MockDependencyInjector::new();
596605
{
597-
let artifact_snapshot_clone: Arc<dyn Artifact> = Arc::new(artifact_snapshot.clone());
598-
let signed_entity_artifact = serde_json::to_string(&artifact_snapshot_clone).unwrap();
606+
let snapshot = fake_data::snapshots(1).first().unwrap().to_owned();
607+
let artifact_snapshot: Arc<dyn Artifact> = Arc::new(snapshot.clone());
608+
let artifact = serde_json::to_string(&artifact_snapshot).unwrap();
599609
mock_container
600-
.mock_signed_entity_storer
601-
.expect_store_signed_entity()
602-
.withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
603-
.return_once(|_| Ok(()));
610+
.mock_cardano_immutable_files_full_artifact_builder
611+
.expect_compute_artifact()
612+
.times(1)
613+
.return_once(|_, _| {
614+
std::thread::sleep(Duration::from_millis(1000));
615+
Ok(snapshot)
616+
});
604617

605-
let artifact_msd_clone: Arc<dyn Artifact> = Arc::new(artifact_msd.clone());
606-
let signed_entity_artifact_msd = serde_json::to_string(&artifact_msd_clone).unwrap();
607618
mock_container
608619
.mock_signed_entity_storer
609620
.expect_store_signed_entity()
610-
.withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact_msd)
621+
.withf(move |signed_entity| signed_entity.artifact == artifact)
622+
.never()
611623
.return_once(|_| Ok(()));
612624
}
613625
{
614-
let artifact_cloned = artifact_snapshot.clone();
626+
let msd = create_stake_distribution(Epoch(1), 5);
627+
let artifact_msd: Arc<dyn Artifact> = Arc::new(msd.clone());
628+
let artifact = serde_json::to_string(&artifact_msd).unwrap();
615629
mock_container
616-
.mock_cardano_immutable_files_full_artifact_builder
630+
.mock_mithril_stake_distribution_artifact_builder
617631
.expect_compute_artifact()
618632
.times(1)
619-
.return_once(|_, _| {
620-
sleep(Duration::from_millis(1000));
621-
Ok(artifact_cloned)
622-
});
633+
.return_once(|_, _| Ok(msd));
623634

624-
let artifact_msd_cloned = artifact_msd.clone();
625635
mock_container
626-
.mock_mithril_stake_distribution_artifact_builder
627-
.expect_compute_artifact()
636+
.mock_signed_entity_storer
637+
.expect_store_signed_entity()
638+
.withf(move |signed_entity| signed_entity.artifact == artifact)
628639
.times(1)
629-
.return_once(|_, _| Ok(artifact_msd_cloned));
640+
.return_once(|_| Ok(()));
630641
}
631642
let artifact_builder_service = Arc::new(mock_container.build_artifact_builder_service());
632643

633-
let first_task_started = Arc::new(AtomicBool::new(false));
634-
let mut set = JoinSet::new();
635-
for _ in 0..2 {
636-
let first_task_started_cloned = first_task_started.clone();
637-
let artifact_builder_service_cloned = artifact_builder_service.clone();
638-
let signed_entity_type_msd_cloned = signed_entity_type_msd.clone();
639-
let signed_entity_type_immutable_cloned = signed_entity_type_immutable.clone();
640-
set.spawn(async move {
641-
if first_task_started_cloned.swap(true, Ordering::Relaxed) {
642-
println!("Task signed_entity_type_msd_cloned");
643-
artifact_builder_service_cloned
644-
.create_artifact(
645-
signed_entity_type_msd_cloned,
646-
&fake_data::certificate("hash".to_string()),
647-
)
648-
.await
649-
.unwrap();
650-
"signed_entity_type_msd"
651-
} else {
652-
println!("Task signed_entity_type_immutable_cloned");
653-
artifact_builder_service_cloned
654-
.create_artifact(
655-
signed_entity_type_immutable_cloned,
656-
&fake_data::certificate("hash".to_string()),
657-
)
658-
.await
659-
.unwrap();
660-
"signed_entity_type_immutable"
661-
}
662-
});
663-
}
644+
let certificate = fake_data::certificate("hash".to_string());
664645

665-
while let Some(res) = set.join_next().await {
666-
let idx = res.unwrap();
667-
println!("Task finished: {}", idx);
668-
}
646+
let signed_entity_type_immutable =
647+
SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
648+
let first_task_that_never_finished = artifact_builder_service
649+
.create_artifact_return_join_handle(signed_entity_type_immutable, &certificate);
650+
651+
let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
652+
let second_task_that_finish_first = artifact_builder_service
653+
.create_artifact_return_join_handle(signed_entity_type_msd, &certificate);
654+
655+
let _ = second_task_that_finish_first.await;
656+
assert!(!first_task_that_never_finished.is_finished());
669657
}
670658
}

0 commit comments

Comments
 (0)