Skip to content

Commit de0524d

Browse files
committed
fix; validator_worker: remove RwLock from Adapter
1 parent 6ab7303 commit de0524d

File tree

8 files changed

+44
-64
lines changed

8 files changed

+44
-64
lines changed

adapter/src/ethereum.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use primitives::{
1111
};
1212
use serde::{Deserialize, Serialize};
1313
use serde_json::Value;
14-
use std::collections::HashMap;
1514
use std::convert::TryFrom;
1615
use std::error::Error;
1716
use std::fs;
@@ -38,8 +37,6 @@ pub struct EthereumAdapter {
3837
keystore_json: Value,
3938
keystore_pwd: Password,
4039
config: Config,
41-
// Auth tokens that we've generated to authenticate with someone (address => token)
42-
authorization_tokens: HashMap<String, String>,
4340
wallet: Option<SafeAccount>,
4441
}
4542

@@ -70,7 +67,6 @@ impl EthereumAdapter {
7067
address,
7168
keystore_json,
7269
keystore_pwd: opts.keystore_pwd.into(),
73-
authorization_tokens: HashMap::new(),
7470
wallet: None,
7571
config: config.to_owned(),
7672
})
@@ -228,9 +224,10 @@ impl Adapter for EthereumAdapter {
228224
}
229225

230226
fn get_auth(&self, validator: &ValidatorId) -> AdapterResult<String> {
231-
let wallet = self.wallet.clone().ok_or_else(|| AdapterError::Configuration(
232-
"failed to unlock wallet".to_string()
233-
))?;
227+
let wallet = self
228+
.wallet
229+
.clone()
230+
.ok_or_else(|| AdapterError::Configuration("failed to unlock wallet".to_string()))?;
234231

235232
let era = Utc::now().timestamp_millis() as f64 / 60000.0;
236233
let payload = Payload {
@@ -241,7 +238,7 @@ impl Adapter for EthereumAdapter {
241238
};
242239
let token = ewt_sign(&wallet, &self.keystore_pwd, &payload)
243240
.map_err(|_| map_error("Failed to sign token"))?;
244-
241+
245242
Ok(token)
246243
}
247244
}

primitives/src/adapter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub struct Session {
5959
pub uid: ValidatorId,
6060
}
6161

