Skip to content

Commit c6af155

Browse files
authored
Merge pull request #203 from AdExNetwork/issue-9-channel-list
Issue #9 channel list
2 parents d446e59 + 3dd8472 commit c6af155

File tree

8 files changed

+174
-56
lines changed

8 files changed

+174
-56
lines changed

primitives/src/sentry.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ pub struct AggregateEvents {
7575
pub event_payouts: HashMap<String, BigNum>,
7676
}
7777

78-
#[derive(Deserialize, Debug)]
78+
#[derive(Debug, Serialize, Deserialize)]
7979
#[serde(rename_all = "camelCase")]
80-
pub struct ChannelAllResponse {
80+
pub struct ChannelListResponse {
8181
pub channels: Vec<Channel>,
8282
pub total_pages: u64,
8383
}

sentry/migrations/20190806011140_initial_tables/up.sql renamed to sentry/migrations/20190806011140_initial-tables/up.sql

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,37 +3,33 @@ CREATE TABLE channels
33
id VARCHAR(66) NOT NULL,
44
creator VARCHAR(255) NOT NULL,
55
deposit_asset VARCHAR(42) NOT NULL,
6-
deposit_amount VARCHAR(255) NOT NULL, -- @TODO change the deposit to BigNum compatible field
6+
deposit_amount VARCHAR(255) NOT NULL,
77
valid_until TIMESTAMP WITH TIME ZONE NOT NULL,
88
spec JSONB NOT NULL,
99

1010
PRIMARY KEY (id)
1111
);
1212

13-
CREATE INDEX idx_valid_until ON channels (valid_until);
14-
CREATE INDEX idx_spec ON channels ((spec -> 'validator' ->> 'id'));
13+
CREATE INDEX idx_channel_valid_until ON channels (valid_until);
14+
CREATE INDEX idx_channels_spec_created ON channels ((spec ->> 'created'));
1515

1616
CREATE TABLE validator_messages
1717
(
1818
channel_id VARCHAR(66) NOT NULL REFERENCES channels (id) ON DELETE RESTRICT,
1919
"from" VARCHAR(255) NOT NULL,
2020
msg JSONB NOT NULL,
21-
received TIMESTAMP WITH TIME ZONE NOT NULL,
22-
23-
PRIMARY KEY (channel_id)
21+
received TIMESTAMP WITH TIME ZONE NOT NULL
2422
);
2523

26-
CREATE INDEX idx_received ON validator_messages (received);
27-
CREATE INDEX ON validator_messages ((msg ->> 'type'));
28-
CREATE INDEX ON validator_messages ((msg ->> 'stateRoot'));
24+
CREATE INDEX idx_validator_messages_received ON validator_messages (received);
25+
CREATE INDEX idx_validator_messages_msg_type ON validator_messages ((msg ->> 'type'));
26+
CREATE INDEX idx_validator_messages_msg_state_root ON validator_messages ((msg ->> 'stateRoot'));
2927

3028
CREATE TABLE event_aggregates
3129
(
3230
channel_id VARCHAR(66) NOT NULL REFERENCES channels (id) ON DELETE RESTRICT,
3331
created TIMESTAMP WITH TIME ZONE NOT NULL,
34-
events JSONB NOT NULL,
35-
36-
PRIMARY KEY (channel_id)
32+
events JSONB NOT NULL
3733
);
3834

39-
CREATE INDEX idx_created ON event_aggregates (created);
35+
CREATE INDEX idx_event_aggregates_created ON event_aggregates (created);

