Skip to content

Commit cf2fd1e

Browse files
committed
add: impl /analytics/advanced route
1 parent b4913b0 commit cf2fd1e

File tree

5 files changed

+68
-10
lines changed

5 files changed

+68
-10
lines changed

primitives/src/channel.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,12 @@ pub mod postgres {
285285
accepts!(TEXT, VARCHAR);
286286
}
287287

288+
impl From<&Row> for ChannelId {
289+
fn from(row: &Row) -> Self {
290+
row.get("id")
291+
}
292+
}
293+
288294
impl ToSql for ChannelId {
289295
fn to_sql(
290296
&self,

sentry/src/analytics_recorder.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use primitives::sentry::{ChannelReport, PublisherReport};
55
use primitives::{BigNum, Channel};
66
use redis;
77
use redis::aio::MultiplexedConnection;
8-
98
use slog::{error, Logger};
109

1110
fn get_payout(channel: &Channel, event: &Event) -> BigNum {
@@ -47,10 +46,12 @@ pub async fn record(
4746
ad_slot,
4847
referrer,
4948
} => {
50-
let payout = get_payout(channel, event)
49+
let divisor = BigNum::from(10u64.pow(18));
50+
let pay_amount = get_payout(channel, event)
51+
.div_floor(&divisor)
5152
.to_u64()
5253
.expect("should always have a payout");
53-
let pay_amount = payout / 10u64.pow(18);
54+
// let pay_amount = payout / 10u64.pow(18);
5455

5556
if let Some(ad_unit) = ad_unit {
5657
db.zincr(

sentry/src/db/analytics.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::Session;
44
use bb8::RunError;
55
use chrono::Utc;
66
use primitives::analytics::{AnalyticsQuery, AnalyticsResponse, ANALYTICS_QUERY_LIMIT};
7-
use primitives::sentry::{AdvancedAnalyticsResponse, ChannelReport, Event, PublisherReport};
7+
use primitives::sentry::{AdvancedAnalyticsResponse, ChannelReport, PublisherReport};
88
use primitives::{ChannelId, ValidatorId};
99
use redis;
1010
use redis::aio::MultiplexedConnection;
@@ -23,6 +23,24 @@ pub enum AnalyticsType {
2323
},
2424
}
2525

26+
pub async fn advertiser_channel_ids(
27+
pool: &DbPool,
28+
creator: &ValidatorId
29+
) -> Result<Vec<ChannelId>, RunError<bb8_postgres::tokio_postgres::Error>> {
30+
pool.run(move |connection| async move {
31+
match connection.prepare("SELECT id FROM channels WHERE creator = {}").await {
32+
Ok(stmt) => match connection.query(&stmt, &[creator]).await {
33+
Ok(rows) => {
34+
let channel_ids: Vec<ChannelId> = rows.iter().map(ChannelId::from).collect();
35+
Ok((channel_ids, connection))
36+
},
37+
Err(e) => Err((e, connection)),
38+
},
39+
Err(e) => Err((e, connection)),
40+
}
41+
})
42+
.await
43+
}
2644
pub async fn get_analytics(
2745
query: AnalyticsQuery,
2846
pool: &DbPool,
@@ -146,7 +164,7 @@ async fn stat_pair(
146164

147165
pub async fn get_advanced_reports(
148166
redis: &MultiplexedConnection,
149-
event: &Event,
167+
event_type: &str,
150168
publisher: &ValidatorId,
151169
channel_ids: &[ChannelId],
152170
) -> Result<AdvancedAnalyticsResponse, Box<dyn Error>> {
@@ -166,10 +184,10 @@ pub async fn get_advanced_reports(
166184
"{}:{}:{}:{}",
167185
epoch().floor(),
168186
publisher_report,
169-
event,
187+
event_type,
170188
publisher
171189
),
172-
_ => format!("{}:{}:{}", publisher_report, event, publisher),
190+
_ => format!("{}:{}:{}", publisher_report, event_type, publisher),
173191
};
174192
let result = stat_pair(redis.clone(), &pair).await?;
175193
publisher_stats.insert(publisher_report.clone(), result);
@@ -189,7 +207,7 @@ pub async fn get_advanced_reports(
189207
for channel_report in channel_reports.iter() {
190208
let result = stat_pair(
191209
redis.clone(),
192-
&format!("{}:{}:{}", channel_report, event, channel_id),
210+
&format!("{}:{}:{}", channel_report, event_type, channel_id),
193211
)
194212
.await?;
195213
channel_stat.insert(channel_report.clone(), result);

sentry/src/lib.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use primitives::adapter::Adapter;
1818
use primitives::{Config, ValidatorId};
1919
use redis::aio::MultiplexedConnection;
2020
use regex::Regex;
21-
use routes::analytics::{advertiser_analytics, analytics, publisher_analytics};
21+
use routes::analytics::{advertiser_analytics, analytics, publisher_analytics, advanced_analytics};
2222
use routes::cfg::config;
2323
use routes::channel::{
2424
channel_list, create_channel, create_validator_messages, insert_events, last_approved,
@@ -141,6 +141,16 @@ impl<A: Adapter + 'static> Application<A> {
141141
("/channel/list", &Method::GET) => channel_list(req, &self).await,
142142

143143
("/analytics", &Method::GET) => analytics(req, &self).await,
144+
("/analytics/advanced", &Method::GET) => {
145+
let req = match chain(req, &self, vec![Box::new(auth_required_middleware)]).await {
146+
Ok(req) => req,
147+
Err(error) => {
148+
return map_response_error(error);
149+
}
150+
};
151+
152+
advanced_analytics(req, &self).await
153+
},
144154
("/analytics/for-advertiser", &Method::GET) => {
145155
let req = match chain(req, &self, vec![Box::new(auth_required_middleware)]).await {
146156
Ok(req) => req,

sentry/src/routes/analytics.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::db::analytics::{get_analytics, AnalyticsType};
1+
use crate::db::analytics::{get_analytics, AnalyticsType, advertiser_channel_ids, get_advanced_reports};
22
use crate::success_response;
33
use crate::Application;
44
use crate::ResponseError;
@@ -97,6 +97,29 @@ pub async fn process_analytics<A: Adapter>(
9797
.map_err(|_| ResponseError::BadRequest("error occurred; try again later".to_string()))
9898
}
9999

100+
101+
pub async fn advanced_analytics<A: Adapter>(
102+
req: Request<Body>,
103+
app: &Application<A>,
104+
) -> Result<Response<Body>, ResponseError> {
105+
let sess = req.extensions().get::<Session>().expect("auth is required");
106+
let advertiser_channels = advertiser_channel_ids(&app.pool, &sess.uid).await?;
107+
108+
let event_type = serde_urlencoded::from_str::<String>(&req.uri().query().unwrap_or(""))?;
109+
110+
let response = get_advanced_reports(
111+
&app.redis,
112+
&event_type,
113+
&sess.uid,
114+
&advertiser_channels
115+
)
116+
.await
117+
.map_err(|_| ResponseError::BadRequest("error occurred; try again later".to_string()))?;
118+
119+
Ok(success_response(serde_json::to_string(&response)?))
120+
121+
}
122+
100123
async fn cache(
101124
redis: &MultiplexedConnection,
102125
key: String,

0 commit comments

Comments
 (0)