Skip to content

Commit d5edb78

Browse files
committed
add: aggr click event, pub get_payout
1 parent db1b128 commit d5edb78

File tree

5 files changed

+22
-14
lines changed

5 files changed

+22
-14
lines changed

primitives/src/sentry.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub struct HeartbeatValidatorMessage {
3737
}
3838

3939
#[serde(tag = "type", rename_all = "SCREAMING_SNAKE_CASE")]
40-
#[derive(Serialize, Deserialize, Clone)]
40+
#[derive(Serialize, Deserialize, Clone, Debug)]
4141
pub enum Event {
4242
#[serde(rename_all = "camelCase")]
4343
Impression {
@@ -99,7 +99,7 @@ impl fmt::Display for Event {
9999
}
100100
}
101101

102-
#[derive(Serialize, Deserialize, Clone)]
102+
#[derive(Serialize, Deserialize, Clone, Debug)]
103103
pub struct Earner {
104104
#[serde(rename = "publisher")]
105105
pub address: String,

sentry/src/analytics_recorder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use redis;
77
use redis::aio::MultiplexedConnection;
88
use slog::{error, Logger};
99

10-
fn get_payout(channel: &Channel, event: &Event) -> BigNum {
10+
pub fn get_payout(channel: &Channel, event: &Event) -> BigNum {
1111
match event {
1212
Event::Impression { .. } => channel.spec.min_per_impression.clone(),
1313
Event::Click { .. } => {

sentry/src/db/analytics.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ pub async fn get_analytics(
114114
applied_limit,
115115
);
116116

117+
println!("{}", sql_query );
118+
117119
// execute query
118120
pool.run(move |connection| async move {
119121
match connection.prepare(&sql_query).await {

sentry/src/db/event_aggregate.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,16 +158,13 @@ struct EventData {
158158
earner: Option<ValidatorId>,
159159
event_count: BigNum,
160160
event_payout: BigNum,
161-
created: DateTime<Utc>,
162161
}
163162

164163
pub async fn insert_event_aggregate(
165164
pool: &DbPool,
166165
channel_id: &ChannelId,
167166
event: &EventAggregate,
168167
) -> Result<bool, RunError<bb8_postgres::tokio_postgres::Error>> {
169-
let created = Utc::now();
170-
171168
let mut data: Vec<EventData> = Vec::new();
172169

173170
for (event_type, aggr) in &event.events {
@@ -183,7 +180,6 @@ pub async fn insert_event_aggregate(
183180
earner: Some(earner.clone()),
184181
event_count: event_count.to_owned(),
185182
event_payout: event_payout.clone(),
186-
created,
187183
});
188184

189185
// total sum
@@ -197,7 +193,6 @@ pub async fn insert_event_aggregate(
197193
earner: None,
198194
event_count: total_event_counts,
199195
event_payout: total_event_payouts,
200-
created,
201196
});
202197
}
203198
}
@@ -211,10 +206,12 @@ pub async fn insert_event_aggregate(
211206
Err(e) => return Err((e, connection))
212207
};
213208

209+
let created = Utc::now(); // time discrepancy
210+
214211
let writer = BinaryCopyInWriter::new(sink, &[Type::VARCHAR, Type::TIMESTAMPTZ, Type::VARCHAR, Type::VARCHAR, Type::VARCHAR, Type::VARCHAR]);
215212
pin_mut!(writer);
216213
for item in data {
217-
if let Err(e) = writer.as_mut().write(&[&item.id, &item.created, &item.event_type, &item.event_count, &item.event_payout, &item.earner]).await {
214+
if let Err(e) = writer.as_mut().write(&[&item.id, &created, &item.event_type, &item.event_count, &item.event_payout, &item.earner]).await {
218215
err = Some(e);
219216
break;
220217
}

sentry/src/event_reducer.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,26 @@
11
use primitives::sentry::{AggregateEvents, Event, EventAggregate};
2-
use primitives::{Channel, ValidatorId};
2+
use primitives::{Channel, ValidatorId, BigNum};
3+
use crate::analytics_recorder::get_payout;
34

45
// @TODO: Remove attribute once we use this function!
56
#[allow(dead_code)]
67
pub(crate) fn reduce(channel: &Channel, initial_aggr: &mut EventAggregate, ev: &Event) {
78
match ev {
89
Event::Impression { publisher, .. } => {
910
let impression = initial_aggr.events.get("IMPRESSION");
10-
11-
let merge = merge_impression_ev(impression, &publisher, &channel);
11+
let payout = get_payout(&channel, &ev);
12+
let merge = merge_impression_ev(impression, &publisher, &payout);
1213

1314
initial_aggr.events.insert("IMPRESSION".to_owned(), merge);
1415
}
16+
Event::Click { publisher, .. } => {
17+
let clicks = initial_aggr.events.get("CLICK");
18+
let payout = get_payout(&channel, &ev);
19+
let merge = merge_impression_ev(clicks, &publisher, &payout);
20+
21+
initial_aggr.events.insert("CLICK".to_owned(), merge);
22+
23+
}
1524
Event::Close => {
1625
let creator = channel.creator.clone();
1726
let close_event = AggregateEvents {
@@ -29,7 +38,7 @@ pub(crate) fn reduce(channel: &Channel, initial_aggr: &mut EventAggregate, ev: &
2938
fn merge_impression_ev(
3039
impression: Option<&AggregateEvents>,
3140
earner: &ValidatorId,
32-
channel: &Channel,
41+
payout: &BigNum
3342
) -> AggregateEvents {
3443
let mut impression = impression.map(Clone::clone).unwrap_or_default();
3544

@@ -45,7 +54,7 @@ fn merge_impression_ev(
4554
.event_payouts
4655
.entry(earner.clone())
4756
.or_insert_with(|| 0.into());
48-
*event_payouts += &channel.spec.min_per_impression;
57+
*event_payouts += payout;
4958

5059
impression
5160
}

0 commit comments

Comments
 (0)