Skip to content

Commit 6a6094f

Browse files
committed
fix: last approved route impl
1 parent 0c3be32 commit 6a6094f

File tree

3 files changed

+47
-12
lines changed

3 files changed

+47
-12
lines changed

sentry/src/db.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,15 +99,15 @@ pub async fn setup_migrations(environment: &str) {
9999
let config = config.reload().expect("Should reload applied migrations");
100100

101101
if environment == "development" {
102-
// delete all existing data to make tests reproducible
102+
// delete all existing data to make tests reproducible
103103
Migrator::with_config(&config)
104104
.all(true)
105105
.direction(Direction::Down)
106106
.swallow_completion(true)
107107
.apply()
108108
.expect("Applying migrations failed");
109109
}
110-
110+
111111
let config = config.reload().expect("Should reload applied migrations");
112112

113113
Migrator::with_config(&config)

sentry/src/db/event_aggregate.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,18 @@ use primitives::{ChannelId, ValidatorId, Channel};
1111
use std::ops::Add;
1212

1313

14-
pub async fn approve_state(
14+
pub async fn lastest_approve_state(
1515
pool: &DbPool,
1616
channel: &Channel
17-
) -> Result<Vec<ApproveStateValidatorMessage>, RunError<bb8_postgres::tokio_postgres::Error>> {
17+
) -> Result<Option<ApproveStateValidatorMessage>, RunError<bb8_postgres::tokio_postgres::Error>> {
1818
/// select (from, msg, received) from validator_messages where channel_id = channel_id, from = from, msg ->> 'type'->>'ApproveState'
1919
/// select (from, msg, received) from validator_messages where channel_id = channel_id, from = from, msg ->> 'type'->>'NewState', msg ->> 'stateRoot'->>'0xx'
2020
pool
2121
.run(move |connection| {
2222
async move {
23-
match connection.prepare("SELECT from, msg, received FROM validator_messages WHERE channel_id = $1 AND from = $2 AND msg ->> 'type' = 'ApproveState' ORDER BY received DESC").await {
23+
match connection.prepare("SELECT from, msg, received FROM validator_messages WHERE channel_id = $1 AND from = $2 AND msg ->> 'type' = 'ApproveState' ORDER BY received DESC LIMIT 1").await {
2424
Ok(select) => match connection.query(&select, &[&channel.id, &channel.spec.validators.follower().id]).await {
25-
Ok(rows) => Ok((rows.iter().map(ApproveStateValidatorMessage::from).collect(), connection)),
25+
Ok(rows) => Ok((rows.get(0).map(ApproveStateValidatorMessage::from), connection)),
2626
Err(e) => Err((e, connection)),
2727
},
2828
Err(e) => Err((e, connection)),
@@ -52,10 +52,10 @@ pub async fn latest_new_state(
5252
.await
5353
}
5454

55-
pub async fn last_heartbeats(
55+
pub async fn latest_heartbeats(
5656
pool: &DbPool,
5757
channel_id: &ChannelId,
58-
validator_id: &ValidatorId
58+
validator_id: ValidatorId
5959
) -> Result<Vec<HeartbeatValidatorMessage>, RunError<bb8_postgres::tokio_postgres::Error>> {
6060
pool
6161
.run(move |connection| {

sentry/src/routes/channel.rs

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use self::channel_list::ChannelListQuery;
1+
use self::channel_list::{ChannelListQuery, LastApprovedQuery};
22
use crate::db::{get_channel_by_id, insert_channel, insert_validator_messages, list_channels};
3+
use crate::db::event_aggregate::{ lastest_approve_state, latest_new_state, latest_heartbeats};
34
use crate::success_response;
45
use crate::Application;
56
use crate::ResponseError;
@@ -10,7 +11,7 @@ use hex::FromHex;
1011
use hyper::{Body, Request, Response};
1112
use primitives::adapter::Adapter;
1213
use primitives::channel::SpecValidator;
13-
use primitives::sentry::{Event, SuccessResponse};
14+
use primitives::sentry::{Event, SuccessResponse, LastApprovedResponse, LastApproved};
1415
use primitives::validator::MessageTypes;
1516
use primitives::{Channel, ChannelId};
1617
use slog::error;
@@ -101,11 +102,39 @@ pub async fn last_approved<A: Adapter>(
101102
let channel_id = ChannelId::from_hex(route_params.index(0))?;
102103
let channel = get_channel_by_id(&app.pool, &channel_id).await?.unwrap();
103104

104-
105+
let approve_state = lastest_approve_state(&app.pool, &channel).await?;
106+
if approve_state.is_none() {
107+
return Ok(Response::builder()
108+
.header("Content-type", "application/json")
109+
.body(serde_json::to_string(&LastApprovedResponse{ last_approved: None, heartbeats: None })?.into())
110+
.unwrap())
111+
}
112+
113+
let state_root = approve_state.as_ref().expect("value should be present").msg.state_root.clone();
114+
let new_state = latest_new_state(&app.pool, &channel, &state_root).await?;
115+
if new_state.is_none() {
116+
return Ok(Response::builder()
117+
.header("Content-type", "application/json")
118+
.body(serde_json::to_string(&LastApprovedResponse{ last_approved: None, heartbeats: None })?.into())
119+
.unwrap())
120+
}
121+
122+
let query = serde_urlencoded::from_str::<LastApprovedQuery>(&req.uri().query().unwrap_or(""))?;
123+
let validators = channel.spec.validators;
124+
let channel_id = channel.id;
125+
let heartbeats = if query.with_heartbeat.is_some() {
126+
let result = try_join_all(
127+
validators.into_iter()
128+
.map(|validator| latest_heartbeats(&app.pool, &channel_id, validator.id.clone()))
129+
).await?;
130+
Some(result.into_iter().flatten().collect::<Vec<_>>())
131+
} else {
132+
None
133+
};
105134

106135
Ok(Response::builder()
107136
.header("Content-type", "application/json")
108-
.body(serde_json::to_string(&channel)?.into())
137+
.body(serde_json::to_string(&&LastApprovedResponse{ last_approved: Some(LastApproved { new_state, approve_state }), heartbeats })?.into())
109138
.unwrap())
110139
}
111140

@@ -206,6 +235,12 @@ mod channel_list {
206235
pub validator: Option<ValidatorId>,
207236
}
208237

238+
#[derive(Debug, Deserialize)]
239+
#[serde(rename_all = "camelCase")]
240+
pub(super) struct LastApprovedQuery {
241+
pub with_heartbeat: Option<String>
242+
}
243+
209244
fn default_page() -> u64 {
210245
0
211246
}

0 commit comments

Comments
 (0)