Skip to content

Commit 0c3be32

Browse files
committed
add: approve_state, latest_new_state, last_heartbeats db queries
1 parent 2368fbc commit 0c3be32

File tree

5 files changed

+90
-6
lines changed

5 files changed

+90
-6
lines changed

sentry/src/analytics_recorder.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ fn get_payout(channel: &Channel, event: &Event) -> BigNum {
1212
Event::Impression { .. } => channel.spec.min_per_impression.clone(),
1313
Event::Click { .. } => {
1414
if let Some(pricing) = channel.spec.pricing_bounds.clone() {
15-
pricing.click.min
15+
if let Some(click) = pricing.click {
16+
click.min
17+
} else {
18+
BigNum::from(0)
19+
}
1620
} else {
1721
BigNum::from(0)
1822
}

sentry/src/db.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,10 @@ pub async fn setup_migrations(environment: &str) {
8484

8585
let mut migrations = vec![make_migration!("20190806011140_initial-tables")];
8686

87+
8788
if environment == "development" {
8889
// seeds database tables for testing
89-
migrations.push(make_migration!("20190806011140_initial-tables/seed"))
90+
migrations.push(make_migration!("20190806011140_initial-tables/seed"));
9091
}
9192

9293
// Define Migrations
@@ -97,6 +98,18 @@ pub async fn setup_migrations(environment: &str) {
9798
// Reload config, ping the database for applied migrations
9899
let config = config.reload().expect("Should reload applied migrations");
99100

101+
if environment == "development" {
102+
// delete all existing data to make tests reproducible
103+
Migrator::with_config(&config)
104+
.all(true)
105+
.direction(Direction::Down)
106+
.swallow_completion(true)
107+
.apply()
108+
.expect("Applying migrations failed");
109+
}
110+
111+
let config = config.reload().expect("Should reload applied migrations");
112+
100113
Migrator::with_config(&config)
101114
// set `swallow_completion` to `true`
102115
// so no error will be returned if all migrations have already been ran

sentry/src/db/event_aggregate.rs

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,73 @@ use bb8_postgres::tokio_postgres::types::{ToSql, Type};
55
use bb8_postgres::tokio_postgres::Error;
66
use chrono::{DateTime, Utc};
77
use futures::pin_mut;
8-
use primitives::sentry::EventAggregate;
8+
use primitives::sentry::{EventAggregate, LastApprovedResponse, ApproveStateValidatorMessage, NewStateValidatorMessage, HeartbeatValidatorMessage};
99
use primitives::BigNum;
10-
use primitives::{ChannelId, ValidatorId};
10+
use primitives::{ChannelId, ValidatorId, Channel};
1111
use std::ops::Add;
1212

13+
14+
pub async fn approve_state(
15+
pool: &DbPool,
16+
channel: &Channel
17+
) -> Result<Vec<ApproveStateValidatorMessage>, RunError<bb8_postgres::tokio_postgres::Error>> {
18+
/// select (from, msg, received) from validator_messages where channel_id = channel_id, from = from, msg ->> 'type'->>'ApproveState'
19+
/// select (from, msg, received) from validator_messages where channel_id = channel_id, from = from, msg ->> 'type'->>'NewState', msg ->> 'stateRoot'->>'0xx'
20+
pool
21+
.run(move |connection| {
22+
async move {
23+
match connection.prepare("SELECT from, msg, received FROM validator_messages WHERE channel_id = $1 AND from = $2 AND msg ->> 'type' = 'ApproveState' ORDER BY received DESC").await {
24+
Ok(select) => match connection.query(&select, &[&channel.id, &channel.spec.validators.follower().id]).await {
25+
Ok(rows) => Ok((rows.iter().map(ApproveStateValidatorMessage::from).collect(), connection)),
26+
Err(e) => Err((e, connection)),
27+
},
28+
Err(e) => Err((e, connection)),
29+
}
30+
}
31+
})
32+
.await
33+
}
34+
35+
pub async fn latest_new_state(
36+
pool: &DbPool,
37+
channel: &Channel,
38+
state_root: &str,
39+
) -> Result<Option<NewStateValidatorMessage>, RunError<bb8_postgres::tokio_postgres::Error>> {
40+
pool
41+
.run(move |connection| {
42+
async move {
43+
match connection.prepare("SELECT from, msg, received FROM validator_messages WHERE channel_id = $1 AND from = $2 AND msg ->> 'type' = 'NewState' AND msg->> 'stateRoot' = $3 ORDER BY received DESC LIMIT 1").await {
44+
Ok(select) => match connection.query(&select, &[&channel.id, &channel.spec.validators.leader().id, &state_root]).await {
45+
Ok(rows) => Ok((rows.get(0).map(NewStateValidatorMessage::from), connection)),
46+
Err(e) => Err((e, connection)),
47+
},
48+
Err(e) => Err((e, connection)),
49+
}
50+
}
51+
})
52+
.await
53+
}
54+
55+
pub async fn last_heartbeats(
56+
pool: &DbPool,
57+
channel_id: &ChannelId,
58+
validator_id: &ValidatorId
59+
) -> Result<Vec<HeartbeatValidatorMessage>, RunError<bb8_postgres::tokio_postgres::Error>> {
60+
pool
61+
.run(move |connection| {
62+
async move {
63+
match connection.prepare("SELECT from, msg, received FROM validator_messages WHERE channel_id = $1 AND from = $2 AND msg ->> 'type' = 'Heartbeat' ORDER BY received DESC LIMIT 2").await {
64+
Ok(select) => match connection.query(&select, &[&channel_id, &validator_id]).await {
65+
Ok(rows) => Ok((rows.iter().map(HeartbeatValidatorMessage::from).collect(), connection)),
66+
Err(e) => Err((e, connection)),
67+
},
68+
Err(e) => Err((e, connection)),
69+
}
70+
}
71+
})
72+
.await
73+
}
74+
1375
pub async fn list_event_aggregates(
1476
pool: &DbPool,
1577
channel_id: &ChannelId,
@@ -153,8 +215,8 @@ pub async fn insert_event_aggregate(
153215
pin_mut!(writer);
154216
for item in data {
155217
if let Err(e) = writer.as_mut().write(&[&item.id, &item.created, &item.event_type, &item.event_count, &item.event_payout, &item.earner]).await {
156-
err = Some(e);
157-
break;
218+
err = Some(e);
219+
break;
158220
}
159221
}
160222

sentry/src/event_aggregator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ impl EventAggregator {
156156
drop(channel_recorder);
157157

158158
if aggr_throttle == 0 {
159+
println!("in store");
159160
store(
160161
&app.pool,
161162
&channel_id,

sentry/src/routes/channel.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ pub async fn last_approved<A: Adapter>(
101101
let channel_id = ChannelId::from_hex(route_params.index(0))?;
102102
let channel = get_channel_by_id(&app.pool, &channel_id).await?.unwrap();
103103

104+
105+
104106
Ok(Response::builder()
105107
.header("Content-type", "application/json")
106108
.body(serde_json::to_string(&channel)?.into())
@@ -126,7 +128,9 @@ pub async fn insert_events<A: Adapter + 'static>(
126128

127129
let into_body = req.into_body();
128130
let body = hyper::body::to_bytes(into_body).await?;
131+
println!("deserializing request body");
129132
let request_body = serde_json::from_slice::<HashMap<String, Vec<Event>>>(&body)?;
133+
println!("deserializing request body 2");
130134
let events = request_body
131135
.get("events")
132136
.ok_or_else(|| ResponseError::BadRequest("invalid request".to_string()))?;

0 commit comments

Comments
 (0)