Skip to content

Commit d107bd5

Browse files
committed
add: AIP#6 impl
1 parent 2fb3645 commit d107bd5

File tree

13 files changed

+284
-97
lines changed

13 files changed

+284
-97
lines changed

primitives/src/channel.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,17 +161,17 @@ pub struct ChannelSpec {
161161
#[serde(rename_all = "camelCase")]
162162
pub struct PriceMultiplicationRules {
163163
#[serde(default, skip_serializing_if = "Option::is_none")]
164-
multiplier: Option<f64>,
164+
pub multiplier: Option<f64>,
165165
#[serde(default, skip_serializing_if = "Option::is_none")]
166-
amount: Option<BigNum>,
166+
pub amount: Option<BigNum>,
167167
#[serde(default, skip_serializing_if = "Option::is_none")]
168-
ev_type: Option<Vec<Event>>,
168+
pub ev_type: Option<Vec<Event>>,
169169
#[serde(default, skip_serializing_if = "Option::is_none")]
170-
publisher: Option<Vec<ValidatorId>>,
170+
pub publisher: Option<Vec<ValidatorId>>,
171171
#[serde(default, skip_serializing_if = "Option::is_none")]
172-
os_type: Option<Vec<String>>,
172+
pub os_type: Option<Vec<String>>,
173173
#[serde(default, skip_serializing_if = "Option::is_none")]
174-
country: Option<Vec<String>>,
174+
pub country: Option<Vec<String>>,
175175
}
176176

177177
#[derive(Serialize, Deserialize, Debug, Clone)]

primitives/src/sentry.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub struct HeartbeatValidatorMessage {
3737
}
3838

3939
#[serde(tag = "type", rename_all = "SCREAMING_SNAKE_CASE")]
40-
#[derive(Serialize, Deserialize, Clone, Debug)]
40+
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
4141
pub enum Event {
4242
#[serde(rename_all = "camelCase")]
4343
Impression {
@@ -99,7 +99,7 @@ impl fmt::Display for Event {
9999
}
100100
}
101101

102-
#[derive(Serialize, Deserialize, Clone, Debug)]
102+
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
103103
pub struct Earner {
104104
#[serde(rename = "publisher")]
105105
pub address: String,

sentry/src/access.rs

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use chrono::Utc;
22
use futures::future::try_join_all;
33
use redis::aio::MultiplexedConnection;
44

5-
use crate::Session;
5+
use crate::{AuthSession, Session};
66
use primitives::event_submission::{RateLimit, Rule};
77
use primitives::sentry::Event;
88
use primitives::Channel;
@@ -38,7 +38,8 @@ impl fmt::Display for Error {
3838
// @TODO: Make pub(crate)
3939
pub async fn check_access(
4040
redis: &MultiplexedConnection,
41-
session: Option<&Session>,
41+
auth_session: Option<&AuthSession>,
42+
session: &Session,
4243
rate_limit: &RateLimit,
4344
channel: &Channel,
4445
events: &[Event],
@@ -56,19 +57,22 @@ pub async fn check_access(
5657
return Ok(());
5758
}
5859

59-
let session = session.ok_or_else(|| Error::UnAuthenticated)?;
6060
if current_time > channel.valid_until {
6161
return Err(Error::ChannelIsExpired);
6262
}
6363

64+
let (is_creator, auth_uid) = match auth_session.as_ref() {
65+
Some(auth) => (auth.uid == channel.creator, auth.uid.to_string()),
66+
None => (false, Default::default()),
67+
};
6468
// We're only sending a CLOSE
6569
// That's allowed for the creator normally, and for everyone during the withdraw period
66-
if has_close_event && session.uid == channel.creator {
70+
if has_close_event && is_creator {
6771
return Ok(());
6872
}
6973

7074
// Only the creator can send a CLOSE
71-
if session.uid != channel.creator && events.iter().any(is_close_event) {
75+
if is_creator && events.iter().any(is_close_event) {
7276
return Err(Error::OnlyCreatorCanCloseChannel);
7377
}
7478

@@ -91,6 +95,7 @@ pub async fn check_access(
9195
rate_limit: Some(rate_limit.clone()),
9296
},
9397
];
98+
9499
// Enforce access limits
95100
let allow_rules = channel
96101
.spec
@@ -103,7 +108,7 @@ pub async fn check_access(
103108
let rules = allow_rules
104109
.iter()
105110
.filter(|r| match &r.uids {
106-
Some(uids) => uids.iter().any(|uid| uid.eq(&session.uid.to_string())),
111+
Some(uids) => uids.iter().any(|uid| uid.eq(&auth_uid)),
107112
None => true,
108113
})
109114
.collect::<Vec<_>>();
@@ -116,7 +121,7 @@ pub async fn check_access(
116121
let apply_all_rules = try_join_all(
117122
rules
118123
.iter()
119-
.map(|rule| apply_rule(redis.clone(), &rule, &events, &channel, &session)),
124+
.map(|rule| apply_rule(redis.clone(), &rule, &events, &channel, &auth_uid, &session)),
120125
);
121126

122127
if let Err(rule_error) = apply_all_rules.await {
@@ -131,16 +136,13 @@ async fn apply_rule(
131136
rule: &Rule,
132137
events: &[Event],
133138
channel: &Channel,
139+
uid: &str,
134140
session: &Session,
135141
) -> Result<(), String> {
136142
match &rule.rate_limit {
137143
Some(rate_limit) => {
138144
let key = if &rate_limit.limit_type == "sid" {
139-
Ok(format!(
140-
"adexRateLimit:{}:{}",
141-
hex::encode(channel.id),
142-
session.uid
143-
))
145+
Ok(format!("adexRateLimit:{}:{}", hex::encode(channel.id), uid))
144146
} else if &rate_limit.limit_type == "ip" {
145147
if events.len() != 1 {
146148
Err("rateLimit: only allows 1 event".to_string())
@@ -257,11 +259,16 @@ mod test {
257259
let (config, redis) = setup().await;
258260

259261
let session = Session {
260-
era: 0,
261-
uid: IDS["follower"].clone(),
262262
ip: Default::default(),
263263
referrer_header: None,
264264
country: None,
265+
os: None,
266+
};
267+
268+
let auth = AuthSession {
269+
era: 0,
270+
uid: IDS["follower"].clone(),
271+
session: session.clone(),
265272
};
266273

267274
let rule = Rule {
@@ -276,7 +283,8 @@ mod test {
276283

277284
let response = check_access(
278285
&redis,
279-
Some(&session),
286+
Some(&auth),
287+
&session,
280288
&config.ip_rate_limit,
281289
&channel,
282290
&events,
@@ -285,8 +293,9 @@ mod test {
285293
assert_eq!(Ok(()), response);
286294

287295
let err_response = check_access(
288-
&redis,
289-
Some(&session),
296+
&&redis,
297+
Some(&auth),
298+
&session,
290299
&config.ip_rate_limit,
291300
&channel,
292301
&events,
@@ -305,11 +314,16 @@ mod test {
305314
let (config, redis) = setup().await;
306315

307316
let session = Session {
308-
era: 0,
309-
uid: IDS["follower"].clone(),
310317
ip: Default::default(),
311-
country: None,
312318
referrer_header: None,
319+
country: None,
320+
os: None,
321+
};
322+
323+
let auth = AuthSession {
324+
era: 0,
325+
uid: IDS["follower"].clone(),
326+
session: session.clone(),
313327
};
314328

315329
let rule = Rule {
@@ -323,7 +337,8 @@ mod test {
323337

324338
let err_response = check_access(
325339
&redis,
326-
Some(&session),
340+
Some(&auth),
341+
&session,
327342
&config.ip_rate_limit,
328343
&channel,
329344
&get_impression_events(2),
@@ -339,7 +354,8 @@ mod test {
339354

340355
let response = check_access(
341356
&redis,
342-
Some(&session),
357+
Some(&auth),
358+
&session,
343359
&config.ip_rate_limit,
344360
&channel,
345361
&get_impression_events(1),

sentry/src/analytics_recorder.rs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,7 @@ use primitives::{BigNum, Channel};
66
use redis::aio::MultiplexedConnection;
77
use redis::pipe;
88
use slog::{error, Logger};
9-
10-
pub fn get_payout(channel: &Channel, event: &Event) -> BigNum {
11-
match event {
12-
Event::Impression { .. } => channel.spec.min_per_impression.clone(),
13-
Event::Click { .. } => channel
14-
.spec
15-
.pricing_bounds
16-
.as_ref()
17-
.and_then(|pricing_bound| pricing_bound.click.as_ref())
18-
.map(|click| click.min.clone())
19-
.unwrap_or_default(),
20-
_ => Default::default(),
21-
}
22-
}
9+
use crate::payout::get_payout;
2310

2411
pub async fn record(
2512
mut conn: MultiplexedConnection,
@@ -47,7 +34,7 @@ pub async fn record(
4734
referrer,
4835
} => {
4936
let divisor = BigNum::from(10u64.pow(18));
50-
let pay_amount = get_payout(&channel, event)
37+
let pay_amount = get_payout(&channel, event, &session)
5138
.div_floor(&divisor)
5239
.to_f64()
5340
.expect("should always have a payout");

sentry/src/db/analytics.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::db::DbPool;
22
use crate::epoch;
3-
use crate::Session;
3+
use crate::AuthSession;
44
use bb8::RunError;
55
use bb8_postgres::tokio_postgres::types::ToSql;
66
use chrono::Utc;
@@ -13,9 +13,9 @@ use std::collections::HashMap;
1313
use std::error::Error;
1414

1515
pub enum AnalyticsType {
16-
Advertiser { session: Session },
16+
Advertiser { auth: AuthSession },
1717
Global,
18-
Publisher { session: Session },
18+
Publisher { auth: AuthSession },
1919
}
2020

2121
pub async fn advertiser_channel_ids(
@@ -78,11 +78,11 @@ pub async fn get_analytics(
7878

7979
let mut group_clause = "time".to_string();
8080
let mut select_clause = match analytics_type {
81-
AnalyticsType::Advertiser { session } => {
81+
AnalyticsType::Advertiser { auth } => {
8282
if channel_id.is_none() {
8383
where_clauses.push(format!(
8484
"channel_id IN (SELECT id FROM channels WHERE creator = '{}')",
85-
session.uid
85+
auth.uid
8686
));
8787
}
8888

@@ -99,8 +99,8 @@ pub async fn get_analytics(
9999
metric, interval
100100
)
101101
}
102-
AnalyticsType::Publisher { session } => {
103-
where_clauses.push(format!("earner = '{}'", session.uid));
102+
AnalyticsType::Publisher { auth } => {
103+
where_clauses.push(format!("earner = '{}'", auth.uid));
104104

105105
format!(
106106
"SUM({}::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time",

sentry/src/event_aggregator.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::db::DbPool;
77
use crate::event_reducer;
88
use crate::Application;
99
use crate::ResponseError;
10-
use crate::Session;
10+
use crate::{AuthSession, Session};
1111
use async_std::sync::RwLock;
1212
use chrono::Utc;
1313
use lazy_static::lazy_static;
@@ -68,7 +68,8 @@ impl EventAggregator {
6868
&self,
6969
app: &'a Application<A>,
7070
channel_id: &ChannelId,
71-
session: Option<&Session>,
71+
auth_session: Option<&AuthSession>,
72+
session: &Session,
7273
events: &'a [Event],
7374
) -> Result<(), ResponseError> {
7475
let recorder = self.recorder.clone();
@@ -126,6 +127,7 @@ impl EventAggregator {
126127

127128
check_access(
128129
&app.redis,
130+
auth_session,
129131
session,
130132
&app.config.ip_rate_limit,
131133
&record.channel,
@@ -141,13 +143,13 @@ impl EventAggregator {
141143
_ => ResponseError::BadRequest(e.to_string()),
142144
})?;
143145

144-
events
145-
.iter()
146-
.for_each(|ev| event_reducer::reduce(&record.channel, &mut record.aggregate, ev));
146+
events.iter().for_each(|ev| {
147+
event_reducer::reduce(&record.channel, &mut record.aggregate, ev, &session)
148+
});
147149

148150
// only time we don't have session is during
149151
// an unauthenticated close event
150-
if let (true, Some(session)) = (ANALYTICS_RECORDER.is_some(), session) {
152+
if ANALYTICS_RECORDER.is_some() {
151153
tokio::spawn(analytics_recorder::record(
152154
redis.clone(),
153155
record.channel.clone(),

sentry/src/event_reducer.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,27 @@
1-
use crate::analytics_recorder::get_payout;
1+
use crate::payout::get_payout;
2+
use crate::Session;
23
use primitives::sentry::{AggregateEvents, Event, EventAggregate};
34
use primitives::{BigNum, Channel, ValidatorId};
45

56
// @TODO: Remove attribute once we use this function!
67
#[allow(dead_code)]
7-
pub(crate) fn reduce(channel: &Channel, initial_aggr: &mut EventAggregate, ev: &Event) {
8+
pub(crate) fn reduce(
9+
channel: &Channel,
10+
initial_aggr: &mut EventAggregate,
11+
ev: &Event,
12+
session: &Session,
13+
) {
814
match ev {
915
Event::Impression { publisher, .. } => {
1016
let impression = initial_aggr.events.get("IMPRESSION");
11-
let payout = get_payout(&channel, &ev);
17+
let payout = get_payout(&channel, &ev, session);
1218
let merge = merge_impression_ev(impression, &publisher, &payout);
1319

1420
initial_aggr.events.insert("IMPRESSION".to_owned(), merge);
1521
}
1622
Event::Click { publisher, .. } => {
1723
let clicks = initial_aggr.events.get("CLICK");
18-
let payout = get_payout(&channel, &ev);
24+
let payout = get_payout(&channel, &ev, session);
1925
let merge = merge_impression_ev(clicks, &publisher, &payout);
2026

2127
initial_aggr.events.insert("CLICK".to_owned(), merge);
@@ -85,8 +91,15 @@ mod test {
8591
referrer: None,
8692
};
8793

94+
let session = Session {
95+
ip: Default::default(),
96+
country: None,
97+
referrer_header: None,
98+
os: None,
99+
};
100+
88101
for _ in 0..101 {
89-
reduce(&channel, &mut event_aggr, &event);
102+
reduce(&channel, &mut event_aggr, &event, &session);
90103
}
91104

92105
assert_eq!(event_aggr.channel_id, channel.id);

0 commit comments

Comments
 (0)