Skip to content

Commit 0e25509

Browse files
authored
Merge branch 'dev' into remove-list-timeout
2 parents f021ffd + bc09c10 commit 0e25509

File tree

13 files changed

+576
-100
lines changed

13 files changed

+576
-100
lines changed

primitives/src/channel.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use serde::{Deserialize, Deserializer, Serialize};
77
use serde_hex::{SerHex, StrictPfx};
88

99
use crate::big_num::BigNum;
10-
use crate::sentry::Event;
1110
use crate::{AdUnit, EventSubmission, TargetingTag, ValidatorDesc, ValidatorId};
1211
use hex::{FromHex, FromHexError};
1312
use std::ops::Deref;
@@ -161,17 +160,17 @@ pub struct ChannelSpec {
161160
#[serde(rename_all = "camelCase")]
162161
pub struct PriceMultiplicationRules {
163162
#[serde(default, skip_serializing_if = "Option::is_none")]
164-
multiplier: Option<f64>,
163+
pub multiplier: Option<f64>,
165164
#[serde(default, skip_serializing_if = "Option::is_none")]
166-
amount: Option<BigNum>,
165+
pub amount: Option<BigNum>,
167166
#[serde(default, skip_serializing_if = "Option::is_none")]
168-
ev_type: Option<Vec<Event>>,
167+
pub ev_type: Option<Vec<String>>,
169168
#[serde(default, skip_serializing_if = "Option::is_none")]
170-
publisher: Option<Vec<ValidatorId>>,
169+
pub publisher: Option<Vec<ValidatorId>>,
171170
#[serde(default, skip_serializing_if = "Option::is_none")]
172-
os_type: Option<Vec<String>>,
171+
pub os_type: Option<Vec<String>>,
173172
#[serde(default, skip_serializing_if = "Option::is_none")]
174-
country: Option<Vec<String>>,
173+
pub country: Option<Vec<String>>,
175174
}
176175

177176
#[derive(Serialize, Deserialize, Debug, Clone)]

primitives/src/sentry.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub struct HeartbeatValidatorMessage {
3737
}
3838

3939
#[serde(tag = "type", rename_all = "SCREAMING_SNAKE_CASE")]
40-
#[derive(Serialize, Deserialize, Clone, Debug)]
40+
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
4141
pub enum Event {
4242
#[serde(rename_all = "camelCase")]
4343
Impression {
@@ -99,7 +99,7 @@ impl fmt::Display for Event {
9999
}
100100
}
101101

102-
#[derive(Serialize, Deserialize, Clone, Debug)]
102+
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
103103
pub struct Earner {
104104
#[serde(rename = "publisher")]
105105
pub address: String,

sentry/src/access.rs

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use chrono::Utc;
22
use futures::future::try_join_all;
33
use redis::aio::MultiplexedConnection;
44

5-
use crate::Session;
5+
use crate::{Auth, Session};
66
use primitives::event_submission::{RateLimit, Rule};
77
use primitives::sentry::Event;
88
use primitives::Channel;
@@ -38,7 +38,8 @@ impl fmt::Display for Error {
3838
// @TODO: Make pub(crate)
3939
pub async fn check_access(
4040
redis: &MultiplexedConnection,
41-
session: Option<&Session>,
41+
session: &Session,
42+
auth: Option<&Auth>,
4243
rate_limit: &RateLimit,
4344
channel: &Channel,
4445
events: &[Event],
@@ -56,19 +57,22 @@ pub async fn check_access(
5657
return Ok(());
5758
}
5859

59-
let session = session.ok_or_else(|| Error::UnAuthenticated)?;
6060
if current_time > channel.valid_until {
6161
return Err(Error::ChannelIsExpired);
6262
}
6363

64+
let (is_creator, auth_uid) = match auth {
65+
Some(auth) => (auth.uid == channel.creator, auth.uid.to_string()),
66+
None => (false, Default::default()),
67+
};
6468
// We're only sending a CLOSE
6569
// That's allowed for the creator normally, and for everyone during the withdraw period
66-
if has_close_event && session.uid == channel.creator {
70+
if has_close_event && is_creator {
6771
return Ok(());
6872
}
6973

7074
// Only the creator can send a CLOSE
71-
if session.uid != channel.creator && events.iter().any(is_close_event) {
75+
if is_creator && events.iter().any(is_close_event) {
7276
return Err(Error::OnlyCreatorCanCloseChannel);
7377
}
7478

@@ -91,6 +95,7 @@ pub async fn check_access(
9195
rate_limit: Some(rate_limit.clone()),
9296
},
9397
];
98+
9499
// Enforce access limits
95100
let allow_rules = channel
96101
.spec
@@ -103,7 +108,7 @@ pub async fn check_access(
103108
let rules = allow_rules
104109
.iter()
105110
.filter(|r| match &r.uids {
106-
Some(uids) => uids.iter().any(|uid| uid.eq(&session.uid.to_string())),
111+
Some(uids) => uids.iter().any(|uid| uid.eq(&auth_uid)),
107112
None => true,
108113
})
109114
.collect::<Vec<_>>();
@@ -116,7 +121,7 @@ pub async fn check_access(
116121
let apply_all_rules = try_join_all(
117122
rules
118123
.iter()
119-
.map(|rule| apply_rule(redis.clone(), &rule, &events, &channel, &session)),
124+
.map(|rule| apply_rule(redis.clone(), &rule, &events, &channel, &auth_uid, &session)),
120125
);
121126

122127
if let Err(rule_error) = apply_all_rules.await {
@@ -131,16 +136,13 @@ async fn apply_rule(
131136
rule: &Rule,
132137
events: &[Event],
133138
channel: &Channel,
139+
uid: &str,
134140
session: &Session,
135141
) -> Result<(), String> {
136142
match &rule.rate_limit {
137143
Some(rate_limit) => {
138144
let key = if &rate_limit.limit_type == "sid" {
139-
Ok(format!(
140-
"adexRateLimit:{}:{}",
141-
hex::encode(channel.id),
142-
session.uid
143-
))
145+
Ok(format!("adexRateLimit:{}:{}", hex::encode(channel.id), uid))
144146
} else if &rate_limit.limit_type == "ip" {
145147
if events.len() != 1 {
146148
Err("rateLimit: only allows 1 event".to_string())
@@ -256,12 +258,16 @@ mod test {
256258
async fn session_uid_rate_limit() {
257259
let (config, redis) = setup().await;
258260

259-
let session = Session {
261+
let auth = Auth {
260262
era: 0,
261263
uid: IDS["follower"].clone(),
264+
};
265+
266+
let session = Session {
262267
ip: Default::default(),
263268
referrer_header: None,
264269
country: None,
270+
os: None,
265271
};
266272

267273
let rule = Rule {
@@ -276,7 +282,8 @@ mod test {
276282

277283
let response = check_access(
278284
&redis,
279-
Some(&session),
285+
&session,
286+
Some(&auth),
280287
&config.ip_rate_limit,
281288
&channel,
282289
&events,
@@ -285,8 +292,9 @@ mod test {
285292
assert_eq!(Ok(()), response);
286293

287294
let err_response = check_access(
288-
&redis,
289-
Some(&session),
295+
&&redis,
296+
&session,
297+
Some(&auth),
290298
&config.ip_rate_limit,
291299
&channel,
292300
&events,
@@ -304,12 +312,16 @@ mod test {
304312
async fn ip_rate_limit() {
305313
let (config, redis) = setup().await;
306314

307-
let session = Session {
315+
let auth = Auth {
308316
era: 0,
309317
uid: IDS["follower"].clone(),
318+
};
319+
320+
let session = Session {
310321
ip: Default::default(),
311-
country: None,
312322
referrer_header: None,
323+
country: None,
324+
os: None,
313325
};
314326

315327
let rule = Rule {
@@ -323,7 +335,8 @@ mod test {
323335

324336
let err_response = check_access(
325337
&redis,
326-
Some(&session),
338+
&session,
339+
Some(&auth),
327340
&config.ip_rate_limit,
328341
&channel,
329342
&get_impression_events(2),
@@ -339,7 +352,8 @@ mod test {
339352

340353
let response = check_access(
341354
&redis,
342-
Some(&session),
355+
&session,
356+
Some(&auth),
343357
&config.ip_rate_limit,
344358
&channel,
345359
&get_impression_events(1),

sentry/src/analytics_recorder.rs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::epoch;
2+
use crate::payout::get_payout;
23
use crate::Session;
34
use primitives::sentry::Event;
45
use primitives::sentry::{ChannelReport, PublisherReport};
@@ -7,20 +8,6 @@ use redis::aio::MultiplexedConnection;
78
use redis::pipe;
89
use slog::{error, Logger};
910

10-
pub fn get_payout(channel: &Channel, event: &Event) -> BigNum {
11-
match event {
12-
Event::Impression { .. } => channel.spec.min_per_impression.clone(),
13-
Event::Click { .. } => channel
14-
.spec
15-
.pricing_bounds
16-
.as_ref()
17-
.and_then(|pricing_bound| pricing_bound.click.as_ref())
18-
.map(|click| click.min.clone())
19-
.unwrap_or_default(),
20-
_ => Default::default(),
21-
}
22-
}
23-
2411
pub async fn record(
2512
mut conn: MultiplexedConnection,
2613
channel: Channel,
@@ -47,7 +34,7 @@ pub async fn record(
4734
referrer,
4835
} => {
4936
let divisor = BigNum::from(10u64.pow(18));
50-
let pay_amount = get_payout(&channel, event)
37+
let pay_amount = get_payout(&channel, event, &session)
5138
.div_floor(&divisor)
5239
.to_f64()
5340
.expect("should always have a payout");

sentry/src/db/analytics.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::db::DbPool;
22
use crate::epoch;
3-
use crate::Session;
3+
use crate::Auth;
44
use bb8::RunError;
55
use bb8_postgres::tokio_postgres::types::ToSql;
66
use chrono::Utc;
@@ -13,9 +13,9 @@ use std::collections::HashMap;
1313
use std::error::Error;
1414

1515
pub enum AnalyticsType {
16-
Advertiser { session: Session },
16+
Advertiser { auth: Auth },
1717
Global,
18-
Publisher { session: Session },
18+
Publisher { auth: Auth },
1919
}
2020

2121
pub async fn advertiser_channel_ids(
@@ -78,11 +78,11 @@ pub async fn get_analytics(
7878

7979
let mut group_clause = "time".to_string();
8080
let mut select_clause = match analytics_type {
81-
AnalyticsType::Advertiser { session } => {
81+
AnalyticsType::Advertiser { auth } => {
8282
if channel_id.is_none() {
8383
where_clauses.push(format!(
8484
"channel_id IN (SELECT id FROM channels WHERE creator = '{}')",
85-
session.uid
85+
auth.uid
8686
));
8787
}
8888

@@ -99,8 +99,8 @@ pub async fn get_analytics(
9999
metric, interval
100100
)
101101
}
102-
AnalyticsType::Publisher { session } => {
103-
where_clauses.push(format!("earner = '{}'", session.uid));
102+
AnalyticsType::Publisher { auth } => {
103+
where_clauses.push(format!("earner = '{}'", auth.uid));
104104

105105
format!(
106106
"SUM({}::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time",

sentry/src/event_aggregator.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use crate::access::check_access;
22
use crate::access::Error as AccessError;
3-
use crate::analytics_recorder;
43
use crate::db::event_aggregate::insert_event_aggregate;
54
use crate::db::get_channel_by_id;
65
use crate::db::DbPool;
76
use crate::event_reducer;
87
use crate::Application;
98
use crate::ResponseError;
109
use crate::Session;
10+
use crate::{analytics_recorder, Auth};
1111
use async_std::sync::RwLock;
1212
use chrono::Utc;
1313
use lazy_static::lazy_static;
@@ -68,7 +68,8 @@ impl EventAggregator {
6868
&self,
6969
app: &'a Application<A>,
7070
channel_id: &ChannelId,
71-
session: Option<&Session>,
71+
session: &Session,
72+
auth: Option<&Auth>,
7273
events: &'a [Event],
7374
) -> Result<(), ResponseError> {
7475
let recorder = self.recorder.clone();
@@ -127,6 +128,7 @@ impl EventAggregator {
127128
check_access(
128129
&app.redis,
129130
session,
131+
auth,
130132
&app.config.ip_rate_limit,
131133
&record.channel,
132134
events,
@@ -141,13 +143,13 @@ impl EventAggregator {
141143
_ => ResponseError::BadRequest(e.to_string()),
142144
})?;
143145

144-
events
145-
.iter()
146-
.for_each(|ev| event_reducer::reduce(&record.channel, &mut record.aggregate, ev));
146+
events.iter().for_each(|ev| {
147+
event_reducer::reduce(&record.channel, &mut record.aggregate, ev, &session)
148+
});
147149

148150
// only time we don't have session is during
149151
// an unauthenticated close event
150-
if let (true, Some(session)) = (ANALYTICS_RECORDER.is_some(), session) {
152+
if ANALYTICS_RECORDER.is_some() {
151153
tokio::spawn(analytics_recorder::record(
152154
redis.clone(),
153155
record.channel.clone(),

sentry/src/event_reducer.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,27 @@
1-
use crate::analytics_recorder::get_payout;
1+
use crate::payout::get_payout;
2+
use crate::Session;
23
use primitives::sentry::{AggregateEvents, Event, EventAggregate};
34
use primitives::{BigNum, Channel, ValidatorId};
45

56
// @TODO: Remove attribute once we use this function!
67
#[allow(dead_code)]
7-
pub(crate) fn reduce(channel: &Channel, initial_aggr: &mut EventAggregate, ev: &Event) {
8+
pub(crate) fn reduce(
9+
channel: &Channel,
10+
initial_aggr: &mut EventAggregate,
11+
ev: &Event,
12+
session: &Session,
13+
) {
814
match ev {
915
Event::Impression { publisher, .. } => {
1016
let impression = initial_aggr.events.get("IMPRESSION");
11-
let payout = get_payout(&channel, &ev);
17+
let payout = get_payout(&channel, &ev, session);
1218
let merge = merge_impression_ev(impression, &publisher, &payout);
1319

1420
initial_aggr.events.insert("IMPRESSION".to_owned(), merge);
1521
}
1622
Event::Click { publisher, .. } => {
1723
let clicks = initial_aggr.events.get("CLICK");
18-
let payout = get_payout(&channel, &ev);
24+
let payout = get_payout(&channel, &ev, session);
1925
let merge = merge_impression_ev(clicks, &publisher, &payout);
2026

2127
initial_aggr.events.insert("CLICK".to_owned(), merge);
@@ -85,8 +91,15 @@ mod test {
8591
referrer: None,
8692
};
8793

94+
let session = Session {
95+
ip: Default::default(),
96+
country: None,
97+
referrer_header: None,
98+
os: None,
99+
};
100+
88101
for _ in 0..101 {
89-
reduce(&channel, &mut event_aggr, &event);
102+
reduce(&channel, &mut event_aggr, &event, &session);
90103
}
91104

92105
assert_eq!(event_aggr.channel_id, channel.id);

0 commit comments

Comments
 (0)