Skip to content

Commit bd4044d

Browse files
authored
Merge pull request #209 from samparsky/analytics-route
Analytics route
2 parents 14560fa + 4a07473 commit bd4044d

File tree

10 files changed

+507
-140
lines changed

10 files changed

+507
-140
lines changed

Cargo.lock

Lines changed: 29 additions & 63 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

primitives/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ rand = "^0.6"
3737
# postgres feature
3838
postgres-types = { version = "0.1.0-alpha.2", optional = true }
3939
bytes = { version = "0.5", optional = true }
40-
tokio-postgres = { version = "0.5.0-alpha.2", optional = true, features = ["with-chrono-0_4", "with-serde_json-1"] }
40+
tokio-postgres = { version = "0.5.1", optional = true, features = ["with-chrono-0_4", "with-serde_json-1"] }
4141
# Futures
4242
futures = "0.3.1"
4343
# Other

primitives/src/analytics.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
use crate::DomainError;
2+
use serde::{Deserialize, Serialize};
3+
4+
pub const ANALYTICS_QUERY_LIMIT: u32 = 200;
5+
6+
#[derive(Debug, Serialize, Deserialize)]
7+
pub struct AnalyticsResponse {
8+
time: u32,
9+
value: String,
10+
}
11+
12+
#[cfg(feature = "postgres")]
13+
pub mod postgres {
14+
use super::AnalyticsResponse;
15+
use tokio_postgres::Row;
16+
17+
impl From<&Row> for AnalyticsResponse {
18+
fn from(row: &Row) -> Self {
19+
Self {
20+
time: row.get("time"),
21+
value: row.get("value"),
22+
}
23+
}
24+
}
25+
}
26+
27+
#[derive(Debug, Deserialize)]
28+
pub struct AnalyticsQuery {
29+
#[serde(default = "default_limit")]
30+
pub limit: u32,
31+
#[serde(default = "default_event_type")]
32+
pub event_type: String,
33+
#[serde(default = "default_metric")]
34+
pub metric: String,
35+
#[serde(default = "default_timeframe")]
36+
pub timeframe: String,
37+
}
38+
39+
impl AnalyticsQuery {
40+
pub fn is_valid(&self) -> Result<(), DomainError> {
41+
let valid_event_types = ["IMPRESSION", "CLICK"];
42+
let valid_metric = ["eventPayouts", "eventCounts"];
43+
let valid_timeframe = ["year", "month", "week", "day", "hour"];
44+
45+
if !valid_event_types.contains(&self.event_type.as_str()) {
46+
Err(DomainError::InvalidArgument(format!(
47+
"invalid event_type, possible values are: {}",
48+
valid_event_types.join(" ,")
49+
)))
50+
} else if !valid_metric.contains(&self.metric.as_str()) {
51+
Err(DomainError::InvalidArgument(format!(
52+
"invalid metric, possible values are: {}",
53+
valid_metric.join(" ,")
54+
)))
55+
} else if !valid_timeframe.contains(&self.timeframe.as_str()) {
56+
Err(DomainError::InvalidArgument(format!(
57+
"invalid timeframe, possible values are: {}",
58+
valid_timeframe.join(" ,")
59+
)))
60+
} else if self.limit > ANALYTICS_QUERY_LIMIT {
61+
Err(DomainError::InvalidArgument(format!(
62+
"invalid limit {}, maximum value 200",
63+
self.limit
64+
)))
65+
} else {
66+
Ok(())
67+
}
68+
}
69+
}
70+
71+
fn default_limit() -> u32 {
72+
100
73+
}
74+
75+
fn default_event_type() -> String {
76+
"IMPRESSION".into()
77+
}
78+
79+
fn default_metric() -> String {
80+
"eventCounts".into()
81+
}
82+
83+
fn default_timeframe() -> String {
84+
"hour".into()
85+
}

primitives/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub mod util {
2323

2424
pub mod logging;
2525
}
26+
pub mod analytics;
2627
pub mod validator;
2728

2829
pub use self::ad_unit::AdUnit;
@@ -42,7 +43,10 @@ pub enum DomainError {
4243

4344
impl fmt::Display for DomainError {
4445
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45-
write!(f, "Domain error",)
46+
match self {
47+
DomainError::InvalidArgument(err) => write!(f, "{}", err),
48+
DomainError::RuleViolation(err) => write!(f, "{}", err),
49+
}
4650
}
4751
}
4852

