Skip to content

Commit 35d6bd0

Browse files
authored
Merge pull request #934 from input-output-hk/jpraynaud/933-enhance-state-machines-aggregator-signer
Update state machines runtime in Aggregator/Signer
2 parents df2e8a0 + 72d2859 commit 35d6bd0

File tree

14 files changed

+341
-231
lines changed

14 files changed

+341
-231
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
-2.21 KB
Loading
-28.6 KB
Loading

mithril-aggregator/src/runtime/runner.rs

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ pub trait AggregatorRunnerTrait: Sync + Send {
5555
/// Return the current beacon from the chain
5656
async fn get_beacon_from_chain(&self) -> Result<Beacon, Box<dyn StdError + Sync + Send>>;
5757

58+
/// Retrieves the current non certified open message for a given signed entity type.
59+
async fn get_current_non_certified_open_message_for_signed_entity_type(
60+
&self,
61+
signed_entity_type: &SignedEntityType,
62+
) -> Result<Option<OpenMessage>, Box<dyn StdError + Sync + Send>>;
63+
5864
/// Retrieves the current non certified open message.
5965
async fn get_current_non_certified_open_message(
6066
&self,
@@ -175,6 +181,38 @@ impl AggregatorRunnerTrait for AggregatorRunner {
175181
Ok(beacon)
176182
}
177183

184+
async fn get_current_non_certified_open_message_for_signed_entity_type(
185+
&self,
186+
signed_entity_type: &SignedEntityType,
187+
) -> Result<Option<OpenMessage>, Box<dyn StdError + Sync + Send>> {
188+
debug!("RUNNER: get_current_non_certified_open_message_for_signed_entity_type"; "signed_entity_type" => ?signed_entity_type);
189+
let open_message = match self
190+
.dependencies
191+
.certifier_service
192+
.get_open_message(signed_entity_type)
193+
.await?
194+
{
195+
Some(existing_open_message) => {
196+
info!(
197+
"RUNNER: get_current_non_certified_open_message_for_signed_entity_type: existing open message found";
198+
"signed_entity_type" => ?signed_entity_type
199+
);
200+
existing_open_message
201+
}
202+
None => {
203+
info!(
204+
"RUNNER: get_current_non_certified_open_message_for_signed_entity_type: no open message found, a new one will be created";
205+
"signed_entity_type" => ?signed_entity_type
206+
);
207+
let protocol_message = self.compute_protocol_message(signed_entity_type).await?;
208+
self.create_open_message(signed_entity_type, &protocol_message)
209+
.await?
210+
}
211+
};
212+
213+
Ok((!open_message.is_certified).then_some(open_message))
214+
}
215+
178216
async fn get_current_non_certified_open_message(
179217
&self,
180218
) -> Result<Option<OpenMessage>, Box<dyn StdError + Sync + Send>> {
@@ -192,32 +230,10 @@ impl AggregatorRunnerTrait for AggregatorRunner {
192230
];
193231

194232
for signed_entity_type in signed_entity_types {
195-
let open_message = match self
196-
.dependencies
197-
.certifier_service
198-
.get_open_message(&signed_entity_type)
233+
if let Some(open_message) = self
234+
.get_current_non_certified_open_message_for_signed_entity_type(&signed_entity_type)
199235
.await?
200236
{
201-
Some(existing_open_message) => {
202-
info!(
203-
"RUNNER: get_current_non_certified_open_message: existing open message found";
204-
"signed_entity_type" => ?signed_entity_type
205-
);
206-
existing_open_message
207-
}
208-
None => {
209-
info!(
210-
"RUNNER: get_current_non_certified_open_message: no open message found, a new one will be created";
211-
"signed_entity_type" => ?signed_entity_type
212-
);
213-
let protocol_message =
214-
self.compute_protocol_message(&signed_entity_type).await?;
215-
self.create_open_message(&signed_entity_type, &protocol_message)
216-
.await?
217-
}
218-
};
219-
220-
if !open_message.is_certified {
221237
return Ok(Some(open_message));
222238
}
223239
info!(

mithril-aggregator/src/runtime/state_machine.rs

Lines changed: 86 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -136,34 +136,20 @@ impl AggregatorRuntime {
136136
AggregatorState::Idle(state) => {
137137
let chain_beacon = self.runner.get_beacon_from_chain().await?;
138138

139-
if state.current_beacon.is_none()
140-
|| chain_beacon
141-
.compare_to_older(state.current_beacon.as_ref().unwrap())
142-
.map_err(|e|
143-
RuntimeError::keep_state(
144-
&format!("Beacon in the state ({:?}) is newer than the beacon read on chain '{:?})", state.current_beacon, chain_beacon), Some(e.into())))?
145-
.is_new_beacon()
146-
{
147-
info!(
148-
"→ new Beacon settings found, trying to transition to READY";
149-
"new_beacon" => ?chain_beacon
150-
);
139+
info!(
140+
"→ new Beacon settings found, trying to transition to READY";
141+
"new_beacon" => ?chain_beacon
142+
);
151143

152-
if self
153-
.try_transition_from_idle_to_ready(
154-
state.current_beacon,
155-
chain_beacon.clone(),
156-
)
157-
.await?
158-
{
159-
self.state = AggregatorState::Ready(ReadyState {
160-
current_beacon: chain_beacon,
161-
});
162-
} else {
163-
info!(" ⋅ could not transition from IDLE to READY");
164-
}
144+
if self
145+
.try_transition_from_idle_to_ready(state.current_beacon, chain_beacon.clone())
146+
.await?
147+
{
148+
self.state = AggregatorState::Ready(ReadyState {
149+
current_beacon: chain_beacon,
150+
});
165151
} else {
166-
info!(" ⋅ Beacon didn't change, waiting…")
152+
info!(" ⋅ could not transition from IDLE to READY");
167153
}
168154
}
169155
AggregatorState::Ready(state) => {
@@ -189,7 +175,7 @@ impl AggregatorRuntime {
189175
// transition READY > SIGNING
190176
info!("→ transitioning to SIGNING");
191177
let new_state = self
192-
.transition_from_ready_to_signing(state.current_beacon, open_message)
178+
.transition_from_ready_to_signing(chain_beacon, open_message)
193179
.await?;
194180
self.state = AggregatorState::Signing(new_state);
195181
} else {
@@ -205,25 +191,37 @@ impl AggregatorRuntime {
205191
}
206192
AggregatorState::Signing(state) => {
207193
let chain_beacon: Beacon = self.runner.get_beacon_from_chain().await?;
208-
209-
if chain_beacon
210-
.compare_to_older(&state.current_beacon)
211-
.map_err(|e|
212-
RuntimeError::keep_state(
213-
&format!("Beacon in the state ({:?}) is newer than the beacon read on chain '{:?})", state.current_beacon, chain_beacon), Some(e.into())))?
214-
.is_new_beacon()
194+
let has_newer_open_message = if let Some(open_message_new) = self
195+
.runner
196+
.get_current_non_certified_open_message_for_signed_entity_type(
197+
&state.open_message.signed_entity_type,
198+
)
199+
.await?
215200
{
216-
info!("→ Beacon changed, transitioning to IDLE"; "new_beacon" => ?chain_beacon);
201+
open_message_new.signed_entity_type != state.open_message.signed_entity_type
202+
} else {
203+
false
204+
};
205+
206+
if state.current_beacon.epoch < chain_beacon.epoch {
207+
// SIGNING > IDLE
208+
info!("→ Epoch changed, transitioning to IDLE");
209+
let new_state = self.transition_from_signing_to_idle(state).await?;
210+
self.state = AggregatorState::Idle(new_state);
211+
} else if has_newer_open_message {
212+
// SIGNING > READY
213+
info!("→ Open message changed, transitioning to READY");
217214
let new_state = self
218-
.transition_from_signing_to_idle_new_beacon(state)
215+
.transition_from_signing_to_ready_new_open_message(state)
219216
.await?;
220-
self.state = AggregatorState::Idle(new_state);
217+
self.state = AggregatorState::Ready(new_state);
221218
} else {
219+
// SIGNING > READY
222220
let new_state = self
223-
.transition_from_signing_to_idle_multisignature(state)
224-
.await?;
225-
info!("→ a multi-signature have been created, build a snapshot & a certificate and transitioning back to IDLE");
226-
self.state = AggregatorState::Idle(new_state);
221+
.transition_from_signing_to_ready_multisignature(state)
222+
.await?;
223+
info!("→ a multi-signature have been created, build a snapshot & a certificate and transitioning back to READY");
224+
self.state = AggregatorState::Ready(new_state);
227225
}
228226
}
229227
}
@@ -267,13 +265,13 @@ impl AggregatorRuntime {
267265
Ok(is_chain_valid)
268266
}
269267

270-
/// Perform a transition from `SIGNING` state to `IDLE` state when a new
268+
/// Perform a transition from `SIGNING` state to `READY` state when a new
271269
/// multi-signature is issued.
272-
async fn transition_from_signing_to_idle_multisignature(
270+
async fn transition_from_signing_to_ready_multisignature(
273271
&self,
274272
state: SigningState,
275-
) -> Result<IdleState, RuntimeError> {
276-
trace!("launching transition from SIGNING to IDLE state");
273+
) -> Result<ReadyState, RuntimeError> {
274+
trace!("launching transition from SIGNING to READY state");
277275
let certificate = self
278276
.runner
279277
.create_certificate(&state.open_message.signed_entity_type)
@@ -288,14 +286,14 @@ impl AggregatorRuntime {
288286
.create_artifact(&state.open_message.signed_entity_type, &certificate)
289287
.await?;
290288

291-
Ok(IdleState {
292-
current_beacon: Some(state.current_beacon),
289+
Ok(ReadyState {
290+
current_beacon: state.current_beacon,
293291
})
294292
}
295293

296294
/// Perform a transition from `SIGNING` state to `IDLE` state when a new
297-
/// beacon is detected.
298-
async fn transition_from_signing_to_idle_new_beacon(
295+
/// epoch is detected.
296+
async fn transition_from_signing_to_idle(
299297
&self,
300298
state: SigningState,
301299
) -> Result<IdleState, RuntimeError> {
@@ -307,6 +305,20 @@ impl AggregatorRuntime {
307305
})
308306
}
309307

308+
/// Perform a transition from `SIGNING` state to `READY` state when a new
309+
/// open message is detected.
310+
async fn transition_from_signing_to_ready_new_open_message(
311+
&self,
312+
state: SigningState,
313+
) -> Result<ReadyState, RuntimeError> {
314+
trace!("launching transition from SIGNING to READY state");
315+
self.runner.drop_pending_certificate().await?;
316+
317+
Ok(ReadyState {
318+
current_beacon: state.current_beacon,
319+
})
320+
}
321+
310322
/// Perform a transition from `READY` state to `SIGNING` state when a new
311323
/// beacon is detected.
312324
async fn transition_from_ready_to_signing(
@@ -344,6 +356,7 @@ mod tests {
344356
use super::super::runner::MockAggregatorRunner;
345357
use super::*;
346358

359+
use mithril_common::entities::{Epoch, SignedEntityType};
347360
use mithril_common::era::UnsupportedEraError;
348361
use mithril_common::test_utils::fake_data;
349362
use mockall::predicate;
@@ -357,25 +370,6 @@ mod tests {
357370
.unwrap()
358371
}
359372

360-
#[tokio::test]
361-
pub async fn idle_check_no_new_beacon_with_current_beacon() {
362-
let mut runner = MockAggregatorRunner::new();
363-
runner
364-
.expect_get_beacon_from_chain()
365-
.once()
366-
.returning(|| Ok(fake_data::beacon()));
367-
let mut runtime = init_runtime(
368-
Some(AggregatorState::Idle(IdleState {
369-
current_beacon: Some(fake_data::beacon()),
370-
})),
371-
runner,
372-
)
373-
.await;
374-
runtime.cycle().await.unwrap();
375-
376-
assert_eq!("idle".to_string(), runtime.get_state());
377-
}
378-
379373
#[tokio::test]
380374
pub async fn idle_check_certificate_chain_is_not_valid() {
381375
let mut runner = MockAggregatorRunner::new();
@@ -593,32 +587,37 @@ mod tests {
593587
}
594588

595589
#[tokio::test]
596-
async fn signing_changing_beacon_to_idle() {
590+
async fn signing_changing_open_message_to_ready() {
597591
let mut runner = MockAggregatorRunner::new();
598592
runner
599593
.expect_get_beacon_from_chain()
600594
.once()
601595
.returning(|| Ok(fake_data::beacon()));
596+
runner
597+
.expect_get_current_non_certified_open_message_for_signed_entity_type()
598+
.once()
599+
.returning(|_| {
600+
Ok(Some(OpenMessage {
601+
signed_entity_type: SignedEntityType::MithrilStakeDistribution(Epoch(1)),
602+
..OpenMessage::dummy()
603+
}))
604+
});
602605
runner
603606
.expect_drop_pending_certificate()
604607
.once()
605608
.returning(|| Ok(Some(fake_data::certificate_pending())));
606609

607610
let state = SigningState {
608-
// this current beacon must be outdated so the state machine will
609-
// return to idle state
610-
current_beacon: {
611-
let mut beacon = fake_data::beacon();
612-
beacon.immutable_file_number -= 1;
613-
614-
beacon
611+
current_beacon: fake_data::beacon(),
612+
open_message: OpenMessage {
613+
signed_entity_type: SignedEntityType::MithrilStakeDistribution(Epoch(2)),
614+
..OpenMessage::dummy()
615615
},
616-
open_message: OpenMessage::dummy(),
617616
};
618617
let mut runtime = init_runtime(Some(AggregatorState::Signing(state)), runner).await;
619618
runtime.cycle().await.unwrap();
620619

621-
assert_eq!("idle".to_string(), runtime.get_state());
620+
assert_eq!("ready".to_string(), runtime.get_state());
622621
}
623622

624623
#[tokio::test]
@@ -628,6 +627,10 @@ mod tests {
628627
.expect_get_beacon_from_chain()
629628
.once()
630629
.returning(|| Ok(fake_data::beacon()));
630+
runner
631+
.expect_get_current_non_certified_open_message_for_signed_entity_type()
632+
.once()
633+
.returning(|_| Ok(Some(OpenMessage::dummy())));
631634
runner
632635
.expect_create_certificate()
633636
.once()
@@ -652,6 +655,10 @@ mod tests {
652655
.expect_get_beacon_from_chain()
653656
.once()
654657
.returning(|| Ok(fake_data::beacon()));
658+
runner
659+
.expect_get_current_non_certified_open_message_for_signed_entity_type()
660+
.once()
661+
.returning(|_| Ok(Some(OpenMessage::dummy())));
655662
runner
656663
.expect_create_certificate()
657664
.return_once(move |_| Ok(Some(fake_data::certificate("whatever".to_string()))));
@@ -671,7 +678,7 @@ mod tests {
671678
let mut runtime = init_runtime(Some(AggregatorState::Signing(state)), runner).await;
672679
runtime.cycle().await.unwrap();
673680

674-
assert_eq!("idle".to_string(), runtime.get_state());
681+
assert_eq!("ready".to_string(), runtime.get_state());
675682
}
676683

677684
#[tokio::test]

0 commit comments

Comments
 (0)