Skip to content

Commit c0cfccf

Browse files
committed
fix: refactor analytics.rs
1 parent 812c97d commit c0cfccf

File tree

8 files changed

+179
-165
lines changed

8 files changed

+179
-165
lines changed

Cargo.lock

Lines changed: 78 additions & 111 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

primitives/src/analytics.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use crate::DomainError;
22
use serde::{Deserialize, Serialize};
33

4+
pub const ANALYTICS_QUERY_LIMIT: u32 = 200;
5+
46
#[derive(Debug, Serialize, Deserialize)]
57
pub struct AnalyticsResponse {
68
time: u32,
@@ -36,7 +38,7 @@ pub struct AnalyticsQuery {
3638

3739
impl AnalyticsQuery {
3840
pub fn is_valid(&self) -> Result<(), DomainError> {
39-
let valid_event_types = ["IMPRESSION"];
41+
let valid_event_types = ["IMPRESSION", "CLICK"];
4042
let valid_metric = ["eventPayouts", "eventCounts"];
4143
let valid_timeframe = ["year", "month", "week", "day", "hour"];
4244

@@ -55,6 +57,11 @@ impl AnalyticsQuery {
5557
"invalid timeframe, possible values are: {}",
5658
valid_timeframe.join(" ,")
5759
)))
60+
} else if self.limit > ANALYTICS_QUERY_LIMIT {
61+
Err(DomainError::InvalidArgument(format!(
62+
"invalid limit {}, maximum value 200",
63+
self.limit
64+
)))
5865
} else {
5966
Ok(())
6067
}

primitives/src/util/tests/prep_db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ lazy_static! {
3535
auth.insert("user".into(), "x8c9v1b2".into());
3636
auth.insert("publisher".into(), "testing".into());
3737
auth.insert("publisher2".into(), "testing2".into());
38-
auth.insert("creator".into(), "awesomeCreator".into());
38+
auth.insert("creator".into(), "0x033ed90e0fec3f3ea1c9b005c724d704501e0196".into());
3939
auth.insert("tester".into(), "AUTH_awesomeTester".into());
4040

4141
auth

rust-toolchain

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
stable
1+
nightly

sentry/src/db/analytics.rs

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,45 +4,51 @@ use bb8::RunError;
44
use chrono::Utc;
55
use primitives::analytics::{AnalyticsQuery, AnalyticsResponse, ANALYTICS_QUERY_LIMIT};
66

7+
pub enum AnalyticsType {
8+
Advertiser {
9+
session: Session,
10+
channel: Option<String>,
11+
},
12+
Global,
13+
Publisher {
14+
session: Session,
15+
channel: Option<String>,
16+
},
17+
}
18+
719
pub async fn get_analytics(
820
query: AnalyticsQuery,
9-
channel: Option<String>,
10-
sess: Option<&Session>,
1121
pool: &DbPool,
12-
is_advertiser: bool,
13-
filter_publisher: bool,
22+
analytics_type: AnalyticsType,
1423
) -> Result<Vec<AnalyticsResponse>, RunError<bb8_postgres::tokio_postgres::Error>> {
1524
let applied_limit = query.limit.min(ANALYTICS_QUERY_LIMIT);
1625
let (interval, period) = get_time_frame(&query.timeframe);
1726
let time_limit = Utc::now().timestamp() - period;
1827

1928
let mut where_clauses = vec![format!("created > to_timestamp({})", time_limit)];
2029

21-
if is_advertiser {
22-
match (channel, sess) {
23-
(Some(id), _) => where_clauses.push(format!("channel_id IN ({})", id)),
24-
(None, Some(session)) => where_clauses.push(format!(
25-
"channel_id IN (SELECT id FROM channels WHERE creator = {})",
26-
session.uid
27-
)),
28-
_ => {}
29-
};
30-
} else if let Some(id) = channel {
31-
where_clauses.push(format!("channel_id = {}", id));
32-
}
30+
let select_query = match analytics_type {
31+
AnalyticsType::Advertiser { session, channel } => {
32+
if let Some(id) = channel {
33+
where_clauses.push(format!("channel_id = {}", id));
34+
} else {
35+
where_clauses.push(format!(
36+
"channel_id IN (SELECT id FROM channels WHERE creator = {})",
37+
session.uid
38+
));
39+
}
3340

34-
let select_query = match (filter_publisher, sess) {
35-
(true, Some(session)) => {
3641
where_clauses.push(format!(
37-
"events->'{}'->'{}'->'{}' IS NOT NULL",
38-
query.event_type, query.metric, session.uid
42+
"events->'{}'->'{}' IS NOT NULL",
43+
query.event_type, query.metric
3944
));
45+
4046
format!(
41-
"select SUM((events->'{}'->'{}'->>'{}')::numeric) as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates",
42-
query.event_type, query.metric, session.uid, interval
47+
"select SUM(value::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates, jsonb_each_text(events->'{}'->'{}')",
48+
interval, query.event_type, query.metric
4349
)
4450
}
45-
_ => {
51+
AnalyticsType::Global => {
4652
where_clauses.push(format!(
4753
"events->'{}'->'{}' IS NOT NULL",
4854
query.event_type, query.metric
@@ -52,6 +58,21 @@ pub async fn get_analytics(
5258
interval, query.event_type, query.metric
5359
)
5460
}
61+
AnalyticsType::Publisher { session, channel } => {
62+
if let Some(id) = channel {
63+
where_clauses.push(format!("channel_id = {}", id));
64+
}
65+
66+
where_clauses.push(format!(
67+
"events->'{}'->'{}'->'{}' IS NOT NULL",
68+
query.event_type, query.metric, session.uid
69+
));
70+
71+
format!(
72+
"select SUM((events->'{}'->'{}'->>'{}')::numeric) as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates",
73+
query.event_type, query.metric, session.uid, interval
74+
)
75+
}
5576
};
5677

5778
let sql_query = format!(

sentry/src/lib.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,6 @@ async fn analytics_router<A: Adapter>(
182182

183183
let req = chain(req, app, vec![channel_load]).await?;
184184
analytics(req, app).await
185-
186185
} else if let Some(caps) = ADVERTISER_ANALYTICS_BY_CHANNEL_ID.captures(route) {
187186
let param = RouteParams(vec![caps
188187
.get(1)
@@ -291,7 +290,10 @@ pub fn map_response_error(error: ResponseError) -> Response<Body> {
291290
match error {
292291
ResponseError::NotFound => not_found(),
293292
ResponseError::BadRequest(e) => bad_response(e, StatusCode::BAD_REQUEST),
294-
ResponseError::UnAuthorized => bad_response("invalid authorization".to_string(), StatusCode::UNAUTHORIZED),
293+
ResponseError::UnAuthorized => bad_response(
294+
"invalid authorization".to_string(),
295+
StatusCode::UNAUTHORIZED,
296+
),
295297
}
296298
}
297299

@@ -314,11 +316,10 @@ pub fn bad_response(response_body: String, status_code: StatusCode) -> Response<
314316
.insert("Content-type", "application/json".parse().unwrap());
315317

316318
*response.status_mut() = status_code;
317-
319+
318320
response
319321
}
320322

321-
322323
pub fn success_response(response_body: String) -> Response<Body> {
323324
let body = Body::from(response_body);
324325

sentry/src/middleware/auth.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ pub(crate) async fn for_request(
5353
.arg(serde_json::to_string(&adapter_session)?)
5454
.query_async(&mut redis.clone())
5555
.await?;
56-
56+
5757
adapter_session
5858
}
5959
};

sentry/src/routes/analytics.rs

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::db::analytics::get_analytics;
1+
use crate::db::analytics::{get_analytics, AnalyticsType};
22
use crate::success_response;
33
use crate::Application;
44
use crate::ResponseError;
@@ -14,7 +14,17 @@ pub async fn publisher_analytics<A: Adapter>(
1414
req: Request<Body>,
1515
app: &Application<A>,
1616
) -> Result<Response<Body>, ResponseError> {
17-
process_analytics(req, app, false, true)
17+
let sess = req.extensions().get::<Session>();
18+
let channel = match req.extensions().get::<RouteParams>() {
19+
Some(param) => param.get(0),
20+
None => None,
21+
};
22+
let analytics_type = AnalyticsType::Publisher {
23+
session: sess.cloned().ok_or(ResponseError::UnAuthorized)?,
24+
channel,
25+
};
26+
27+
process_analytics(req, app, analytics_type)
1828
.await
1929
.map(success_response)
2030
}
@@ -38,8 +48,15 @@ pub async fn analytics<A: Adapter>(
3848
Some(_) => 600,
3949
None => 300,
4050
};
41-
let response = process_analytics(req, app, false, false).await?;
42-
cache(&redis.clone(), request_uri, &response, cache_timeframe, &app.logger).await;
51+
let response = process_analytics(req, app, AnalyticsType::Global).await?;
52+
cache(
53+
&redis.clone(),
54+
request_uri,
55+
&response,
56+
cache_timeframe,
57+
&app.logger,
58+
)
59+
.await;
4360
Ok(success_response(response))
4461
}
4562
}
@@ -49,43 +66,44 @@ pub async fn advertiser_analytics<A: Adapter>(
4966
req: Request<Body>,
5067
app: &Application<A>,
5168
) -> Result<Response<Body>, ResponseError> {
52-
process_analytics(req, app, true, false)
69+
let sess = req.extensions().get::<Session>();
70+
let channel = match req.extensions().get::<RouteParams>() {
71+
Some(param) => param.get(0),
72+
None => None,
73+
};
74+
let analytics_type = AnalyticsType::Advertiser {
75+
session: sess.ok_or(ResponseError::UnAuthorized)?.to_owned(),
76+
channel,
77+
};
78+
79+
process_analytics(req, app, analytics_type)
5380
.await
5481
.map(success_response)
5582
}
5683

5784
pub async fn process_analytics<A: Adapter>(
5885
req: Request<Body>,
5986
app: &Application<A>,
60-
is_advertiser: bool,
61-
filter_publisher: bool,
87+
analytics_type: AnalyticsType,
6288
) -> Result<String, ResponseError> {
6389
let query = serde_urlencoded::from_str::<AnalyticsQuery>(&req.uri().query().unwrap_or(""))?;
6490
query
6591
.is_valid()
6692
.map_err(|e| ResponseError::BadRequest(e.to_string()))?;
6793

68-
let sess = req.extensions().get::<Session>();
69-
let channels = match req.extensions().get::<RouteParams>() {
70-
Some(param) => param.get(0),
71-
None => None,
72-
};
73-
74-
let result = get_analytics(
75-
query,
76-
channels,
77-
sess,
78-
&app.pool,
79-
is_advertiser,
80-
filter_publisher,
81-
)
82-
.await?;
94+
let result = get_analytics(query, &app.pool, analytics_type).await?;
8395

8496
serde_json::to_string(&result)
8597
.map_err(|_| ResponseError::BadRequest("error occurred; try again later".to_string()))
8698
}
8799

88-
async fn cache(redis: &MultiplexedConnection, key: String, value: &str, timeframe: i32, logger: &Logger) {
100+
async fn cache(
101+
redis: &MultiplexedConnection,
102+
key: String,
103+
value: &str,
104+
timeframe: i32,
105+
logger: &Logger,
106+
) {
89107
if let Err(err) = redis::cmd("SETEX")
90108
.arg(&key)
91109
.arg(timeframe)

0 commit comments

Comments
 (0)