Skip to content

Commit dba3051

Browse files
authored
Issue #70 validation tick timeout (#71)
* domain - re-export `SpecValidator` * domain - channel - ChannelId - impl `Display` instead of `ToString` * validator - domain - re-export some structs & traits * validator - infra - TickWorker - Validation tick timeout
1 parent 1b35dd3 commit dba3051

File tree

5 files changed

+52
-19
lines changed

5 files changed

+52
-19
lines changed

domain/src/channel.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::convert::TryFrom;
2+
use std::fmt;
23

34
use chrono::serde::{ts_milliseconds, ts_seconds};
45
use chrono::{DateTime, Utc};
@@ -17,7 +18,7 @@ pub struct ChannelId {
1718
pub id: [u8; 32],
1819
}
1920

20-
impl ToString for ChannelId {
21+
impl fmt::Display for ChannelId {
2122
/// Converts a ChannelId to hex string with prefix
2223
///
2324
/// Example:
@@ -26,11 +27,12 @@ impl ToString for ChannelId {
2627
///
2728
/// let hex_string = "0x061d5e2a67d0a9a10f1c732bca12a676d83f79663a396f7d87b3e30b9b411088";
2829
/// let channel_id = ChannelId::try_from_hex(&hex_string).expect("Should be a valid hex string already");
29-
///
30-
/// assert_eq!("0x061d5e2a67d0a9a10f1c732bca12a676d83f79663a396f7d87b3e30b9b411088", channel_id.to_string());
30+
/// let channel_hex_string = format!("{}", channel_id);
31+
/// assert_eq!("0x061d5e2a67d0a9a10f1c732bca12a676d83f79663a396f7d87b3e30b9b411088", &channel_hex_string);
3132
/// ```
32-
fn to_string(&self) -> String {
33-
SerHex::<StrictPfx>::into_hex(&self.id).unwrap()
33+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
34+
let hex_string = SerHex::<StrictPfx>::into_hex(&self.id).unwrap();
35+
write!(f, "{}", hex_string)
3436
}
3537
}
3638

domain/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ pub use util::tests as test_util;
99
pub use self::ad_unit::AdUnit;
1010
pub use self::asset::Asset;
1111
pub use self::bignum::BigNum;
12-
pub use self::channel::{Channel, ChannelId, ChannelSpec, SpecValidators};
12+
pub use self::channel::{Channel, ChannelId, ChannelSpec, SpecValidator, SpecValidators};
1313
pub use self::event_submission::EventSubmission;
1414
#[cfg(feature = "repositories")]
1515
pub use self::repository::*;

validator/src/domain.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
pub use self::channel::ChannelRepository;
2+
pub use self::validator::{Validator, ValidatorError, ValidatorFuture};
13
pub use self::worker::{Worker, WorkerFuture};
24

35
pub mod channel;

validator/src/infrastructure/worker.rs

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,26 @@ pub use self::single::TickWorker;
33

44
pub mod single {
55
use std::sync::Arc;
6+
use std::time::Duration;
67

7-
use futures::future::FutureExt;
8+
use futures::compat::Future01CompatExt;
9+
use futures::future::{FutureExt, TryFutureExt};
10+
use tokio::util::FutureExt as TokioFutureExt;
811

9-
use domain::channel::SpecValidator;
10-
use domain::Channel;
12+
use domain::{Channel, SpecValidator};
1113

12-
use crate::domain::channel::ChannelRepository;
13-
use crate::domain::validator::Validator;
14-
use crate::domain::{Worker, WorkerFuture};
15-
use crate::infrastructure::validator::follower::Follower;
16-
use crate::infrastructure::validator::leader::Leader;
14+
use crate::domain::{ChannelRepository, Validator, Worker, WorkerFuture};
15+
use crate::infrastructure::validator::{Follower, Leader};
1716

1817
#[derive(Clone)]
1918
pub struct TickWorker {
2019
pub leader: Leader,
2120
pub follower: Follower,
2221
pub channel_repository: Arc<dyn ChannelRepository>,
22+
// @TODO: use the adapter(maybe?) instead of repeating the identity
2323
pub identity: String,
24+
// @TODO: Pass configuration by which this can be set
25+
pub validation_tick_timeout: Duration,
2426
}
2527

2628
/// Single tick worker
@@ -45,15 +47,33 @@ pub mod single {
4547

4648
match &channel.spec.validators.find(&self.identity) {
4749
SpecValidator::Leader(_) => {
48-
self.leader.tick(channel);
49-
eprintln!("Channel {} handled as __Leader__", channel_id.to_string());
50+
let tick_future = self.leader.tick(channel);
51+
52+
let tick_result = await!(tick_future
53+
.compat()
54+
.timeout(self.validation_tick_timeout)
55+
.compat());
56+
57+
match tick_result {
58+
Ok(_) => println!("Channel {} handled as __Leader__", channel_id),
59+
Err(_) => eprintln!("Channel {} Timed out", channel_id),
60+
}
5061
}
5162
SpecValidator::Follower(_) => {
52-
self.follower.tick(channel);
53-
eprintln!("Channel {} handled as __Follower__", channel_id.to_string());
63+
let tick_future = self.follower.tick(channel);
64+
65+
let tick_result = await!(tick_future
66+
.compat()
67+
.timeout(self.validation_tick_timeout)
68+
.compat());
69+
70+
match tick_result {
71+
Ok(_) => println!("Channel {} handled as __Follower__", channel_id),
72+
Err(_) => eprintln!("Channel {} Timed out", channel_id),
73+
}
5474
}
5575
SpecValidator::None => {
56-
eprintln!("Channel {} is not validated by us", channel_id.to_string());
76+
eprintln!("Channel {} is not validated by us", channel_id);
5777
}
5878
};
5979

validator/src/main.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,14 @@ lazy_static! {
1414
.unwrap()
1515
.parse()
1616
.unwrap();
17+
18+
let validation_tick_timeout = std::env::var("VALIDATOR_VALIDATION_TICK_TIMEOUT")
19+
.unwrap()
20+
.parse()
21+
.unwrap();
22+
1723
Config {
24+
validation_tick_timeout: Duration::from_millis(validation_tick_timeout),
1825
ticks_wait_time: Duration::from_millis(ticks_wait_time),
1926
sentry_url: std::env::var("VALIDATOR_SENTRY_URL")
2027
.unwrap()
@@ -89,6 +96,7 @@ fn run(is_single_tick: bool, adapter: impl Adapter) {
8996
follower: Follower {},
9097
channel_repository,
9198
identity: adapter.config().identity.to_string(),
99+
validation_tick_timeout: CONFIG.validation_tick_timeout,
92100
};
93101

94102
if !is_single_tick {
@@ -118,6 +126,7 @@ fn run(is_single_tick: bool, adapter: impl Adapter) {
118126
}
119127

120128
struct Config {
129+
pub validation_tick_timeout: Duration,
121130
pub ticks_wait_time: Duration,
122131
pub sentry_url: String,
123132
}

0 commit comments

Comments
 (0)