Skip to content

Commit f1c8fd4

Browse files
authored
Merge pull request #207 from AdExNetwork/issue-9-validator-messages
Issue #9 Get Channel's Validator messages
2 parents 89a2c1e + b0cc1b3 commit f1c8fd4

File tree

6 files changed

+199
-6
lines changed

6 files changed

+199
-6
lines changed

primitives/src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub struct Config {
2323
pub heartbeat_time: u32, // in milliseconds
2424
pub channels_find_limit: u32,
2525
pub events_find_limit: u32,
26+
pub msgs_find_limit: u32,
2627
pub health_threshold_promilles: u32,
2728
pub health_unsignable_promilles: u32,
2829
pub propagation_timeout: u32,

primitives/src/sentry.rs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::validator::{ApproveState, Heartbeat, MessageTypes, NewState};
2-
use crate::{BigNum, Channel, ChannelId};
2+
use crate::{BigNum, Channel, ChannelId, ValidatorId};
33
use chrono::{DateTime, Utc};
44
use serde::{Deserialize, Serialize};
55
use std::collections::HashMap;
@@ -99,7 +99,7 @@ pub struct SuccessResponse {
9999

100100
#[derive(Serialize, Deserialize, Debug)]
101101
pub struct ValidatorMessage {
102-
pub from: String,
102+
pub from: ValidatorId,
103103
pub received: DateTime<Utc>,
104104
pub msg: MessageTypes,
105105
}
@@ -114,3 +114,36 @@ pub struct ValidatorMessageResponse {
114114
pub struct EventAggregateResponse {
115115
pub events: Vec<EventAggregate>,
116116
}
117+
118+
#[cfg(feature = "postgres")]
119+
mod postgres {
120+
use super::ValidatorMessage;
121+
use crate::validator::MessageTypes;
122+
use bytes::BytesMut;
123+
use postgres_types::{accepts, to_sql_checked, IsNull, Json, ToSql, Type};
124+
use std::error::Error;
125+
use tokio_postgres::Row;
126+
127+
impl From<&Row> for ValidatorMessage {
128+
fn from(row: &Row) -> Self {
129+
Self {
130+
from: row.get("from"),
131+
received: row.get("received"),
132+
msg: row.get::<_, Json<MessageTypes>>("msg").0,
133+
}
134+
}
135+
}
136+
137+
impl ToSql for MessageTypes {
138+
fn to_sql(
139+
&self,
140+
ty: &Type,
141+
w: &mut BytesMut,
142+
) -> Result<IsNull, Box<dyn Error + Sync + Send>> {
143+
Json(self).to_sql(ty, w)
144+
}
145+
146+
accepts!(JSONB);
147+
to_sql_checked!();
148+
}
149+
}

sentry/src/db.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ use std::env;
88
use lazy_static::lazy_static;
99

1010
mod channel;
11+
mod validator_message;
12+
1113
pub use self::channel::*;
14+
pub use self::validator_message::*;
1215

1316
pub type DbPool = Pool<PostgresConnectionManager<NoTls>>;
1417

sentry/src/db/validator_message.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use crate::db::DbPool;
2+
use bb8::RunError;
3+
use bb8_postgres::tokio_postgres::types::ToSql;
4+
use primitives::sentry::ValidatorMessage;
5+
use primitives::{ChannelId, ValidatorId};
6+
7+
pub async fn get_validator_messages(
8+
pool: &DbPool,
9+
channel_id: &ChannelId,
10+
validator_id: &Option<ValidatorId>,
11+
message_types: &[String],
12+
limit: u64,
13+
) -> Result<Vec<ValidatorMessage>, RunError<bb8_postgres::tokio_postgres::Error>> {
14+
let mut where_clauses: Vec<String> = vec!["channel_id = $1".to_string()];
15+
let mut params: Vec<&(dyn ToSql + Sync)> = vec![&channel_id];
16+
17+
if let Some(validator_id) = validator_id {
18+
where_clauses.push(format!(r#""from" = ${}"#, params.len() + 1));
19+
params.push(validator_id);
20+
}
21+
22+
add_message_types_params(&mut where_clauses, &mut params, message_types);
23+
24+
pool
25+
.run(move |connection| {
26+
async move {
27+
let statement = format!(r#"SELECT "from", msg, received FROM validator_messages WHERE {} ORDER BY received DESC LIMIT {}"#, where_clauses.join(" AND "), limit);
28+
match connection.prepare(&statement).await {
29+
Ok(select) => match connection.query(&select, params.as_slice()).await {
30+
Ok(results) => {
31+
let messages = results.iter().map(ValidatorMessage::from).collect();
32+
Ok((messages, connection))},
33+
Err(e) => Err((e, connection)),
34+
},
35+
Err(e) => Err((e, connection)),
36+
}
37+
}
38+
})
39+
.await
40+
}
41+
42+
fn add_message_types_params<'a>(
43+
where_clauses: &mut Vec<String>,
44+
params: &mut Vec<&'a (dyn ToSql + Sync)>,
45+
message_types: &'a [String],
46+
) {
47+
let mut msg_prep = vec![];
48+
for message_type in message_types.iter() {
49+
msg_prep.push(format!("${}", params.len() + 1));
50+
params.push(message_type);
51+
}
52+
53+
if !msg_prep.is_empty() {
54+
where_clauses.push(format!("msg->>'type' IN ({})", msg_prep.join(",")));
55+
}
56+
}

sentry/src/lib.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::middleware::auth;
77
use crate::middleware::channel::channel_load;
88
use crate::middleware::cors::{cors, Cors};
99
use crate::routes::channel::channel_status;
10+
use crate::routes::validator_message::{extract_params, list_validator_messages};
1011
use hyper::{Body, Method, Request, Response, StatusCode};
1112
use lazy_static::lazy_static;
1213
use primitives::adapter::Adapter;
@@ -27,6 +28,7 @@ pub mod middleware {
2728
pub mod routes {
2829
pub mod cfg;
2930
pub mod channel;
31+
pub mod validator_message;
3032
}
3133

3234
pub mod access;
@@ -39,6 +41,8 @@ lazy_static! {
3941
Regex::new(r"^/channel/0x([a-zA-Z0-9]{64})/?$").expect("The regex should be valid");
4042
static ref LAST_APPROVED_BY_CHANNEL_ID: Regex = Regex::new(r"^/channel/0x([a-zA-Z0-9]{64})/last-approved/?$").expect("The regex should be valid");
4143
static ref CHANNEL_STATUS_BY_CHANNEL_ID: Regex = Regex::new(r"^/channel/0x([a-zA-Z0-9]{64})/status/?$").expect("The regex should be valid");
44+
// Only the initial Regex to be matched.
45+
static ref CHANNEL_VALIDATOR_MESSAGES: Regex = Regex::new(r"^/channel/0x([a-zA-Z0-9]{64})/validator-messages(/.*)?$").expect("The regex should be valid");
4246
// @TODO define other regex routes
4347
}
4448

@@ -106,18 +110,19 @@ impl<A: Adapter + 'static> Application<A> {
106110
}
107111
};
108112

109-
let mut response = match (req.uri().path(), req.method()) {
113+
let path = req.uri().path().to_string();
114+
let mut response = match (path.as_ref(), req.method()) {
110115
("/cfg", &Method::GET) => config(req, &self).await,
111116
("/channel", &Method::POST) => create_channel(req, &self).await,
112117
("/channel/list", &Method::GET) => channel_list(req, &self).await,
113118
// This is important becuase it prevents us from doing
114119
// expensive regex matching for routes without /channel
115-
(route, method) if route.starts_with("/channel") => {
120+
(path, method) if path.starts_with("/channel") => {
116121
// example with
117122
// @TODO remove later
118123
// regex matching for routes with params
119124
if let (Some(caps), &Method::GET) =
120-
(LAST_APPROVED_BY_CHANNEL_ID.captures(route), method)
125+
(LAST_APPROVED_BY_CHANNEL_ID.captures(path), method)
121126
{
122127
let param = RouteParams(vec![caps
123128
.get(1)
@@ -135,7 +140,7 @@ impl<A: Adapter + 'static> Application<A> {
135140

136141
last_approved(req, &self).await
137142
} else if let (Some(caps), &Method::GET) =
138-
(CHANNEL_STATUS_BY_CHANNEL_ID.captures(route), method)
143+
(CHANNEL_STATUS_BY_CHANNEL_ID.captures(path), method)
139144
{
140145
let param = RouteParams(vec![caps
141146
.get(1)
@@ -150,6 +155,31 @@ impl<A: Adapter + 'static> Application<A> {
150155
};
151156

152157
channel_status(req, &self).await
158+
} else if let (Some(caps), &Method::GET) =
159+
(CHANNEL_VALIDATOR_MESSAGES.captures(path), method)
160+
{
161+
let param = RouteParams(vec![caps
162+
.get(1)
163+
.map_or("".to_string(), |m| m.as_str().to_string())]);
164+
req.extensions_mut().insert(param);
165+
166+
let req = match chain(req, &self, vec![channel_load]).await {
167+
Ok(req) => req,
168+
Err(error) => {
169+
return map_response_error(error);
170+
}
171+
};
172+
173+
// @TODO: Move this to a middleware?!
174+
let extract_params =
175+
match extract_params(caps.get(2).map_or("", |m| m.as_str())) {
176+
Ok(params) => params,
177+
Err(error) => {
178+
return map_response_error(error.into());
179+
}
180+
};
181+
182+
list_validator_messages(req, &self, &extract_params.0, &extract_params.1).await
153183
} else {
154184
Err(ResponseError::NotFound)
155185
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
use crate::db::get_validator_messages;
2+
use crate::{success_response, Application, ResponseError};
3+
use hyper::{Body, Request, Response};
4+
use primitives::adapter::Adapter;
5+
use primitives::sentry::ValidatorMessageResponse;
6+
use primitives::{Channel, DomainError, ValidatorId};
7+
use serde::Deserialize;
8+
use std::convert::TryFrom;
9+
10+
#[derive(Deserialize)]
11+
pub struct ValidatorMessagesListQuery {
12+
limit: Option<u64>,
13+
}
14+
15+
pub fn extract_params(from_path: &str) -> Result<(Option<ValidatorId>, Vec<String>), DomainError> {
16+
// trim the `/` at the beginning & end if there is one or more
17+
// and split the rest of the string at the `/`
18+
let split: Vec<&str> = from_path.trim_matches('/').split('/').collect();
19+
20+
if split.len() > 2 {
21+
return Err(DomainError::InvalidArgument(
22+
"Too many parameters".to_string(),
23+
));
24+
}
25+
26+
let validator_id = split
27+
.get(0)
28+
// filter an empty string
29+
.filter(|string| !string.is_empty())
30+
// then try to map it to ValidatorId
31+
.map(|string| ValidatorId::try_from(*string))
32+
// Transpose in order to check for an error from the conversion
33+
.transpose()?;
34+
35+
let message_types = split
36+
.get(1)
37+
.filter(|string| !string.is_empty())
38+
.map(|string| string.split('+').map(|s| s.to_string()).collect());
39+
40+
Ok((validator_id, message_types.unwrap_or_default()))
41+
}
42+
43+
pub async fn list_validator_messages<A: Adapter>(
44+
req: Request<Body>,
45+
app: &Application<A>,
46+
validator_id: &Option<ValidatorId>,
47+
message_types: &[String],
48+
) -> Result<Response<Body>, ResponseError> {
49+
let query =
50+
serde_urlencoded::from_str::<ValidatorMessagesListQuery>(&req.uri().query().unwrap_or(""))?;
51+
52+
let channel = req
53+
.extensions()
54+
.get::<Channel>()
55+
.expect("Request should have Channel");
56+
57+
let config_limit = app.config.msgs_find_limit as u64;
58+
let limit = query
59+
.limit
60+
.filter(|n| *n >= 1)
61+
.unwrap_or(config_limit)
62+
.min(config_limit);
63+
64+
let validator_messages =
65+
get_validator_messages(&app.pool, &channel.id, validator_id, message_types, limit).await?;
66+
67+
let response = ValidatorMessageResponse { validator_messages };
68+
69+
Ok(success_response(serde_json::to_string(&response)?))
70+
}

0 commit comments

Comments
 (0)