sentry/src/db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ pub async fn setup_migrations() {
7878

7979
// Define Migrations
8080
config
81-
.use_migrations(&[make_migration!("20190806011140_initial_tables")])
81+
.use_migrations(&[make_migration!("20190806011140_initial-tables")])
8282
.expect("Loading migrations failed");
8383

8484
// Reload config, ping the database for applied migrations

sentry/src/db/channel.rs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use bb8::RunError;
33
use primitives::{Channel, ChannelId, ValidatorId};
44
use std::str::FromStr;
55

6+
pub use list_channels::list_channels;
7+
68
pub async fn get_channel_by_id(
79
pool: &DbPool,
810
id: &ChannelId,
@@ -67,3 +69,123 @@ pub async fn insert_channel(
6769
})
6870
.await
6971
}
72+
73+
mod list_channels {
74+
use crate::db::DbPool;
75+
use bb8::RunError;
76+
use bb8_postgres::tokio_postgres::types::{accepts, FromSql, ToSql, Type};
77+
use chrono::{DateTime, Utc};
78+
use primitives::sentry::ChannelListResponse;
79+
use primitives::{Channel, ValidatorId};
80+
use std::error::Error;
81+
use std::str::FromStr;
82+
83+
struct TotalCount(pub u64);
84+
impl<'a> FromSql<'a> for TotalCount {
85+
fn from_sql(ty: &Type, raw: &'a [u8]) -> Result<Self, Box<dyn Error + Sync + Send>> {
86+
let str_slice = <&str as FromSql>::from_sql(ty, raw)?;
87+
88+
Ok(Self(u64::from_str(str_slice)?))
89+
}
90+
91+
// Use a varchar or text, since otherwise `int8` fails deserialization
92+
accepts!(VARCHAR, TEXT);
93+
}
94+
95+
pub async fn list_channels(
96+
pool: &DbPool,
97+
skip: u64,
98+
limit: u32,
99+
creator: &Option<String>,
100+
validator: &Option<ValidatorId>,
101+
valid_until_ge: &DateTime<Utc>,
102+
) -> Result<ChannelListResponse, RunError<bb8_postgres::tokio_postgres::Error>> {
103+
let validator = validator.as_ref().map(|validator_id| {
104+
serde_json::Value::from_str(&format!(r#"[{{"id": "{}"}}]"#, validator_id))
105+
.expect("Not a valid json")
106+
});
107+
let (where_clauses, params) =
108+
channel_list_query_params(creator, validator.as_ref(), valid_until_ge);
109+
let total_count_params = (where_clauses.clone(), params.clone());
110+
111+
let channels = pool
112+
.run(move |connection| {
113+
async move {
114+
// To understand why we use Order by, see Postgres Documentation: https://www.postgresql.org/docs/8.1/queries-limit.html
115+
let statement = format!("SELECT id, creator, deposit_asset, deposit_amount, valid_until, spec FROM channels WHERE {} ORDER BY spec->>'created' DESC LIMIT {} OFFSET {}", where_clauses.join(" AND "), limit, skip);
116+
match connection.prepare(&statement).await {
117+
Ok(stmt) => {
118+
match connection.query(&stmt, params.as_slice()).await {
119+
Ok(rows) => {
120+
let channels = rows.iter().map(Channel::from).collect();
121+
122+
Ok((channels, connection))
123+
},
124+
Err(e) => Err((e, connection)),
125+
}
126+
},
127+
Err(e) => Err((e, connection)),
128+
}
129+
}
130+
})
131+
.await?;
132+
133+
let total_count =
134+
list_channels_total_count(&pool, (&total_count_params.0, total_count_params.1)).await?;
135+
136+
// fast ceil for total_pages
137+
let total_pages = if total_count == 0 {
138+
1
139+
} else {
140+
1 + ((total_count - 1) / limit as u64)
141+
};
142+
143+
Ok(ChannelListResponse {
144+
total_pages,
145+
channels,
146+
})
147+
}
148+
149+
async fn list_channels_total_count<'a>(
150+
pool: &DbPool,
151+
(where_clauses, params): (&'a [String], Vec<&'a (dyn ToSql + Sync)>),
152+
) -> Result<u64, RunError<bb8_postgres::tokio_postgres::Error>> {
153+
pool.run(move |connection| {
154+
async move {
155+
let statement = format!(
156+
"SELECT COUNT(id)::varchar FROM channels WHERE {}",
157+
where_clauses.join(" AND ")
158+
);
159+
match connection.prepare(&statement).await {
160+
Ok(stmt) => match connection.query_one(&stmt, params.as_slice()).await {
161+
Ok(row) => Ok((row.get::<_, TotalCount>(0).0, connection)),
162+
Err(e) => Err((e, connection)),
163+
},
164+
Err(e) => Err((e, connection)),
165+
}
166+
}
167+
})
168+
.await
169+
}
170+
171+
fn channel_list_query_params<'a>(
172+
creator: &'a Option<String>,
173+
validator: Option<&'a serde_json::Value>,
174+
valid_until_ge: &'a DateTime<Utc>,
175+
) -> (Vec<String>, Vec<&'a (dyn ToSql + Sync)>) {
176+
let mut where_clauses = vec!["valid_until >= $1".to_string()];
177+
let mut params: Vec<&(dyn ToSql + Sync)> = vec![valid_until_ge];
178+
179+
if let Some(creator) = creator {
180+
where_clauses.push(format!("creator = ${}", params.len() + 1));
181+
params.push(creator);
182+
}
183+
184+
if let Some(validator) = validator {
185+
where_clauses.push(format!("spec->'validators' @> ${}", params.len() + 1));
186+
params.push(validator);
187+
}
188+
189+
(where_clauses, params)
190+
}
191+
}

