Skip to content

Commit 5fe3194

Browse files
committed
fix: formatting /analytics.rs
1 parent 8a05845 commit 5fe3194

File tree

3 files changed

+105
-67
lines changed

3 files changed

+105
-67
lines changed

sentry/src/db/channel.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -75,19 +75,21 @@ pub async fn get_channel_by_creator(
7575
pool: &DbPool,
7676
id: &ChannelId,
7777
) -> Result<Vec<Channel>, RunError<bb8_postgres::tokio_postgres::Error>> {
78-
pool
79-
.run(move |connection| {
80-
async move {
81-
match connection.prepare("SELECT id FROM channels WHERE creator = $1").await {
82-
Ok(select) => match connection.query(&select, &[&id]).await {
83-
Ok(results) => Ok((results.iter().map(Channel::from).collect(), connection)),
84-
Err(e) => Err((e, connection)),
85-
},
78+
pool.run(move |connection| {
79+
async move {
80+
match connection
81+
.prepare("SELECT id FROM channels WHERE creator = $1")
82+
.await
83+
{
84+
Ok(select) => match connection.query(&select, &[&id]).await {
85+
Ok(results) => Ok((results.iter().map(Channel::from).collect(), connection)),
8686
Err(e) => Err((e, connection)),
87-
}
87+
},
88+
Err(e) => Err((e, connection)),
8889
}
89-
})
90-
.await
90+
}
91+
})
92+
.await
9193
}
9294

9395

sentry/src/lib.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use primitives::adapter::Adapter;
1414
use primitives::Config;
1515
use redis::aio::MultiplexedConnection;
1616
use regex::Regex;
17+
use routes::analytics::{advertiser_analytics, analytics, publisher_analytics};
1718
use routes::cfg::config;
1819
use routes::channel::{channel_list, create_channel, last_approved};
1920
use routes::analytics::{publisher_analytics, analytics, advertiser_analytics};
@@ -27,6 +28,7 @@ pub mod middleware {
2728
}
2829

2930
pub mod routes {
31+
pub mod analytics;
3032
pub mod cfg;
3133
pub mod channel;
3234
pub mod validator_message;
@@ -60,7 +62,7 @@ async fn config_middleware<A: Adapter>(
6062

6163
async fn auth_required_middleware<A: Adapter>(
6264
req: Request<Body>,
63-
_: &Application<A>
65+
_: &Application<A>,
6466
) -> Result<Request<Body>, ResponseError> {
6567
if req.extensions().get::<Session>().is_some() {
6668
Ok(req)
@@ -69,8 +71,6 @@ async fn auth_required_middleware<A: Adapter>(
6971
}
7072
}
7173

72-
73-
7474
#[derive(Debug)]
7575
pub struct RouteParams(Vec<String>);
7676

@@ -143,17 +143,17 @@ impl<A: Adapter + 'static> Application<A> {
143143
}
144144
};
145145
advertiser_analytics(req, &self).await
146-
},
146+
}
147147
("/analytics/for-publisher", &Method::GET) => {
148148
let req = match chain(req, &self, vec![auth_required_middleware]).await {
149149
Ok(req) => req,
150150
Err(error) => {
151151
return map_response_error(error);
152152
}
153153
};
154-
154+
155155
publisher_analytics(req, &self).await
156-
},
156+
}
157157
(route, _) if route.starts_with("/analytics") => analytics_router(req, &self).await,
158158
// This is important becuase it prevents us from doing
159159
// expensive regex matching for routes without /channel
@@ -234,21 +234,25 @@ impl<A: Adapter + 'static> Application<A> {
234234
}
235235
}
236236

