Skip to content

Commit 519dff3

Browse files
authored
Issue 62 validator all channel list pages (#64)
* domain - repository - RepositoryStream * sentry - resource - channel_list - response - add `serde(rename_all = "camelCase")` * validator - persistence - channel - api - fetch all channels in one future * validator - persistence - channel - api - fetch all channels in one future * fix `clippy` warning
1 parent ed3c244 commit 519dff3

File tree

4 files changed

+71
-26
lines changed

4 files changed

+71
-26
lines changed

domain/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ impl error::Error for DomainError {
5555
pub mod repository {
5656
use std::pin::Pin;
5757

58-
use futures::Future;
58+
use futures::{Future, Stream};
5959

6060
pub trait IOError: std::error::Error + Send {}
6161

@@ -69,4 +69,6 @@ pub mod repository {
6969
}
7070

7171
pub type RepositoryFuture<T> = Pin<Box<dyn Future<Output = Result<T, RepositoryError>> + Send>>;
72+
73+
pub type RepositoryStream<T> = Pin<Box<dyn Stream<Item = Result<T, RepositoryError>> + Send>>;
7274
}

sentry/src/application/resource/channel/channel_list/response.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ use tower_web::Response;
33

44
use domain::Channel;
55

6-
#[derive(Debug, Response)]
6+
#[derive(Debug, Response, Serialize)]
7+
#[serde(rename_all = "camelCase")]
78
pub struct ChannelListResponse {
89
pub channels: Vec<Channel>,
910
pub total_pages: u64,
Lines changed: 59 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
use std::iter::once;
2+
use std::sync::Arc;
3+
14
use futures::compat::Future01CompatExt;
2-
use futures::future::FutureExt;
5+
use futures::future::{ok, try_join_all};
6+
use futures::{FutureExt, TryFutureExt};
37
use futures_legacy::Future as LegacyFuture;
48
use reqwest::r#async::{Client, Response};
59
use serde::Deserialize;
@@ -9,39 +13,74 @@ use domain::{Channel, RepositoryFuture};
913
use crate::domain::channel::ChannelRepository;
1014
use crate::infrastructure::persistence::api::ApiPersistenceError;
1115

16+
#[derive(Clone)]
1217
pub struct ApiChannelRepository {
1318
pub client: Client,
1419
}
1520

16-
impl ChannelRepository for ApiChannelRepository {
17-
fn all(&self, identity: &str) -> RepositoryFuture<Vec<Channel>> {
18-
let list_page_url = |page: u32| {
19-
format!(
20-
"http://localhost:8005/channel/list?validator={}&page={}",
21-
identity, page
21+
impl ApiChannelRepository {
22+
fn fetch_page(&self, page: u64, identity: &str) -> RepositoryFuture<ChannelAllResponse> {
23+
self.client
24+
// call Sentry and fetch first page, where validator = identity
25+
.get(
26+
format!(
27+
"http://localhost:8005/channel/list?validator={}&page={}",
28+
identity, page
29+
)
30+
.as_str(),
2231
)
23-
};
24-
let first_page = self
25-
.client
26-
// call Sentry and fetch first page, where validator = params.identifier
27-
.get(&list_page_url(1))
2832
.send()
29-
.and_then(|mut res: Response| res.json::<AllResponse>())
30-
.map(|response| response.channels)
33+
.and_then(|mut res: Response| res.json::<ChannelAllResponse>())
3134
// @TODO: Error handling
32-
.map_err(|_error| ApiPersistenceError::Reading.into());
35+
.map_err(|_error| ApiPersistenceError::Reading.into())
36+
.compat()
37+
.boxed()
38+
}
39+
}
3340

34-
// call Sentry again and concat all the Channels in 1 Stream
41+
impl ChannelRepository for ApiChannelRepository {
42+
fn all(&self, identity: &str) -> RepositoryFuture<Vec<Channel>> {
43+
let identity = Arc::new(identity.to_string());
44+
let handle = self.clone();
45+
46+
let first_page = handle.fetch_page(1, &identity.clone());
47+
48+
// call Sentry again and concat all the Channels in Future
3549
// fetching them until no more Channels are returned
36-
// @TODO: fetch the rest of the results
50+
first_page
51+
.and_then(move |response| {
52+
let first_page_future = ok(response.channels).boxed();
53+
54+
if response.total_pages < 2 {
55+
first_page_future
56+
} else {
57+
let identity = identity.clone();
58+
let futures = (2..=response.total_pages)
59+
.map(|page| {
60+
handle
61+
.fetch_page(page, &identity)
62+
.map(|response_result| {
63+
response_result.and_then(|response| Ok(response.channels))
64+
})
65+
.boxed()
66+
})
67+
.chain(once(first_page_future));
3768

38-
first_page.compat().boxed()
69+
try_join_all(futures)
70+
.map(|result_all| {
71+
result_all
72+
.and_then(|all| Ok(all.into_iter().flatten().collect::<Vec<_>>()))
73+
})
74+
.boxed()
75+
}
76+
})
77+
.boxed()
3978
}
4079
}
4180

42-
#[derive(Deserialize)]
81+
#[derive(Deserialize, Debug)]
4382
#[serde(rename_all = "camelCase")]
44-
struct AllResponse {
83+
struct ChannelAllResponse {
4584
pub channels: Vec<Channel>,
4685
pub total_pages: u64,
4786
}

validator/src/main.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@ fn main() {
1111
let repo = validator::infrastructure::persistence::channel::api::ApiChannelRepository {
1212
client: Client::new(),
1313
};
14-
println!(
15-
"{:#?}",
16-
await!(repo.all("0x2892f6C41E0718eeeDd49D98D648C789668cA67d"))
17-
);
14+
15+
let all_channels = await!(repo.all("0x2892f6C41E0718eeeDd49D98D648C789668cA67d"));
16+
17+
match all_channels {
18+
Ok(channel) => println!("{:#?}", channel),
19+
Err(error) => eprintln!("Error occurred: {:#?}", error),
20+
};
1821
};
1922

2023
tokio::run(future.unit_error().boxed().compat());

0 commit comments

Comments
 (0)