Skip to content

Commit 6545379

Browse files
committed
fix: add /:id/events route
1 parent 7098d89 commit 6545379

File tree

7 files changed

+113
-78
lines changed

7 files changed

+113
-78
lines changed

sentry/src/access.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ use chrono::Utc;
22
use futures::future::try_join_all;
33
use redis::aio::MultiplexedConnection;
44

5+
use crate::Session;
56
use primitives::event_submission::{RateLimit, Rule};
67
use primitives::sentry::Event;
78
use primitives::Channel;
89
use std::cmp::PartialEq;
910
use std::error::Error;
10-
use crate::Session;
1111
use std::fmt;
1212

1313
#[derive(Debug, PartialEq, Eq)]

sentry/src/db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use std::env;
88
use lazy_static::lazy_static;
99

1010
pub mod analytics;
11-
pub mod event_aggregate;
1211
mod channel;
12+
pub mod event_aggregate;
1313
mod validator_message;
1414

1515
pub use self::channel::*;

sentry/src/db/event_aggregate.rs

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use bb8::RunError;
33
use bb8_postgres::tokio_postgres::types::ToSql;
44
use chrono::{DateTime, Utc};
55
use primitives::sentry::EventAggregate;
6-
use primitives::{ValidatorId, ChannelId};
6+
use primitives::{ChannelId, ValidatorId};
77

