Skip to content

Commit cfc8c6d

Browse files
committed
validator_worker - SentryApi & validator worker errors
1 parent b7bb224 commit cfc8c6d

File tree

3 files changed

+138
-62
lines changed

3 files changed

+138
-62
lines changed

validator_worker/src/error.rs

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,47 @@
1+
use primitives::adapter::AdapterErrorKind;
12
use primitives::ChannelId;
2-
use serde::{Deserialize, Serialize};
3-
use std::error::Error;
4-
use std::fmt::{Display, Formatter, Result};
3+
use std::fmt;
54

6-
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
7-
pub enum ValidatorWorker {
8-
Configuration(String),
9-
Failed(String),
10-
Channel(ChannelId, String),
5+
#[derive(Debug)]
6+
pub enum TickError {
7+
TimedOut(tokio::time::Elapsed),
8+
Tick(Box<dyn std::error::Error>),
119
}
1210

13-
impl Error for ValidatorWorker {}
11+
impl fmt::Display for TickError {
12+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
13+
match self {
14+
TickError::TimedOut(err) => write!(f, "Tick timed out - {}", err),
15+
TickError::Tick(err) => write!(f, "Tick error - {}", err),
16+
}
17+
}
18+
}
19+
20+
#[derive(Debug)]
21+
pub enum Error<AE: AdapterErrorKind> {
22+
SentryApi(crate::sentry_interface::Error<AE>),
23+
LeaderTick(ChannelId, TickError),
24+
FollowerTick(ChannelId, TickError),
25+
}
26+
27+
impl<AE: AdapterErrorKind> std::error::Error for Error<AE> {}
28+
29+
impl<AE: AdapterErrorKind> fmt::Display for Error<AE> {
30+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31+
use Error::*;
1432

15-
impl Display for ValidatorWorker {
16-
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
1733
match self {
18-
ValidatorWorker::Configuration(err) => write!(f, "Configuration error: {}", err),
19-
ValidatorWorker::Failed(err) => write!(f, "error: {}", err),
20-
ValidatorWorker::Channel(channel_id, err) => {
21-
write!(f, "Channel {}: {}", channel_id, err)
22-
}
34+
SentryApi(err) => write!(f, "Sentry Api error - {}", err),
35+
LeaderTick(channel_id, err) => write!(
36+
f,
37+
"Error for Leader tick of Channel ({}) - {}",
38+
channel_id, err
39+
),
40+
FollowerTick(channel_id, err) => write!(
41+
f,
42+
"Error for Follower tick of Channel ({}) - {}",
43+
channel_id, err
44+
),
2345
}
2446
}
2547
}

validator_worker/src/main.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use primitives::config::{configuration, Config};
1616
use primitives::util::tests::prep_db::{AUTH, IDS};
1717
use primitives::{Channel, SpecValidator, ValidatorId};
1818
use slog::{error, info, Logger};
19-
use validator_worker::error::ValidatorWorker as ValidatorWorkerError;
19+
use validator_worker::error::{Error as ValidatorWorkerError, TickError};
2020
use validator_worker::{all_channels, follower, leader, SentryApi};
2121

