Skip to content

Commit b15cfaa

Browse files
committed
fix: refactor analytics.rs
1 parent a3b2e73 commit b15cfaa

File tree

6 files changed

+194
-155
lines changed

6 files changed

+194
-155
lines changed

primitives/src/analytics.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
use serde::{Deserialize, Serialize};
2+
3+
4+
#[derive(Debug, Deserialize)]
5+
pub struct AnalyticsQuery {
6+
#[serde(default = "default_limit")]
7+
pub limit: u32,
8+
#[serde(default = "default_event_type")]
9+
pub event_type: String,
10+
#[serde(default = "default_metric")]
11+
pub metric: String,
12+
#[serde(default = "default_timeframe")]
13+
pub timeframe: String,
14+
}
15+
16+
#[derive(Debug, Serialize, Deserialize)]
17+
pub struct AnalyticsResponse {
18+
time: u32,
19+
value: String,
20+
}
21+
22+
#[cfg(feature = "postgres")]
23+
pub mod postgres {
24+
use tokio_postgres::Row;
25+
26+
impl From<&Row> for AnalyticsResponse {
27+
fn from(row: &Row) -> Self {
28+
Self {
29+
time: row.get("time"),
30+
value: row.get("value"),
31+
}
32+
}
33+
}
34+
}
35+
36+
impl AnalyticsQuery {
37+
pub fn is_valid(&self) -> Result<(), String> {
38+
let valid_event_types = ["IMPRESSION"];
39+
let valid_metric = ["eventPayouts", "eventCounts"];
40+
let valid_timeframe = ["year", "month", "week", "day", "hour"];
41+
42+
if !valid_event_types.iter().any(|e| *e == &self.event_type[..]) {
43+
Err(format!(
44+
"invalid event_type, possible values are: {}",
45+
valid_event_types.join(" ,")
46+
))
47+
} else if !valid_metric.iter().any(|e| *e == &self.metric[..]) {
48+
Err(format!(
49+
"invalid metric, possible values are: {}",
50+
valid_metric.join(" ,")
51+
))
52+
} else if !valid_timeframe.iter().any(|e| *e == &self.timeframe[..]) {
53+
Err(format!(
54+
"invalid timeframe, possible values are: {}",
55+
valid_timeframe.join(" ,")
56+
))
57+
} else {
58+
Ok(())
59+
}
60+
}
61+
}
62+
63+
fn default_limit() -> u32 {
64+
100
65+
}
66+
67+
fn default_event_type() -> String {
68+
"IMPRESSION".into()
69+
}
70+
71+
fn default_metric() -> String {
72+
"eventCounts".into()
73+
}
74+
75+
fn default_timeframe() -> String {
76+
"hour".into()
77+
}

primitives/src/channel.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ pub mod postgres {
239239
use hex::FromHex;
240240
use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, Json, ToSql, Type};
241241
use std::error::Error;
242-
use tokio_postgres::Row;
242+
243243

