Skip to content

Commit 0e88635

Browse files
authored
Issue 60 make sentry url configurable (#65)
* domain - repository - remove Stream future * validator - infrastructure - sentry - add sentry API struct * validator - persistence - channel - api - use `SentryApi` * cleanup * Config * validator - Cargo.toml - add `dotenv` for now * validator - load `sentry_url` from `.env` file for now * validator - main - POC for the validator ticks * run `rustfmt`
1 parent 519dff3 commit 0e88635

File tree

9 files changed

+159
-88
lines changed

9 files changed

+159
-88
lines changed

.env.dist

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
CHANNEL_LIST_LIMIT=200
1+
SENTRY_CHANNEL_LIST_LIMIT=200
2+
3+
VALIDATOR_TICKS_WAIT_TIME=500

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

domain/src/lib.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ impl error::Error for DomainError {
5555
pub mod repository {
5656
use std::pin::Pin;
5757

58-
use futures::{Future, Stream};
58+
use futures::Future;
5959

6060
pub trait IOError: std::error::Error + Send {}
6161

@@ -69,6 +69,4 @@ pub mod repository {
6969
}
7070

7171
pub type RepositoryFuture<T> = Pin<Box<dyn Future<Output = Result<T, RepositoryError>> + Send>>;
72-
73-
pub type RepositoryStream<T> = Pin<Box<dyn Stream<Item = Result<T, RepositoryError>> + Send>>;
7472
}

sentry/src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,19 +82,19 @@ impl TryFrom<Vars> for Config {
8282
fn try_from(mut vars: Vars) -> Result<Self, Self::Error> {
8383
let limit = vars
8484
.find_map(|(key, value)| {
85-
if key == "CHANNEL_LIST_LIMIT" {
85+
if key == "SENTRY_CHANNEL_LIST_LIMIT" {
8686
Some(value)
8787
} else {
8888
None
8989
}
9090
})
9191
.ok_or(DomainError::InvalidArgument(
92-
"CHANNEL_LIST_LIMIT evn. variable was not passed".to_string(),
92+
"SENTRY_CHANNEL_LIST_LIMIT evn. variable was not passed".to_string(),
9393
))
9494
.and_then(|value| {
9595
value.parse::<u32>().map_err(|_| {
9696
DomainError::InvalidArgument(
97-
"CHANNEL_LIST_LIMIT is not a u32 value".to_string(),
97+
"SENTRY_CHANNEL_LIST_LIMIT is not a u32 value".to_string(),
9898
)
9999
})
100100
});

validator/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ futures_legacy = { version = "0.1", package = "futures" }
1818
tokio = { version = "=0.1.19" }
1919
# API client
2020
reqwest = "0.9.18"
21+
# Configuration
22+
lazy_static = "1.3"
23+
dotenv = "0.14"
2124
# (De)Serialization
2225
serde = { version = "^1.0", features = ['derive'] }
2326
serde_json = "1.0"

validator/src/infrastructure.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
pub mod persistence;
2+
pub mod sentry;
23
pub mod validator;
Lines changed: 8 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,24 @@
1-
use std::iter::once;
2-
use std::sync::Arc;
3-
4-
use futures::compat::Future01CompatExt;
5-
use futures::future::{ok, try_join_all};
61
use futures::{FutureExt, TryFutureExt};
7-
use futures_legacy::Future as LegacyFuture;
8-
use reqwest::r#async::{Client, Response};
9-
use serde::Deserialize;
102

113
use domain::{Channel, RepositoryFuture};
124

135
use crate::domain::channel::ChannelRepository;
146
use crate::infrastructure::persistence::api::ApiPersistenceError;
7+
use crate::infrastructure::sentry::SentryApi;
158

169
#[derive(Clone)]
10+
// @TODO: make pub(crate)
1711
pub struct ApiChannelRepository {
18-
pub client: Client,
19-
}
20-
21-
impl ApiChannelRepository {
22-
fn fetch_page(&self, page: u64, identity: &str) -> RepositoryFuture<ChannelAllResponse> {
23-
self.client
24-
// call Sentry and fetch first page, where validator = identity
25-
.get(
26-
format!(
27-
"http://localhost:8005/channel/list?validator={}&page={}",
28-
identity, page
29-
)
30-
.as_str(),
31-
)
32-
.send()
33-
.and_then(|mut res: Response| res.json::<ChannelAllResponse>())
34-
// @TODO: Error handling
35-
.map_err(|_error| ApiPersistenceError::Reading.into())
36-
.compat()
37-
.boxed()
38-
}
12+
pub sentry: SentryApi,
3913
}
4014

4115
impl ChannelRepository for ApiChannelRepository {
4216
fn all(&self, identity: &str) -> RepositoryFuture<Vec<Channel>> {
43-
let identity = Arc::new(identity.to_string());
44-
let handle = self.clone();
45-
46-
let first_page = handle.fetch_page(1, &identity.clone());
47-
48-
// call Sentry again and concat all the Channels in Future
49-
// fetching them until no more Channels are returned
50-
first_page
51-
.and_then(move |response| {
52-
let first_page_future = ok(response.channels).boxed();
53-
54-
if response.total_pages < 2 {
55-
first_page_future
56-
} else {
57-
let identity = identity.clone();
58-
let futures = (2..=response.total_pages)
59-
.map(|page| {
60-
handle
61-
.fetch_page(page, &identity)
62-
.map(|response_result| {
63-
response_result.and_then(|response| Ok(response.channels))
64-
})
65-
.boxed()
66-
})
67-
.chain(once(first_page_future));
68-
69-
try_join_all(futures)
70-
.map(|result_all| {
71-
result_all
72-
.and_then(|all| Ok(all.into_iter().flatten().collect::<Vec<_>>()))
73-
})
74-
.boxed()
75-
}
76-
})
17+
self.sentry
18+
.clone()
19+
.all_channels(Some(identity.to_string()))
20+
// @TODO: Error handling
21+
.map_err(|_error| ApiPersistenceError::Reading.into())
7722
.boxed()
7823
}
7924
}
80-
81-
#[derive(Deserialize, Debug)]
82-
#[serde(rename_all = "camelCase")]
83-
struct ChannelAllResponse {
84-
pub channels: Vec<Channel>,
85-
pub total_pages: u64,
86-
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
use domain::Channel;
2+
use futures::compat::Future01CompatExt;
3+
use futures::future::{ok, try_join_all, FutureExt, TryFutureExt};
4+
use futures::Future;
5+
use futures_legacy::Future as LegacyFuture;
6+
use reqwest::r#async::{Client, Response};
7+
use reqwest::Error;
8+
use serde::Deserialize;
9+
use std::iter::once;
10+
11+
#[derive(Clone)]
12+
// @TODO: make pub(crate)
13+
pub struct SentryApi {
14+
pub sentry_url: String,
15+
pub client: Client,
16+
}
17+
18+
impl SentryApi {
19+
pub fn all_channels(
20+
&self,
21+
validator: Option<String>,
22+
) -> impl Future<Output = Result<Vec<Channel>, Error>> {
23+
let first_page = self.clone().fetch_page(1, validator.clone());
24+
25+
// call Sentry again and concat all the Channels in Future
26+
// fetching them until no more Channels are returned
27+
let handle = self.clone();
28+
first_page
29+
.and_then(move |response| {
30+
let first_page_future = ok(response.channels).boxed();
31+
32+
if response.total_pages < 2 {
33+
first_page_future
34+
} else {
35+
let futures = (2..=response.total_pages)
36+
.map(|page| {
37+
handle
38+
.clone()
39+
.fetch_page(page, validator.clone())
40+
.map(|response_result| {
41+
response_result.and_then(|response| Ok(response.channels))
42+
})
43+
.boxed()
44+
})
45+
.chain(once(first_page_future));
46+
47+
try_join_all(futures)
48+
.map(|result_all| {
49+
result_all
50+
.and_then(|all| Ok(all.into_iter().flatten().collect::<Vec<_>>()))
51+
})
52+
.boxed()
53+
}
54+
})
55+
.boxed()
56+
}
57+
58+
async fn fetch_page(
59+
self,
60+
page: u64,
61+
validator: Option<String>,
62+
) -> Result<ChannelAllResponse, reqwest::Error> {
63+
let mut query = vec![format!("page={}", page)];
64+
65+
if let Some(validator) = validator {
66+
query.push(format!("validator={}", validator));
67+
}
68+
69+
let future = self
70+
.client
71+
// call Sentry and fetch first page, where validator = identity
72+
.get(format!("{}/channel/list?{}", self.sentry_url, query.join("&")).as_str())
73+
.send()
74+
.and_then(|mut res: Response| res.json::<ChannelAllResponse>());
75+
76+
await!(future.compat())
77+
}
78+
}
79+
80+
#[derive(Deserialize, Debug)]
81+
#[serde(rename_all = "camelCase")]
82+
struct ChannelAllResponse {
83+
pub channels: Vec<Channel>,
84+
pub total_pages: u64,
85+
}

validator/src/main.rs

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,66 @@
11
#![feature(async_await, await_macro)]
22
#![deny(rust_2018_idioms)]
33
#![deny(clippy::all)]
4+
5+
use futures::compat::Future01CompatExt;
6+
use futures::future::{join, FutureExt, TryFutureExt};
47
use reqwest::r#async::Client;
58

6-
use futures::future::{FutureExt, TryFutureExt};
9+
use lazy_static::lazy_static;
10+
use std::ops::Add;
11+
use std::time::{Duration, Instant};
12+
use tokio::timer::Delay;
713
use validator::domain::channel::ChannelRepository;
14+
use validator::infrastructure::persistence::channel::api::ApiChannelRepository;
15+
use validator::infrastructure::sentry::SentryApi;
16+
17+
lazy_static! {
18+
static ref CONFIG: Config = {
19+
dotenv::dotenv().ok();
20+
21+
let ticks_wait_time = std::env::var("VALIDATOR_TICKS_WAIT_TIME")
22+
.unwrap()
23+
.parse()
24+
.unwrap();
25+
Config {
26+
ticks_wait_time: Duration::from_millis(ticks_wait_time),
27+
sentry_url: std::env::var("VALIDATOR_SENTRY_URL")
28+
.unwrap()
29+
.parse()
30+
.unwrap(),
31+
}
32+
};
33+
}
834

935
fn main() {
10-
let future = async {
11-
let repo = validator::infrastructure::persistence::channel::api::ApiChannelRepository {
12-
client: Client::new(),
13-
};
36+
let worker = async {
37+
loop {
38+
let future = async {
39+
let sentry = SentryApi {
40+
client: Client::new(),
41+
sentry_url: CONFIG.sentry_url.clone(),
42+
};
43+
let repo = ApiChannelRepository { sentry };
1444

15-
let all_channels = await!(repo.all("0x2892f6C41E0718eeeDd49D98D648C789668cA67d"));
45+
let all_channels = await!(repo.all("0x2892f6C41E0718eeeDd49D98D648C789668cA67d"));
1646

17-
match all_channels {
18-
Ok(channel) => println!("{:#?}", channel),
19-
Err(error) => eprintln!("Error occurred: {:#?}", error),
20-
};
47+
match all_channels {
48+
Ok(channel) => println!("{:#?}", channel),
49+
Err(error) => eprintln!("Error occurred: {:#?}", error),
50+
};
51+
};
52+
let tick_future = Delay::new(Instant::now().add(CONFIG.ticks_wait_time));
53+
54+
let joined = join(future, tick_future.compat());
55+
56+
await!(joined);
57+
}
2158
};
2259

23-
tokio::run(future.unit_error().boxed().compat());
60+
tokio::run(worker.unit_error().boxed().compat());
61+
}
62+
63+
struct Config {
64+
pub ticks_wait_time: Duration,
65+
pub sentry_url: String,
2466
}

0 commit comments

Comments
 (0)