Skip to content

Commit 63b1269

Browse files
dlachaumesfauvel
authored andcommitted
Implement lock/unlock mechanism for signed entity types during the create artifact process
1 parent 534a2dd commit 63b1269

File tree

2 files changed

+234
-28
lines changed

2 files changed

+234
-28
lines changed

mithril-aggregator/src/dependency_injection/builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1135,6 +1135,7 @@ impl DependenciesBuilder {
11351135
mithril_stake_distribution_artifact_builder,
11361136
cardano_immutable_files_full_artifact_builder,
11371137
cardano_transactions_artifact_builder,
1138+
self.get_signed_entity_lock().await?,
11381139
));
11391140

11401141
// Compute the cache pool for prover service

mithril-aggregator/src/services/signed_entity.rs

Lines changed: 233 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//!
33
//! This service is responsible for dealing with [SignedEntity] type.
44
//! It creates [Artifact] that can be accessed by clients.
5-
use anyhow::Context;
5+
use anyhow::{anyhow, Context};
66
use async_trait::async_trait;
77
use chrono::Utc;
88
use slog_scope::info;
@@ -16,6 +16,7 @@ use mithril_common::{
1616
Snapshot,
1717
},
1818
signable_builder::Artifact,
19+
signed_entity_type_lock::SignedEntityTypeLock,
1920
StdResult,
2021
};
2122

@@ -79,6 +80,7 @@ pub struct MithrilSignedEntityService {
7980
Arc<dyn ArtifactBuilder<CardanoDbBeacon, Snapshot>>,
8081
cardano_transactions_artifact_builder:
8182
Arc<dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>>,
83+
signed_entity_type_lock: Arc<SignedEntityTypeLock>,
8284
}
8385