sentry/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use primitives::Config;
1414
use redis::aio::MultiplexedConnection;
1515
use regex::Regex;
1616
use routes::cfg::config;
17-
use routes::channel::{create_channel, last_approved};
17+
use routes::channel::{channel_list, create_channel, last_approved};
1818
use slog::{error, Logger};
1919
use std::collections::HashMap;
2020

@@ -109,7 +109,7 @@ impl<A: Adapter + 'static> Application<A> {
109109
let mut response = match (req.uri().path(), req.method()) {
110110
("/cfg", &Method::GET) => config(req, &self).await,
111111
("/channel", &Method::POST) => create_channel(req, &self).await,
112-
("/channel/list", &Method::GET) => Err(ResponseError::NotFound),
112+
("/channel/list", &Method::GET) => channel_list(req, &self).await,
113113
// This is important becuase it prevents us from doing
114114
// expensive regex matching for routes without /channel
115115
(route, method) if route.starts_with("/channel") => {

sentry/src/routes/channel.rs

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use self::channel_list::ChannelListQuery;
2-
use crate::db::{get_channel_by_id, insert_channel};
2+
use crate::db::{get_channel_by_id, insert_channel, list_channels};
33
use crate::success_response;
44
use crate::Application;
55
use crate::ResponseError;
@@ -62,16 +62,27 @@ pub async fn create_channel<A: Adapter>(
6262
Ok(success_response(serde_json::to_string(&create_response)?))
6363
}
6464

65-
pub async fn channel_list(req: Request<Body>) -> Result<Response<Body>, ResponseError> {
66-
// @TODO: Get from Config
67-
let _channel_find_limit = 5;
68-
65+
pub async fn channel_list<A: Adapter>(
66+
req: Request<Body>,
67+
app: &Application<A>,
68+
) -> Result<Response<Body>, ResponseError> {
6969
let query = serde_urlencoded::from_str::<ChannelListQuery>(&req.uri().query().unwrap_or(""))?;
70-
71-
// @TODO: List all channels returned from the DB
72-
println!("{:?}", query);
73-
74-
Err(ResponseError::NotFound)
70+
let skip = query
71+
.page
72+
.checked_mul(app.config.channels_find_limit.into())
73+
.ok_or_else(|| ResponseError::BadRequest("Page and/or limit is too large".into()))?;
74+
75+
let list_response = list_channels(
76+
&app.pool,
77+
skip,
78+
app.config.channels_find_limit,
79+
&query.creator,
80+
&query.validator,
81+
&query.valid_until_ge,
82+
)
83+
.await?;
84+
85+
Ok(success_response(serde_json::to_string(&list_response)?))
7586
}
7687

7788
pub async fn last_approved<A: Adapter>(
@@ -93,40 +104,29 @@ pub async fn last_approved<A: Adapter>(
93104
}
94105

95106
mod channel_list {
107+
use chrono::serde::ts_seconds::deserialize as ts_seconds;
96108
use chrono::{DateTime, Utc};
97-
use serde::{Deserialize, Deserializer};
109+
use primitives::ValidatorId;
110+
use serde::Deserialize;
98111

99112
#[derive(Debug, Deserialize)]
100-
pub(crate) struct ChannelListQuery {
101-
/// page to show, should be >= 1
113+
pub(super) struct ChannelListQuery {
102114
#[serde(default = "default_page")]
103115
pub page: u64,
104-
/// channels limit per page, should be >= 1
105-
#[serde(default = "default_limit")]
106-
pub limit: u32,
107116
/// filters the list on `valid_until >= valid_until_ge`
108-
#[serde(default = "Utc::now")]
117+
/// It should be the same timestamp format as the `Channel.valid_until`: **seconds**
118+
#[serde(
119+
deserialize_with = "ts_seconds",
120+
default = "Utc::now",
121+
rename = "validUntil"
122+
)]
109123
pub valid_until_ge: DateTime<Utc>,
124+
pub creator: Option<String>,
110125
/// filters the channels containing a specific validator if provided
111-
#[serde(default, deserialize_with = "deserialize_validator")]
112-
pub validator: Option<String>,
113-
}
114-
115-
/// Deserialize the `Option<String>`, but if the `String` is empty it will return `None`
116-
fn deserialize_validator<'de, D>(de: D) -> Result<Option<String>, D::Error>
117-
where
118-
D: Deserializer<'de>,
119-
{
120-
let value: String = Deserialize::deserialize(de)?;
121-
let option = Some(value).filter(|string| !string.is_empty());
122-
Ok(option)
123-
}
124-
125-
fn default_limit() -> u32 {
126-
1
126+
pub validator: Option<ValidatorId>,
127127
}
128128

129129
fn default_page() -> u64 {
130-
1
130+
0
131131
}
132132
}

validator_worker/src/sentry_interface.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use futures_legacy::Future as LegacyFuture;
66
use primitives::adapter::Adapter;
77
use primitives::channel::SpecValidator;
88
use primitives::sentry::{
9-
ChannelAllResponse, EventAggregateResponse, LastApprovedResponse, SuccessResponse,
9+
ChannelListResponse, EventAggregateResponse, LastApprovedResponse, SuccessResponse,
1010
ValidatorMessageResponse,
1111
};
1212
use primitives::validator::MessageTypes;
@@ -230,7 +230,7 @@ pub async fn all_channels(
230230
if first_page.total_pages < 2 {
231231
Ok(first_page.channels)
232232
} else {
233-
let mut all: Vec<ChannelAllResponse> = try_join_all(
233+
let mut all: Vec<ChannelListResponse> = try_join_all(
234234
(1..first_page.total_pages).map(|i| fetch_page(url.clone(), i, whoami.clone())),
235235
)
236236
.await?;
@@ -248,7 +248,7 @@ async fn fetch_page(
248248
sentry_url: String,
249249
page: u64,
250250
validator: String,
251-
) -> Result<ChannelAllResponse, reqwest::Error> {
251+
) -> Result<ChannelListResponse, reqwest::Error> {
252252
let client = Client::new();
253253

254254
let mut query = vec![format!("page={}", page)];
@@ -257,7 +257,7 @@ async fn fetch_page(
257257
let future = client
258258
.get(format!("{}/channel/list?{}", sentry_url, query.join("&")).as_str())
259259
.send()
260-
.and_then(|mut res: Response| res.json::<ChannelAllResponse>());
260+
.and_then(|mut res: Response| res.json::<ChannelListResponse>());
261261

262262
future.compat().await
263263
}

0 commit comments

Comments
 (0)