Skip to content

Commit c29fdf6

Browse files
authored
Merge pull request #246 from samparsky/validator-messages
Validator messages
2 parents 1003e75 + f8e26c0 commit c29fdf6

File tree

4 files changed

+107
-13
lines changed

4 files changed

+107
-13
lines changed

sentry/src/chain.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,21 @@ use primitives::adapter::Adapter;
77
//
88
// function signature
99
// fn middleware(mut req: Request) -> Result<Request, ResponseError>
10-
11-
pub async fn chain<'a, A: Adapter + 'static, M>(
10+
#[allow(clippy::type_complexity)]
11+
pub async fn chain<'a, A: Adapter + 'static>(
1212
req: Request<Body>,
1313
app: &'a Application<A>,
14-
middlewares: Vec<M>,
15-
) -> Result<Request<Body>, ResponseError>
16-
where
17-
M: FnMut(
18-
Request<Body>,
19-
&'a Application<A>,
20-
) -> BoxFuture<'a, Result<Request<Body>, ResponseError>>
21-
+ 'static,
22-
{
14+
middlewares: Vec<
15+
Box<
16+
dyn FnMut(
17+
Request<Body>,
18+
&'a Application<A>,
19+
) -> BoxFuture<'a, Result<Request<Body>, ResponseError>>
20+
+ 'static
21+
+ Send,
22+
>,
23+
>,
24+
) -> Result<Request<Body>, ResponseError> {
2325
let mut req = Ok(req);
2426

2527
for mut mw in middlewares.into_iter() {

sentry/src/db/channel.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use crate::db::DbPool;
22
use bb8::RunError;
3+
use chrono::Utc;
4+
use primitives::validator::MessageTypes;
35
use primitives::{Channel, ChannelId, ValidatorId};
46
use std::str::FromStr;
57

@@ -70,6 +72,30 @@ pub async fn insert_channel(
7072
.await
7173
}
7274

75+
pub async fn insert_validator_messages(
76+
pool: &DbPool,
77+
channel: &Channel,
78+
from: &ValidatorId,
79+
validator_message: &MessageTypes,
80+
) -> Result<bool, RunError<bb8_postgres::tokio_postgres::Error>> {
81+
pool
82+
.run(move | connection| {
83+
async move {
84+
match connection.prepare("INSERT INTO validator_messages (channel_id, \"from\", msg, received) values ($1, $2, $3, $4)").await {
85+
Ok(stmt) => match connection.execute(&stmt, &[&channel.id, &from, &validator_message, &Utc::now()]).await {
86+
Ok(row) => {
87+
let inserted = row == 1;
88+
Ok((inserted, connection))
89+
},
90+
Err(e) => Err((e, connection)),
91+
},
92+
Err(e) => Err((e, connection)),
93+
}
94+
}
95+
})
96+
.await
97+
}
98+
7399
mod list_channels {
74100
use crate::db::DbPool;
75101
use bb8::RunError;

sentry/src/lib.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ use redis::aio::MultiplexedConnection;
1919
use regex::Regex;
2020
use routes::analytics::{advertiser_analytics, analytics, publisher_analytics};
2121
use routes::cfg::config;
22-
use routes::channel::{channel_list, create_channel, insert_events, last_approved};
22+
use routes::channel::{
23+
channel_list, create_channel, create_validator_messages, insert_events, last_approved,
24+
};
2325
use slog::{error, Logger};
2426
use std::collections::HashMap;
2527

@@ -54,6 +56,7 @@ lazy_static! {
5456
static ref ANALYTICS_BY_CHANNEL_ID: Regex = Regex::new(r"^/analytics/0x([a-zA-Z0-9]{64})/?$").expect("The regex should be valid");
5557
static ref ADVERTISER_ANALYTICS_BY_CHANNEL_ID: Regex = Regex::new(r"^/analytics/for-advertiser/0x([a-zA-Z0-9]{64})/?$").expect("The regex should be valid");
5658
static ref CREATE_EVENTS_BY_CHANNEL_ID: Regex = Regex::new(r"^/channel/0x([a-zA-Z0-9]{64})/events(/.*)?$").expect("The regex should be valid");
59+
5760
}
5861

5962
fn auth_required_middleware<'a, A: Adapter>(
@@ -251,6 +254,28 @@ async fn channels_router<A: Adapter + 'static>(
251254
};
252255

253256
list_validator_messages(req, &app, &extract_params.0, &extract_params.1).await
257+
} else if let (Some(caps), &Method::POST) = (CHANNEL_VALIDATOR_MESSAGES.captures(&path), method)
258+
{
259+
let param = RouteParams(vec![caps
260+
.get(1)
261+
.map_or("".to_string(), |m| m.as_str().to_string())]);
262+
263+
req.extensions_mut().insert(param);
264+
265+
let req = match chain(
266+
req,
267+
app,
268+
vec![Box::new(auth_required_middleware), Box::new(channel_load)],
269+
)
270+
.await
271+
{
272+
Ok(req) => req,
273+
Err(error) => {
274+
return Err(error);
275+
}
276+
};
277+
278+
create_validator_messages(req, &app).await
254279
} else if let (Some(caps), &Method::GET) = (CHANNEL_EVENTS_AGGREGATES.captures(&path), method) {
255280
if req.extensions().get::<Session>().is_none() {
256281
return Err(ResponseError::Unauthorized);

sentry/src/routes/channel.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
use self::channel_list::ChannelListQuery;
2-
use crate::db::{get_channel_by_id, insert_channel, list_channels};
2+
use crate::db::{get_channel_by_id, insert_channel, insert_validator_messages, list_channels};
33
use crate::success_response;
44
use crate::Application;
55
use crate::ResponseError;
66
use crate::RouteParams;
77
use crate::Session;
8+
use futures::future::try_join_all;
89
use hex::FromHex;
910
use hyper::{Body, Request, Response};
1011
use primitives::adapter::Adapter;
12+
use primitives::channel::SpecValidator;
1113
use primitives::sentry::{Event, SuccessResponse};
14+
use primitives::validator::MessageTypes;
1215
use primitives::{Channel, ChannelId};
1316
use slog::error;
1417
use std::collections::HashMap;
@@ -138,6 +141,44 @@ pub async fn insert_events<A: Adapter + 'static>(
138141
.unwrap())
139142
}
140143

144+
pub async fn create_validator_messages<A: Adapter + 'static>(
145+
req: Request<Body>,
146+
app: &Application<A>,
147+
) -> Result<Response<Body>, ResponseError> {
148+
let session = req
149+
.extensions()
150+
.get::<Session>()
151+
.expect("request session")
152+
.to_owned();
153+
154+
let channel = req
155+
.extensions()
156+
.get::<Channel>()
157+
.expect("Request should have Channel")
158+
.to_owned();
159+
160+
let into_body = req.into_body();
161+
let body = hyper::body::to_bytes(into_body).await?;
162+
let request_body = serde_json::from_slice::<HashMap<String, Vec<MessageTypes>>>(&body)?;
163+
let messages = request_body
164+
.get("messages")
165+
.ok_or_else(|| ResponseError::BadRequest("missing messages body".to_string()))?;
166+
167+
match channel.spec.validators.find(&session.uid) {
168+
SpecValidator::None => Err(ResponseError::Unauthorized),
169+
_ => {
170+
try_join_all(messages.iter().map(|message| {
171+
insert_validator_messages(&app.pool, &channel, &session.uid, &message)
172+
}))
173+
.await?;
174+
175+
Ok(success_response(serde_json::to_string(&SuccessResponse {
176+
success: true,
177+
})?))
178+
}
179+
}
180+
}
181+
141182
mod channel_list {
142183
use chrono::serde::ts_seconds::deserialize as ts_seconds;
143184
use chrono::{DateTime, Utc};

0 commit comments

Comments
 (0)