Skip to content

Commit ef667ee

Browse files
committed
add: db/event_aggregate.rs, route impl
1 parent e330197 commit ef667ee

File tree

7 files changed

+226
-6
lines changed

7 files changed

+226
-6
lines changed

Cargo.lock

Lines changed: 66 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sentry/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ edition = "2018"
77
[dependencies]
88
# Futures
99
futures = "0.3.1"
10+
async-std = "1.4.0"
1011
# Primitives
1112
primitives = { path = "../primitives", features = ["postgres"] }
1213
adapter = { version = "0.1", path = "../adapter" }
@@ -25,7 +26,7 @@ bb8-postgres = { git = "https://github.com/khuey/bb8", features = ["with-chrono-
2526
# Migrations
2627
migrant_lib = { version = "0.27", features = ["d-postgres"] }
2728
# Logger
28-
slog = { version = "^2.2.3" , features = ["max_level_trace"] }
29+
slog = { version = "^2.2.3", features = ["max_level_trace"] }
2930
# Serde
3031
serde = { version = "^1.0", features = ['derive'] }
3132
serde_json = "^1.0"

sentry/migrations/20190806011140_initial-tables/up.sql

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@ CREATE INDEX idx_validator_messages_msg_state_root ON validator_messages ((msg -
2828
CREATE TABLE event_aggregates
2929
(
3030
channel_id VARCHAR(66) NOT NULL REFERENCES channels (id) ON DELETE RESTRICT,
31-
created TIMESTAMP WITH TIME ZONE NOT NULL,
32-
events JSONB NOT NULL
31+
created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
32+
event_type VARCHAR(255) NOT NULL,
33+
earner VARCHAR(255) NOT NULL,
34+
event_counts TEXT NOT NULL,
35+
event_payouts TEXT NOT NULL,
3336
);
3437

3538
CREATE INDEX idx_event_aggregates_created ON event_aggregates (created);

sentry/src/db/event_aggregate.rs

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::db::DbPool;
22
use bb8::RunError;
33
use bb8_postgres::tokio_postgres::types::ToSql;
4-
use chrono::{DateTime, Utc};
4+
use chrono::{DateTime, Utc, Date};
55
use primitives::sentry::EventAggregate;
6-
use primitives::ValidatorId;
6+
use primitives::{ValidatorId, ChannelId};
77

88
pub async fn list_event_aggregates(
99
pool: &DbPool,
@@ -52,3 +52,51 @@ pub async fn list_event_aggregates(
5252

5353
Ok(event_aggregates)
5454
}
55+
56+
pub async fn insert_event_aggregate(
57+
pool: &DbPool,
58+
channel_id: &ChannelId,
59+
event: &EventAggregate
60+
) -> Result<bool, RunError<bb8_postgres::tokio_postgres::Error>> {
61+
62+
let mut inserts : Vec<String> = Vec::new();
63+
let mut values = Vec::new();
64+
let mut index = 0;
65+
let id = channel_id.to_string();
66+
67+
for (event_type, aggr) in event.events {
68+
if let Some(event_counts) = aggr.event_counts {
69+
for (earner, value) in event_counts {
70+
let data = vec![id.clone(), event_type.clone(), earner, value.to_string(), aggr.event_payouts[&earner].to_string()];
71+
inserts.extend(data);
72+
//
73+
// this is a work around for bulk inserts
74+
// rust-postgres does not have native support for bulk inserts
75+
// so we have to manually build up a query string dynamically based on
76+
// how many things we want to insert
77+
//
78+
values.push(format!("(${}, ${}, ${}, ${}, ${})", index+1, index+2, index+3, index+4, index+5));
79+
index += 5;
80+
}
81+
}
82+
}
83+
84+
// the created field is supplied by postgres Default
85+
pool
86+
.run(
87+
move |connection | {
88+
async move {
89+
match connection.prepare(&format!("INSERT INTO event_aggregates (channel_id, event_type, earner, event_counts, event_payouts) values {}", values.join(" ,"))).await {
90+
Ok(stmt) => match connection.execute(&stmt, &inserts.as_slice()).await {
91+
Ok(row) => {
92+
let inserted = row == (index / 5);
93+
Ok((inserted, connection))
94+
},
95+
Err(e) => Err((e, connection)),
96+
},
97+
Err(e) => Err((e, connection)),
98+
}
99+
}
100+
}
101+
).await
102+
}

sentry/src/event_aggregator.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use crate::event_reducer;
2+
use primitives::{ChannelId, Channel};
3+
use primitives::adapter::Adapter;
4+
use primitives::sentry::{EventAggregate, Event};
5+
use std::collections::HashMap;
6+
use futures::future::BoxFuture;
7+
use crate::Session;
8+
use crate::access::check_access;
9+
use crate::Application;
10+
use chrono::{Utc, Duration};
11+
use async_std::stream;
12+
13+
// use futures::
14+
use async_std::sync::RwLock;
15+
use std::sync::{Arc};
16+
17+
pub struct EventAggregator {
18+
// recorder: HashMap<ChannelId, FnMut(&Session, &str) -> BoxFuture>,
19+
aggregate: Arc<RwLock<HashMap<ChannelId, EventAggregate>>>
20+
}
21+
22+
fn persist(aggr_throttle: i64, channel_id: ChannelId, aggr: Arc<RwLock<HashMap<ChannelId, EventAggregate>>>) {
23+
let mut interval = stream::interval(Duration::from_secs(aggr_throttle));
24+
while let Some(_) = interval.next().await {
25+
// loop through the keys and persist them in the
26+
// database
27+
}
28+
}
29+
30+
impl EventAggregator {
31+
pub async fn record<A: Adapter>(
32+
&self,
33+
app: &Application<A>,
34+
channel: &Channel,
35+
session: &Session,
36+
events: &[Event]
37+
)
38+
{
39+
// eventAggrCol
40+
// channelsCol
41+
// try getting aggr if none create and store new aggr
42+
// redis: &MultiplexedConnection,
43+
// session: &Session,
44+
// rate_limit: &RateLimit,
45+
// channel: &Channel,
46+
// events: &[Event]
47+
let has_access = check_access(&app.redis, session, &app.config.ip_rate_limit, channel, events).await;
48+
if has_access.is_err() {
49+
// return the error
50+
}
51+
let recorder = self.aggregate.write().await.expect("should acquire lock");
52+
let mut aggr: EventAggregate = *recorder.get_mut(&channel.id);
53+
// if aggr is none
54+
// spawn a tokio task for saving to database
55+
events.iter().for_each( | ev| event_reducer::reduce(channel, &mut aggr, ev));
56+
57+
if app.config.aggr_throttle > 0
58+
&&
59+
Utc::now() < (aggr.created + Duration::seconds(app.config.aggr_throttle as i64))
60+
{
61+
62+
}
63+
64+
65+
66+
67+
68+
}
69+
}

sentry/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ pub mod access;
3939
mod chain;
4040
pub mod db;
4141
pub mod event_reducer;
42+
pub mod event_aggregator;
43+
4244

4345
lazy_static! {
4446
static ref CHANNEL_GET_BY_ID: Regex =
@@ -50,6 +52,7 @@ lazy_static! {
5052
static ref CHANNEL_EVENTS_AGGREGATES: Regex = Regex::new(r"^/channel/0x([a-zA-Z0-9]{64})/events-aggregates/?$").expect("The regex should be valid");
5153
static ref ANALYTICS_BY_CHANNEL_ID: Regex = Regex::new(r"^/analytics/0x([a-zA-Z0-9]{64})/?$").expect("The regex should be valid");
5254
static ref ADVERTISER_ANALYTICS_BY_CHANNEL_ID: Regex = Regex::new(r"^/analytics/for-advertiser/0x([a-zA-Z0-9]{64})/?$").expect("The regex should be valid");
55+
static ref CREATE_EVENTS_BY_CHANNEL_ID: Regex = Regex::new(r"^/channel/0x([a-zA-Z0-9]{64})/events(/.*)?$").expect("The regex should be valid");
5356
}
5457

5558
fn auth_required_middleware<'a, A: Adapter>(
@@ -262,6 +265,8 @@ async fn channels_router<A: Adapter + 'static>(
262265

263266
list_channel_event_aggregates(req, app).await
264267
} else {
268+
// else if let (Some(caps), &Method::POST) = (CREATE_EVENTS_BY_CHANNEL_ID.captures(&path) ,method) {
269+
//} else {
265270
Err(ResponseError::NotFound)
266271
}
267272
}

sentry/src/routes/channel.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::RouteParams;
77
use hex::FromHex;
88
use hyper::{Body, Request, Response};
99
use primitives::adapter::Adapter;
10-
use primitives::sentry::SuccessResponse;
10+
use primitives::sentry::{SuccessResponse, Event};
1111
use primitives::{Channel, ChannelId};
1212
use slog::error;
1313

@@ -102,6 +102,34 @@ pub async fn last_approved<A: Adapter>(
102102
.unwrap())
103103
}
104104

105+
pub async fn events<A: Adapter>(
106+
req: Request<Body>,
107+
app: &Application<A>,
108+
) -> Result<Response<Body>, ResponseError> {
109+
let ip = if let Some(xforwardedfor) = req.headers().get("x-forwarded-for") {
110+
let ip: Vec<&str> = xforwardedfor.to_str()?.split(',').collect();
111+
Some(ip[0])
112+
} else if let Some(trueip) = req.headers().get("true-client-ip") {
113+
Some(trueip.to_str()?)
114+
} else {
115+
None
116+
};
117+
118+
let body = hyper::body::to_bytes(req.into_body()).await?;
119+
let events = serde_json::from_slice::<Vec<Event>>(&body)?;
120+
121+
122+
123+
124+
125+
126+
Ok(Response::builder()
127+
.header("Content-type", "application/json")
128+
.body(serde_json::to_string(&events)?.into())
129+
.unwrap())
130+
131+
}
132+
105133
mod channel_list {
106134
use chrono::serde::ts_seconds::deserialize as ts_seconds;
107135
use chrono::{DateTime, Utc};

0 commit comments

Comments
 (0)