Skip to content

Commit 29e8930

Browse files
committed
fix: /:id/events POST route
1 parent ef667ee commit 29e8930

File tree

8 files changed

+133
-67
lines changed

8 files changed

+133
-67
lines changed

sentry/src/access.rs

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,39 @@ use primitives::event_submission::{RateLimit, Rule};
66
use primitives::sentry::Event;
77
use primitives::Channel;
88
use std::cmp::PartialEq;
9-
9+
use std::error::Error;
1010
use crate::Session;
11+
use std::fmt;
1112

1213
#[derive(Debug, PartialEq, Eq)]
13-
pub enum Error {
14+
pub enum AccessError {
1415
OnlyCreatorCanCloseChannel,
1516
ChannelIsExpired,
1617
ChannelIsInWithdrawPeriod,
1718
RulesError(String),
1819
}
1920

21+
impl Error for AccessError {}
22+
23+
impl fmt::Display for AccessError {
24+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25+
match self {
26+
AccessError::OnlyCreatorCanCloseChannel => write!(f, "only creator can create channel"),
27+
AccessError::ChannelIsExpired => write!(f, "channel has expired"),
28+
AccessError::ChannelIsInWithdrawPeriod => write!(f, "channel is in withdraw period"),
29+
AccessError::RulesError(error) => write!(f, "{}", error),
30+
}
31+
}
32+
}
33+
2034
// @TODO: Make pub(crate)
2135
pub async fn check_access(
2236
redis: &MultiplexedConnection,
2337
session: &Session,
2438
rate_limit: &RateLimit,
2539
channel: &Channel,
2640
events: &[Event],
27-
) -> Result<(), Error> {
41+
) -> Result<(), AccessError> {
2842
let is_close_event = |e: &Event| match e {
2943
Event::Close => true,
3044
_ => false,
@@ -33,7 +47,7 @@ pub async fn check_access(
3347
let is_in_withdraw_period = current_time > channel.spec.withdraw_period_start;
3448

3549
if current_time > channel.valid_until {
36-
return Err(Error::ChannelIsExpired);
50+
return Err(AccessError::ChannelIsExpired);
3751
}
3852

3953
// We're only sending a CLOSE
@@ -46,11 +60,11 @@ pub async fn check_access(
4660

4761
// Only the creator can send a CLOSE
4862
if session.uid != channel.creator && events.iter().any(is_close_event) {
49-
return Err(Error::OnlyCreatorCanCloseChannel);
63+
return Err(AccessError::OnlyCreatorCanCloseChannel);
5064
}
5165

5266
if is_in_withdraw_period {
53-
return Err(Error::ChannelIsInWithdrawPeriod);
67+
return Err(AccessError::ChannelIsInWithdrawPeriod);
5468
}
5569

5670
let default_rules = [
@@ -92,7 +106,7 @@ pub async fn check_access(
92106
);
93107

94108
if let Err(rule_error) = apply_all_rules.await {
95-
Err(Error::RulesError(rule_error))
109+
Err(AccessError::RulesError(rule_error))
96110
} else {
97111
Ok(())
98112
}
@@ -225,7 +239,7 @@ mod test {
225239
let err_response =
226240
check_access(&redis, &session, &config.ip_rate_limit, &channel, &events).await;
227241
assert_eq!(
228-
Err(Error::RulesError(
242+
Err(AccessError::RulesError(
229243
"rateLimit: too many requests".to_string()
230244
)),
231245
err_response
@@ -261,7 +275,7 @@ mod test {
261275
.await;
262276

263277
assert_eq!(
264-
Err(Error::RulesError(
278+
Err(AccessError::RulesError(
265279
"rateLimit: only allows 1 event".to_string()
266280
)),
267281
err_response

sentry/src/db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use std::env;
88
use lazy_static::lazy_static;
99

1010
pub mod analytics;
11+
pub mod event_aggregate;
1112
mod channel;
12-
mod event_aggregate;
1313
mod validator_message;
1414

1515
pub use self::channel::*;

sentry/src/db/event_aggregate.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::db::DbPool;
22
use bb8::RunError;
33
use bb8_postgres::tokio_postgres::types::ToSql;
4-
use chrono::{DateTime, Utc, Date};
4+
use chrono::{DateTime, Utc};
55
use primitives::sentry::EventAggregate;
66
use primitives::{ValidatorId, ChannelId};
77

@@ -58,35 +58,44 @@ pub async fn insert_event_aggregate(
5858
channel_id: &ChannelId,
5959
event: &EventAggregate
6060
) -> Result<bool, RunError<bb8_postgres::tokio_postgres::Error>> {
61-
62-
let mut inserts : Vec<String> = Vec::new();
6361
let mut values = Vec::new();
6462
let mut index = 0;
6563
let id = channel_id.to_string();
6664

67-
for (event_type, aggr) in event.events {
68-
if let Some(event_counts) = aggr.event_counts {
65+
let mut data: Vec<String> = Vec::new();
66+
67+
for (event_type, aggr) in &event.events {
68+
if let Some(event_counts) = &aggr.event_counts {
6969
for (earner, value) in event_counts {
70-
let data = vec![id.clone(), event_type.clone(), earner, value.to_string(), aggr.event_payouts[&earner].to_string()];
71-
inserts.extend(data);
70+
let event_count = value.to_string();
71+
let event_payout = aggr.event_payouts[earner].to_string();
72+
73+
data.extend(vec![id.clone(), event_type.clone(), earner.clone(), event_count, event_payout]);
7274
//
7375
// this is a work around for bulk inserts
7476
// rust-postgres does not have native support for bulk inserts
7577
// so we have to manually build up a query string dynamically based on
7678
// how many things we want to insert
77-
//
79+
// i.e.
80+
// INSERT INTO event_aggregates (_, _) VALUES ($1, $2), ($3, $4), ($5, $6)
81+
7882
values.push(format!("(${}, ${}, ${}, ${}, ${})", index+1, index+2, index+3, index+4, index+5));
7983
index += 5;
8084
}
8185
}
8286
}
8387

88+
let inserts: Vec<&(dyn ToSql + Sync)> = data.iter().map(|x| x as &(dyn ToSql + Sync)).collect();
89+
90+
8491
// the created field is supplied by postgres Default
85-
pool
92+
let query = format!("INSERT INTO event_aggregates (channel_id, event_type, earner, event_counts, event_payouts) values {}", values.join(" ,"));
93+
94+
let result = pool
8695
.run(
8796
move |connection | {
8897
async move {
89-
match connection.prepare(&format!("INSERT INTO event_aggregates (channel_id, event_type, earner, event_counts, event_payouts) values {}", values.join(" ,"))).await {
98+
match connection.prepare(&query).await {
9099
Ok(stmt) => match connection.execute(&stmt, &inserts.as_slice()).await {
91100
Ok(row) => {
92101
let inserted = row == (index / 5);
@@ -98,5 +107,7 @@ pub async fn insert_event_aggregate(
98107
}
99108
}
100109
}
101-
).await
110+
).await?;
111+
112+
Ok(result)
102113
}

sentry/src/event_aggregator.rs

Lines changed: 63 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,67 +3,99 @@ use primitives::{ChannelId, Channel};
33
use primitives::adapter::Adapter;
44
use primitives::sentry::{EventAggregate, Event};
55
use std::collections::HashMap;
6-
use futures::future::BoxFuture;
6+
use crate::db::event_aggregate::insert_event_aggregate;
77
use crate::Session;
88
use crate::access::check_access;
99
use crate::Application;
1010
use chrono::{Utc, Duration};
11-
use async_std::stream;
11+
use crate::db::DbPool;
12+
use tokio::time::{delay_for};
13+
use std::time::Duration as TimeDuration;
14+
use crate::ResponseError;
1215

1316
// use futures::
1417
use async_std::sync::RwLock;
1518
use std::sync::{Arc};
1619

20+
#[derive(Default, Clone)]
1721
pub struct EventAggregator {
1822
// recorder: HashMap<ChannelId, FnMut(&Session, &str) -> BoxFuture>,
19-
aggregate: Arc<RwLock<HashMap<ChannelId, EventAggregate>>>
23+
aggregate: Arc<RwLock<HashMap<String, EventAggregate>>>
2024
}
2125

22-
fn persist(aggr_throttle: i64, channel_id: ChannelId, aggr: Arc<RwLock<HashMap<ChannelId, EventAggregate>>>) {
23-
let mut interval = stream::interval(Duration::from_secs(aggr_throttle));
24-
while let Some(_) = interval.next().await {
25-
// loop through the keys and persist them in the
26-
// database
26+
pub fn new_aggr(channel_id: &ChannelId) -> EventAggregate {
27+
EventAggregate {
28+
channel_id: channel_id.to_owned(),
29+
created: Utc::now(),
30+
events: HashMap::new(),
31+
}
32+
}
33+
34+
async fn store(db: &DbPool, channel_id: &ChannelId, aggr: Arc<RwLock<HashMap<String, EventAggregate>>>) {
35+
let recorder = aggr.write().await;
36+
let ev_aggr: Option<&EventAggregate> = recorder.get(&channel_id.to_string());
37+
if let Some(data) = ev_aggr {
38+
if let Err(e) = insert_event_aggregate(&db, &channel_id, data).await {
39+
eprintln!("{}", e);
40+
};
2741
}
2842
}
2943

3044
impl EventAggregator {
31-
pub async fn record<A: Adapter>(
32-
&self,
33-
app: &Application<A>,
34-
channel: &Channel,
35-
session: &Session,
36-
events: &[Event]
37-
)
45+
pub async fn record<'a, A: Adapter + 'static>(
46+
&'static self,
47+
app: &'static Application<A>,
48+
channel: Channel,
49+
session: Session,
50+
events: &'a [Event],
51+
) -> Result<(), ResponseError>
3852
{
3953
// eventAggrCol
4054
// channelsCol
4155
// try getting aggr if none create and store new aggr
42-
// redis: &MultiplexedConnection,
43-
// session: &Session,
44-
// rate_limit: &RateLimit,
45-
// channel: &Channel,
46-
// events: &[Event]
47-
let has_access = check_access(&app.redis, session, &app.config.ip_rate_limit, channel, events).await;
48-
if has_access.is_err() {
49-
// return the error
56+
// redis: &MultiplexedConnection,
57+
// session: &Session,
58+
// rate_limit: &RateLimit,
59+
// channel: &Channel,
60+
// events: &[Event]
61+
let has_access = check_access(&app.redis, &session, &app.config.ip_rate_limit, &channel, events).await;
62+
if let Err(e) = has_access {
63+
return Err(ResponseError::BadRequest(e.to_string()));
5064
}
51-
let recorder = self.aggregate.write().await.expect("should acquire lock");
52-
let mut aggr: EventAggregate = *recorder.get_mut(&channel.id);
65+
66+
let mut recorder = self.aggregate.write().await;
67+
let mut aggr: &mut EventAggregate = if let Some(aggr) = recorder.get_mut(&channel.id.to_string()) {
68+
aggr
69+
} else {
70+
// insert into
71+
recorder.insert(channel.id.to_string(), new_aggr(&channel.id));
72+
recorder.get_mut(&channel.id.to_string()).expect("should have aggr, we just inserted")
73+
};
74+
5375
// if aggr is none
5476
// spawn a tokio task for saving to database
55-
events.iter().for_each( | ev| event_reducer::reduce(channel, &mut aggr, ev));
77+
events.iter().for_each( | ev| event_reducer::reduce(&channel, &mut aggr, ev));
78+
let created = aggr.created;
79+
80+
// drop write access to RwLock and mut access to aggr
81+
// we don't need it anymore
82+
drop(recorder);
5683

5784
if app.config.aggr_throttle > 0
5885
&&
59-
Utc::now() < (aggr.created + Duration::seconds(app.config.aggr_throttle as i64))
86+
Utc::now() > (created + Duration::seconds(app.config.aggr_throttle as i64))
6087
{
6188

89+
tokio::spawn(
90+
async move {
91+
delay_for(TimeDuration::from_secs(app.config.aggr_throttle as u64)).await;
92+
store(&app.pool.clone(), &channel.id, self.aggregate.clone()).await;
93+
}
94+
);
95+
} else {
96+
store(&app.pool, &channel.id, self.aggregate.clone()).await;
6297
}
6398

64-
65-
66-
67-
99+
Ok(())
68100
}
69101
}

sentry/src/event_reducer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use primitives::sentry::{AggregateEvents, Event, EventAggregate};
2-
use primitives::{Channel, ValidatorId};
2+
use primitives::{Channel};
3+
34

45
// @TODO: Remove attribute once we use this function!
56
#[allow(dead_code)]

sentry/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ use routes::cfg::config;
2121
use routes::channel::{channel_list, create_channel, last_approved};
2222
use slog::{error, Logger};
2323
use std::collections::HashMap;
24+
use crate::event_aggregator::EventAggregator;
25+
26+
2427
pub mod middleware {
2528
pub mod auth;
2629
pub mod channel;
@@ -89,6 +92,7 @@ pub struct Application<A: Adapter> {
8992
pub redis: MultiplexedConnection,
9093
pub pool: DbPool,
9194
pub config: Config,
95+
pub event_aggregator: EventAggregator,
9296
__secret: (),
9397
}
9498

@@ -106,6 +110,7 @@ impl<A: Adapter + 'static> Application<A> {
106110
logger,
107111
redis,
108112
pool,
113+
event_aggregator: Default::default(),
109114
__secret: (),
110115
}
111116
}

sentry/src/middleware/auth.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ fn get_request_ip(req: &Request<Body>) -> Option<String> {
7575
.get("true-client-ip")
7676
.or_else(|| req.headers().get("x-forwarded-for"))
7777
.and_then(|hv| hv.to_str().map(ToString::to_string).ok())
78+
.map(|token| token.split(',').collect::<Vec<&str>>()[0].to_string())
7879
}
7980

8081
#[cfg(test)]

0 commit comments

Comments
 (0)