8486
impl MithrilSignedEntityService {
@@ -94,29 +96,56 @@ impl MithrilSignedEntityService {
9496
cardano_transactions_artifact_builder: Arc<
9597
dyn ArtifactBuilder<BlockNumber, CardanoTransactionsSnapshot>,
9698
>,
99+
signed_entity_type_lock: Arc<SignedEntityTypeLock>,
97100
) -> Self {
98101
Self {
99102
signed_entity_storer,
100103
mithril_stake_distribution_artifact_builder,
101104
cardano_immutable_files_full_artifact_builder,
102105
cardano_transactions_artifact_builder,
106+
signed_entity_type_lock,
103107
}
104108
}
105109

106-
fn create_artifact_return_join_handle(
110+
async fn create_artifact_return_join_handle(
107111
&self,
108112
signed_entity_type: SignedEntityType,
109113
certificate: &Certificate,
110-
) -> JoinHandle<StdResult<()>> {
114+
) -> StdResult<JoinHandle<StdResult<()>>> {
115+
if self
116+
.signed_entity_type_lock
117+
.is_locked(&signed_entity_type)
118+
.await
119+
{
120+
return Err(anyhow!(
121+
"Signed entity type '{:?}' is already locked",
122+
signed_entity_type
123+
));
124+
}
125+
111126
let service = self.clone();
112127
let certificate_cloned = certificate.clone();
113-
tokio::task::spawn_blocking(|| {
114-
Handle::current().block_on(async move {
115-
service
116-
.create_artifact(signed_entity_type, &certificate_cloned)
128+
service
129+
.signed_entity_type_lock
130+
.lock(&signed_entity_type)
131+
.await;
132+
133+
Ok(tokio::task::spawn(async move {
134+
let signed_entity_type_clone = signed_entity_type.clone();
135+
let service_clone = service.clone();
136+
let result = tokio::task::spawn(async move {
137+
service_clone
138+
.create_artifact(signed_entity_type_clone, &certificate_cloned)
117139
.await
118140
})
119-
})
141+
.await;
142+
service
143+
.signed_entity_type_lock
144+
.release(signed_entity_type)
145+
.await;
146+
147+
result.unwrap()
148+
}))
120149
}
121150

122151
/// Compute artifact from signed entity type
@@ -190,6 +219,11 @@ impl SignedEntityService for MithrilSignedEntityService {
190219
"certificate_hash" => &certificate.hash
191220
);
192221

222+
println!(
223+
"MithrilSignedEntityService::create_artifact: signed_entity_type: {:?}, certificate_hash: {}",
224+
signed_entity_type, certificate.hash
225+
);
226+
193227
let mut remaining_retries = 2;
194228
let artifact = loop {
195229
remaining_retries -= 1;
@@ -317,14 +351,15 @@ impl SignedEntityService for MithrilSignedEntityService {
317351

318352
#[cfg(test)]
319353
mod tests {
320-
use std::time::Duration;
354+
use std::{sync::atomic::Ordering, time::Duration};
321355

322356
use mithril_common::{
323357
entities::{CardanoTransactionsSnapshot, Epoch},
324358
signable_builder,
325359
test_utils::fake_data,
326360
};
327361
use serde::{de::DeserializeOwned, Serialize};
362+
use std::sync::atomic::AtomicBool;
328363

329364
use crate::artifact_builder::MockArtifactBuilder;
330365
use crate::database::repository::MockSignedEntityStorer;
@@ -386,6 +421,58 @@ mod tests {
386421
Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
387422
Arc::new(self.mock_cardano_immutable_files_full_artifact_builder),
388423
Arc::new(self.mock_cardano_transactions_artifact_builder),
424+
Arc::new(SignedEntityTypeLock::default()),
425+
)
426+
}
427+
428+
fn build_artifact_builder_service_with_time_consuming_process(
429+
mut self,
430+
atomic_stop: Arc<AtomicBool>,
431+
) -> MithrilSignedEntityService {
432+
struct LongArtifactBuilder {
433+
atomic_stop: Arc<AtomicBool>,
434+
snapshot: Snapshot,
435+
}
436+
impl LongArtifactBuilder {}
437+
438+
let snapshot = fake_data::snapshots(1).first().unwrap().to_owned();
439+
440+
#[async_trait]
441+
impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for LongArtifactBuilder {
442+
async fn compute_artifact(
443+
&self,
444+
_beacon: CardanoDbBeacon,
445+
_certificate: &Certificate,
446+
) -> StdResult<Snapshot> {
447+
let mut max_iteration = 100;
448+
while !self.atomic_stop.load(Ordering::Relaxed) {
449+
max_iteration -= 1;
450+
if max_iteration <= 0 {
451+
return Err(anyhow!("Test should handle the stop"));
452+
}
453+
tokio::time::sleep(Duration::from_millis(10)).await;
454+
}
455+
Ok(self.snapshot.clone())
456+
}
457+
}
458+
let cardano_immutable_files_full_long_artifact_builder = LongArtifactBuilder {
459+
atomic_stop: atomic_stop.clone(),
460+
snapshot: snapshot.clone(),
461+
};
462+
463+
let artifact_clone: Arc<dyn Artifact> = Arc::new(snapshot);
464+
let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap();
465+
self.mock_signed_entity_storer
466+
.expect_store_signed_entity()
467+
.withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact)
468+
.return_once(|_| Ok(()));
469+
470+
MithrilSignedEntityService::new(
471+
Arc::new(self.mock_signed_entity_storer),
472+
Arc::new(self.mock_mithril_stake_distribution_artifact_builder),
473+
Arc::new(cardano_immutable_files_full_long_artifact_builder),
474+
Arc::new(self.mock_cardano_transactions_artifact_builder),
475+
Arc::new(SignedEntityTypeLock::default()),
389476
)
390477
}
391478

@@ -585,7 +672,7 @@ mod tests {
585672
// TODO: Verify the relevance of this test
586673
#[tokio::test]
587674
async fn create_artifact_for_two_signed_entity_types_in_sequence() {
588-
let artifact_builder_service = {
675+
let signed_entity_type_service = {
589676
let mut mock_container = MockDependencyInjector::new();
590677
mock_container.mock_immutable_files_processing(
591678
fake_data::snapshots(1).first().unwrap().to_owned(),
@@ -599,49 +686,167 @@ mod tests {
599686

600687
let signed_entity_type_immutable =
601688
SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
602-
artifact_builder_service
689+
signed_entity_type_service
603690
.create_artifact(signed_entity_type_immutable, &certificate)
604691
.await
605692
.unwrap();
606693

607694
let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
608-
artifact_builder_service
695+
signed_entity_type_service
609696
.create_artifact(signed_entity_type_msd, &certificate)
610697
.await
611698
.unwrap();
612699
}
613700

614701
#[tokio::test]
615702
async fn create_artifact_for_two_signed_entity_types_in_sequence_not_blocking() {
616-
let artifact_builder_service = {
703+
let atomic_stop = Arc::new(AtomicBool::new(false));
704+
let signed_entity_type_service = {
617705
let mut mock_container = MockDependencyInjector::new();
618-
let snapshot = fake_data::snapshots(1).first().unwrap().to_owned();
619-
mock_container
620-
.mock_cardano_immutable_files_full_artifact_builder
621-
.expect_compute_artifact()
622-
.times(1)
623-
.return_once(|_, _| {
624-
std::thread::sleep(Duration::from_millis(1000));
625-
Ok(snapshot)
626-
});
627706

628707
let msd = create_stake_distribution(Epoch(1), 5);
629708
mock_container.mock_stake_distribution_processing(msd);
630709

631-
Arc::new(mock_container.build_artifact_builder_service())
710+
mock_container
711+
.build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone())
632712
};
633713
let certificate = fake_data::certificate("hash".to_string());
634714

635715
let signed_entity_type_immutable =
636716
SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
637-
let first_task_that_never_finished = artifact_builder_service
638-
.create_artifact_return_join_handle(signed_entity_type_immutable, &certificate);
717+
let first_task_that_never_finished = signed_entity_type_service
718+
.create_artifact_return_join_handle(signed_entity_type_immutable, &certificate)
719+
.await
720+
.unwrap();
639721

640722
let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1));
641-
let second_task_that_finish_first = artifact_builder_service
642-
.create_artifact_return_join_handle(signed_entity_type_msd, &certificate);
723+
let second_task_that_finish_first = signed_entity_type_service
724+
.create_artifact_return_join_handle(signed_entity_type_msd, &certificate)
725+
.await
726+
.unwrap();
643727

644-
let _ = second_task_that_finish_first.await.unwrap();
728+
second_task_that_finish_first.await.unwrap().unwrap();
645729
assert!(!first_task_that_never_finished.is_finished());
730+
731+
atomic_stop.swap(true, Ordering::Relaxed);
732+
}
733+
734+
#[tokio::test]
735+
async fn create_artifact_lock_unlock_signed_entity_type_while_processing() {
736+
let atomic_stop = Arc::new(AtomicBool::new(false));
737+
let signed_entity_type_service = MockDependencyInjector::new()
738+
.build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
739+
let certificate = fake_data::certificate("hash".to_string());
740+
741+
let signed_entity_type_immutable =
742+
SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
743+
assert!(
744+
!signed_entity_type_service
745+
.signed_entity_type_lock
746+
.is_locked(&signed_entity_type_immutable)
747+
.await
748+
);
749+
let join_handle = signed_entity_type_service
750+
.create_artifact_return_join_handle(signed_entity_type_immutable.clone(), &certificate)
751+
.await
752+
.unwrap();
753+
754+
// Results are stored to finalize the task before assertions,
755+
// ensuring 'atomic_stop' is always assigned a new value.
756+
let is_locked = signed_entity_type_service
757+
.signed_entity_type_lock
758+
.is_locked(&signed_entity_type_immutable)
759+
.await;
760+
let is_finished = join_handle.is_finished();
761+
762+
atomic_stop.swap(true, Ordering::Relaxed);
763+
join_handle.await.unwrap().unwrap();
764+
765+
assert!(is_locked);
766+
assert!(!is_finished);
767+
768+
assert!(
769+
!signed_entity_type_service
770+
.signed_entity_type_lock
771+
.is_locked(&signed_entity_type_immutable)
772+
.await
773+
);
774+
}
775+
776+
#[tokio::test]
777+
async fn create_artifact_unlock_signed_entity_type_when_error() {
778+
let signed_entity_type_service = {
779+
let mut mock_container = MockDependencyInjector::new();
780+
mock_container
781+
.mock_cardano_immutable_files_full_artifact_builder
782+
.expect_compute_artifact()
783+
.returning(|_, _| Err(anyhow::anyhow!("Error while computing artifact")));
784+
785+
mock_container.build_artifact_builder_service()
786+
};
787+
let certificate = fake_data::certificate("hash".to_string());
788+
789+
let signed_entity_type_immutable =
790+
SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
791+
792+
let join_handle = signed_entity_type_service
793+
.create_artifact_return_join_handle(signed_entity_type_immutable.clone(), &certificate)
794+
.await
795+
.unwrap();
796+
797+
join_handle.await.unwrap().unwrap_err();
798+
799+
assert!(
800+
!signed_entity_type_service
801+
.signed_entity_type_lock
802+
.is_locked(&signed_entity_type_immutable)
803+
.await
804+
);
805+
}
806+
807+
#[tokio::test]
808+
async fn create_artifact_unlock_signed_entity_type_when_panic() {
809+
let signed_entity_type_service =
810+
MockDependencyInjector::new().build_artifact_builder_service();
811+
let certificate = fake_data::certificate("hash".to_string());
812+
813+
let signed_entity_type_immutable =
814+
SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
815+
816+
let join_handle = signed_entity_type_service
817+
.create_artifact_return_join_handle(signed_entity_type_immutable.clone(), &certificate)
818+
.await
819+
.unwrap();
820+
821+
join_handle.await.unwrap_err();
822+
823+
assert!(
824+
!signed_entity_type_service
825+
.signed_entity_type_lock
826+
.is_locked(&signed_entity_type_immutable)
827+
.await
828+
);
829+
}
830+
831+
#[tokio::test]
832+
async fn create_artifact_for_a_signed_entity_type_already_lock_return_error() {
833+
let atomic_stop = Arc::new(AtomicBool::new(false));
834+
let signed_entity_service = MockDependencyInjector::new()
835+
.build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone());
836+
let certificate = fake_data::certificate("hash".to_string());
837+
let signed_entity_type_immutable =
838+
SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default());
839+
840+
signed_entity_service
841+
.create_artifact_return_join_handle(signed_entity_type_immutable.clone(), &certificate)
842+
.await
843+
.unwrap();
844+
845+
signed_entity_service
846+
.create_artifact_return_join_handle(signed_entity_type_immutable, &certificate)
847+
.await
848+
.expect_err("Should return error when signed entity type is already locked");
849+
850+
atomic_stop.swap(true, Ordering::Relaxed);
646851
}
647852
}

0 commit comments

Comments
 (0)