Skip to content

Commit da0dda9

Browse files
committed
fix: /analytics query, add get_channel_id middleware
1 parent e071625 commit da0dda9

File tree

5 files changed

+108
-47
lines changed

5 files changed

+108
-47
lines changed

sentry/src/db/analytics.rs

Lines changed: 55 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::epoch;
33
use crate::Session;
44
use bb8::RunError;
55
use chrono::Utc;
6-
use primitives::analytics::{AnalyticsQuery, AnalyticsResponse, ANALYTICS_QUERY_LIMIT};
6+
use primitives::analytics::{AnalyticsQuery, AnalyticsData, ANALYTICS_QUERY_LIMIT};
77
use primitives::sentry::{AdvancedAnalyticsResponse, ChannelReport, PublisherReport};
88
use primitives::{ChannelId, ValidatorId};
99
use redis;
@@ -14,12 +14,10 @@ use std::error::Error;
1414
pub enum AnalyticsType {
1515
Advertiser {
1616
session: Session,
17-
channel: Option<String>,
1817
},
1918
Global,
2019
Publisher {
2120
session: Session,
22-
channel: Option<String>,
2321
},
2422
}
2523

@@ -50,57 +48,85 @@ pub async fn get_analytics(
5048
pool: &DbPool,
5149
analytics_type: AnalyticsType,
5250
segment_by_channel: bool,
53-
) -> Result<Vec<AnalyticsResponse>, RunError<bb8_postgres::tokio_postgres::Error>> {
51+
channel_id: Option<&ChannelId>,
52+
) -> Result<Vec<AnalyticsData>, RunError<bb8_postgres::tokio_postgres::Error>> {
5453
let applied_limit = query.limit.min(ANALYTICS_QUERY_LIMIT);
5554
let (interval, period) = get_time_frame(&query.timeframe);
5655
let time_limit = Utc::now().timestamp() - period;
5756

5857
let mut where_clauses = vec![format!("created > to_timestamp({})", time_limit)];
58+
if let Some(id) = channel_id {
59+
where_clauses.push(format!("channel_id = {}", id));
60+
}
61+
5962
let mut group_clause = "time".to_string();
6063
let mut select_clause = match analytics_type {
61-
AnalyticsType::Advertiser { session, channel } => {
62-
if let Some(id) = channel {
63-
where_clauses.push(format!("channel_id = {}", id));
64-
} else {
64+
AnalyticsType::Advertiser { session } => {
65+
if channel_id.is_none() {
6566
where_clauses.push(format!(
6667
"channel_id IN (SELECT id FROM channels WHERE creator = {})",
6768
session.uid
6869
));
6970
}
7071

7172
where_clauses.push(format!(
72-
"events->'{}'->'{}' IS NOT NULL",
73-
query.event_type, query.metric
73+
"event_type = '{}'",
74+
query.event_type
75+
));
76+
77+
where_clauses.push(format!(
78+
"{} IS NOT NULL",
79+
query.metric
7480
));
7581

7682
format!(
77-
"SUM(value::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates, jsonb_each_text(events->'{}'->'{}')",
78-
interval, query.event_type, query.metric
83+
"SUM({}::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates",
84+
query.metric, interval
7985
)
8086
}
81-
AnalyticsType::Global => {
87+
AnalyticsType::Global => {
8288
where_clauses.push(format!(
83-
"events->'{}'->'{}' IS NOT NULL",
84-
query.event_type, query.metric
89+
"event_type = '{}'",
90+
query.event_type
8591
));
92+
93+
where_clauses.push(format!(
94+
"{} IS NOT NULL",
95+
query.metric
96+
));
97+
98+
where_clauses.push("earner IS NULL".to_string());
99+
86100
format!(
87-
"SUM(value::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates, jsonb_each_text(events->'{}'->'{}')",
88-
interval, query.event_type, query.metric
101+
"SUM({}::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates",
102+
query.metric, interval
89103
)
90104
}
91-
AnalyticsType::Publisher { session, channel } => {
92-
if let Some(id) = channel {
93-
where_clauses.push(format!("channel_id = {}", id));
94-
}
105+
AnalyticsType::Publisher { session } => {
106+
where_clauses.push(format!(
107+
"event_type = '{}'",
108+
query.event_type
109+
));
95110

96111
where_clauses.push(format!(
97-
"events->'{}'->'{}'->'{}' IS NOT NULL",
98-
query.event_type, query.metric, session.uid
112+
"{} IS NOT NULL",
113+
query.metric
99114
));
100115

116+
where_clauses.push(format!(
117+
"earner = '{}'",
118+
session.uid
119+
));
120+
121+
122+
// where_clauses.push(format!(
123+
// "events->'{}'->'{}'->'{}' IS NOT NULL",
124+
// query.event_type, query.metric, session.uid
125+
// ));
126+
101127
format!(
102-
"SUM((events->'{}'->'{}'->>'{}')::numeric) as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates",
103-
query.event_type, query.metric, session.uid, interval
128+
"SUM(({}::numeric) as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates",
129+
query.metric, interval
104130
)
105131
}
106132
};
@@ -118,13 +144,15 @@ pub async fn get_analytics(
118144
applied_limit,
119145
);
120146

147+
println!("{}", sql_query);
148+
121149
// execute query
122150
pool.run(move |connection| async move {
123151
match connection.prepare(&sql_query).await {
124152
Ok(stmt) => match connection.query(&stmt, &[]).await {
125153
Ok(rows) => {
126-
let analytics: Vec<AnalyticsResponse> =
127-
rows.iter().map(AnalyticsResponse::from).collect();
154+
let analytics: Vec<AnalyticsData> =
155+
rows.iter().map(AnalyticsData::from).collect();
128156
Ok((analytics, connection))
129157
}
130158
Err(e) => Err((e, connection)),

sentry/src/lib.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::chain::chain;
55
use crate::db::DbPool;
66
use crate::event_aggregator::EventAggregator;
77
use crate::middleware::auth;
8-
use crate::middleware::channel::channel_load;
8+
use crate::middleware::channel::{channel_load, get_channel_id};
99
use crate::middleware::cors::{cors, Cors};
1010
use crate::routes::channel::channel_status;
1111
use crate::routes::event_aggregate::list_channel_event_aggregates;
@@ -198,15 +198,23 @@ async fn analytics_router<A: Adapter + 'static>(
198198
.map_or("".to_string(), |m| m.as_str().to_string())]);
199199
req.extensions_mut().insert(param);
200200

201-
let req = chain(req, app, vec![Box::new(channel_load)]).await?;
201+
let req = chain(req, app, vec![
202+
Box::new(channel_load),
203+
Box::new(get_channel_id)
204+
]).await?;
205+
202206
analytics(req, app).await
203207
} else if let Some(caps) = ADVERTISER_ANALYTICS_BY_CHANNEL_ID.captures(route) {
204208
let param = RouteParams(vec![caps
205209
.get(1)
206210
.map_or("".to_string(), |m| m.as_str().to_string())]);
207211
req.extensions_mut().insert(param);
208212

209-
let req = auth_required_middleware(req, app).await?;
213+
let req = chain(req, app, vec![
214+
Box::new(auth_required_middleware),
215+
Box::new(get_channel_id)
216+
]).await?;
217+
210218
advertiser_analytics(req, app).await
211219
} else {
212220
Err(ResponseError::NotFound)

sentry/src/middleware/auth.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub(crate) async fn for_request(
2323
.and_then(|hv| {
2424
hv.to_str()
2525
.map(|token_str| {
26+
println!("token str {}", token_str );
2627
if token_str.starts_with(prefix) {
2728
Some(token_str[prefix.len()..].to_string())
2829
} else {

sentry/src/middleware/channel.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,24 @@ pub fn channel_if_active<'a, A: Adapter + 'static>(
6666
}
6767
.boxed()
6868
}
69+
70+
71+
pub fn get_channel_id<'a, A: Adapter + 'static>(
72+
mut req: Request<Body>,
73+
_: &'a Application<A>
74+
) -> BoxFuture<'a, Result<Request<Body>, ResponseError>> {
75+
async move {
76+
match req.extensions().get::<RouteParams>() {
77+
Some(param) => {
78+
let id = param.get(0).expect("should have channel id");
79+
let channel_id = ChannelId::from_hex(id)
80+
.map_err(|_| ResponseError::BadRequest("Invalid Channel Id".to_string()))?;
81+
req.extensions_mut().insert(channel_id);
82+
83+
Ok(req)
84+
},
85+
None => Ok(req),
86+
}
87+
}
88+
.boxed()
89+
}

sentry/src/routes/analytics.rs

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,27 @@
11
use crate::db::analytics::{
2-
advertiser_channel_ids, get_advanced_reports, get_analytics, AnalyticsType,
2+
advertiser_channel_ids, get_advanced_reports, get_analytics, AnalyticsType
33
};
44
use crate::success_response;
55
use crate::Application;
66
use crate::ResponseError;
77
use crate::RouteParams;
88
use crate::Session;
99
use hyper::{Body, Request, Response};
10+
use primitives::ChannelId;
1011
use primitives::adapter::Adapter;
11-
use primitives::analytics::AnalyticsQuery;
12+
use primitives::analytics::{AnalyticsQuery, AnalyticsResponse};
1213
use redis::aio::MultiplexedConnection;
1314
use slog::{error, Logger};
1415

16+
1517
pub async fn publisher_analytics<A: Adapter>(
1618
req: Request<Body>,
1719
app: &Application<A>,
1820
) -> Result<Response<Body>, ResponseError> {
1921
let sess = req.extensions().get::<Session>();
20-
let channel = match req.extensions().get::<RouteParams>() {
21-
Some(param) => param.get(0),
22-
None => None,
23-
};
22+
2423
let analytics_type = AnalyticsType::Publisher {
2524
session: sess.cloned().ok_or(ResponseError::Unauthorized)?,
26-
channel,
2725
};
2826

2927
process_analytics(req, app, analytics_type)
@@ -46,8 +44,8 @@ pub async fn analytics<A: Adapter>(
4644
Ok(Some(response)) => Ok(success_response(response)),
4745
_ => {
4846
// checks if /:id route param is present
49-
let cache_timeframe = match req.extensions().get::<RouteParams>() {
50-
Some(_) => 600,
47+
let cache_timeframe= match req.extensions().get::<RouteParams>() {
48+
Some(param) => 600,
5149
None => 300,
5250
};
5351
let response = process_analytics(req, app, AnalyticsType::Global).await?;
@@ -69,13 +67,8 @@ pub async fn advertiser_analytics<A: Adapter>(
6967
app: &Application<A>,
7068
) -> Result<Response<Body>, ResponseError> {
7169
let sess = req.extensions().get::<Session>();
72-
let channel = match req.extensions().get::<RouteParams>() {
73-
Some(param) => param.get(0),
74-
None => None,
75-
};
7670
let analytics_type = AnalyticsType::Advertiser {
7771
session: sess.ok_or(ResponseError::Unauthorized)?.to_owned(),
78-
channel,
7972
};
8073

8174
process_analytics(req, app, analytics_type)
@@ -88,20 +81,30 @@ pub async fn process_analytics<A: Adapter>(
8881
app: &Application<A>,
8982
analytics_type: AnalyticsType,
9083
) -> Result<String, ResponseError> {
91-
let query = serde_urlencoded::from_str::<AnalyticsQuery>(&req.uri().query().unwrap_or(""))?;
84+
let mut query = serde_urlencoded::from_str::<AnalyticsQuery>(&req.uri().query().unwrap_or(""))?;
9285
query
9386
.is_valid()
9487
.map_err(|e| ResponseError::BadRequest(e.to_string()))?;
88+
89+
query.metric_to_column();
90+
91+
let channel_id = req.extensions().get::<ChannelId>();
9592

9693
let segment_channel = query
9794
.segment_by_channel
9895
.clone()
9996
.map(|_| true)
10097
.unwrap_or_else(|| false);
98+
let limit = query.limit;
99+
100+
let aggr = get_analytics(query, &app.pool, analytics_type, segment_channel, channel_id).await?;
101101

102-
let result = get_analytics(query, &app.pool, analytics_type, segment_channel).await?;
102+
let response = AnalyticsResponse {
103+
limit,
104+
aggr
105+
};
103106

104-
serde_json::to_string(&result)
107+
serde_json::to_string(&response)
105108
.map_err(|_| ResponseError::BadRequest("error occurred; try again later".to_string()))
106109
}
107110

0 commit comments

Comments
 (0)