88
pub async fn list_event_aggregates(
99
pool: &DbPool,
@@ -56,8 +56,8 @@ pub async fn list_event_aggregates(
5656
pub async fn insert_event_aggregate(
5757
pool: &DbPool,
5858
channel_id: &ChannelId,
59-
event: &EventAggregate
60-
) -> Result<bool, RunError<bb8_postgres::tokio_postgres::Error>> {
59+
event: &EventAggregate,
60+
) -> Result<bool, RunError<bb8_postgres::tokio_postgres::Error>> {
6161
let mut values = Vec::new();
6262
let mut index = 0;
6363
let id = channel_id.to_string();
@@ -70,7 +70,13 @@ pub async fn insert_event_aggregate(
7070
let event_count = value.to_string();
7171
let event_payout = aggr.event_payouts[earner].to_string();
7272

73-
data.extend(vec![id.clone(), event_type.clone(), earner.clone(), event_count, event_payout]);
73+
data.extend(vec![
74+
id.clone(),
75+
event_type.clone(),
76+
earner.clone(),
77+
event_count,
78+
event_payout,
79+
]);
7480
//
7581
// this is a work around for bulk inserts
7682
// rust-postgres does not have native support for bulk inserts
@@ -79,35 +85,40 @@ pub async fn insert_event_aggregate(
7985
// i.e.
8086
// INSERT INTO event_aggregates (_, _) VALUES ($1, $2), ($3, $4), ($5, $6)
8187

82-
values.push(format!("(${}, ${}, ${}, ${}, ${})", index+1, index+2, index+3, index+4, index+5));
88+
values.push(format!(
89+
"(${}, ${}, ${}, ${}, ${})",
90+
index + 1,
91+
index + 2,
92+
index + 3,
93+
index + 4,
94+
index + 5
95+
));
8396
index += 5;
8497
}
8598
}
8699
}
87100

88101
let inserts: Vec<&(dyn ToSql + Sync)> = data.iter().map(|x| x as &(dyn ToSql + Sync)).collect();
89102

90-
91103
// the created field is supplied by postgres Default
92104
let query = format!("INSERT INTO event_aggregates (channel_id, event_type, earner, event_counts, event_payouts) values {}", values.join(" ,"));
93105

94106
let result = pool
95-
.run(
96-
move |connection | {
97-
async move {
98-
match connection.prepare(&query).await {
99-
Ok(stmt) => match connection.execute(&stmt, &inserts.as_slice()).await {
100-
Ok(row) => {
101-
let inserted = row == (index / 5);
102-
Ok((inserted, connection))
103-
},
104-
Err(e) => Err((e, connection)),
105-
},
107+
.run(move |connection| {
108+
async move {
109+
match connection.prepare(&query).await {
110+
Ok(stmt) => match connection.execute(&stmt, &inserts.as_slice()).await {
111+
Ok(row) => {
112+
let inserted = row == (index / 5);
113+
Ok((inserted, connection))
114+
}
106115
Err(e) => Err((e, connection)),
107-
}
116+
},
117+
Err(e) => Err((e, connection)),
108118
}
109119
}
110-
).await?;
120+
})
121+
.await?;
111122

112123
Ok(result)
113124
}

sentry/src/event_aggregator.rs

Lines changed: 54 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,25 @@
1+
use crate::access::check_access;
2+
use crate::db::event_aggregate::insert_event_aggregate;
3+
use crate::db::DbPool;
14
use crate::event_reducer;
2-
use primitives::{ChannelId, Channel};
5+
use crate::Application;
6+
use crate::ResponseError;
7+
use crate::Session;
8+
use chrono::{Duration, Utc};
39
use primitives::adapter::Adapter;
4-
use primitives::sentry::{EventAggregate, Event};
10+
use primitives::sentry::{Event, EventAggregate};
11+
use primitives::{Channel, ChannelId};
512
use std::collections::HashMap;
6-
use crate::db::event_aggregate::insert_event_aggregate;
7-
use crate::Session;
8-
use crate::access::check_access;
9-
use crate::Application;
10-
use chrono::{Utc, Duration};
11-
use crate::db::DbPool;
12-
use tokio::time::{delay_for};
1313
use std::time::Duration as TimeDuration;
14-
use crate::ResponseError;
14+
use tokio::time::delay_for;
1515

1616
// use futures::
1717
use async_std::sync::RwLock;
18-
use std::sync::{Arc};
18+
use std::sync::Arc;
1919

2020
#[derive(Default, Clone)]
2121
pub struct EventAggregator {
22-
aggregate: Arc<RwLock<HashMap<String, EventAggregate>>>
22+
aggregate: Arc<RwLock<HashMap<String, EventAggregate>>>,
2323
}
2424

2525
pub fn new_aggr(channel_id: &ChannelId) -> EventAggregate {
@@ -30,7 +30,11 @@ pub fn new_aggr(channel_id: &ChannelId) -> EventAggregate {
3030
}
3131
}
3232

33-
async fn store(db: &DbPool, channel_id: &ChannelId, aggr: Arc<RwLock<HashMap<String, EventAggregate>>>) {
33+
async fn store(
34+
db: &DbPool,
35+
channel_id: &ChannelId,
36+
aggr: Arc<RwLock<HashMap<String, EventAggregate>>>,
37+
) {
3438
let mut recorder = aggr.write().await;
3539
let ev_aggr: Option<&EventAggregate> = recorder.get(&channel_id.to_string());
3640
if let Some(data) = ev_aggr {
@@ -44,58 +48,67 @@ async fn store(db: &DbPool, channel_id: &ChannelId, aggr: Arc<RwLock<HashMap<Str
4448
}
4549

4650
impl EventAggregator {
47-
pub async fn record<'a, A: Adapter + 'static>(
48-
&'static self,
49-
app: &'static Application<A>,
51+
pub async fn record<'a, A: Adapter>(
52+
&self,
53+
app: &'a Application<A>,
5054
channel: Channel,
5155
session: Session,
5256
events: &'a [Event],
53-
) -> Result<(), ResponseError>
54-
{
55-
let has_access = check_access(&app.redis, &session, &app.config.ip_rate_limit, &channel, events).await;
57+
) -> Result<(), ResponseError> {
58+
let has_access = check_access(
59+
&app.redis,
60+
&session,
61+
&app.config.ip_rate_limit,
62+
&channel,
63+
events,
64+
)
65+
.await;
5666
if let Err(e) = has_access {
5767
return Err(ResponseError::BadRequest(e.to_string()));
5868
}
5969

6070
let mut recorder = self.aggregate.write().await;
61-
let mut aggr: &mut EventAggregate = if let Some(aggr) = recorder.get_mut(&channel.id.to_string()) {
62-
aggr
63-
} else {
64-
// insert into
65-
recorder.insert(channel.id.to_string(), new_aggr(&channel.id));
66-
recorder.get_mut(&channel.id.to_string()).expect("should have aggr, we just inserted")
67-
};
71+
let mut aggr: &mut EventAggregate =
72+
if let Some(aggr) = recorder.get_mut(&channel.id.to_string()) {
73+
aggr
74+
} else {
75+
// insert into
76+
recorder.insert(channel.id.to_string(), new_aggr(&channel.id));
77+
recorder
78+
.get_mut(&channel.id.to_string())
79+
.expect("should have aggr, we just inserted")
80+
};
6881

6982
// if aggr is none
70-
events.iter().for_each( | ev| event_reducer::reduce(&channel, &mut aggr, ev));
83+
events
84+
.iter()
85+
.for_each(|ev| event_reducer::reduce(&channel, &mut aggr, ev));
7186
let created = aggr.created;
87+
let dbpool = app.pool.clone();
88+
let aggr_throttle = app.config.aggr_throttle;
89+
let aggregate = self.aggregate.clone();
7290

7391
// drop write access to RwLock
7492
// this is required to prevent a deadlock in store
7593
drop(recorder);
7694

7795
// Checks if aggr_throttle is set
7896
// and if current time is greater than aggr.created plus throttle seconds
79-
//
97+
//
8098
// This approach spawns an async task every > AGGR_THROTTLE seconds
8199
// Each spawned task resolves after AGGR_THROTTLE seconds
82-
//
100+
//
83101

84-
if app.config.aggr_throttle > 0
85-
&&
86-
Utc::now() > (created + Duration::seconds(app.config.aggr_throttle as i64))
87-
{
102+
if aggr_throttle > 0 && Utc::now() > (created + Duration::seconds(aggr_throttle as i64)) {
88103
// spawn a tokio task for saving to database
89-
tokio::spawn(
90-
async move {
91-
delay_for(TimeDuration::from_secs(app.config.aggr_throttle as u64)).await;
92-
store(&app.pool.clone(), &channel.id, self.aggregate.clone()).await;
93-
}
94-
);
104+
tokio::spawn(async move {
105+
delay_for(TimeDuration::from_secs(aggr_throttle as u64)).await;
106+
store(&dbpool, &channel.id, aggregate).await;
107+
});
95108
} else {
96-
store(&app.pool, &channel.id, self.aggregate.clone()).await;
109+
store(&app.pool, &channel.id, aggregate).await;
97110
}
98111

99112
Ok(())
100113
}
101-
}
114+
}

sentry/src/event_reducer.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use primitives::sentry::{AggregateEvents, Event, EventAggregate};
2-
use primitives::{Channel};
3-
2+
use primitives::Channel;
43

54
// @TODO: Remove attribute once we use this function!
65
#[allow(dead_code)]

sentry/src/lib.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
use crate::chain::chain;
55
use crate::db::DbPool;
6+
use crate::event_aggregator::EventAggregator;
67
use crate::middleware::auth;
78
use crate::middleware::channel::channel_load;
89
use crate::middleware::cors::{cors, Cors};
@@ -18,11 +19,9 @@ use redis::aio::MultiplexedConnection;
1819
use regex::Regex;
1920
use routes::analytics::{advertiser_analytics, analytics, publisher_analytics};
2021
use routes::cfg::config;
21-
use routes::channel::{channel_list, create_channel, last_approved};
22+
use routes::channel::{channel_list, create_channel, insert_events, last_approved};
2223
use slog::{error, Logger};
2324
use std::collections::HashMap;
24-
use crate::event_aggregator::EventAggregator;
25-
2625

2726
pub mod middleware {
2827
pub mod auth;
@@ -41,9 +40,8 @@ pub mod routes {
4140
pub mod access;
4241
mod chain;
4342
pub mod db;
44-
pub mod event_reducer;
4543
pub mod event_aggregator;
46-
44+
pub mod event_reducer;
4745

4846
lazy_static! {
4947
static ref CHANNEL_GET_BY_ID: Regex =
@@ -269,9 +267,23 @@ async fn channels_router<A: Adapter + 'static>(
269267
let req = chain(req, app, vec![Box::new(channel_load)]).await?;
270268

271269
list_channel_event_aggregates(req, app).await
270+
} else if let (Some(caps), &Method::POST) =
271+
(CREATE_EVENTS_BY_CHANNEL_ID.captures(&path), method)
272+
{
273+
if req.extensions().get::<Session>().is_none() {
274+
return Err(ResponseError::Unauthorized);
275+
}
276+
277+
let param = RouteParams(vec![caps
278+
.get(1)
279+
.map_or("".to_string(), |m| m.as_str().to_string())]);
280+
281+
req.extensions_mut().insert(param);
282+
283+
let req = chain(req, app, vec![Box::new(channel_load)]).await?;
284+
285+
insert_events(req, app).await
272286
} else {
273-
// else if let (Some(caps), &Method::POST) = (CREATE_EVENTS_BY_CHANNEL_ID.captures(&path) ,method) {
274-
//} else {
275287
Err(ResponseError::NotFound)
276288
}
277289
}

sentry/src/routes/channel.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::Session;
88
use hex::FromHex;
99
use hyper::{Body, Request, Response};
1010
use primitives::adapter::Adapter;
11-
use primitives::sentry::{SuccessResponse, Event};
11+
use primitives::sentry::{Event, SuccessResponse};
1212
use primitives::{Channel, ChannelId};
1313
use slog::error;
1414

@@ -103,9 +103,9 @@ pub async fn last_approved<A: Adapter>(
103103
.unwrap())
104104
}
105105

106-
pub async fn events<A: Adapter>(
106+
pub async fn insert_events<A: Adapter + 'static>(
107107
req: Request<Body>,
108-
app: &'static Application<A>,
108+
app: &Application<A>,
109109
) -> Result<Response<Body>, ResponseError> {
110110
let session = req
111111
.extensions()
@@ -122,14 +122,14 @@ pub async fn events<A: Adapter>(
122122
let body = hyper::body::to_bytes(into_body).await?;
123123
let events = serde_json::from_slice::<Vec<Event>>(&body)?;
124124

125-
126-
app.event_aggregator.record(app, channel, session, &events.as_slice()).await?;
125+
app.event_aggregator
126+
.record(app, channel, session, &events.as_slice())
127+
.await?;
127128

128129
Ok(Response::builder()
129130
.header("Content-type", "application/json")
130-
.body(serde_json::to_string(&SuccessResponse { success: true})?.into())
131+
.body(serde_json::to_string(&SuccessResponse { success: true })?.into())
131132
.unwrap())
132-
133133
}
134134

135135
mod channel_list {

0 commit comments

Comments
 (0)