62-
pub trait Adapter: ChannelValidator + Send + Clone + Debug {
62+
pub trait Adapter: ChannelValidator + Send + Sync + Clone + Debug {
6363
/// Unlock adapter
6464
fn unlock(&mut self) -> AdapterResult<()>;
6565

validator_worker/src/follower.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,7 @@ async fn on_new_state<'a, A: Adapter + 'static>(
6464
return Ok(on_error(&iface, &new_state, InvalidNewState::RootHash).await);
6565
}
6666

67-
let adapter = iface.adapter.read().await.clone();
68-
69-
if !adapter.verify(
67+
if !iface.adapter.verify(
7068
&iface.channel.spec.validators.leader().id,
7169
&proposed_state_root,
7270
&new_state.signature,
@@ -84,7 +82,7 @@ async fn on_new_state<'a, A: Adapter + 'static>(
8482
return Ok(on_error(&iface, &new_state, InvalidNewState::Transition).await);
8583
}
8684

87-
let signature = adapter.sign(&new_state.state_root)?;
85+
let signature = iface.adapter.sign(&new_state.state_root)?;
8886
let health_threshold = u64::from(iface.config.health_threshold_promilles).into();
8987
let health = is_healthy(
9088
&iface.channel,

validator_worker/src/heartbeat.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ async fn send_heartbeat<A: Adapter + 'static>(iface: &SentryApi<A>) -> Result<()
2323
let state_root_raw = get_signable_state_root(&iface.channel.id, &merkle_tree.root())?;
2424
let state_root = hex::encode(state_root_raw);
2525

26-
let signature = iface.adapter.read().await.sign(&state_root)?;
26+
let signature = iface.adapter.sign(&state_root)?;
2727

2828
let message_types = MessageTypes::Heartbeat(Heartbeat {
2929
signature,

validator_worker/src/leader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ async fn on_new_accounting<A: Adapter + 'static>(
2525
let state_root_raw = get_state_root_hash(&iface, &balances)?;
2626
let state_root = hex::encode(state_root_raw);
2727

28-
let signature = iface.adapter.read().await.sign(&state_root)?;
28+
let signature = iface.adapter.sign(&state_root)?;
2929

3030
iface
3131
.propagate(&[&MessageTypes::NewState(NewState {

validator_worker/src/lib.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,10 @@ mod test {
4444
use super::*;
4545

4646
use adapter::DummyAdapter;
47-
use async_std::sync::RwLock;
4847
use primitives::adapter::DummyAdapterOptions;
4948
use primitives::config::configuration;
5049
use primitives::util::tests::prep_db::{AUTH, DUMMY_CHANNEL, IDS};
5150
use primitives::{BalancesMap, Channel};
52-
use std::sync::Arc;
5351

5452
fn setup_iface(channel: &Channel) -> SentryApi<DummyAdapter> {
5553
let adapter_options = DummyAdapterOptions {
@@ -61,14 +59,7 @@ mod test {
6159
let dummy_adapter = DummyAdapter::init(adapter_options, &config);
6260
let whoami = dummy_adapter.whoami().clone();
6361

64-
SentryApi::init(
65-
Arc::new(RwLock::new(dummy_adapter)),
66-
&channel,
67-
&config,
68-
false,
69-
&whoami,
70-
)
71-
.expect("should succeed")
62+
SentryApi::init(dummy_adapter, &channel, &config, false, &whoami).expect("should succeed")
7263
}
7364

7465
#[test]

validator_worker/src/main.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
use clap::{App, Arg};
55

66
use adapter::{AdapterTypes, DummyAdapter, EthereumAdapter};
7-
use async_std::sync::RwLock;
87
use futures::compat::Future01CompatExt;
98
use futures::future::try_join_all;
109
use futures::future::{join, FutureExt, TryFutureExt};
@@ -15,7 +14,6 @@ use primitives::{Channel, SpecValidator, ValidatorId};
1514
use std::convert::TryFrom;
1615
use std::error::Error;
1716
use std::ops::Add;
18-
use std::sync::Arc;
1917
use std::time::{Duration, Instant};
2018
use tokio::timer::Delay;
2119
use tokio::util::FutureExt as TokioFutureExt;
@@ -26,7 +24,7 @@ use validator_worker::{all_channels, follower, leader, SentryApi};
2624
struct Args<A: Adapter> {
2725
sentry_url: String,
2826
config: Config,
29-
adapter: Arc<RwLock<A>>,
27+
adapter: A,
3028
whoami: ValidatorId,
3129
}
3230

@@ -126,7 +124,9 @@ fn run<A: Adapter + 'static>(
126124
config: &Config,
127125
adapter: A,
128126
) -> Result<(), Box<dyn Error>> {
129-
let sentry_adapter = Arc::new(RwLock::new(adapter.clone()));
127+
let mut sentry_adapter = adapter.clone();
128+
// unlock adapter
129+
sentry_adapter.unlock()?;
130130
let whoami = adapter.whoami().to_owned();
131131

132132
let args = Args {
@@ -188,7 +188,7 @@ async fn iterate_channels<A: Adapter + 'static>(args: Args<A>) -> Result<(), ()>
188188
}
189189

190190
async fn validator_tick<A: Adapter + 'static>(
191-
adapter: Arc<RwLock<A>>,
191+
adapter: A,
192192
channel: Channel,
193193
config: &Config,
194194
whoami: &ValidatorId,

validator_worker/src/sentry_interface.rs

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::error::ValidatorWorker;
2-
use async_std::sync::RwLock;
32
use chrono::{DateTime, Utc};
43
use futures::compat::Future01CompatExt;
54
use futures::future::try_join_all;
@@ -14,23 +13,23 @@ use primitives::validator::MessageTypes;
1413
use primitives::{Channel, Config, ValidatorDesc, ValidatorId};
1514
use reqwest::r#async::{Client, Response};
1615
use std::collections::HashMap;
17-
use std::sync::Arc;
1816
use std::time::Duration;
1917

2018
#[derive(Debug, Clone)]
2119
pub struct SentryApi<T: Adapter> {
22-
pub adapter: Arc<RwLock<T>>,
20+
pub adapter: T,
2321
pub validator_url: String,
2422
pub client: Client,
2523
pub logging: bool,
2624
pub channel: Channel,
2725
pub config: Config,
2826
pub whoami: ValidatorId,
27+
pub propagate_to: Vec<(ValidatorDesc, String)>,
2928
}
3029

3130
impl<T: Adapter + 'static> SentryApi<T> {
3231
pub fn init(
33-
adapter: Arc<RwLock<T>>,
32+
adapter: T,
3433
channel: &Channel,
3534
config: &Config,
3635
logging: bool,
@@ -46,12 +45,29 @@ impl<T: Adapter + 'static> SentryApi<T> {
4645
SpecValidator::Leader(v) | SpecValidator::Follower(v) => {
4746
let channel_id = format!("0x{}", hex::encode(&channel.id));
4847
let validator_url = format!("{}/channel/{}", v.url, channel_id);
48+
let propagate_to = channel
49+
.spec
50+
.validators
51+
.into_iter()
52+
.map(|validator| {
53+
adapter
54+
.get_auth(&validator.id)
55+
.map(|auth| (validator.to_owned(), auth))
56+
.map_err(|e| {
57+
ValidatorWorker::Failed(format!(
58+
"propagate error: get auth failed {}",
59+
e
60+
))
61+
})
62+
})
63+
.collect::<Result<Vec<_>, _>>()?;
4964

5065
Ok(Self {
5166
adapter,
5267
validator_url,
5368
client,
5469
logging,
70+
propagate_to,
5571
channel: channel.to_owned(),
5672
config: config.to_owned(),
5773
whoami: whoami.to_owned(),
@@ -64,29 +80,13 @@ impl<T: Adapter + 'static> SentryApi<T> {
6480
}
6581

6682
pub async fn propagate(&self, messages: &[&MessageTypes]) {
67-
let mut adapter = self.adapter.write().await;
68-
6983
let channel_id = format!("0x{}", hex::encode(&self.channel.id));
70-
71-
for validator in self.channel.spec.validators.into_iter() {
72-
let auth_token = adapter.get_auth(&validator.id);
73-
74-
if let Err(e) = auth_token {
75-
println!("propagate error: get auth failed {}", e);
76-
continue;
77-
}
78-
79-
if let Err(e) = propagate_to(
80-
&channel_id,
81-
&auth_token.unwrap(),
82-
&self.client,
83-
&validator,
84-
messages,
85-
)
86-
.await
87-
{
88-
handle_http_error(e, &validator.url);
89-
}
84+
if let Err(e) = try_join_all(self.propagate_to.iter().map(|(validator, auth_token)| {
85+
propagate_to(&channel_id, &auth_token, &self.client, &validator, messages)
86+
}))
87+
.await
88+
{
89+
handle_http_error(e);
9090
}
9191
}
9292

@@ -148,8 +148,6 @@ impl<T: Adapter + 'static> SentryApi<T> {
148148
) -> Result<EventAggregateResponse, Box<ValidatorWorker>> {
149149
let auth_token = self
150150
.adapter
151-
.write()
152-
.await
153151
.get_auth(&self.whoami)
154152
.map_err(|e| Box::new(ValidatorWorker::Failed(e.to_string())))?;
155153

@@ -201,11 +199,11 @@ async fn propagate_to(
201199
Ok(())
202200
}
203201

204-
fn handle_http_error(e: reqwest::Error, url: &str) {
202+
fn handle_http_error(e: reqwest::Error) {
205203
if e.is_http() {
206204
match e.url() {
207205
None => println!("No Url given"),
208-
Some(url) => println!("Problem making request to: {}", url),
206+
Some(url) => println!("erorr sending http request for validator {}", url),
209207
}
210208
}
211209
// Inspect the internal error and output it
@@ -217,10 +215,6 @@ fn handle_http_error(e: reqwest::Error, url: &str) {
217215
println!("problem parsing information {}", serde_error);
218216
}
219217

220-
if e.is_client_error() {
221-
println!("erorr sending http request for validator {}", url)
222-
}
223-
224218
if e.is_redirect() {
225219
println!("server redirecting too many times or making loop");
226220
}

0 commit comments

Comments
 (0)