Skip to content

Commit a8eb0ba

Browse files
committed
fix: channels routing complexity
1 parent b15cfaa commit a8eb0ba

File tree

7 files changed

+102
-108
lines changed

7 files changed

+102
-108
lines changed

primitives/src/analytics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use serde::{Deserialize, Serialize};
22

3-
43
#[derive(Debug, Deserialize)]
54
pub struct AnalyticsQuery {
65
#[serde(default = "default_limit")]
@@ -21,6 +20,7 @@ pub struct AnalyticsResponse {
2120

2221
#[cfg(feature = "postgres")]
2322
pub mod postgres {
23+
use super::AnalyticsResponse;
2424
use tokio_postgres::Row;
2525

2626
impl From<&Row> for AnalyticsResponse {

primitives/src/channel.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ pub mod postgres {
239239
use hex::FromHex;
240240
use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, Json, ToSql, Type};
241241
use std::error::Error;
242-
242+
use tokio_postgres::Row;
243243

244244
impl From<&Row> for Channel {
245245
fn from(row: &Row) -> Self {

primitives/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ pub mod util {
2323

2424
pub mod logging;
2525
}
26-
pub mod validator;
2726
pub mod analytics;
27+
pub mod validator;
2828

2929
pub use self::ad_unit::AdUnit;
3030
pub use self::balances_map::BalancesMap;

sentry/src/db.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,10 @@ use std::env;
77

88
use lazy_static::lazy_static;
99

10+
pub mod analytics;
1011
mod channel;
1112
mod validator_message;
1213

13-
14-
pub mod channel;
15-
// mod channel;
16-
pub mod analytics;
17-
1814
pub use self::channel::*;
1915
pub use self::validator_message::*;
2016

sentry/src/db/analytics.rs

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
1-
use hyper::{Body, Request, Response};
21
use crate::db::DbPool;
3-
use bb8::RunError;
4-
use primitives::{Channel, ChannelId, ValidatorId};
5-
use primitives::analytics::{AnalyticsResponse, AnalyticsQuery};
62
use crate::RouteParams;
73
use crate::Session;
8-
use crate::ResponseError;
4+
use bb8::RunError;
95
use chrono::Utc;
6+
use primitives::analytics::{AnalyticsQuery, AnalyticsResponse};
107

118
pub async fn get_analytics(
129
query: AnalyticsQuery,
@@ -15,7 +12,7 @@ pub async fn get_analytics(
1512
pool: &DbPool,
1613
is_advertiser: bool,
1714
skip_publisher_filter: bool,
18-
) -> Result<Vec<AnalyticsResponse>, ResponseError> {
15+
) -> Result<Vec<AnalyticsResponse>, RunError<bb8_postgres::tokio_postgres::Error>> {
1916
let applied_limit = query.limit.min(200);
2017
let (interval, period) = get_time_frame(&query.timeframe);
2118
let time_limit = Utc::now().timestamp() - period;
@@ -67,26 +64,24 @@ pub async fn get_analytics(
6764
);
6865

6966
// execute query
70-
pool
71-
.run(move |connection| {
72-
async move {
73-
match connection.prepare(&sql_query).await {
74-
Ok(stmt) => match connection.query(&stmt, &[]).await {
75-
Ok(rows) => {
76-
let analytics: Vec<AnalyticsResponse> =
77-
rows.iter().map(AnalyticsResponse::from).collect();
78-
Ok((analytics, connection))
79-
}
80-
Err(e) => Err((e, connection)),
81-
},
67+
pool.run(move |connection| {
68+
async move {
69+
match connection.prepare(&sql_query).await {
70+
Ok(stmt) => match connection.query(&stmt, &[]).await {
71+
Ok(rows) => {
72+
let analytics: Vec<AnalyticsResponse> =
73+
rows.iter().map(AnalyticsResponse::from).collect();
74+
Ok((analytics, connection))
75+
}
8276
Err(e) => Err((e, connection)),
83-
}
77+
},
78+
Err(e) => Err((e, connection)),
8479
}
85-
})
86-
.await;
80+
}
81+
})
82+
.await
8783
}
8884

89-
9085
fn get_time_frame(timeframe: &str) -> (i64, i64) {
9186
let minute = 60 * 1000;
9287
let hour = 60 * minute;

sentry/src/lib.rs

Lines changed: 72 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ pub mod routes {
3131
pub mod cfg;
3232
pub mod channel;
3333
pub mod validator_message;
34-
pub mod analytics;
3534
}
3635

3736
pub mod access;
@@ -119,7 +118,7 @@ impl<A: Adapter + 'static> Application<A> {
119118
None => Default::default(),
120119
};
121120

122-
let mut req = match auth::for_request(req, &self.adapter, self.redis.clone()).await {
121+
let req = match auth::for_request(req, &self.adapter, self.redis.clone()).await {
123122
Ok(req) => req,
124123
Err(error) => {
125124
error!(&self.logger, "{}", &error; "module" => "middleware-auth");
@@ -156,73 +155,7 @@ impl<A: Adapter + 'static> Application<A> {
156155
(route, _) if route.starts_with("/analytics") => analytics_router(req, &self).await,
157156
// This is important becuase it prevents us from doing
158157
// expensive regex matching for routes without /channel
159-
(path, method) if path.starts_with("/channel") => {
160-
// example with
161-
// @TODO remove later
162-
// regex matching for routes with params
163-
if let (Some(caps), &Method::GET) =
164-
(LAST_APPROVED_BY_CHANNEL_ID.captures(path), method)
165-
{
166-
let param = RouteParams(vec![caps
167-
.get(1)
168-
.map_or("".to_string(), |m| m.as_str().to_string())]);
169-
req.extensions_mut().insert(param);
170-
171-
// example with middleware
172-
// @TODO remove later
173-
let req = match chain(req, &self, vec![config_middleware]).await {
174-
Ok(req) => req,
175-
Err(error) => {
176-
return map_response_error(error);
177-
}
178-
};
179-
180-
last_approved(req, &self).await
181-
} else if let (Some(caps), &Method::GET) =
182-
(CHANNEL_STATUS_BY_CHANNEL_ID.captures(path), method)
183-
{
184-
let param = RouteParams(vec![caps
185-
.get(1)
186-
.map_or("".to_string(), |m| m.as_str().to_string())]);
187-
req.extensions_mut().insert(param);
188-
189-
let req = match chain(req, &self, vec![channel_load]).await {
190-
Ok(req) => req,
191-
Err(error) => {
192-
return map_response_error(error);
193-
}
194-
};
195-
196-
channel_status(req, &self).await
197-
} else if let (Some(caps), &Method::GET) =
198-
(CHANNEL_VALIDATOR_MESSAGES.captures(path), method)
199-
{
200-
let param = RouteParams(vec![caps
201-
.get(1)
202-
.map_or("".to_string(), |m| m.as_str().to_string())]);
203-
req.extensions_mut().insert(param);
204-
205-
let req = match chain(req, &self, vec![channel_load]).await {
206-
Ok(req) => req,
207-
Err(error) => {
208-
return map_response_error(error);
209-
}
210-
};
211-
212-
// @TODO: Move this to a middleware?!
213-
let extract_params =
214-
match extract_params(caps.get(2).map_or("", |m| m.as_str())) {
215-
Ok(params) => params,
216-
Err(error) => {
217-
return map_response_error(error.into());
218-
}
219-
};
220-
221-
list_validator_messages(req, &self, &extract_params.0, &extract_params.1).await
222-
} else {
223-
Err(ResponseError::NotFound)
224-
}
225-
}
158+
(path, _) if path.starts_with("/channel") => channels_router(req, &self).await,
226159
_ => Err(ResponseError::NotFound),
227160
}
228161
.unwrap_or_else(map_response_error);
@@ -266,6 +199,76 @@ async fn analytics_router<A: Adapter>(
266199
}
267200
}
268201

202+
async fn channels_router<A: Adapter>(
203+
mut req: Request<Body>,
204+
app: &Application<A>,
205+
) -> Result<Response<Body>, ResponseError> {
206+
let (path, method) = (req.uri().path().to_owned(), req.method());
207+
208+
// example with
209+
// @TODO remove later
210+
// regex matching for routes with params
211+
if let (Some(caps), &Method::GET) = (LAST_APPROVED_BY_CHANNEL_ID.captures(&path), method) {
212+
let param = RouteParams(vec![caps
213+
.get(1)
214+
.map_or("".to_string(), |m| m.as_str().to_string())]);
215+
req.extensions_mut().insert(param);
216+
217+
// example with middleware
218+
// @TODO remove later
219+
let req = match chain(req, &app, vec![config_middleware]).await {
220+
Ok(req) => req,
221+
Err(error) => {
222+
return Err(error);
223+
}
224+
};
225+
226+
last_approved(req, &app).await
227+
} else if let (Some(caps), &Method::GET) =
228+
(CHANNEL_STATUS_BY_CHANNEL_ID.captures(&path), method)
229+
{
230+
let param = RouteParams(vec![caps
231+
.get(1)
232+
.map_or("".to_string(), |m| m.as_str().to_string())]);
233+
req.extensions_mut().insert(param);
234+
235+
let req = match chain(req, &app, vec![channel_load]).await {
236+
Ok(req) => req,
237+
Err(error) => {
238+
return Err(error);
239+
}
240+
};
241+
242+
channel_status(req, &app).await
243+
} else if let (Some(caps), &Method::GET) = (CHANNEL_VALIDATOR_MESSAGES.captures(&path), method)
244+
{
245+
let param = RouteParams(vec![caps
246+
.get(1)
247+
.map_or("".to_string(), |m| m.as_str().to_string())]);
248+
249+
req.extensions_mut().insert(param);
250+
251+
let req = match chain(req, &app, vec![channel_load]).await {
252+
Ok(req) => req,
253+
Err(error) => {
254+
return Err(error);
255+
}
256+
};
257+
258+
// @TODO: Move this to a middleware?!
259+
let extract_params = match extract_params(caps.get(2).map_or("", |m| m.as_str())) {
260+
Ok(params) => params,
261+
Err(error) => {
262+
return Err(error.into());
263+
}
264+
};
265+
266+
list_validator_messages(req, &app, &extract_params.0, &extract_params.1).await
267+
} else {
268+
Err(ResponseError::NotFound)
269+
}
270+
}
271+
269272
#[derive(Debug)]
270273
pub enum ResponseError {
271274
NotFound,

sentry/src/routes/analytics.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
1+
use crate::db::analytics::get_analytics;
12
use crate::success_response;
23
use crate::Application;
34
use crate::ResponseError;
45
use crate::RouteParams;
56
use crate::Session;
6-
use bb8_postgres::tokio_postgres::Row;
77
use hyper::{Body, Request, Response};
88
use primitives::adapter::Adapter;
9+
use primitives::analytics::AnalyticsQuery;
910
use redis::aio::MultiplexedConnection;
10-
use serde::{Deserialize, Serialize};
11-
use crate::db::analytics::get_analytics;
12-
1311

1412
pub async fn publisher_analytics<A: Adapter>(
1513
req: Request<Body>,
@@ -61,7 +59,10 @@ pub async fn process_analytics<A: Adapter>(
6159
skip_publisher_filter: bool,
6260
) -> Result<String, ResponseError> {
6361
let query = serde_urlencoded::from_str::<AnalyticsQuery>(&req.uri().query().unwrap_or(""))?;
64-
query.is_valid()?;
62+
query
63+
.is_valid()
64+
.map_err(|e| ResponseError::BadRequest(e.to_string()))?;
65+
6566
let sess = req.extensions().get::<Session>();
6667
let params = req.extensions().get::<RouteParams>();
6768

@@ -71,12 +72,12 @@ pub async fn process_analytics<A: Adapter>(
7172
sess,
7273
&app.pool,
7374
is_advertiser,
74-
skip_publisher_filter
75-
).await?;
75+
skip_publisher_filter,
76+
)
77+
.await?;
7678

7779
serde_json::to_string(&result)
7880
.map_err(|_| ResponseError::BadRequest("error occurred; try again later".to_string()))
79-
8081
}
8182

8283
async fn cache(redis: &MultiplexedConnection, key: String, value: &str, timeframe: i32) {
@@ -90,4 +91,3 @@ async fn cache(redis: &MultiplexedConnection, key: String, value: &str, timefram
9091
println!("{:?}", err);
9192
}
9293
}
93-

0 commit comments

Comments
 (0)