Skip to content

Commit f7f0c84

Browse files
committed
validator_worker - follower - complete the fns tick & on_new_state
1 parent 155233e commit f7f0c84

File tree

1 file changed

+96
-11
lines changed

1 file changed

+96
-11
lines changed

validator_worker/src/follower.rs

Lines changed: 96 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,25 @@
11
use std::error::Error;
22

33
use primitives::adapter::Adapter;
4-
use primitives::validator::{MessageTypes, NewState};
4+
use primitives::validator::{ApproveState, MessageTypes, NewState, RejectState};
55
use primitives::BalancesMap;
66

7+
use crate::core::follower_rules::{is_healthy, is_valid_transition};
78
use crate::heartbeat::heartbeat;
89
use crate::sentry_interface::SentryApi;
910
use crate::{get_state_root_hash, producer};
11+
use chrono::Utc;
12+
13+
enum InvalidNewState {
14+
RootHash,
15+
Signature,
16+
Transition,
17+
}
18+
19+
enum NewStateResult {
20+
Ok,
21+
Err(InvalidNewState),
22+
}
1023

1124
pub async fn tick<A: Adapter + 'static>(iface: &SentryApi<A>) -> Result<(), Box<dyn Error>> {
1225
let from = iface.channel.spec.validators.leader().id.clone();
@@ -36,19 +49,91 @@ pub async fn tick<A: Adapter + 'static>(iface: &SentryApi<A>) -> Result<(), Box<
3649

3750
let (balances, _) = await!(producer::tick(&iface))?;
3851

39-
match (new_msg, latest_is_responded_to) {
40-
(Some(new_state), false) => on_new_state(&iface, &balances, &new_state)?,
41-
(_, _) => {}
52+
if let (Some(new_state), false) = (new_msg, latest_is_responded_to) {
53+
await!(on_new_state(&iface, &balances, &new_state))?;
4254
}
4355

44-
// TODO: Pass the heartbeat time from the Configuration
4556
await!(heartbeat(&iface, balances)).map(|_| ())
4657
}
4758

48-
fn on_new_state<A: Adapter + 'static>(
49-
_iface: &SentryApi<A>,
50-
_balances: &BalancesMap,
51-
_new_state: &NewState,
52-
) -> Result<(), Box<dyn Error>> {
53-
unimplemented!();
59+
async fn on_new_state<'a, A: Adapter + 'static>(
60+
iface: &'a SentryApi<A>,
61+
balances: &'a BalancesMap,
62+
new_state: &'a NewState,
63+
) -> Result<NewStateResult, Box<dyn Error>> {
64+
let proposed_balances = new_state.balances.clone();
65+
let proposed_state_root = new_state.state_root.clone();
66+
67+
if proposed_state_root != hex::encode(get_state_root_hash(&iface, &proposed_balances)?) {
68+
return Ok(await!(on_error(
69+
&iface,
70+
&new_state,
71+
InvalidNewState::RootHash
72+
)));
73+
}
74+
75+
if !iface.adapter.verify(
76+
&iface.channel.spec.validators.leader().id,
77+
&proposed_state_root,
78+
&new_state.signature,
79+
)? {
80+
return Ok(await!(on_error(
81+
&iface,
82+
&new_state,
83+
InvalidNewState::Signature
84+
)));
85+
}
86+
87+
let last_approve_response = await!(iface.get_last_approved())?;
88+
// TODO: Check if it's possible to have an empty response (i.e. we don't have LastApprove)
89+
let prev_balances = last_approve_response.last_approved.new_state.balances;
90+
if !is_valid_transition(&iface.channel, &prev_balances, &proposed_balances) {
91+
return Ok(await!(on_error(
92+
&iface,
93+
&new_state,
94+
InvalidNewState::Transition
95+
)));
96+
}
97+
98+
let signature = iface.adapter.sign(&new_state.state_root)?;
99+
let health_threshold = u64::from(iface.config.health_threshold_promilles).into();
100+
101+
iface.propagate(&[&MessageTypes::ApproveState(ApproveState {
102+
state_root: proposed_state_root,
103+
signature,
104+
is_healthy: is_healthy(
105+
&iface.channel,
106+
balances,
107+
&proposed_balances,
108+
&health_threshold,
109+
),
110+
})]);
111+
112+
Ok(NewStateResult::Ok)
113+
}
114+
115+
async fn on_error<'a, A: Adapter + 'static>(
116+
iface: &'a SentryApi<A>,
117+
new_state: &'a NewState,
118+
status: InvalidNewState,
119+
) -> NewStateResult {
120+
use InvalidNewState::*;
121+
let reason = match &status {
122+
RootHash => "InvalidRootHash",
123+
Signature => "InvalidSignature",
124+
Transition => "InvalidTransition",
125+
}
126+
.to_string();
127+
128+
iface.propagate(&[&MessageTypes::RejectState(RejectState {
129+
reason,
130+
state_root: new_state.state_root.clone(),
131+
signature: new_state.signature.clone(),
132+
balances: Some(new_state.balances.clone()),
133+
/// The NewState timestamp that is being rejected
134+
// TODO: Double check this, if we decide to have 2 timestamps - 1 for the RejectState & 1 for NewState timestamp
135+
timestamp: Some(Utc::now()),
136+
})]);
137+
138+
NewStateResult::Err(status)
54139
}

0 commit comments

Comments
 (0)