237-
async fn analytics_router<A: Adapter>(mut req: Request<Body>, app: &Application<A>) -> Result<Response<Body>, ResponseError> {
237+
async fn analytics_router<A: Adapter>(
238+
mut req: Request<Body>,
239+
app: &Application<A>,
240+
) -> Result<Response<Body>, ResponseError> {
238241
let (route, method) = (req.uri().path(), req.method());
239-
242+
240243
match *method {
241244
Method::GET => {
242245
if let Some(caps) = ANALYTICS_BY_CHANNEL_ID.captures(route) {
243-
let param = RouteParams(vec![caps.get(1)
246+
let param = RouteParams(vec![caps
247+
.get(1)
244248
.map_or("".to_string(), |m| m.as_str().to_string())]);
245249
req.extensions_mut().insert(param);
246250

247251
let req = chain(req, app, vec![channel_load]).await?;
248252
analytics(req, app).await
249-
250253
} else if let Some(caps) = PUBLISHER_ANALYTICS_BY_CHANNEL_ID.captures(route) {
251-
let param = RouteParams(vec![caps.get(1)
254+
let param = RouteParams(vec![caps
255+
.get(1)
252256
.map_or("".to_string(), |m| m.as_str().to_string())]);
253257
req.extensions_mut().insert(param);
254258

@@ -258,8 +262,8 @@ async fn analytics_router<A: Adapter>(mut req: Request<Body>, app: &Application<
258262
} else {
259263
Err(ResponseError::NotFound)
260264
}
261-
},
262-
_ => Err(ResponseError::NotFound)
265+
}
266+
_ => Err(ResponseError::NotFound),
263267
}
264268
}
265269

sentry/src/routes/analytics.rs

Lines changed: 75 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,27 @@
1+
use crate::success_response;
12
use crate::Application;
23
use crate::ResponseError;
3-
use hyper::{Body, Request, Response};
4-
use primitives::adapter::Adapter;
54
use crate::RouteParams;
6-
use chrono::{Utc};
75
use crate::Session;
8-
use serde::{Serialize, Deserialize};
96
use bb8_postgres::tokio_postgres::Row;
10-
use crate::success_response;
11-
use std::cmp;
7+
use chrono::Utc;
8+
use hyper::{Body, Request, Response};
9+
use primitives::adapter::Adapter;
1210
use redis::aio::MultiplexedConnection;
13-
11+
use serde::{Deserialize, Serialize};
12+
use std::cmp;
1413

1514
#[derive(Debug, Serialize, Deserialize)]
1615
pub(crate) struct AnalyticsResponse {
1716
time: u32,
18-
value: String
17+
value: String,
1918
}
2019

2120
impl From<&Row> for AnalyticsResponse {
2221
fn from(row: &Row) -> Self {
2322
Self {
2423
time: row.get("time"),
25-
value: row.get("value")
24+
value: row.get("value"),
2625
}
2726
}
2827
}
@@ -36,7 +35,7 @@ struct AnalyticsQuery {
3635
#[serde(default = "default_metric")]
3736
pub metric: String,
3837
#[serde(default = "default_timeframe")]
39-
pub timeframe: String,
38+
pub timeframe: String,
4039
}
4140

4241
fn default_limit() -> u32 {
@@ -58,43 +57,52 @@ fn default_timeframe() -> String {
5857
pub async fn publisher_analytics<A: Adapter>(
5958
req: Request<Body>,
6059
app: &Application<A>,
61-
) -> Result<Response<Body>, ResponseError> {
62-
process_analytics(req, app, false, false).await.map(success_response)
60+
) -> Result<Response<Body>, ResponseError> {
61+
process_analytics(req, app, false, false)
62+
.await
63+
.map(success_response)
6364
}
6465

6566
pub async fn analytics<A: Adapter>(
6667
req: Request<Body>,
6768
app: &Application<A>,
68-
) -> Result<Response<Body>, ResponseError> {
69+
) -> Result<Response<Body>, ResponseError> {
6970
let request_uri = req.uri().to_string();
7071
let redis = app.redis.clone();
7172

72-
match redis::cmd("EXISTS").arg(&request_uri)
73+
match redis::cmd("EXISTS")
74+
.arg(&request_uri)
7375
.query_async::<_, String>(&mut redis.clone())
74-
.await
76+
.await
7577
{
7678
Ok(response) => Ok(success_response(response)),
7779
_ => {
7880
let cache_timeframe = match req.extensions().get::<RouteParams>() {
7981
Some(_) => 600,
80-
None => 300
82+
None => 300,
8183
};
8284
let response = process_analytics(req, app, false, true).await?;
83-
cache(&redis.clone(), request_uri, &response, cache_timeframe).await;
85+
cache(&redis.clone(), request_uri, &response, cache_timeframe).await;
8486
Ok(success_response(response))
8587
}
8688
}
87-
8889
}
8990

9091
pub async fn advertiser_analytics<A: Adapter>(
9192
req: Request<Body>,
92-
app: &Application<A>
93-
) -> Result<Response<Body>, ResponseError> {
94-
process_analytics(req, app, true, true).await.map(success_response)
93+
app: &Application<A>,
94+
) -> Result<Response<Body>, ResponseError> {
95+
process_analytics(req, app, true, true)
96+
.await
97+
.map(success_response)
9598
}
9699

97-
pub async fn process_analytics<A: Adapter>(req: Request<Body>, app: &Application<A>, is_advertiser: bool, skip_publisher: bool) -> Result<String, ResponseError> {
100+
pub async fn process_analytics<A: Adapter>(
101+
req: Request<Body>,
102+
app: &Application<A>,
103+
is_advertiser: bool,
104+
skip_publisher: bool,
105+
) -> Result<String, ResponseError> {
98106
let query = serde_urlencoded::from_str::<AnalyticsQuery>(&req.uri().query().unwrap_or(""))?;
99107
let applied_limit = cmp::min(query.limit, 200);
100108
let (interval, period) = get_time_frame(&query.timeframe);
@@ -106,8 +114,11 @@ pub async fn process_analytics<A: Adapter>(req: Request<Body>, app: &Applicatio
106114
if is_advertiser {
107115
match req.extensions().get::<RouteParams>() {
108116
Some(params) => where_clauses.push(format!("channel_id IN ({})", params.index(0))),
109-
None => where_clauses.push(format!("channel_id IN (SELECT id FROM channels WHERE creator = {})", sess.unwrap().uid.to_string()))
110-
};
117+
None => where_clauses.push(format!(
118+
"channel_id IN (SELECT id FROM channels WHERE creator = {})",
119+
sess.unwrap().uid.to_string()
120+
)),
121+
};
111122
} else if let Some(params) = req.extensions().get::<RouteParams>() {
112123
if let Some(id) = params.get(0) {
113124
where_clauses.push(format!("channel_id = {}", id));
@@ -116,38 +127,59 @@ pub async fn process_analytics<A: Adapter>(req: Request<Body>, app: &Applicatio
116127

117128
let select_query = match (skip_publisher, sess) {
118129
(false, Some(session)) => {
119-
where_clauses.push(format!("events->'{}'->'{}'->'{}' IS NOT NULL", query.event_type, query.metric, session.uid));
120-
format!("select SUM((events->'{}'->'{}'->>'{}')::numeric) as value, extract({} from created) as time from event_aggregates", query.event_type, query.metric, session.uid, interval)
130+
where_clauses.push(format!(
131+
"events->'{}'->'{}'->'{}' IS NOT NULL",
132+
query.event_type, query.metric, session.uid
133+
));
134+
format!(
135+
"select SUM((events->'{}'->'{}'->>'{}')::numeric) as value, extract({} from created) as time from event_aggregates",
136+
query.event_type, query.metric, session.uid, interval
137+
)
121138
}
122139
_ => {
123-
where_clauses.push(format!("events->'{}'->'{}' IS NOT NULL", query.event_type, query.metric));
124-
format!("select SUM(value::numeric)::varchar as value, extract({} from created) as time from event_aggregates, jsonb_each_text(events->'{}'->'{}')", interval, query.event_type, query.metric)
140+
where_clauses.push(format!(
141+
"events->'{}'->'{}' IS NOT NULL",
142+
query.event_type, query.metric
143+
));
144+
format!(
145+
"select SUM(value::numeric)::varchar as value, extract({} from created) as time from event_aggregates, jsonb_each_text(events->'{}'->'{}')",
146+
interval, query.event_type, query.metric
147+
)
125148
}
126149
};
127150

128-
let sql_query = format!("{} WHERE {} GROUP BY time LIMIT {}", select_query, where_clauses.join(" AND "), applied_limit);
151+
let sql_query = format!(
152+
"{} WHERE {} GROUP BY time LIMIT {}",
153+
select_query,
154+
where_clauses.join(" AND "),
155+
applied_limit
156+
);
129157

130158
// log the query here
131159
println!("{}", sql_query);
132160

133161
// execute query
134-
let result = app.pool
162+
let result = app
163+
.pool
135164
.run(move |connection| {
136165
async move {
137166
match connection.prepare(&sql_query).await {
138167
Ok(stmt) => match connection.query(&stmt, &[]).await {
139168
Ok(rows) => {
140-
let analytics: Vec<AnalyticsResponse> = rows.iter().map(AnalyticsResponse::from).collect();
169+
let analytics: Vec<AnalyticsResponse> =
170+
rows.iter().map(AnalyticsResponse::from).collect();
141171
Ok((analytics, connection))
142-
},
172+
}
143173
Err(e) => Err((e, connection)),
144174
},
145175
Err(e) => Err((e, connection)),
146176
}
147177
}
148-
}).await?;
149-
150-
serde_json::to_string(&result).map_err(|_| ResponseError::BadRequest("error occurred; try again later".to_string()))
178+
})
179+
.await?;
180+
181+
serde_json::to_string(&result)
182+
.map_err(|_| ResponseError::BadRequest("error occurred; try again later".to_string()))
151183
}
152184

153185
async fn cache(redis: &MultiplexedConnection, key: String, value: &str, timeframe: i32) {
@@ -156,7 +188,7 @@ async fn cache(redis: &MultiplexedConnection, key: String, value: &str, timefram
156188
.arg(timeframe)
157189
.arg(value)
158190
.query_async::<_, ()>(&mut redis.clone())
159-
.await
191+
.await
160192
{
161193
println!("{:?}", err);
162194
}
@@ -166,13 +198,13 @@ fn get_time_frame(timeframe: &str) -> (String, i64) {
166198
let minute = 60 * 1000;
167199
let hour = 60 * minute;
168200
let day = 24 * hour;
169-
201+
170202
match timeframe {
171-
"year" => ("month".into(), 365 * day),
172-
"month" => ("day".into(), 30 * day),
173-
"week" => ("week".into(), 7 * day),
174-
"day" => ("hour".into(), day),
175-
"hour" => ("minute".into(), hour),
176-
_ => ("hour".into(), day),
203+
"year" => ("month".into(), 365 * day),
204+
"month" => ("day".into(), 30 * day),
205+
"week" => ("week".into(), 7 * day),
206+
"day" => ("hour".into(), day),
207+
"hour" => ("minute".into(), hour),
208+
_ => ("hour".into(), day),
177209
}
178210
}

0 commit comments

Comments
 (0)