Skip to content

Commit 8a93cc3

Browse files
committed
add: /:id/validator-messages route impl
1 parent 8efc08c commit 8a93cc3

File tree

2 files changed

+66
-2
lines changed

2 files changed

+66
-2
lines changed

sentry/src/db/channel.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use crate::db::DbPool;
22
use bb8::RunError;
3-
use primitives::{Channel, ChannelId, ValidatorId};
3+
use primitives::{Channel, ChannelId, ValidatorId,};
4+
use primitives::validator::MessageTypes;
45
use std::str::FromStr;
6+
use chrono::Utc;
57

68
pub use list_channels::list_channels;
79

@@ -70,6 +72,31 @@ pub async fn insert_channel(
7072
.await
7173
}
7274

75+
76+
pub async fn insert_validator_messages(
77+
pool: &DbPool,
78+
channel: &Channel,
79+
from: &ValidatorId,
80+
validator_message: &MessageTypes
81+
) -> Result<bool, RunError<bb8_postgres::tokio_postgres::Error>> {
82+
pool
83+
.run(move | connection| {
84+
async move {
85+
match connection.prepare("INSERT INTO validator_messages(channel_id, from, msg, received) values ($1, $2, $3, $4)").await {
86+
Ok(stmt) => match connection.execute(&stmt, &[&channel.id, &from, &validator_message, &Utc::now()]).await {
87+
Ok(row) => {
88+
let inserted = row == 1;
89+
Ok((inserted, connection))
90+
},
91+
Err(e) => Err((e, connection)),
92+
},
93+
Err(e) => Err((e, connection)),
94+
}
95+
}
96+
})
97+
.await
98+
}
99+
73100
mod list_channels {
74101
use crate::db::DbPool;
75102
use bb8::RunError;

sentry/src/routes/channel.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use self::channel_list::ChannelListQuery;
2-
use crate::db::{get_channel_by_id, insert_channel, list_channels};
2+
use crate::db::{get_channel_by_id, insert_channel, list_channels, insert_validator_messages};
33
use crate::success_response;
44
use crate::Application;
55
use crate::ResponseError;
@@ -12,6 +12,10 @@ use primitives::sentry::{Event, SuccessResponse};
1212
use primitives::{Channel, ChannelId};
1313
use slog::error;
1414
use std::collections::HashMap;
15+
use primitives::channel::SpecValidator;
16+
use primitives::validator::MessageTypes;
17+
use futures::future::try_join_all;
18+
1519

1620
pub async fn channel_status<A: Adapter>(
1721
req: Request<Body>,
@@ -138,6 +142,39 @@ pub async fn insert_events<A: Adapter + 'static>(
138142
.unwrap())
139143
}
140144

145+
pub async fn validator_messages<A: Adapter + 'static>(
146+
req: Request<Body>,
147+
app: &Application<A>
148+
) -> Result<Response<Body>, ResponseError> {
149+
let session = req
150+
.extensions()
151+
.get::<Session>()
152+
.expect("request session")
153+
.to_owned();
154+
155+
let channel = req
156+
.extensions()
157+
.get::<Channel>()
158+
.expect("Request should have Channel")
159+
.to_owned();
160+
161+
let into_body = req.into_body();
162+
let body = hyper::body::to_bytes(into_body).await?;
163+
let messages = serde_json::from_slice::<Vec<MessageTypes>>(&body)?;
164+
165+
match channel.spec.validators.find(&session.uid) {
166+
SpecValidator::None => Err(ResponseError::Unauthorized),
167+
_ => {
168+
try_join_all(messages.iter().map(
169+
|message| insert_validator_messages(&app.pool, &channel, &session.uid, &message)
170+
)).await?;
171+
172+
Ok(success_response(serde_json::to_string(&SuccessResponse { success: true })?))
173+
}
174+
}
175+
176+
}
177+
141178
mod channel_list {
142179
use chrono::serde::ts_seconds::deserialize as ts_seconds;
143180
use chrono::{DateTime, Utc};

0 commit comments

Comments
 (0)