primitives/src/util/tests/prep_db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ lazy_static! {
3535
auth.insert("user".into(), "x8c9v1b2".into());
3636
auth.insert("publisher".into(), "testing".into());
3737
auth.insert("publisher2".into(), "testing2".into());
38-
auth.insert("creator".into(), "awesomeCreator".into());
38+
auth.insert("creator".into(), "0x033ed90e0fec3f3ea1c9b005c724d704501e0196".into());
3939
auth.insert("tester".into(), "AUTH_awesomeTester".into());
4040

4141
auth

sentry/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ hyper = { version = "0.13", features = ["stream"] }
2020
regex = "1"
2121
# Database
2222
redis = { version = "0.13.1-alpha.0", features = ["tokio-rt-core"] }
23-
bb8 = { git = "https://github.com/djc/bb8", branch = "async-await" }
24-
bb8-postgres = { git = "https://github.com/djc/bb8", branch = "async-await", features = ["with-chrono-0_4", "with-serde_json-1"] }
23+
bb8 = { git = "https://github.com/khuey/bb8" }
24+
bb8-postgres = { git = "https://github.com/khuey/bb8", features = ["with-chrono-0_4", "with-serde_json-1"] }
2525
# Migrations
2626
migrant_lib = { version = "0.27", features = ["d-postgres"] }
2727
# Logger

sentry/src/db.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::env;
77

88
use lazy_static::lazy_static;
99

10+
pub mod analytics;
1011
mod channel;
1112
mod validator_message;
1213

sentry/src/db/analytics.rs

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
use crate::db::DbPool;
2+
use crate::Session;
3+
use bb8::RunError;
4+
use chrono::Utc;
5+
use primitives::analytics::{AnalyticsQuery, AnalyticsResponse, ANALYTICS_QUERY_LIMIT};
6+
7+
pub enum AnalyticsType {
8+
Advertiser {
9+
session: Session,
10+
channel: Option<String>,
11+
},
12+
Global,
13+
Publisher {
14+
session: Session,
15+
channel: Option<String>,
16+
},
17+
}
18+
19+
pub async fn get_analytics(
20+
query: AnalyticsQuery,
21+
pool: &DbPool,
22+
analytics_type: AnalyticsType,
23+
) -> Result<Vec<AnalyticsResponse>, RunError<bb8_postgres::tokio_postgres::Error>> {
24+
let applied_limit = query.limit.min(ANALYTICS_QUERY_LIMIT);
25+
let (interval, period) = get_time_frame(&query.timeframe);
26+
let time_limit = Utc::now().timestamp() - period;
27+
28+
let mut where_clauses = vec![format!("created > to_timestamp({})", time_limit)];
29+
30+
let select_query = match analytics_type {
31+
AnalyticsType::Advertiser { session, channel } => {
32+
if let Some(id) = channel {
33+
where_clauses.push(format!("channel_id = {}", id));
34+
} else {
35+
where_clauses.push(format!(
36+
"channel_id IN (SELECT id FROM channels WHERE creator = {})",
37+
session.uid
38+
));
39+
}
40+
41+
where_clauses.push(format!(
42+
"events->'{}'->'{}' IS NOT NULL",
43+
query.event_type, query.metric
44+
));
45+
46+
format!(
47+
"select SUM(value::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates, jsonb_each_text(events->'{}'->'{}')",
48+
interval, query.event_type, query.metric
49+
)
50+
}
51+
AnalyticsType::Global => {
52+
where_clauses.push(format!(
53+
"events->'{}'->'{}' IS NOT NULL",
54+
query.event_type, query.metric
55+
));
56+
format!(
57+
"select SUM(value::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates, jsonb_each_text(events->'{}'->'{}')",
58+
interval, query.event_type, query.metric
59+
)
60+
}
61+
AnalyticsType::Publisher { session, channel } => {
62+
if let Some(id) = channel {
63+
where_clauses.push(format!("channel_id = {}", id));
64+
}
65+
66+
where_clauses.push(format!(
67+
"events->'{}'->'{}'->'{}' IS NOT NULL",
68+
query.event_type, query.metric, session.uid
69+
));
70+
71+
format!(
72+
"select SUM((events->'{}'->'{}'->>'{}')::numeric) as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates",
73+
query.event_type, query.metric, session.uid, interval
74+
)
75+
}
76+
};
77+
78+
let sql_query = format!(
79+
"{} WHERE {} GROUP BY time LIMIT {}",
80+
select_query,
81+
where_clauses.join(" AND "),
82+
applied_limit
83+
);
84+
85+
// execute query
86+
pool.run(move |connection| {
87+
async move {
88+
match connection.prepare(&sql_query).await {
89+
Ok(stmt) => match connection.query(&stmt, &[]).await {
90+
Ok(rows) => {
91+
let analytics: Vec<AnalyticsResponse> =
92+
rows.iter().map(AnalyticsResponse::from).collect();
93+
Ok((analytics, connection))
94+
}
95+
Err(e) => Err((e, connection)),
96+
},
97+
Err(e) => Err((e, connection)),
98+
}
99+
}
100+
})
101+
.await
102+
}
103+
104+
fn get_time_frame(timeframe: &str) -> (i64, i64) {
105+
let minute = 60 * 1000;
106+
let hour = 60 * minute;
107+
let day = 24 * hour;
108+
109+
match timeframe {
110+
"year" => (30 * day, 365 * day),
111+
"month" => (day, 30 * day),
112+
"week" => (6 * hour, 7 * day),
113+
"day" => (hour, day),
114+
"hour" => (minute, hour),
115+
_ => (hour, day),
116+
}
117+
}

0 commit comments

Comments
 (0)