244244
impl From<&Row> for Channel {
245245
fn from(row: &Row) -> Self {

primitives/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub mod util {
2424
pub mod logging;
2525
}
2626
pub mod validator;
27+
pub mod analytics;
2728

2829
pub use self::ad_unit::AdUnit;
2930
pub use self::balances_map::BalancesMap;

sentry/src/db.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ mod validator_message;
1313

1414
pub mod channel;
1515
// mod channel;
16+
pub mod analytics;
1617

1718
pub use self::channel::*;
1819
pub use self::validator_message::*;

sentry/src/db/analytics.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
use hyper::{Body, Request, Response};
2+
use crate::db::DbPool;
3+
use bb8::RunError;
4+
use primitives::{Channel, ChannelId, ValidatorId};
5+
use primitives::analytics::{AnalyticsResponse, AnalyticsQuery};
6+
use crate::RouteParams;
7+
use crate::Session;
8+
use crate::ResponseError;
9+
use chrono::Utc;
10+
11+
pub async fn get_analytics(
12+
query: AnalyticsQuery,
13+
route_params: Option<&RouteParams>,
14+
sess: Option<&Session>,
15+
pool: &DbPool,
16+
is_advertiser: bool,
17+
skip_publisher_filter: bool,
18+
) -> Result<Vec<AnalyticsResponse>, ResponseError> {
19+
let applied_limit = query.limit.min(200);
20+
let (interval, period) = get_time_frame(&query.timeframe);
21+
let time_limit = Utc::now().timestamp() - period;
22+
23+
let mut where_clauses = vec![format!("created > to_timestamp({})", time_limit)];
24+
25+
if is_advertiser {
26+
match route_params {
27+
Some(params) => where_clauses.push(format!("channel_id IN ({})", params.index(0))),
28+
None => where_clauses.push(format!(
29+
"channel_id IN (SELECT id FROM channels WHERE creator = {})",
30+
sess.unwrap().uid.to_string()
31+
)),
32+
};
33+
} else if let Some(params) = route_params {
34+
if let Some(id) = params.get(0) {
35+
where_clauses.push(format!("channel_id = {}", id));
36+
};
37+
}
38+
39+
let select_query = match (skip_publisher_filter, sess) {
40+
(false, Some(session)) => {
41+
where_clauses.push(format!(
42+
"events->'{}'->'{}'->'{}' IS NOT NULL",
43+
query.event_type, query.metric, session.uid
44+
));
45+
format!(
46+
"select SUM((events->'{}'->'{}'->>'{}')::numeric) as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates",
47+
query.event_type, query.metric, session.uid, interval
48+
)
49+
}
50+
_ => {
51+
where_clauses.push(format!(
52+
"events->'{}'->'{}' IS NOT NULL",
53+
query.event_type, query.metric
54+
));
55+
format!(
56+
"select SUM(value::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates, jsonb_each_text(events->'{}'->'{}')",
57+
interval, query.event_type, query.metric
58+
)
59+
}
60+
};
61+
62+
let sql_query = format!(
63+
"{} WHERE {} GROUP BY time LIMIT {}",
64+
select_query,
65+
where_clauses.join(" AND "),
66+
applied_limit
67+
);
68+
69+
// execute query
70+
pool
71+
.run(move |connection| {
72+
async move {
73+
match connection.prepare(&sql_query).await {
74+
Ok(stmt) => match connection.query(&stmt, &[]).await {
75+
Ok(rows) => {
76+
let analytics: Vec<AnalyticsResponse> =
77+
rows.iter().map(AnalyticsResponse::from).collect();
78+
Ok((analytics, connection))
79+
}
80+
Err(e) => Err((e, connection)),
81+
},
82+
Err(e) => Err((e, connection)),
83+
}
84+
}
85+
})
86+
.await;
87+
}
88+
89+
90+
fn get_time_frame(timeframe: &str) -> (i64, i64) {
91+
let minute = 60 * 1000;
92+
let hour = 60 * minute;
93+
let day = 24 * hour;
94+
95+
match timeframe {
96+
"year" => (30 * day, 365 * day),
97+
"month" => (day, 30 * day),
98+
"week" => (6 * hour, 7 * day),
99+
"day" => (hour, day),
100+
"hour" => (minute, hour),
101+
_ => (hour, day),
102+
}
103+
}

sentry/src/routes/analytics.rs

Lines changed: 11 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -4,82 +4,12 @@ use crate::ResponseError;
44
use crate::RouteParams;
55
use crate::Session;
66
use bb8_postgres::tokio_postgres::Row;
7-
use chrono::Utc;
87
use hyper::{Body, Request, Response};
98
use primitives::adapter::Adapter;
109
use redis::aio::MultiplexedConnection;
1110
use serde::{Deserialize, Serialize};
12-
use std::cmp;
11+
use crate::db::analytics::get_analytics;
1312

14-
#[derive(Debug, Serialize, Deserialize)]
15-
pub(crate) struct AnalyticsResponse {
16-
time: u32,
17-
value: String,
18-
}
19-
20-
impl From<&Row> for AnalyticsResponse {
21-
fn from(row: &Row) -> Self {
22-
Self {
23-
time: row.get("time"),
24-
value: row.get("value"),
25-
}
26-
}
27-
}
28-
29-
#[derive(Debug, Deserialize)]
30-
struct AnalyticsQuery {
31-
#[serde(default = "default_limit")]
32-
pub limit: u32,
33-
#[serde(default = "default_event_type")]
34-
pub event_type: String,
35-
#[serde(default = "default_metric")]
36-
pub metric: String,
37-
#[serde(default = "default_timeframe")]
38-
pub timeframe: String,
39-
}
40-
41-
impl AnalyticsQuery {
42-
pub fn is_valid(&self) -> Result<(), ResponseError> {
43-
let valid_event_types = ["IMPRESSION"];
44-
let valid_metric = ["eventPayouts", "eventCounts"];
45-
let valid_timeframe = ["year", "month", "week", "day", "hour"];
46-
47-
if !valid_event_types.iter().any(|e| *e == &self.event_type[..]) {
48-
Err(ResponseError::BadRequest(format!(
49-
"invalid event_type, possible values are: {}",
50-
valid_event_types.join(" ,")
51-
)))
52-
} else if !valid_metric.iter().any(|e| *e == &self.metric[..]) {
53-
Err(ResponseError::BadRequest(format!(
54-
"invalid metric, possible values are: {}",
55-
valid_metric.join(" ,")
56-
)))
57-
} else if !valid_timeframe.iter().any(|e| *e == &self.timeframe[..]) {
58-
Err(ResponseError::BadRequest(format!(
59-
"invalid timeframe, possible values are: {}",
60-
valid_timeframe.join(" ,")
61-
)))
62-
} else {
63-
Ok(())
64-
}
65-
}
66-
}
67-
68-
fn default_limit() -> u32 {
69-
100
70-
}
71-
72-
fn default_event_type() -> String {
73-
"IMPRESSION".into()
74-
}
75-
76-
fn default_metric() -> String {
77-
"eventCounts".into()
78-
}
79-
80-
fn default_timeframe() -> String {
81-
"hour".into()
82-
}
8313

8414
pub async fn publisher_analytics<A: Adapter>(
8515
req: Request<Body>,
@@ -132,80 +62,21 @@ pub async fn process_analytics<A: Adapter>(
13262
) -> Result<String, ResponseError> {
13363
let query = serde_urlencoded::from_str::<AnalyticsQuery>(&req.uri().query().unwrap_or(""))?;
13464
query.is_valid()?;
135-
136-
let applied_limit = cmp::min(query.limit, 200);
137-
let (interval, period) = get_time_frame(&query.timeframe);
138-
let time_limit = Utc::now().timestamp() - period;
13965
let sess = req.extensions().get::<Session>();
66+
let params = req.extensions().get::<RouteParams>();
14067

141-
let mut where_clauses = vec![format!("created > to_timestamp({})", time_limit)];
142-
143-
if is_advertiser {
144-
match req.extensions().get::<RouteParams>() {
145-
Some(params) => where_clauses.push(format!("channel_id IN ({})", params.index(0))),
146-
None => where_clauses.push(format!(
147-
"channel_id IN (SELECT id FROM channels WHERE creator = {})",
148-
sess.unwrap().uid.to_string()
149-
)),
150-
};
151-
} else if let Some(params) = req.extensions().get::<RouteParams>() {
152-
if let Some(id) = params.get(0) {
153-
where_clauses.push(format!("channel_id = {}", id));
154-
};
155-
}
156-
157-
let select_query = match (skip_publisher_filter, sess) {
158-
(false, Some(session)) => {
159-
where_clauses.push(format!(
160-
"events->'{}'->'{}'->'{}' IS NOT NULL",
161-
query.event_type, query.metric, session.uid
162-
));
163-
format!(
164-
"select SUM((events->'{}'->'{}'->>'{}')::numeric) as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates",
165-
query.event_type, query.metric, session.uid, interval
166-
)
167-
}
168-
_ => {
169-
where_clauses.push(format!(
170-
"events->'{}'->'{}' IS NOT NULL",
171-
query.event_type, query.metric
172-
));
173-
format!(
174-
"select SUM(value::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates, jsonb_each_text(events->'{}'->'{}')",
175-
interval, query.event_type, query.metric
176-
)
177-
}
178-
};
179-
180-
let sql_query = format!(
181-
"{} WHERE {} GROUP BY time LIMIT {}",
182-
select_query,
183-
where_clauses.join(" AND "),
184-
applied_limit
185-
);
186-
187-
// execute query
188-
let result = app
189-
.pool
190-
.run(move |connection| {
191-
async move {
192-
match connection.prepare(&sql_query).await {
193-
Ok(stmt) => match connection.query(&stmt, &[]).await {
194-
Ok(rows) => {
195-
let analytics: Vec<AnalyticsResponse> =
196-
rows.iter().map(AnalyticsResponse::from).collect();
197-
Ok((analytics, connection))
198-
}
199-
Err(e) => Err((e, connection)),
200-
},
201-
Err(e) => Err((e, connection)),
202-
}
203-
}
204-
})
205-
.await?;
68+
let result = get_analytics(
69+
query,
70+
params,
71+
sess,
72+
&app.pool,
73+
is_advertiser,
74+
skip_publisher_filter
75+
).await?;
20676

20777
serde_json::to_string(&result)
20878
.map_err(|_| ResponseError::BadRequest("error occurred; try again later".to_string()))
79+
20980
}
21081

21182
async fn cache(redis: &MultiplexedConnection, key: String, value: &str, timeframe: i32) {
@@ -220,17 +91,3 @@ async fn cache(redis: &MultiplexedConnection, key: String, value: &str, timefram
22091
}
22192
}
22293

223-
fn get_time_frame(timeframe: &str) -> (i64, i64) {
224-
let minute = 60 * 1000;
225-
let hour = 60 * minute;
226-
let day = 24 * hour;
227-
228-
match timeframe {
229-
"year" => (30 * day, 365 * day),
230-
"month" => (day, 30 * day),
231-
"week" => (6 * hour, 7 * day),
232-
"day" => (hour, day),
233-
"hour" => (minute, hour),
234-
_ => (hour, day),
235-
}
236-
}

0 commit comments

Comments
 (0)