Skip to content

Commit 8908f6c

Browse files
samparskyelpiel
authored andcommitted
fix: validator worker startup (#155)
* fix: validator worker startup * fix: validator_worker async process * fix: validator messages response * fix: rebase /dev, clippy warnings * fix: rename SentryApi::new -> init * fix: clippy warnings, remove uneeded clones * fix: validator_worker return err on failed fetch channel * fix: get_our_latest_msg, remove uneeded option * fix: add futures_locks, change to RwLock futures compatible
1 parent f79ae27 commit 8908f6c

File tree

11 files changed

+281
-136
lines changed

11 files changed

+281
-136
lines changed

Cargo.lock

Lines changed: 13 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

primitives/src/adapter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub struct Session {
5555
pub uid: String,
5656
}
5757

58-
pub trait Adapter: ChannelValidator + Clone + Debug {
58+
pub trait Adapter: ChannelValidator + Clone + Debug + Send {
5959
type Output;
6060

6161
/// Initialize adapter

primitives/src/sentry.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,16 @@ pub struct SuccessResponse {
8080
}
8181

8282
#[derive(Serialize, Deserialize, Debug)]
83-
pub struct ValidatorMessageResponse {
83+
pub struct ValidatorMessage {
8484
pub from: String,
8585
pub received: DateTime<Utc>,
86-
pub msg: Vec<MessageTypes>,
86+
pub msg: MessageTypes,
87+
}
88+
89+
#[derive(Serialize, Deserialize, Debug)]
90+
#[serde(rename_all = "camelCase")]
91+
pub enum ValidatorMessageResponse {
92+
ValidatorMessages(Vec<ValidatorMessage>),
8793
}
8894

8995
#[derive(Serialize, Deserialize, Debug)]

validator_worker/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ hex = {version = "0.3.2"}
2020
byteorder = "1.3"
2121
# Futures
2222
futures-preview = { version = "=0.3.0-alpha.19", features = ["compat"]}
23-
futures_legacy = { version = "0.1", package = "futures" }
23+
futures_legacy = { version = "0.1.20", package = "futures" }
24+
futures-locks = "0.3"
2425
# Concurrency
2526
tokio = { version = "=0.1.19" }
2627
# API client

validator_worker/src/follower.rs

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::error::Error;
22

3+
use futures::compat::Future01CompatExt;
34
use primitives::adapter::Adapter;
45
use primitives::validator::{ApproveState, MessageTypes, NewState, RejectState};
56
use primitives::BalancesMap;
@@ -23,25 +24,21 @@ enum NewStateResult {
2324

2425
pub async fn tick<A: Adapter + 'static>(iface: &SentryApi<A>) -> Result<(), Box<dyn Error>> {
2526
let from = iface.channel.spec.validators.leader().id.clone();
26-
let new_msg_response = iface.get_latest_msg(from, "NewState".to_string()).await?;
27-
let new_msg = new_msg_response
28-
.msg
29-
.get(0)
30-
.and_then(|message_types| match message_types {
31-
MessageTypes::NewState(new_state) => Some(new_state.clone()),
32-
_ => None,
33-
});
27+
let new_msg_response = iface.get_latest_msg(from, &["NewState"]).await?;
28+
let new_msg = match new_msg_response {
29+
Some(MessageTypes::NewState(new_state)) => Some(new_state),
30+
_ => None,
31+
};
32+
3433
let our_latest_msg_response = iface
35-
.get_our_latest_msg("ApproveState+RejectState".to_string())
34+
.get_our_latest_msg(&["ApproveState", "RejectState"])
3635
.await?;
37-
let our_latest_msg_state_root = our_latest_msg_response
38-
.msg
39-
.get(0)
40-
.and_then(|message_types| match message_types {
41-
MessageTypes::ApproveState(approve_state) => Some(approve_state.state_root.clone()),
42-
MessageTypes::RejectState(reject_state) => Some(reject_state.state_root.clone()),
43-
_ => None,
44-
});
36+
37+
let our_latest_msg_state_root = match our_latest_msg_response {
38+
Some(MessageTypes::ApproveState(approve_state)) => Some(approve_state.state_root),
39+
Some(MessageTypes::RejectState(reject_state)) => Some(reject_state.state_root),
40+
_ => None,
41+
};
4542

4643
let latest_is_responded_to = match (&new_msg, &our_latest_msg_state_root) {
4744
(Some(new_msg), Some(state_root)) => &new_msg.state_root == state_root,
@@ -72,6 +69,8 @@ async fn on_new_state<'a, A: Adapter + 'static>(
7269
let adapter = iface
7370
.adapter
7471
.read()
72+
.compat()
73+
.await
7574
.expect("on_new_state: failed to acquire read lock adapter");
7675

7776
if !adapter.verify(
@@ -95,16 +94,18 @@ async fn on_new_state<'a, A: Adapter + 'static>(
9594
let signature = adapter.sign(&new_state.state_root)?;
9695
let health_threshold = u64::from(iface.config.health_threshold_promilles).into();
9796

98-
iface.propagate(&[&MessageTypes::ApproveState(ApproveState {
99-
state_root: proposed_state_root,
100-
signature,
101-
is_healthy: is_healthy(
102-
&iface.channel,
103-
balances,
104-
&proposed_balances,
105-
&health_threshold,
106-
),
107-
})]);
97+
iface
98+
.propagate(&[&MessageTypes::ApproveState(ApproveState {
99+
state_root: proposed_state_root,
100+
signature,
101+
is_healthy: is_healthy(
102+
&iface.channel,
103+
balances,
104+
&proposed_balances,
105+
&health_threshold,
106+
),
107+
})])
108+
.await;
108109

109110
Ok(NewStateResult::Ok)
110111
}
@@ -122,14 +123,16 @@ async fn on_error<'a, A: Adapter + 'static>(
122123
}
123124
.to_string();
124125

125-
iface.propagate(&[&MessageTypes::RejectState(RejectState {
126-
reason,
127-
state_root: new_state.state_root.clone(),
128-
signature: new_state.signature.clone(),
129-
balances: Some(new_state.balances.clone()),
130-
/// The NewState timestamp that is being rejected
131-
timestamp: Some(Utc::now()),
132-
})]);
126+
iface
127+
.propagate(&[&MessageTypes::RejectState(RejectState {
128+
reason,
129+
state_root: new_state.state_root.clone(),
130+
signature: new_state.signature.clone(),
131+
balances: Some(new_state.balances.clone()),
132+
/// The NewState timestamp that is being rejected
133+
timestamp: Some(Utc::now()),
134+
})])
135+
.await;
133136

134137
NewStateResult::Err(status)
135138
}

validator_worker/src/heartbeat.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::convert::TryFrom;
22
use std::error::Error;
33

44
use chrono::{Duration, Utc};
5+
use futures::compat::Future01CompatExt;
56

67
use adapter::get_signable_state_root;
78
use byteorder::{BigEndian, ByteOrder};
@@ -23,10 +24,13 @@ async fn send_heartbeat<A: Adapter + 'static>(iface: &SentryApi<A>) -> Result<()
2324

2425
let state_root_raw = get_signable_state_root(&iface.channel.id, &info_root_raw)?;
2526
let state_root = hex::encode(state_root_raw);
27+
2628
let signature = iface
2729
.adapter
2830
.read()
29-
.expect("send_heartbeat: failed to acquire adapter read lock")
31+
.compat()
32+
.await
33+
.expect("on_new_state: failed to acquire read lock adapter")
3034
.sign(&state_root)?;
3135

3236
let message_types = MessageTypes::Heartbeat(Heartbeat {
@@ -35,7 +39,7 @@ async fn send_heartbeat<A: Adapter + 'static>(iface: &SentryApi<A>) -> Result<()
3539
timestamp: Utc::now(),
3640
});
3741

38-
iface.propagate(&[&message_types]);
42+
iface.propagate(&[&message_types]).await;
3943

4044
Ok(())
4145
}
@@ -44,15 +48,12 @@ pub async fn heartbeat<A: Adapter + 'static>(
4448
iface: &SentryApi<A>,
4549
balances: BalancesMap,
4650
) -> Result<(), Box<dyn Error>> {
47-
let validator_message_response = iface.get_our_latest_msg("Heartbeat".into()).await?;
48-
49-
let heartbeat_msg = validator_message_response
50-
.msg
51-
.get(0)
52-
.and_then(|message_types| match message_types {
53-
MessageTypes::Heartbeat(heartbeat) => Some(heartbeat.clone()),
54-
_ => None,
55-
});
51+
let validator_message_response = iface.get_our_latest_msg(&["Heartbeat"]).await?;
52+
53+
let heartbeat_msg = match validator_message_response {
54+
Some(MessageTypes::Heartbeat(heartbeat)) => Some(heartbeat),
55+
_ => None,
56+
};
5657

5758
let should_send = heartbeat_msg.map_or(true, |heartbeat| {
5859
let duration = Utc::now() - heartbeat.timestamp;

validator_worker/src/leader.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::error::Error;
22

3+
use futures::compat::Future01CompatExt;
34
use primitives::adapter::Adapter;
45
use primitives::validator::{Accounting, MessageTypes, NewState};
56
use primitives::BalancesMap;
@@ -12,13 +13,13 @@ pub async fn tick<A: Adapter + 'static>(iface: &SentryApi<A>) -> Result<(), Box<
1213
let (balances, new_accounting) = producer::tick(&iface).await?;
1314

1415
if let Some(new_accounting) = new_accounting {
15-
on_new_accounting(&iface, (&balances, &new_accounting))?;
16+
on_new_accounting(&iface, (&balances, &new_accounting)).await?;
1617
}
1718

1819
heartbeat(&iface, balances).await.map(|_| ())
1920
}
2021

21-
fn on_new_accounting<A: Adapter + 'static>(
22+
async fn on_new_accounting<A: Adapter + 'static>(
2223
iface: &SentryApi<A>,
2324
(balances, new_accounting): (&BalancesMap, &Accounting),
2425
) -> Result<(), Box<dyn Error>> {
@@ -28,14 +29,18 @@ fn on_new_accounting<A: Adapter + 'static>(
2829
let signature = iface
2930
.adapter
3031
.read()
31-
.expect("on_new_accounting: failed to acquire read lock")
32+
.compat()
33+
.await
34+
.expect("on_new_state: failed to acquire read lock adapter")
3235
.sign(&state_root)?;
3336

34-
iface.propagate(&[&MessageTypes::NewState(NewState {
35-
state_root,
36-
signature,
37-
balances: new_accounting.balances.clone(),
38-
})]);
37+
iface
38+
.propagate(&[&MessageTypes::NewState(NewState {
39+
state_root,
40+
signature,
41+
balances: new_accounting.balances.clone(),
42+
})])
43+
.await;
3944

4045
Ok(())
4146
}

validator_worker/src/lib.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@
33

44
use std::error::Error;
55

6-
use crate::sentry_interface::SentryApi;
76
use adapter::{get_balance_leaf, get_signable_state_root};
87
use primitives::adapter::Adapter;
98
use primitives::merkle_tree::MerkleTree;
109
use primitives::BalancesMap;
1110

12-
pub use self::sentry_interface::all_channels;
11+
pub use self::sentry_interface::{all_channels, SentryApi};
1312

1413
pub mod error;
1514
pub mod follower;
@@ -47,11 +46,12 @@ mod test {
4746
use super::*;
4847

4948
use adapter::DummyAdapter;
49+
use futures_locks::RwLock;
5050
use primitives::adapter::AdapterOptions;
5151
use primitives::config::configuration;
5252
use primitives::util::tests::prep_db::{AUTH, DUMMY_CHANNEL, IDS};
5353
use primitives::{BalancesMap, Channel};
54-
use std::sync::{Arc, RwLock};
54+
use std::sync::Arc;
5555

5656
fn setup_iface(channel: &Channel) -> SentryApi<DummyAdapter> {
5757
let adapter_options = AdapterOptions::DummAdapter {
@@ -61,12 +61,14 @@ mod test {
6161
};
6262
let config = configuration("development", None).expect("Dev config should be available");
6363
let dummy_adapter = DummyAdapter::init(adapter_options, &config).expect("init adadpter");
64+
let whoami = dummy_adapter.whoami();
6465

65-
SentryApi::new(
66+
SentryApi::init(
6667
Arc::new(RwLock::new(dummy_adapter)),
6768
&channel,
6869
&config,
6970
false,
71+
&whoami,
7072
)
7173
.expect("should succeed")
7274
}

0 commit comments

Comments
 (0)