Skip to content

Commit a451ebb

Browse files
authored
Merge branch 'dev' into issue-9-validator-messages
2 parents 931aba4 + c6af155 commit a451ebb

File tree

13 files changed

+187
-125
lines changed

13 files changed

+187
-125
lines changed

Cargo.lock

Lines changed: 11 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

primitives/src/ad_unit.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
use chrono::serde::ts_milliseconds;
1+
use chrono::serde::{ts_milliseconds, ts_milliseconds_option};
22
use chrono::{DateTime, Utc};
33
use serde::{Deserialize, Serialize};
44

5-
use crate::util::serde::ts_milliseconds_option;
65
use crate::TargetingTag;
76

87
#[derive(Serialize, Deserialize, Debug, Clone)]

primitives/src/channel.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
use std::error::Error;
22
use std::fmt;
33

4-
use chrono::serde::{ts_milliseconds, ts_seconds};
4+
use chrono::serde::{ts_milliseconds, ts_milliseconds_option, ts_seconds};
55
use chrono::{DateTime, Utc};
66
use serde::{Deserialize, Serialize};
77
use serde_hex::{SerHex, StrictPfx};
88

99
use crate::big_num::BigNum;
10-
use crate::util::serde::ts_milliseconds_option;
1110
use crate::{AdUnit, EventSubmission, TargetingTag, ValidatorDesc, ValidatorId};
1211
use hex::{FromHex, FromHexError};
1312
use std::ops::Deref;

primitives/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ pub mod util {
2222
}
2323

2424
pub mod logging;
25-
pub mod serde;
2625
}
2726
pub mod validator;
2827

primitives/src/sentry.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ pub struct AggregateEvents {
7575
pub event_payouts: HashMap<String, BigNum>,
7676
}
7777

78-
#[derive(Deserialize, Debug)]
78+
#[derive(Debug, Serialize, Deserialize)]
7979
#[serde(rename_all = "camelCase")]
80-
pub struct ChannelAllResponse {
80+
pub struct ChannelListResponse {
8181
pub channels: Vec<Channel>,
8282
pub total_pages: u64,
8383
}

primitives/src/util/serde.rs

Lines changed: 0 additions & 52 deletions
This file was deleted.

sentry/migrations/20190806011140_initial_tables/up.sql renamed to sentry/migrations/20190806011140_initial-tables/up.sql

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,37 +3,33 @@ CREATE TABLE channels
33
id VARCHAR(66) NOT NULL,
44
creator VARCHAR(255) NOT NULL,
55
deposit_asset VARCHAR(42) NOT NULL,
6-
deposit_amount VARCHAR(255) NOT NULL, -- @TODO change the deposit to BigNum compatible field
6+
deposit_amount VARCHAR(255) NOT NULL,
77
valid_until TIMESTAMP WITH TIME ZONE NOT NULL,
88
spec JSONB NOT NULL,
99

1010
PRIMARY KEY (id)
1111
);
1212

13-
CREATE INDEX idx_valid_until ON channels (valid_until);
14-
CREATE INDEX idx_spec ON channels ((spec -> 'validator' ->> 'id'));
13+
CREATE INDEX idx_channel_valid_until ON channels (valid_until);
14+
CREATE INDEX idx_channels_spec_created ON channels ((spec ->> 'created'));
1515

1616
CREATE TABLE validator_messages
1717
(
1818
channel_id VARCHAR(66) NOT NULL REFERENCES channels (id) ON DELETE RESTRICT,
1919
"from" VARCHAR(255) NOT NULL,
2020
msg JSONB NOT NULL,
21-
received TIMESTAMP WITH TIME ZONE NOT NULL,
22-
23-
PRIMARY KEY (channel_id)
21+
received TIMESTAMP WITH TIME ZONE NOT NULL
2422
);
2523

26-
CREATE INDEX idx_received ON validator_messages (received);
27-
CREATE INDEX ON validator_messages ((msg ->> 'type'));
28-
CREATE INDEX ON validator_messages ((msg ->> 'stateRoot'));
24+
CREATE INDEX idx_validator_messages_received ON validator_messages (received);
25+
CREATE INDEX idx_validator_messages_msg_type ON validator_messages ((msg ->> 'type'));
26+
CREATE INDEX idx_validator_messages_msg_state_root ON validator_messages ((msg ->> 'stateRoot'));
2927

3028
CREATE TABLE event_aggregates
3129
(
3230
channel_id VARCHAR(66) NOT NULL REFERENCES channels (id) ON DELETE RESTRICT,
3331
created TIMESTAMP WITH TIME ZONE NOT NULL,
34-
events JSONB NOT NULL,
35-
36-
PRIMARY KEY (channel_id)
32+
events JSONB NOT NULL
3733
);
3834

39-
CREATE INDEX idx_created ON event_aggregates (created);
35+
CREATE INDEX idx_event_aggregates_created ON event_aggregates (created);

