Skip to content

Commit ed3c244

Browse files
authored
Issue #61 sentry paginated channel list (#63)
* domain - channel - ChannelRepository - list_count: - persistence - channel - memory - impl `list_count` * application - channel resource - channel_list - show `total_pages` * domain - channel - ChannelRepository list_count - change count to `u64` and: - app - resource - channel_list - response of `total_pages` to be `u64` - persistence - postgres - impl `list_count` * sentry - util - bb8 - util for a `query_result` * validator - persistence - channel AllResponse: - add `total_pages` - add `serde(rename_all = camelCase)` attribute
1 parent 61a7acb commit ed3c244

File tree

9 files changed

+109
-44
lines changed

9 files changed

+109
-44
lines changed

sentry/src/application/resource/channel/channel_list/handler.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,12 @@ impl ChannelListHandler {
3434
let list_fut = self.channel_repository.list(&channel_list_params);
3535
// @TODO: Proper error handling
3636
let channels = await!(list_fut).unwrap();
37+
let channels_count =
38+
await!(self.channel_repository.list_count(&channel_list_params)).unwrap();
3739

38-
Ok(ChannelListResponse { channels })
40+
Ok(ChannelListResponse {
41+
channels,
42+
total_pages: channels_count,
43+
})
3944
}
4045
}

sentry/src/application/resource/channel/channel_list/response.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ use domain::Channel;
66
#[derive(Debug, Response)]
77
pub struct ChannelListResponse {
88
pub channels: Vec<Channel>,
9+
pub total_pages: u64,
910
}

