Skip to content

Commit ac7449b

Browse files
committed
add: /:id/validator-messages POST route
1 parent 8a93cc3 commit ac7449b

File tree

4 files changed

+64
-34
lines changed

4 files changed

+64
-34
lines changed

sentry/src/chain.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,21 @@ use primitives::adapter::Adapter;
77
//
88
// function signature
99
// fn middleware(mut req: Request) -> Result<Request, ResponseError>
10-
11-
pub async fn chain<'a, A: Adapter + 'static, M>(
10+
#[allow(clippy::type_complexity)]
11+
pub async fn chain<'a, A: Adapter + 'static>(
1212
req: Request<Body>,
1313
app: &'a Application<A>,
14-
middlewares: Vec<M>,
15-
) -> Result<Request<Body>, ResponseError>
16-
where
17-
M: FnMut(
18-
Request<Body>,
19-
&'a Application<A>,
20-
) -> BoxFuture<'a, Result<Request<Body>, ResponseError>>
21-
+ 'static,
22-
{
14+
middlewares: Vec<
15+
Box<
16+
dyn FnMut(
17+
Request<Body>,
18+
&'a Application<A>,
19+
) -> BoxFuture<'a, Result<Request<Body>, ResponseError>>
20+
+ 'static
21+
+ Send,
22+
>,
23+
>,
24+
) -> Result<Request<Body>, ResponseError> {
2325
let mut req = Ok(req);
2426

2527
for mut mw in middlewares.into_iter() {

sentry/src/db/channel.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::db::DbPool;
22
use bb8::RunError;
3-
use primitives::{Channel, ChannelId, ValidatorId,};
3+
use chrono::Utc;
44
use primitives::validator::MessageTypes;
5+
use primitives::{Channel, ChannelId, ValidatorId};
56
use std::str::FromStr;
6-
use chrono::Utc;
77

88
pub use list_channels::list_channels;
99

@@ -72,17 +72,16 @@ pub async fn insert_channel(
7272
.await
7373
}
7474

75-
7675
pub async fn insert_validator_messages(
7776
pool: &DbPool,
7877
channel: &Channel,
7978
from: &ValidatorId,
80-
validator_message: &MessageTypes
79+
validator_message: &MessageTypes,
8180
) -> Result<bool, RunError<bb8_postgres::tokio_postgres::Error>> {
8281
pool
8382
.run(move | connection| {
8483
async move {
85-
match connection.prepare("INSERT INTO validator_messages(channel_id, from, msg, received) values ($1, $2, $3, $4)").await {
84+
match connection.prepare("INSERT INTO validator_messages (channel_id, \"from\", msg, received) values ($1, $2, $3, $4)").await {
8685
Ok(stmt) => match connection.execute(&stmt, &[&channel.id, &from, &validator_message, &Utc::now()]).await {
8786
Ok(row) => {
8887
let inserted = row == 1;

sentry/src/lib.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ use redis::aio::MultiplexedConnection;
1919
use regex::Regex;
2020
use routes::analytics::{advertiser_analytics, analytics, publisher_analytics};
2121
use routes::cfg::config;
22-
use routes::channel::{channel_list, create_channel, insert_events, last_approved};
22+
use routes::channel::{
23+
channel_list, create_channel, create_validator_messages, insert_events, last_approved,
24+
};
2325
use slog::{error, Logger};
2426
use std::collections::HashMap;
2527

@@ -54,6 +56,7 @@ lazy_static! {
5456
static ref ANALYTICS_BY_CHANNEL_ID: Regex = Regex::new(r"^/analytics/0x([a-zA-Z0-9]{64})/?$").expect("The regex should be valid");
5557
static ref ADVERTISER_ANALYTICS_BY_CHANNEL_ID: Regex = Regex::new(r"^/analytics/for-advertiser/0x([a-zA-Z0-9]{64})/?$").expect("The regex should be valid");
5658
static ref CREATE_EVENTS_BY_CHANNEL_ID: Regex = Regex::new(r"^/channel/0x([a-zA-Z0-9]{64})/events(/.*)?$").expect("The regex should be valid");
59+
5760
}
5861

5962
fn auth_required_middleware<'a, A: Adapter>(
@@ -251,6 +254,28 @@ async fn channels_router<A: Adapter + 'static>(
251254
};
252255

253256
list_validator_messages(req, &app, &extract_params.0, &extract_params.1).await
257+
} else if let (Some(caps), &Method::POST) = (CHANNEL_VALIDATOR_MESSAGES.captures(&path), method)
258+
{
259+
let param = RouteParams(vec![caps
260+
.get(1)
261+
.map_or("".to_string(), |m| m.as_str().to_string())]);
262+
263+
req.extensions_mut().insert(param);
264+
265+
let req = match chain(
266+
req,
267+
app,
268+
vec![Box::new(auth_required_middleware), Box::new(channel_load)],
269+
)
270+
.await
271+
{
272+
Ok(req) => req,
273+
Err(error) => {
274+
return Err(error);
275+
}
276+
};
277+
278+
create_validator_messages(req, &app).await
254279
} else if let (Some(caps), &Method::GET) = (CHANNEL_EVENTS_AGGREGATES.captures(&path), method) {
255280
if req.extensions().get::<Session>().is_none() {
256281
return Err(ResponseError::Unauthorized);

sentry/src/routes/channel.rs

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,20 @@
11
use self::channel_list::ChannelListQuery;
2-
use crate::db::{get_channel_by_id, insert_channel, list_channels, insert_validator_messages};
2+
use crate::db::{get_channel_by_id, insert_channel, insert_validator_messages, list_channels};
33
use crate::success_response;
44
use crate::Application;
55
use crate::ResponseError;
66
use crate::RouteParams;
77
use crate::Session;
8+
use futures::future::try_join_all;
89
use hex::FromHex;
910
use hyper::{Body, Request, Response};
1011
use primitives::adapter::Adapter;
12+
use primitives::channel::SpecValidator;
1113
use primitives::sentry::{Event, SuccessResponse};
14+
use primitives::validator::MessageTypes;
1215
use primitives::{Channel, ChannelId};
1316
use slog::error;
1417
use std::collections::HashMap;
15-
use primitives::channel::SpecValidator;
16-
use primitives::validator::MessageTypes;
17-
use futures::future::try_join_all;
18-
1918

2019
pub async fn channel_status<A: Adapter>(
2120
req: Request<Body>,
@@ -142,9 +141,9 @@ pub async fn insert_events<A: Adapter + 'static>(
142141
.unwrap())
143142
}
144143

145-
pub async fn validator_messages<A: Adapter + 'static>(
144+
pub async fn create_validator_messages<A: Adapter + 'static>(
146145
req: Request<Body>,
147-
app: &Application<A>
146+
app: &Application<A>,
148147
) -> Result<Response<Body>, ResponseError> {
149148
let session = req
150149
.extensions()
@@ -157,22 +156,27 @@ pub async fn validator_messages<A: Adapter + 'static>(
157156
.get::<Channel>()
158157
.expect("Request should have Channel")
159158
.to_owned();
160-
159+
161160
let into_body = req.into_body();
162161
let body = hyper::body::to_bytes(into_body).await?;
163-
let messages = serde_json::from_slice::<Vec<MessageTypes>>(&body)?;
164-
162+
let request_body = serde_json::from_slice::<HashMap<String, Vec<MessageTypes>>>(&body)?;
163+
let messages = request_body
164+
.get("messages")
165+
.ok_or_else(|| ResponseError::BadRequest("missing messages body".to_string()))?;
166+
165167
match channel.spec.validators.find(&session.uid) {
166168
SpecValidator::None => Err(ResponseError::Unauthorized),
167-
_ => {
168-
try_join_all(messages.iter().map(
169-
|message| insert_validator_messages(&app.pool, &channel, &session.uid, &message)
170-
)).await?;
171-
172-
Ok(success_response(serde_json::to_string(&SuccessResponse { success: true })?))
169+
_ => {
170+
try_join_all(messages.iter().map(|message| {
171+
insert_validator_messages(&app.pool, &channel, &session.uid, &message)
172+
}))
173+
.await?;
174+
175+
Ok(success_response(serde_json::to_string(&SuccessResponse {
176+
success: true,
177+
})?))
173178
}
174179
}
175-
176180
}
177181

178182
mod channel_list {

0 commit comments

Comments
 (0)