Skip to content

Commit 930ee6e

Browse files
committed
feat(aggregator): add follower sync in state machine + wire synchroniser
1 parent 8931cb8 commit 930ee6e

File tree

8 files changed

+124
-9
lines changed

8 files changed

+124
-9
lines changed

mithril-aggregator/src/dependency_injection/builder/mod.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ use crate::{
5656
file_uploaders::FileUploader,
5757
http_server::routes::router::{self, RouterConfig, RouterState},
5858
services::{
59-
AggregatorHTTPClient, CertifierService, MessageService, MithrilSignerRegistrationFollower,
60-
ProverService, SignedEntityService, SignerSynchronizer, Snapshotter,
61-
StakeDistributionService, UpkeepService,
59+
AggregatorHTTPClient, CertificateChainSynchronizer, CertifierService, MessageService,
60+
MithrilSignerRegistrationFollower, ProverService, SignedEntityService, SignerSynchronizer,
61+
Snapshotter, StakeDistributionService, UpkeepService,
6262
},
6363
tools::file_archiver::FileArchiver,
6464
};
@@ -185,6 +185,9 @@ pub struct DependenciesBuilder {
185185
/// Genesis signature verifier service.
186186
pub genesis_verifier: Option<Arc<ProtocolGenesisVerifier>>,
187187

188+
/// Certificate chain synchroniser service
189+
pub certificate_chain_synchroniser: Option<Arc<dyn CertificateChainSynchronizer>>,
190+
188191
/// Mithril signer registration leader service
189192
pub mithril_signer_registration_leader: Option<Arc<MithrilSignerRegistrationLeader>>,
190193

@@ -312,6 +315,7 @@ impl DependenciesBuilder {
312315
snapshotter: None,
313316
certificate_verifier: None,
314317
genesis_verifier: None,
318+
certificate_chain_synchroniser: None,
315319
mithril_signer_registration_leader: None,
316320
mithril_signer_registration_follower: None,
317321
signer_registerer: None,
@@ -374,6 +378,7 @@ impl DependenciesBuilder {
374378
verification_key_store: self.get_verification_key_store().await?,
375379
epoch_settings_storer: self.get_epoch_settings_store().await?,
376380
chain_observer: self.get_chain_observer().await?,
381+
certificate_chain_synchroniser: self.get_certificate_chain_synchroniser().await?,
377382
signer_registerer: self.get_signer_registerer().await?,
378383
signer_synchronizer: self.get_signer_synchronizer().await?,
379384
signer_registration_round_opener: self.get_signer_registration_round_opener().await?,

mithril-aggregator/src/dependency_injection/builder/protocol/certificates.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ use crate::database::repository::{BufferedSingleSignatureRepository, SingleSigna
1010
use crate::dependency_injection::{DependenciesBuilder, DependenciesBuilderError, Result};
1111
use crate::get_dependency;
1212
use crate::services::{
13-
BufferedCertifierService, CertifierService, MithrilCertifierService,
14-
MithrilSignerRegistrationFollower, SignerSynchronizer,
13+
BufferedCertifierService, CertificateChainSynchronizer, CertifierService,
14+
MithrilCertificateChainSynchroniserNoop, MithrilCertificateChainSynchronizer,
15+
MithrilCertifierService, MithrilSignerRegistrationFollower, SignerSynchronizer,
1516
};
1617
use crate::{
1718
ExecutionEnvironment, MithrilSignerRegistrationLeader, MithrilSignerRegistrationVerifier,
@@ -84,6 +85,39 @@ impl DependenciesBuilder {
8485
get_dependency!(self.multi_signer)
8586
}
8687

88+
async fn build_certificate_chain_synchroniser(
89+
&mut self,
90+
) -> Result<Arc<dyn CertificateChainSynchronizer>> {
91+
let synchroniser: Arc<dyn CertificateChainSynchronizer> =
92+
if self.configuration.is_follower_aggregator() {
93+
let leader_aggregator_client = self.get_leader_aggregator_client().await?;
94+
let verifier = Arc::new(MithrilCertificateVerifier::new(
95+
self.root_logger(),
96+
leader_aggregator_client.clone(),
97+
));
98+
99+
Arc::new(MithrilCertificateChainSynchronizer::new(
100+
leader_aggregator_client,
101+
self.get_certificate_repository().await?,
102+
verifier,
103+
self.get_genesis_verifier().await?,
104+
self.get_open_message_repository().await?,
105+
self.root_logger(),
106+
))
107+
} else {
108+
Arc::new(MithrilCertificateChainSynchroniserNoop)
109+
};
110+
111+
Ok(synchroniser)
112+
}
113+
114+
/// [CertificateChainSynchronizer] service
115+
pub async fn get_certificate_chain_synchroniser(
116+
&mut self,
117+
) -> Result<Arc<dyn CertificateChainSynchronizer>> {
118+
get_dependency!(self.certificate_chain_synchroniser)
119+
}
120+
87121
async fn build_certificate_verifier(&mut self) -> Result<Arc<dyn CertificateVerifier>> {
88122
let verifier = Arc::new(MithrilCertificateVerifier::new(
89123
self.root_logger(),

mithril-aggregator/src/dependency_injection/containers/serve.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ use crate::{
2727
entities::AggregatorEpochSettings,
2828
event_store::{EventMessage, TransmitterService},
2929
services::{
30-
CertifierService, EpochService, MessageService, ProverService, SignedEntityService,
31-
SignerRecorder, SignerSynchronizer, StakeDistributionService, UpkeepService,
30+
CertificateChainSynchronizer, CertifierService, EpochService, MessageService,
31+
ProverService, SignedEntityService, SignerRecorder, SignerSynchronizer,
32+
StakeDistributionService, UpkeepService,
3233
},
3334
};
3435

@@ -57,6 +58,9 @@ pub struct ServeCommandDependenciesContainer {
5758
/// Chain observer service.
5859
pub(crate) chain_observer: Arc<dyn ChainObserver>,
5960

61+
/// Certificate chain synchroniser service
62+
pub(crate) certificate_chain_synchroniser: Arc<dyn CertificateChainSynchronizer>,
63+
6064
/// Signer registerer service
6165
pub signer_registerer: Arc<dyn SignerRegisterer>,
6266

mithril-aggregator/src/runtime/runner.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ pub trait AggregatorRunnerTrait: Sync + Send {
7272
/// Synchronize the follower aggregator signer registration.
7373
async fn synchronize_follower_aggregator_signer_registration(&self) -> StdResult<()>;
7474

75+
/// Synchronise the follower aggregator certificate chain
76+
async fn synchronize_follower_aggregator_certificate_chain(
77+
&self,
78+
force_sync: bool,
79+
) -> StdResult<()>;
80+
7581
/// Ask the EpochService to update the epoch settings.
7682
async fn update_epoch_settings(&self) -> StdResult<()>;
7783

@@ -506,6 +512,20 @@ impl AggregatorRunnerTrait for AggregatorRunner {
506512
.get_runtime_cycle_total_since_startup()
507513
.increment();
508514
}
515+
516+
async fn synchronize_follower_aggregator_certificate_chain(
517+
&self,
518+
force_sync: bool,
519+
) -> StdResult<()> {
520+
debug!(
521+
self.logger,
522+
">> synchronize_follower_aggregator_certificate_chain(force_sync:{force_sync})"
523+
);
524+
self.dependencies
525+
.certificate_chain_synchroniser
526+
.synchronize_certificate_chain(force_sync)
527+
.await
528+
}
509529
}
510530

511531
#[cfg(test)]

mithril-aggregator/src/runtime/state_machine.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,13 +277,21 @@ impl AggregatorRuntime {
277277
self.runner.precompute_epoch_data().await?;
278278
}
279279

280-
self.runner
280+
let chain_validity_result = self
281+
.runner
281282
.is_certificate_chain_valid(&new_time_point)
282283
.await
283284
.map_err(|e| RuntimeError::KeepState {
284285
message: "certificate chain is invalid".to_string(),
285286
nested_error: e.into(),
286-
})?;
287+
});
288+
if self.config.is_follower {
289+
let force_sync = chain_validity_result.is_err();
290+
self.runner
291+
.synchronize_follower_aggregator_certificate_chain(force_sync)
292+
.await?;
293+
}
294+
chain_validity_result?;
287295

288296
Ok(())
289297
}
@@ -834,6 +842,8 @@ mod tests {
834842
}
835843

836844
mod follower {
845+
use mockall::predicate::eq;
846+
837847
use super::*;
838848

839849
#[tokio::test]
@@ -911,6 +921,11 @@ mod tests {
911921
.expect_is_certificate_chain_valid()
912922
.once()
913923
.returning(|_| Ok(()));
924+
runner
925+
.expect_synchronize_follower_aggregator_certificate_chain()
926+
.once()
927+
.with(eq(false)) // Certificate chain valid so force_sync must be false
928+
.returning(|_| Ok(()));
914929
runner
915930
.expect_update_era_checker()
916931
.with(predicate::eq(new_time_point_clone.clone().epoch))

mithril-aggregator/src/services/aggregator_client.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use thiserror::Error;
1111
use mithril_common::{
1212
MITHRIL_AGGREGATOR_VERSION_HEADER, MITHRIL_API_VERSION_HEADER, StdError, StdResult,
1313
api_version::APIVersionProvider,
14+
certificate_chain::{CertificateRetriever, CertificateRetrieverError},
1415
entities::{Certificate, ClientError, ServerError},
1516
logging::LoggerExtensions,
1617
messages::{
@@ -334,6 +335,27 @@ impl LeaderAggregatorClient for AggregatorHTTPClient {
334335
}
335336
}
336337

338+
#[async_trait]
339+
impl CertificateRetriever for AggregatorHTTPClient {
340+
async fn get_certificate_details(
341+
&self,
342+
certificate_hash: &str,
343+
) -> Result<Certificate, CertificateRetrieverError> {
344+
let message = self
345+
.certificates_details(certificate_hash)
346+
.await
347+
.with_context(|| {
348+
format!("Failed to retrieve certificate with hash: '{certificate_hash}'")
349+
})
350+
.map_err(CertificateRetrieverError)?
351+
.ok_or(CertificateRetrieverError(anyhow!(
352+
"Certificate does not exist: '{certificate_hash}'"
353+
)))?;
354+
355+
message.try_into().map_err(CertificateRetrieverError)
356+
}
357+
}
358+
337359
#[async_trait]
338360
impl RemoteCertificateRetriever for AggregatorHTTPClient {
339361
async fn get_latest_certificate_details(&self) -> StdResult<Option<Certificate>> {
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
mod interface;
2+
mod noop;
23
mod synchroniser_service;
34

45
pub use interface::*;
6+
pub use noop::*;
57
pub use synchroniser_service::*;
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
use mithril_common::StdResult;
2+
3+
use crate::services::CertificateChainSynchronizer;
4+
5+
/// A noop [CertificateChainSynchronizer] for leader aggregators
6+
pub struct MithrilCertificateChainSynchroniserNoop;
7+
8+
#[async_trait::async_trait]
9+
impl CertificateChainSynchronizer for MithrilCertificateChainSynchroniserNoop {
10+
async fn synchronize_certificate_chain(&self, _force: bool) -> StdResult<()> {
11+
Ok(())
12+
}
13+
}

0 commit comments

Comments
 (0)