sentry/src/domain/channel.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ pub trait ChannelRepository: Debug + Send + Sync {
5353
/// Returns a list of channels, based on the passed Parameters for this method
5454
fn list(&self, params: &ChannelListParams) -> RepositoryFuture<Vec<Channel>>;
5555

56+
fn list_count(&self, params: &ChannelListParams) -> RepositoryFuture<u64>;
57+
5658
fn find(&self, channel_id: &ChannelId) -> RepositoryFuture<Option<Channel>>;
5759

5860
fn create(&self, channel: Channel) -> RepositoryFuture<()>;

sentry/src/infrastructure.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
pub(crate) mod field;
22
pub mod persistence;
3+
pub(crate) mod util;

sentry/src/infrastructure/persistence/channel/memory.rs

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,28 +36,10 @@ impl ChannelRepository for MemoryChannelRepository {
3636
Ok(reader) => {
3737
let channels = reader
3838
.iter()
39-
.filter_map(|channel| {
40-
let valid_until_filter = channel.valid_until >= params.valid_until_ge;
41-
42-
let validator_filter_passed = match &params.validator {
43-
Some(validator_id) => {
44-
// check if there is any validator in the current
45-
// `channel.spec.validators` that has the same `id`
46-
channel.spec.validators.find(&validator_id).is_some()
47-
}
48-
// if None -> the current channel has passed, since we don't need to filter by anything
49-
None => true,
50-
};
51-
52-
match (valid_until_filter, validator_filter_passed) {
53-
(true, true) => Some(channel.clone()),
54-
(_, _) => None,
55-
}
56-
})
39+
.filter_map(|channel| list_filter(&params, channel))
5740
.skip(skip_results)
5841
.take(take)
5942
.collect();
60-
6143
ok(channels)
6244
}
6345
Err(error) => err(MemoryPersistenceError::from(error).into()),
@@ -66,6 +48,22 @@ impl ChannelRepository for MemoryChannelRepository {
6648
res_fut.boxed()
6749
}
6850

51+
fn list_count(&self, params: &ChannelListParams) -> RepositoryFuture<u64> {
52+
let res_fut = match self.records.read() {
53+
Ok(reader) => {
54+
let filtered_count = reader
55+
.iter()
56+
.filter_map(|channel| list_filter(&params, channel))
57+
.count();
58+
let pages = (filtered_count as f64 / f64::from(params.limit)).ceil() as u64;
59+
ok(pages)
60+
}
61+
Err(error) => err(MemoryPersistenceError::from(error).into()),
62+
};
63+
64+
res_fut.boxed()
65+
}
66+
6967
fn find(&self, channel_id: &ChannelId) -> RepositoryFuture<Option<Channel>> {
7068
let res_fut = match self.records.read() {
7169
Ok(reader) => {
@@ -113,3 +111,22 @@ impl ChannelRepository for MemoryChannelRepository {
113111
create_fut.boxed()
114112
}
115113
}
114+
115+
fn list_filter(params: &ChannelListParams, channel: &Channel) -> Option<Channel> {
116+
let valid_until_filter = channel.valid_until >= params.valid_until_ge;
117+
118+
let validator_filter_passed = match &params.validator {
119+
Some(validator_id) => {
120+
// check if there is any validator in the current
121+
// `channel.spec.validators` that has the same `id`
122+
channel.spec.validators.find(&validator_id).is_some()
123+
}
124+
// if None -> the current channel has passed, since we don't need to filter by anything
125+
None => true,
126+
};
127+
128+
match (valid_until_filter, validator_filter_passed) {
129+
(true, true) => Some(channel.clone()),
130+
(_, _) => None,
131+
}
132+
}
Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use futures::compat::Future01CompatExt;
22
use futures::future::FutureExt;
3-
use futures_legacy::future::IntoFuture;
43
use futures_legacy::Future as OldFuture;
5-
use futures_legacy::Stream as OldStream;
64
use tokio_postgres::types::Json;
5+
use tokio_postgres::Row;
76

87
use domain::{Channel, ChannelId, ChannelSpec, RepositoryFuture};
98
use try_future::try_future;
@@ -12,6 +11,7 @@ use crate::domain::channel::{ChannelListParams, ChannelRepository};
1211
use crate::infrastructure::field::{asset::AssetPg, bignum::BigNumPg, channel_id::ChannelIdPg};
1312
use crate::infrastructure::persistence::postgres::PostgresPersistenceError;
1413
use crate::infrastructure::persistence::DbPool;
14+
use crate::infrastructure::util::bb8::query_result;
1515

1616
#[derive(Debug)]
1717
pub struct PostgresChannelRepository {
@@ -31,31 +31,11 @@ impl ChannelRepository for PostgresChannelRepository {
3131
.run(move |mut conn| {
3232
conn.prepare("SELECT channel_id, creator, deposit_asset, deposit_amount, valid_until, spec FROM channels")
3333
.then(move |res| match res {
34-
Ok(stmt) => {
35-
conn
36-
.query(&stmt, &[])
37-
.collect()
38-
.into_future()
39-
.then(|res| match res {
40-
Ok(rows) => Ok((rows, conn)),
41-
Err(err) => Err((err, conn)),
42-
})
43-
.into()
44-
}
34+
Ok(stmt) => query_result(conn.query(&stmt, &[]), conn),
4535
Err(err) => try_future!(Err((err, conn))),
4636
})
4737
.and_then(|(rows, conn)| {
48-
let channels = rows.iter().map(|row| {
49-
let spec: ChannelSpec = row.get::<_, Json<ChannelSpec>>("spec").0;
50-
Channel {
51-
id: row.get::<_, ChannelIdPg>("channel_id").into(),
52-
creator: row.get("creator"),
53-
deposit_asset: row.get::<_, AssetPg>("deposit_asset").into(),
54-
deposit_amount: row.get::<_, BigNumPg>("deposit_amount").into(),
55-
valid_until: row.get("valid_until"),
56-
spec,
57-
}
58-
}).collect();
38+
let channels = rows.iter().map(|row| channel_map(row)).collect();
5939

6040
Ok((channels, conn))
6141
})
@@ -65,6 +45,29 @@ impl ChannelRepository for PostgresChannelRepository {
6545
fut.compat().boxed()
6646
}
6747

48+
fn list_count(&self, _params: &ChannelListParams) -> RepositoryFuture<u64> {
49+
let fut = self
50+
.db_pool
51+
.run(move |mut conn| {
52+
conn.prepare("SELECT COUNT(channel_id)::TEXT FROM channels")
53+
.then(move |res| match res {
54+
Ok(stmt) => query_result(conn.query(&stmt, &[]), conn),
55+
Err(err) => try_future!(Err((err, conn))),
56+
})
57+
.and_then(|(rows, conn)| {
58+
let count = rows[0]
59+
.get::<_, &str>(0)
60+
.parse::<u64>()
61+
.expect("Not possible to have that many rows");
62+
63+
Ok((count, conn))
64+
})
65+
})
66+
.map_err(|err| PostgresPersistenceError::from(err).into());
67+
68+
fut.compat().boxed()
69+
}
70+
6871
fn find(&self, _channel_id: &ChannelId) -> RepositoryFuture<Option<Channel>> {
6972
unimplemented!("find() for Postgres still needs to be implemented")
7073
}
@@ -73,3 +76,15 @@ impl ChannelRepository for PostgresChannelRepository {
7376
unimplemented!("create() for Postgres still needs to be implemented")
7477
}
7578
}
79+
80+
fn channel_map(row: &Row) -> Channel {
81+
let spec: ChannelSpec = row.get::<_, Json<ChannelSpec>>("spec").0;
82+
Channel {
83+
id: row.get::<_, ChannelIdPg>("channel_id").into(),
84+
creator: row.get("creator"),
85+
deposit_asset: row.get::<_, AssetPg>("deposit_asset").into(),
86+
deposit_amount: row.get::<_, BigNumPg>("deposit_amount").into(),
87+
valid_until: row.get("valid_until"),
88+
spec,
89+
}
90+
}

sentry/src/infrastructure/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub(crate) mod bb8;

sentry/src/infrastructure/util/bb8.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use futures_legacy::future::IntoFuture;
2+
use futures_legacy::stream::Stream;
3+
use futures_legacy::Future;
4+
use tokio_postgres::impls::Query;
5+
use tokio_postgres::{Client, Row};
6+
use try_future::TryFuture;
7+
8+
pub(crate) fn query_result(
9+
query: Query,
10+
client: Client,
11+
) -> TryFuture<impl Future<Item = (Vec<Row>, Client), Error = (tokio_postgres::Error, Client)>> {
12+
query
13+
.collect()
14+
.into_future()
15+
.then(|res| match res {
16+
Ok(rows) => Ok((rows, client)),
17+
Err(err) => Err((err, client)),
18+
})
19+
.into_future()
20+
.into()
21+
}

validator/src/infrastructure/persistence/channel/api.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ impl ChannelRepository for ApiChannelRepository {
4040
}
4141

4242
#[derive(Deserialize)]
43+
#[serde(rename_all = "camelCase")]
4344
struct AllResponse {
4445
pub channels: Vec<Channel>,
46+
pub total_pages: u64,
4547
}

0 commit comments

Comments
 (0)