Skip to content

Commit d7ef701

Browse files
authored
Issue #66 Worker main loop (#67)
* validator - infra - validator - follower & leader add `derive(Clone)`` * validator - infra - worker - single & infinite tick workers * validator - infra - validator - re-export `Leader` & `Follower` * validator - domain - worker - add `Worker` trait & `WorkerFuture` * validator - infra - worker - impl `Worker` and cleanup for `TickWorker` & `InfiniteWorker` * validator - main - use `TickWorker` & `InfiniteWorker` * validator - domain - Validator - `tick()` to method and add `Channel` * validator - infra - Worker: - add handle_channel and call the validators - add identity to the worker - Call the `ChannelRepository::all()` with the worker identity * fix `clippy` warnings
1 parent 0e88635 commit d7ef701

File tree

9 files changed

+159
-33
lines changed

9 files changed

+159
-33
lines changed

validator/src/domain.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
1+
pub use self::worker::{Worker, WorkerFuture};
2+
13
pub mod channel;
24
pub mod validator;
5+
pub mod worker;

validator/src/domain/validator.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
use futures::Future;
21
use std::pin::Pin;
32

3+
use futures::Future;
4+
5+
use domain::Channel;
6+
47
pub type ValidatorFuture<T> = Pin<Box<dyn Future<Output = Result<T, ValidatorError>> + Send>>;
58

69
#[derive(Debug)]
@@ -9,5 +12,5 @@ pub enum ValidatorError {
912
}
1013

1114
pub trait Validator {
12-
fn tick() -> ValidatorFuture<()>;
15+
fn tick(&self, channel: Channel) -> ValidatorFuture<()>;
1316
}

validator/src/domain/worker.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
use futures::Future;
2+
use std::pin::Pin;
3+
4+
pub type WorkerFuture = Pin<Box<dyn Future<Output = Result<(), ()>> + Send>>;
5+
6+
pub trait Worker {
7+
fn run(&self) -> WorkerFuture;
8+
}

validator/src/infrastructure.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub mod persistence;
22
pub mod sentry;
33
pub mod validator;
4+
pub mod worker;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
1+
pub use self::follower::Follower;
2+
pub use self::leader::Leader;
3+
14
pub mod follower;
25
pub mod leader;
Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
use domain::Channel;
2+
13
use crate::domain::validator::{Validator, ValidatorFuture};
4+
use futures::future::FutureExt;
25

6+
#[derive(Clone)]
37
pub struct Follower {}
48

59
impl Validator for Follower {
6-
fn tick() -> ValidatorFuture<()> {
7-
unimplemented!()
10+
fn tick(&self, _channel: Channel) -> ValidatorFuture<()> {
11+
futures::future::ok(()).boxed()
812
}
913
}
Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
use domain::Channel;
2+
13
use crate::domain::validator::{Validator, ValidatorFuture};
4+
use futures::future::FutureExt;
25

6+
#[derive(Clone)]
37
pub struct Leader {}
48

59
impl Validator for Leader {
6-
fn tick() -> ValidatorFuture<()> {
7-
unimplemented!()
10+
fn tick(&self, _channel: Channel) -> ValidatorFuture<()> {
11+
futures::future::ok(()).boxed()
812
}
913
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
pub use self::infinite::InfiniteWorker;
2+
pub use self::single::TickWorker;
3+
4+
pub mod single {
5+
use std::sync::Arc;
6+
7+
use futures::future::FutureExt;
8+
9+
use crate::domain::channel::ChannelRepository;
10+
use crate::domain::validator::Validator;
11+
use crate::domain::{Worker, WorkerFuture};
12+
use crate::infrastructure::validator::follower::Follower;
13+
use crate::infrastructure::validator::leader::Leader;
14+
use domain::channel::SpecValidator;
15+
use domain::Channel;
16+
17+
#[derive(Clone)]
18+
pub struct TickWorker {
19+
pub leader: Leader,
20+
pub follower: Follower,
21+
pub channel_repository: Arc<dyn ChannelRepository>,
22+
pub identity: String,
23+
}
24+
25+
/// Single tick worker
26+
impl TickWorker {
27+
pub async fn tick(self) -> Result<(), ()> {
28+
let all_channels = await!(self.channel_repository.all(&self.identity));
29+
30+
match all_channels {
31+
Ok(channels) => {
32+
for channel in channels {
33+
await!(self.clone().handle_channel(channel)).unwrap();
34+
}
35+
}
36+
Err(error) => eprintln!("Error occurred: {:#?}", error),
37+
};
38+
39+
Ok(())
40+
}
41+
42+
async fn handle_channel(self, channel: Channel) -> Result<(), ()> {
43+
let channel_id = channel.id;
44+
45+
match &channel.spec.validators.find(&self.identity) {
46+
SpecValidator::Leader(_) => {
47+
self.leader.tick(channel);
48+
eprintln!("Channel {} handled as __Leader__", channel_id.to_string());
49+
}
50+
SpecValidator::Follower(_) => {
51+
self.follower.tick(channel);
52+
eprintln!("Channel {} handled as __Follower__", channel_id.to_string());
53+
}
54+
SpecValidator::None => {
55+
eprintln!("Channel {} is not validated by us", channel_id.to_string());
56+
}
57+
};
58+
59+
Ok(())
60+
}
61+
}
62+
63+
impl Worker for TickWorker {
64+
fn run(&self) -> WorkerFuture {
65+
self.clone().tick().boxed()
66+
}
67+
}
68+
}
69+
pub mod infinite {
70+
use std::ops::Add;
71+
use std::time::{Duration, Instant};
72+
73+
use futures::compat::Future01CompatExt;
74+
use futures::future::{join, FutureExt};
75+
use tokio::timer::Delay;
76+
77+
use crate::domain::{Worker, WorkerFuture};
78+
use crate::infrastructure::worker::TickWorker;
79+
80+
#[derive(Clone)]
81+
pub struct InfiniteWorker {
82+
pub tick_worker: TickWorker,
83+
}
84+
85+
/// Infinite tick worker
86+
impl InfiniteWorker {
87+
pub async fn infinite(self) -> Result<(), ()> {
88+
let handle = self.clone();
89+
loop {
90+
let future = handle.clone().tick_worker.tick();
91+
// let tick_future = Delay::new(Instant::now().add(CONFIG.ticks_wait_time));
92+
let tick_future = Delay::new(Instant::now().add(Duration::from_millis(5000)));
93+
94+
let joined = join(future, tick_future.compat());
95+
96+
await!(joined);
97+
}
98+
}
99+
}
100+
101+
impl Worker for InfiniteWorker {
102+
fn run(&self) -> WorkerFuture {
103+
self.clone().infinite().boxed()
104+
}
105+
}
106+
}

validator/src/main.rs

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@
22
#![deny(rust_2018_idioms)]
33
#![deny(clippy::all)]
44

5-
use futures::compat::Future01CompatExt;
6-
use futures::future::{join, FutureExt, TryFutureExt};
5+
use futures::future::{FutureExt, TryFutureExt};
76
use reqwest::r#async::Client;
87

98
use lazy_static::lazy_static;
10-
use std::ops::Add;
11-
use std::time::{Duration, Instant};
12-
use tokio::timer::Delay;
13-
use validator::domain::channel::ChannelRepository;
9+
use std::sync::Arc;
10+
use std::time::Duration;
11+
use validator::domain::worker::Worker;
1412
use validator::infrastructure::persistence::channel::api::ApiChannelRepository;
1513
use validator::infrastructure::sentry::SentryApi;
14+
use validator::infrastructure::validator::{Follower, Leader};
15+
use validator::infrastructure::worker::{InfiniteWorker, TickWorker};
1616

1717
lazy_static! {
1818
static ref CONFIG: Config = {
@@ -33,31 +33,25 @@ lazy_static! {
3333
}
3434

3535
fn main() {
36-
let worker = async {
37-
loop {
38-
let future = async {
39-
let sentry = SentryApi {
40-
client: Client::new(),
41-
sentry_url: CONFIG.sentry_url.clone(),
42-
};
43-
let repo = ApiChannelRepository { sentry };
44-
45-
let all_channels = await!(repo.all("0x2892f6C41E0718eeeDd49D98D648C789668cA67d"));
46-
47-
match all_channels {
48-
Ok(channel) => println!("{:#?}", channel),
49-
Err(error) => eprintln!("Error occurred: {:#?}", error),
50-
};
51-
};
52-
let tick_future = Delay::new(Instant::now().add(CONFIG.ticks_wait_time));
36+
let sentry = SentryApi {
37+
client: Client::new(),
38+
sentry_url: CONFIG.sentry_url.clone(),
39+
};
5340

54-
let joined = join(future, tick_future.compat());
41+
let channel_repository = Arc::new(ApiChannelRepository {
42+
sentry: sentry.clone(),
43+
});
5544

56-
await!(joined);
57-
}
45+
let tick_worker = TickWorker {
46+
leader: Leader {},
47+
follower: Follower {},
48+
channel_repository,
49+
identity: "0x2892f6C41E0718eeeDd49D98D648C789668cA67d".to_string(),
5850
};
5951

60-
tokio::run(worker.unit_error().boxed().compat());
52+
let worker = InfiniteWorker { tick_worker };
53+
54+
tokio::run(worker.run().boxed().compat());
6155
}
6256

6357
struct Config {

0 commit comments

Comments
 (0)