2222
#[derive(Debug, Clone)]
@@ -195,39 +195,40 @@ async fn validator_tick<A: Adapter + 'static>(
195195
channel: Channel,
196196
config: &Config,
197197
logger: &Logger,
198-
) -> Result<(), ValidatorWorkerError> {
198+
) -> Result<(), ValidatorWorkerError<A::AdapterError>> {
199199
let whoami = adapter.whoami().clone();
200200
// Cloning the `Logger` is cheap, see documentation for more info
201-
let sentry = SentryApi::init(adapter, &channel, &config, logger.clone())?;
201+
let sentry = SentryApi::init(adapter, &channel, &config, logger.clone())
202+
.map_err(ValidatorWorkerError::SentryApi)?;
202203
let duration = Duration::from_millis(config.validator_tick_timeout as u64);
203204

204205
match channel.spec.validators.find(&whoami) {
205206
SpecValidator::Leader(_) => match timeout(duration, leader::tick(&sentry)).await {
206-
Err(timeout_e) => Err(ValidatorWorkerError::Channel(
207+
Err(timeout_e) => Err(ValidatorWorkerError::LeaderTick(
207208
channel.id,
208-
timeout_e.to_string(),
209+
TickError::TimedOut(timeout_e),
209210
)),
210-
Ok(Err(tick_e)) => Err(ValidatorWorkerError::Channel(
211+
Ok(Err(tick_e)) => Err(ValidatorWorkerError::LeaderTick(
211212
channel.id,
212-
tick_e.to_string(),
213+
TickError::Tick(tick_e),
213214
)),
214215
_ => Ok(()),
215216
},
216217
SpecValidator::Follower(_) => match timeout(duration, follower::tick(&sentry)).await {
217-
Err(timeout_e) => Err(ValidatorWorkerError::Channel(
218+
Err(timeout_e) => Err(ValidatorWorkerError::FollowerTick(
218219
channel.id,
219-
timeout_e.to_string(),
220+
TickError::TimedOut(timeout_e),
220221
)),
221-
Ok(Err(tick_e)) => Err(ValidatorWorkerError::Channel(
222+
Ok(Err(tick_e)) => Err(ValidatorWorkerError::FollowerTick(
222223
channel.id,
223-
tick_e.to_string(),
224+
TickError::Tick(tick_e),
224225
)),
225226
_ => Ok(()),
226227
},
227-
SpecValidator::None => Err(ValidatorWorkerError::Channel(
228-
channel.id,
229-
"validatorTick: processing a channel which we are not validating".to_string(),
230-
)),
228+
// @TODO: Can we make this so that we don't have this check at all? maybe something with the SentryApi struct?
229+
SpecValidator::None => {
230+
unreachable!("SentryApi makes a check if validator is in Channel spec on `init()`")
231+
}
231232
}
232233
}
233234

validator_worker/src/sentry_interface.rs

Lines changed: 84 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
1-
use crate::error::ValidatorWorker;
1+
use std::collections::HashMap;
2+
use std::fmt;
3+
use std::time::Duration;
4+
25
use chrono::{DateTime, Utc};
3-
use futures::future::try_join_all;
4-
use futures::future::TryFutureExt;
5-
use primitives::adapter::Adapter;
6+
use futures::future::{try_join_all, TryFutureExt};
7+
use reqwest::{Client, Response};
8+
use slog::{error, Logger};
9+
10+
use primitives::adapter::{Adapter, AdapterErrorKind, Error as AdapterError};
611
use primitives::channel::SpecValidator;
712
use primitives::sentry::{
813
ChannelListResponse, EventAggregateResponse, LastApprovedResponse, SuccessResponse,
914
ValidatorMessageResponse,
1015
};
1116
use primitives::validator::MessageTypes;
12-
use primitives::{Channel, Config, ToETHChecksum, ValidatorDesc, ValidatorId};
13-
use reqwest::{Client, Response};
14-
use slog::{error, Logger};
15-
use std::collections::HashMap;
16-
use std::time::Duration;
17+
use primitives::{Channel, ChannelId, Config, ToETHChecksum, ValidatorDesc, ValidatorId};
1718

1819
#[derive(Debug, Clone)]
1920
pub struct SentryApi<T: Adapter> {
@@ -26,17 +27,63 @@ pub struct SentryApi<T: Adapter> {
2627
pub propagate_to: Vec<(ValidatorDesc, String)>,
2728
}
2829

29-
impl<T: Adapter + 'static> SentryApi<T> {
30+
#[derive(Debug)]
31+
pub enum Error<AE: AdapterErrorKind> {
32+
BuildingClient(reqwest::Error),
33+
Request(reqwest::Error),
34+
ValidatorAuthentication(AdapterError<AE>),
35+
MissingWhoamiInChannelValidators {
36+
channel: ChannelId,
37+
validators: Vec<ValidatorId>,
38+
whoami: ValidatorId,
39+
},
40+
}
41+
42+
impl<AE: AdapterErrorKind> std::error::Error for Error<AE> {}
43+
44+
impl<AE: AdapterErrorKind> fmt::Display for Error<AE> {
45+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46+
use Error::*;
47+
48+
match self {
49+
BuildingClient(err) => write!(f, "Building client - {}", err),
50+
Request(err) => write!(f, "Making a request - {}", err),
51+
ValidatorAuthentication(err) => {
52+
write!(f, "Getting authentication for validator - {}", err)
53+
}
54+
MissingWhoamiInChannelValidators {
55+
channel,
56+
validators,
57+
whoami,
58+
} => {
59+
let validator_ids = validators
60+
.iter()
61+
.map(ToString::to_string)
62+
.collect::<Vec<_>>()
63+
.join(", ");
64+
write!(
65+
f,
66+
"We cannot find validator entry for whoami ({}) in channel {} with validators: {}",
67+
whoami,
68+
channel,
69+
validator_ids
70+
)
71+
}
72+
}
73+
}
74+
}
75+
76+
impl<A: Adapter + 'static> SentryApi<A> {
3077
pub fn init(
31-
adapter: T,
78+
adapter: A,
3279
channel: &Channel,
3380
config: &Config,
3481
logger: Logger,
35-
) -> Result<Self, ValidatorWorker> {
82+
) -> Result<Self, Error<A::AdapterError>> {
3683
let client = Client::builder()
3784
.timeout(Duration::from_millis(config.fetch_timeout.into()))
3885
.build()
39-
.map_err(|e| ValidatorWorker::Failed(format!("building Client error: {}", e)))?;
86+
.map_err(Error::BuildingClient)?;
4087

4188
// validate that we are to validate the channel
4289
match channel.spec.validators.find(adapter.whoami()) {
@@ -51,12 +98,7 @@ impl<T: Adapter + 'static> SentryApi<T> {
5198
adapter
5299
.get_auth(&validator.id)
53100
.map(|auth| (validator.to_owned(), auth))
54-
.map_err(|e| {
55-
ValidatorWorker::Failed(format!(
56-
"Propagation error: get auth failed {}",
57-
e
58-
))
59-
})
101+
.map_err(Error::ValidatorAuthentication)
60102
})
61103
.collect::<Result<Vec<_>, _>>()?;
62104

@@ -70,28 +112,36 @@ impl<T: Adapter + 'static> SentryApi<T> {
70112
config: config.to_owned(),
71113
})
72114
}
73-
SpecValidator::None => Err(ValidatorWorker::Failed(
74-
"We can not find validator entry for whoami".to_string(),
75-
)),
115+
SpecValidator::None => Err(Error::MissingWhoamiInChannelValidators {
116+
channel: channel.id,
117+
validators: channel
118+
.spec
119+
.validators
120+
.iter()
121+
.map(|v| v.id.clone())
122+
.collect(),
123+
whoami: adapter.whoami().clone(),
124+
}),
76125
}
77126
}
78127

128+
// @TODO: Remove logging & fix `try_join_all` @see: https://github.com/AdExNetwork/adex-validator-stack-rust/issues/278
79129
pub async fn propagate(&self, messages: &[&MessageTypes]) {
80130
let channel_id = format!("0x{}", hex::encode(&self.channel.id));
81131
if let Err(e) = try_join_all(self.propagate_to.iter().map(|(validator, auth_token)| {
82132
propagate_to(&channel_id, &auth_token, &self.client, &validator, messages)
83133
}))
84134
.await
85135
{
86-
error!(&self.logger, "Propagation error: {}", e; "module" => "sentry_interface", "in" => "SentryApi");
136+
error!(&self.logger, "Propagation error - {}", e; "module" => "sentry_interface", "in" => "SentryApi");
87137
}
88138
}
89139

90140
pub async fn get_latest_msg(
91141
&self,
92142
from: &ValidatorId,
93143
message_types: &[&str],
94-
) -> Result<Option<MessageTypes>, reqwest::Error> {
144+
) -> Result<Option<MessageTypes>, Error<A::AdapterError>> {
95145
let message_type = message_types.join("+");
96146
let url = format!(
97147
"{}/validator-messages/{}/{}?limit=1",
@@ -104,6 +154,7 @@ impl<T: Adapter + 'static> SentryApi<T> {
104154
.get(&url)
105155
.send()
106156
.and_then(|res: Response| res.json::<ValidatorMessageResponse>())
157+
.map_err(Error::Request)
107158
.await?;
108159

109160
Ok(result.validator_messages.first().map(|m| m.msg.clone()))
@@ -112,38 +163,40 @@ impl<T: Adapter + 'static> SentryApi<T> {
112163
pub async fn get_our_latest_msg(
113164
&self,
114165
message_types: &[&str],
115-
) -> Result<Option<MessageTypes>, reqwest::Error> {
166+
) -> Result<Option<MessageTypes>, Error<A::AdapterError>> {
116167
self.get_latest_msg(self.adapter.whoami(), message_types)
117168
.await
118169
}
119170

120-
pub async fn get_last_approved(&self) -> Result<LastApprovedResponse, reqwest::Error> {
171+
pub async fn get_last_approved(&self) -> Result<LastApprovedResponse, Error<A::AdapterError>> {
121172
self.client
122173
.get(&format!("{}/last-approved", self.validator_url))
123174
.send()
124175
.and_then(|res: Response| res.json::<LastApprovedResponse>())
176+
.map_err(Error::Request)
125177
.await
126178
}
127179

128-
pub async fn get_last_msgs(&self) -> Result<LastApprovedResponse, reqwest::Error> {
180+
pub async fn get_last_msgs(&self) -> Result<LastApprovedResponse, Error<A::AdapterError>> {
129181
self.client
130182
.get(&format!(
131183
"{}/last-approved?withHeartbeat=true",
132184
self.validator_url
133185
))
134186
.send()
135187
.and_then(|res: Response| res.json::<LastApprovedResponse>())
188+
.map_err(Error::Request)
136189
.await
137190
}
138191

139192
pub async fn get_event_aggregates(
140193
&self,
141194
after: DateTime<Utc>,
142-
) -> Result<EventAggregateResponse, Box<ValidatorWorker>> {
195+
) -> Result<EventAggregateResponse, Error<A::AdapterError>> {
143196
let auth_token = self
144197
.adapter
145198
.get_auth(self.adapter.whoami())
146-
.map_err(|e| Box::new(ValidatorWorker::Failed(e.to_string())))?;
199+
.map_err(Error::ValidatorAuthentication)?;
147200

148201
let url = format!(
149202
"{}/events-aggregates?after={}",
@@ -155,10 +208,10 @@ impl<T: Adapter + 'static> SentryApi<T> {
155208
.get(&url)
156209
.bearer_auth(&auth_token)
157210
.send()
158-
.map_err(|e| Box::new(ValidatorWorker::Failed(e.to_string())))
211+
.map_err(Error::Request)
159212
.await?
160213
.json()
161-
.map_err(|e| Box::new(ValidatorWorker::Failed(e.to_string())))
214+
.map_err(Error::Request)
162215
.await
163216
}
164217
}

0 commit comments

Comments
 (0)