Skip to content

Commit b0947b7

Browse files
committed
add: spawn advanced analytics recorder
1 parent 97a2e78 commit b0947b7

File tree

6 files changed

+37
-31
lines changed

6 files changed

+37
-31
lines changed

primitives/src/sentry.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub struct ApproveStateValidatorMessage {
3030
}
3131

3232
#[serde(tag = "type", rename_all = "SCREAMING_SNAKE_CASE")]
33-
#[derive(Serialize, Deserialize)]
33+
#[derive(Serialize, Deserialize, Clone)]
3434
pub enum Event {
3535
#[serde(rename_all = "camelCase")]
3636
Impression {
@@ -92,7 +92,7 @@ impl fmt::Display for Event {
9292
}
9393
}
9494

95-
#[derive(Serialize, Deserialize)]
95+
#[derive(Serialize, Deserialize, Clone)]
9696
pub struct Earner {
9797
#[serde(rename = "publisher")]
9898
pub address: String,

sentry/src/analytics_recorder.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ fn get_payout(channel: &Channel, event: &Event) -> BigNum {
2323

2424
pub async fn record(
2525
mut conn: MultiplexedConnection,
26-
channel: &Channel,
27-
session: &Session,
28-
events: &[Event],
29-
logger: &Logger,
26+
channel: Channel,
27+
session: Session,
28+
events: Vec<Event>,
29+
logger: Logger,
3030
) {
3131
let mut db = redis::pipe();
3232

@@ -47,7 +47,7 @@ pub async fn record(
4747
referrer,
4848
} => {
4949
let divisor = BigNum::from(10u64.pow(18));
50-
let pay_amount = get_payout(channel, event)
50+
let pay_amount = get_payout(&channel, event)
5151
.div_floor(&divisor)
5252
.to_u64()
5353
.expect("should always have a payout");

sentry/src/db/analytics.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,18 @@ pub enum AnalyticsType {
2525

2626
pub async fn advertiser_channel_ids(
2727
pool: &DbPool,
28-
creator: &ValidatorId
28+
creator: &ValidatorId,
2929
) -> Result<Vec<ChannelId>, RunError<bb8_postgres::tokio_postgres::Error>> {
3030
pool.run(move |connection| async move {
31-
match connection.prepare("SELECT id FROM channels WHERE creator = {}").await {
31+
match connection
32+
.prepare("SELECT id FROM channels WHERE creator = {}")
33+
.await
34+
{
3235
Ok(stmt) => match connection.query(&stmt, &[creator]).await {
3336
Ok(rows) => {
3437
let channel_ids: Vec<ChannelId> = rows.iter().map(ChannelId::from).collect();
3538
Ok((channel_ids, connection))
36-
},
39+
}
3740
Err(e) => Err((e, connection)),
3841
},
3942
Err(e) => Err((e, connection)),

sentry/src/event_aggregator.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::access::check_access;
2+
use crate::analytics_recorder;
23
use crate::db::event_aggregate::insert_event_aggregate;
34
use crate::db::get_channel_by_id;
45
use crate::db::DbPool;
@@ -8,17 +9,16 @@ use crate::ResponseError;
89
use crate::Session;
910
use async_std::sync::RwLock;
1011
use chrono::Utc;
12+
use lazy_static::lazy_static;
1113
use primitives::adapter::Adapter;
1214
use primitives::sentry::{Event, EventAggregate};
1315
use primitives::{Channel, ChannelId};
1416
use slog::{error, Logger};
1517
use std::collections::HashMap;
18+
use std::env;
1619
use std::sync::Arc;
1720
use std::time::Duration;
1821
use tokio::time::delay_for;
19-
use lazy_static::lazy_static;
20-
use std::env;
21-
use crate::analytics_recorder;
2222

2323
lazy_static! {
2424
pub static ref ANALYTICS_RECORDER: String =
@@ -74,6 +74,7 @@ impl EventAggregator {
7474
let recorder = self.recorder.clone();
7575
let aggr_throttle = app.config.aggr_throttle;
7676
let dbpool = app.pool.clone();
77+
let redis = app.redis.clone();
7778
let logger = app.logger.clone();
7879

7980
let mut channel_recorder = self.recorder.write().await;
@@ -135,14 +136,21 @@ impl EventAggregator {
135136
return Err(ResponseError::BadRequest(e.to_string()));
136137
}
137138

138-
if ANALYTICS_RECORDER.ne(&"false".to_string()) {
139-
analytics_recorder::record(app.redis.clone(), &record.channel, &session, &events, &app.logger).await
140-
}
141-
142139
events
143140
.iter()
144141
.for_each(|ev| event_reducer::reduce(&record.channel, &mut record.aggregate, ev));
145142

143+
if ANALYTICS_RECORDER.ne(&"false".to_string()) {
144+
let logger = app.logger.clone();
145+
tokio::spawn(analytics_recorder::record(
146+
redis.clone(),
147+
record.channel.clone(),
148+
session.clone(),
149+
events.to_owned().to_vec(),
150+
logger,
151+
));
152+
}
153+
146154
// drop write access to RwLock
147155
// this is required to prevent a deadlock in store
148156
drop(channel_recorder);

sentry/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use primitives::adapter::Adapter;
1818
use primitives::{Config, ValidatorId};
1919
use redis::aio::MultiplexedConnection;
2020
use regex::Regex;
21-
use routes::analytics::{advertiser_analytics, analytics, publisher_analytics, advanced_analytics};
21+
use routes::analytics::{advanced_analytics, advertiser_analytics, analytics, publisher_analytics};
2222
use routes::cfg::config;
2323
use routes::channel::{
2424
channel_list, create_channel, create_validator_messages, insert_events, last_approved,
@@ -141,7 +141,7 @@ impl<A: Adapter + 'static> Application<A> {
141141
("/channel/list", &Method::GET) => channel_list(req, &self).await,
142142

143143
("/analytics", &Method::GET) => analytics(req, &self).await,
144-
("/analytics/advanced", &Method::GET) => {
144+
("/analytics/advanced", &Method::GET) => {
145145
let req = match chain(req, &self, vec![Box::new(auth_required_middleware)]).await {
146146
Ok(req) => req,
147147
Err(error) => {
@@ -150,7 +150,7 @@ impl<A: Adapter + 'static> Application<A> {
150150
};
151151

152152
advanced_analytics(req, &self).await
153-
},
153+
}
154154
("/analytics/for-advertiser", &Method::GET) => {
155155
let req = match chain(req, &self, vec![Box::new(auth_required_middleware)]).await {
156156
Ok(req) => req,

sentry/src/routes/analytics.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::db::analytics::{get_analytics, AnalyticsType, advertiser_channel_ids, get_advanced_reports};
1+
use crate::db::analytics::{
2+
advertiser_channel_ids, get_advanced_reports, get_analytics, AnalyticsType,
3+
};
24
use crate::success_response;
35
use crate::Application;
46
use crate::ResponseError;
@@ -97,7 +99,6 @@ pub async fn process_analytics<A: Adapter>(
9799
.map_err(|_| ResponseError::BadRequest("error occurred; try again later".to_string()))
98100
}
99101

100-
101102
pub async fn advanced_analytics<A: Adapter>(
102103
req: Request<Body>,
103104
app: &Application<A>,
@@ -107,17 +108,11 @@ pub async fn advanced_analytics<A: Adapter>(
107108

108109
let event_type = serde_urlencoded::from_str::<String>(&req.uri().query().unwrap_or(""))?;
109110

110-
let response = get_advanced_reports(
111-
&app.redis,
112-
&event_type,
113-
&sess.uid,
114-
&advertiser_channels
115-
)
116-
.await
117-
.map_err(|_| ResponseError::BadRequest("error occurred; try again later".to_string()))?;
111+
let response = get_advanced_reports(&app.redis, &event_type, &sess.uid, &advertiser_channels)
112+
.await
113+
.map_err(|_| ResponseError::BadRequest("error occurred; try again later".to_string()))?;
118114

119115
Ok(success_response(serde_json::to_string(&response)?))
120-
121116
}
122117

123118
async fn cache(

0 commit comments

Comments
 (0)