Skip to content

Commit de1da60

Browse files
committed
sentry - dummy event_aggregate route
sentry - `Session` and fix `Channel.creator` usage
1 parent 441ad60 commit de1da60

File tree

6 files changed

+109
-23
lines changed

6 files changed

+109
-23
lines changed

sentry/src/access.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use redis::aio::MultiplexedConnection;
55
use primitives::event_submission::{RateLimit, Rule};
66
use primitives::sentry::Event;
77
use primitives::Channel;
8+
use std::cmp::PartialEq;
89

910
use crate::Session;
1011

@@ -54,7 +55,7 @@ pub async fn check_access(
5455

5556
let default_rules = [
5657
Rule {
57-
uids: Some(vec![channel.creator.clone()]),
58+
uids: Some(vec![channel.creator.to_string()]),
5859
rate_limit: None,
5960
},
6061
Rule {
@@ -74,7 +75,7 @@ pub async fn check_access(
7475
let rules = allow_rules
7576
.iter()
7677
.filter(|r| match &r.uids {
77-
Some(uids) => uids.iter().any(|uid| &session.uid == uid),
78+
Some(uids) => uids.iter().any(|uid| uid.eq(&session.uid.to_string())),
7879
None => true,
7980
})
8081
.collect::<Vec<_>>();
@@ -107,16 +108,11 @@ async fn apply_rule(
107108
match &rule.rate_limit {
108109
Some(rate_limit) => {
109110
let key = if &rate_limit.limit_type == "sid" {
110-
// @TODO: Is this really necessary?
111-
if session.uid.is_empty() {
112-
Err("rateLimit: unauthenticated request".to_string())
113-
} else {
114-
Ok(format!(
115-
"adexRateLimit:{}:{}",
116-
hex::encode(channel.id),
117-
session.uid
118-
))
119-
}
111+
Ok(format!(
112+
"adexRateLimit:{}:{}",
113+
hex::encode(channel.id),
114+
session.uid
115+
))
120116
} else if &rate_limit.limit_type == "ip" {
121117
if events.len() != 1 {
122118
Err("rateLimit: only allows 1 event".to_string())
@@ -163,7 +159,7 @@ mod test {
163159
use primitives::config::configuration;
164160
use primitives::event_submission::{RateLimit, Rule};
165161
use primitives::sentry::Event;
166-
use primitives::util::tests::prep_db::DUMMY_CHANNEL;
162+
use primitives::util::tests::prep_db::{DUMMY_CHANNEL, IDS};
167163
use primitives::{Channel, Config, EventSubmission};
168164

169165
use crate::db::redis_connection;
@@ -207,7 +203,7 @@ mod test {
207203

208204
let session = Session {
209205
era: 0,
210-
uid: "response".to_string(),
206+
uid: IDS["follower"].clone(),
211207
ip: Default::default(),
212208
};
213209

@@ -241,7 +237,7 @@ mod test {
241237

242238
let session = Session {
243239
era: 0,
244-
uid: "response".to_string(),
240+
uid: IDS["follower"].clone(),
245241
ip: Default::default(),
246242
};
247243

sentry/src/event_reducer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pub(crate) fn reduce(channel: &Channel, initial_aggr: &mut EventAggregate, ev: &
1313
initial_aggr.events.insert("IMPRESSION".to_owned(), merge);
1414
}
1515
Event::Close => {
16-
let creator = channel.creator.clone();
16+
let creator = channel.creator.to_string();
1717
let close_event = AggregateEvents {
1818
event_counts: Some(vec![(creator.clone(), 1.into())].into_iter().collect()),
1919
event_payouts: vec![(creator, channel.deposit_amount.clone())]
@@ -36,14 +36,14 @@ fn merge_impression_ev(
3636
let event_count = impression
3737
.event_counts
3838
.get_or_insert_with(Default::default)
39-
.entry(earner.into())
39+
.entry(earner.to_string())
4040
.or_insert_with(|| 0.into());
4141

4242
*event_count += &1.into();
4343

4444
let event_payouts = impression
4545
.event_payouts
46-
.entry(earner.into())
46+
.entry(earner.to_string())
4747
.or_insert_with(|| 0.into());
4848
*event_payouts += &channel.spec.min_per_impression;
4949

sentry/src/lib.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ use crate::middleware::auth;
77
use crate::middleware::channel::channel_load;
88
use crate::middleware::cors::{cors, Cors};
99
use crate::routes::channel::channel_status;
10+
use crate::routes::event_aggregate::list_channel_event_aggregates;
1011
use crate::routes::validator_message::{extract_params, list_validator_messages};
1112
use hyper::{Body, Method, Request, Response, StatusCode};
1213
use lazy_static::lazy_static;
1314
use primitives::adapter::Adapter;
14-
use primitives::Config;
15+
use primitives::{Config, ValidatorId};
1516
use redis::aio::MultiplexedConnection;
1617
use regex::Regex;
1718
use routes::cfg::config;
@@ -23,11 +24,13 @@ pub mod middleware {
2324
pub mod auth;
2425
pub mod channel;
2526
pub mod cors;
27+
pub mod event_aggregate;
2628
}
2729

2830
pub mod routes {
2931
pub mod cfg;
3032
pub mod channel;
33+
pub mod event_aggregate;
3134
pub mod validator_message;
3235
}
3336

@@ -43,6 +46,7 @@ lazy_static! {
4346
static ref CHANNEL_STATUS_BY_CHANNEL_ID: Regex = Regex::new(r"^/channel/0x([a-zA-Z0-9]{64})/status/?$").expect("The regex should be valid");
4447
// Only the initial Regex to be matched.
4548
static ref CHANNEL_VALIDATOR_MESSAGES: Regex = Regex::new(r"^/channel/0x([a-zA-Z0-9]{64})/validator-messages(/.*)?$").expect("The regex should be valid");
49+
static ref CHANNEL_EVENT_AGGREGATES: Regex = Regex::new(r"^/channel/0x([a-zA-Z0-9]{64})/event-aggregates(/|/0x[a-zA-Z0-9]{40}/?)?$").expect("The regex should be valid");
4650
// @TODO define other regex routes
4751
}
4852

@@ -180,6 +184,29 @@ impl<A: Adapter + 'static> Application<A> {
180184
};
181185

182186
list_validator_messages(req, &self, &extract_params.0, &extract_params.1).await
187+
} else if let (Some(caps), &Method::GET) =
188+
(CHANNEL_EVENT_AGGREGATES.captures(path), method)
189+
{
190+
if req.extensions().get::<Session>().is_none() {
191+
return map_response_error(ResponseError::Unauthorized);
192+
}
193+
194+
let param = RouteParams(vec![
195+
caps.get(1)
196+
.map_or("".to_string(), |m| m.as_str().to_string()),
197+
caps.get(2)
198+
.map_or("".to_string(), |m| m.as_str().trim_matches('/').to_string()),
199+
]);
200+
req.extensions_mut().insert(param);
201+
202+
let req = match chain(req, &self, vec![channel_load]).await {
203+
Ok(req) => req,
204+
Err(error) => {
205+
return map_response_error(error);
206+
}
207+
};
208+
209+
list_channel_event_aggregates(req, &self).await
183210
} else {
184211
Err(ResponseError::NotFound)
185212
}
@@ -198,6 +225,7 @@ impl<A: Adapter + 'static> Application<A> {
198225
pub enum ResponseError {
199226
NotFound,
200227
BadRequest(String),
228+
Unauthorized,
201229
}
202230

203231
impl<T> From<T> for ResponseError
@@ -215,6 +243,7 @@ pub fn map_response_error(error: ResponseError) -> Response<Body> {
215243
match error {
216244
ResponseError::NotFound => not_found(),
217245
ResponseError::BadRequest(e) => bad_response(e),
246+
ResponseError::Unauthorized => unauthorized(),
218247
}
219248
}
220249

@@ -242,6 +271,13 @@ pub fn bad_response(response_body: String) -> Response<Body> {
242271
response
243272
}
244273

274+
pub fn unauthorized() -> Response<Body> {
275+
let mut response = Response::new(Body::from("Unauthorized"));
276+
let status = response.status_mut();
277+
*status = StatusCode::UNAUTHORIZED;
278+
response
279+
}
280+
245281
pub fn success_response(response_body: String) -> Response<Body> {
246282
let body = Body::from(response_body);
247283

@@ -260,6 +296,6 @@ pub fn success_response(response_body: String) -> Response<Body> {
260296
#[derive(Debug, Clone)]
261297
pub struct Session {
262298
pub era: i64,
263-
pub uid: String,
299+
pub uid: ValidatorId,
264300
pub ip: Option<String>,
265301
}

sentry/src/middleware/auth.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ pub(crate) async fn for_request(
6060

6161
let session = Session {
6262
era: adapter_session.era,
63-
uid: adapter_session.uid.to_hex_non_prefix_string(),
63+
uid: adapter_session.uid,
6464
ip: get_request_ip(&req),
6565
};
6666

@@ -165,8 +165,8 @@ mod test {
165165
.extensions()
166166
.get::<Session>()
167167
.expect("There should be a Session set inside the request");
168-
let leader_id = IDS["leader"].to_hex_non_prefix_string();
169-
assert_eq!(leader_id, session.uid);
168+
169+
assert_eq!(IDS["leader"], session.uid);
170170
assert!(session.ip.is_none());
171171
}
172172
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use crate::{Application, ResponseError, RouteParams};
2+
use hyper::{Body, Request};
3+
use primitives::adapter::Adapter;
4+
use primitives::sentry::EarnerAddress;
5+
6+
/// channel_load & channel_if_exist
7+
pub async fn earner_load<A: Adapter>(
8+
mut req: Request<Body>,
9+
_app: &Application<A>,
10+
) -> Result<Request<Body>, ResponseError> {
11+
let earner = req
12+
.extensions()
13+
.get::<RouteParams>()
14+
.ok_or_else(|| ResponseError::BadRequest("Route params not found".to_string()))?
15+
.get(1)
16+
.ok_or_else(|| ResponseError::BadRequest("No earner param".to_string()))?;
17+
18+
let earner_option: Option<EarnerAddress> = if earner.is_empty() {
19+
None
20+
} else {
21+
Some(
22+
serde_json::from_str(&earner)
23+
.map_err(|_| ResponseError::BadRequest("Invalid earner param".to_string()))?,
24+
)
25+
};
26+
27+
req.extensions_mut().insert(earner_option);
28+
29+
Ok(req)
30+
}

sentry/src/routes/event_aggregate.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use crate::{Application, ResponseError, Session};
2+
use hyper::{Body, Request, Response};
3+
use primitives::adapter::Adapter;
4+
use primitives::Channel;
5+
6+
pub async fn list_channel_event_aggregates<A: Adapter>(
7+
req: Request<Body>,
8+
_app: &Application<A>,
9+
) -> Result<Response<Body>, ResponseError> {
10+
let channel = req
11+
.extensions()
12+
.get::<Channel>()
13+
.expect("Request should have Channel");
14+
15+
// TODO: Auth required middleware
16+
let session = req
17+
.extensions()
18+
.get::<Session>()
19+
.ok_or_else(|| ResponseError::Unauthorized)?;
20+
21+
let _is_superuser = channel.spec.validators.find(&session.uid).is_some();
22+
23+
unimplemented!("Still need to finish it")
24+
}

0 commit comments

Comments
 (0)