sentry/src/db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ pub async fn setup_migrations() {
8181

8282
// Define Migrations
8383
config
84-
.use_migrations(&[make_migration!("20190806011140_initial_tables")])
84+
.use_migrations(&[make_migration!("20190806011140_initial-tables")])
8585
.expect("Loading migrations failed");
8686

8787
// Reload config, ping the database for applied migrations

sentry/src/db/channel.rs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use bb8::RunError;
33
use primitives::{Channel, ChannelId, ValidatorId};
44
use std::str::FromStr;
55

6+
pub use list_channels::list_channels;
7+
68
pub async fn get_channel_by_id(
79
pool: &DbPool,
810
id: &ChannelId,
@@ -67,3 +69,123 @@ pub async fn insert_channel(
6769
})
6870
.await
6971
}
72+
73+
mod list_channels {
74+
use crate::db::DbPool;
75+
use bb8::RunError;
76+
use bb8_postgres::tokio_postgres::types::{accepts, FromSql, ToSql, Type};
77+
use chrono::{DateTime, Utc};
78+
use primitives::sentry::ChannelListResponse;
79+
use primitives::{Channel, ValidatorId};
80+
use std::error::Error;
81+
use std::str::FromStr;
82+
83+
struct TotalCount(pub u64);
84+
impl<'a> FromSql<'a> for TotalCount {
85+
fn from_sql(ty: &Type, raw: &'a [u8]) -> Result<Self, Box<dyn Error + Sync + Send>> {
86+
let str_slice = <&str as FromSql>::from_sql(ty, raw)?;
87+
88+
Ok(Self(u64::from_str(str_slice)?))
89+
}
90+
91+
// Use a varchar or text, since otherwise `int8` fails deserialization
92+
accepts!(VARCHAR, TEXT);
93+
}
94+
95+
pub async fn list_channels(
96+
pool: &DbPool,
97+
skip: u64,
98+
limit: u32,
99+
creator: &Option<String>,
100+
validator: &Option<ValidatorId>,
101+
valid_until_ge: &DateTime<Utc>,
102+
) -> Result<ChannelListResponse, RunError<bb8_postgres::tokio_postgres::Error>> {
103+
let validator = validator.as_ref().map(|validator_id| {
104+
serde_json::Value::from_str(&format!(r#"[{{"id": "{}"}}]"#, validator_id))
105+
.expect("Not a valid json")
106+
});
107+
let (where_clauses, params) =
108+
channel_list_query_params(creator, validator.as_ref(), valid_until_ge);
109+
let total_count_params = (where_clauses.clone(), params.clone());
110+
111+
let channels = pool
112+
.run(move |connection| {
113+
async move {
114+
// To understand why we use Order by, see Postgres Documentation: https://www.postgresql.org/docs/8.1/queries-limit.html
115+
let statement = format!("SELECT id, creator, deposit_asset, deposit_amount, valid_until, spec FROM channels WHERE {} ORDER BY spec->>'created' DESC LIMIT {} OFFSET {}", where_clauses.join(" AND "), limit, skip);
116+
match connection.prepare(&statement).await {
117+
Ok(stmt) => {
118+
match connection.query(&stmt, params.as_slice()).await {
119+
Ok(rows) => {
120+
let channels = rows.iter().map(Channel::from).collect();
121+
122+
Ok((channels, connection))
123+
},
124+
Err(e) => Err((e, connection)),
125+
}
126+
},
127+
Err(e) => Err((e, connection)),
128+
}
129+
}
130+
})
131+
.await?;
132+
133+
let total_count =
134+
list_channels_total_count(&pool, (&total_count_params.0, total_count_params.1)).await?;
135+
136+
// fast ceil for total_pages
137+
let total_pages = if total_count == 0 {
138+
1
139+
} else {
140+
1 + ((total_count - 1) / limit as u64)
141+
};
142+
143+
Ok(ChannelListResponse {
144+
total_pages,
145+
channels,
146+
})
147+
}
148+
149+
async fn list_channels_total_count<'a>(
150+
pool: &DbPool,
151+
(where_clauses, params): (&'a [String], Vec<&'a (dyn ToSql + Sync)>),
152+
) -> Result<u64, RunError<bb8_postgres::tokio_postgres::Error>> {
153+
pool.run(move |connection| {
154+
async move {
155+
let statement = format!(
156+
"SELECT COUNT(id)::varchar FROM channels WHERE {}",
157+
where_clauses.join(" AND ")
158+
);
159+
match connection.prepare(&statement).await {
160+
Ok(stmt) => match connection.query_one(&stmt, params.as_slice()).await {
161+
Ok(row) => Ok((row.get::<_, TotalCount>(0).0, connection)),
162+
Err(e) => Err((e, connection)),
163+
},
164+
Err(e) => Err((e, connection)),
165+
}
166+
}
167+
})
168+
.await
169+
}
170+
171+
fn channel_list_query_params<'a>(
172+
creator: &'a Option<String>,
173+
validator: Option<&'a serde_json::Value>,
174+
valid_until_ge: &'a DateTime<Utc>,
175+
) -> (Vec<String>, Vec<&'a (dyn ToSql + Sync)>) {
176+
let mut where_clauses = vec!["valid_until >= $1".to_string()];
177+
let mut params: Vec<&(dyn ToSql + Sync)> = vec![valid_until_ge];
178+
179+
if let Some(creator) = creator {
180+
where_clauses.push(format!("creator = ${}", params.len() + 1));
181+
params.push(creator);
182+
}
183+
184+
if let Some(validator) = validator {
185+
where_clauses.push(format!("spec->'validators' @> ${}", params.len() + 1));
186+
params.push(validator);
187+
}
188+
189+
(where_clauses, params)
190+
}
191+
}

0 commit comments

Comments
 (0)