Skip to content

Commit 80cde9c

Browse files
committed
fix: analytics db query
1 parent afcf824 commit 80cde9c

File tree

4 files changed

+31
-17
lines changed

4 files changed

+31
-17
lines changed

primitives/src/analytics.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
use crate::DomainError;
22
use serde::{Deserialize, Serialize};
3+
use crate::ChannelId;
34

45
pub const ANALYTICS_QUERY_LIMIT: u32 = 200;
56

67
#[derive(Debug, Serialize, Deserialize)]
8+
#[serde(rename_all = "camelCase")]
79
pub struct AnalyticsData {
810
pub time: f64,
911
pub value: String,
12+
#[serde(default, skip_serializing_if = "Option::is_none")]
13+
pub channel_id: Option<ChannelId>
1014
}
1115

1216
#[derive(Debug, Serialize, Deserialize)]
1317
pub struct AnalyticsResponse {
1418
pub aggr: Vec<AnalyticsData>,
15-
pub limit: u32
19+
pub limit: u32,
1620
}
1721

1822
#[cfg(feature = "postgres")]
@@ -25,6 +29,7 @@ pub mod postgres {
2529
Self {
2630
time: row.get("time"),
2731
value: row.get("value"),
32+
channel_id: row.try_get("channel_id").ok(),
2833
}
2934
}
3035
}

sentry/src/db/analytics.rs

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub async fn advertiser_channel_ids(
2727
) -> Result<Vec<ChannelId>, RunError<bb8_postgres::tokio_postgres::Error>> {
2828
pool.run(move |connection| async move {
2929
match connection
30-
.prepare("SELECT id FROM channels WHERE creator = {}")
30+
.prepare("SELECT id FROM channels WHERE creator = $1")
3131
.await
3232
{
3333
Ok(stmt) => match connection.query(&stmt, &[creator]).await {
@@ -81,7 +81,7 @@ pub async fn get_analytics(
8181
));
8282

8383
format!(
84-
"SUM({}::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates",
84+
"SUM({}::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time",
8585
query.metric, interval
8686
)
8787
}
@@ -99,7 +99,7 @@ pub async fn get_analytics(
9999
where_clauses.push("earner IS NULL".to_string());
100100

101101
format!(
102-
"SUM({}::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates",
102+
"SUM({}::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time",
103103
query.metric, interval
104104
)
105105
}
@@ -119,26 +119,20 @@ pub async fn get_analytics(
119119
session.uid
120120
));
121121

122-
123-
// where_clauses.push(format!(
124-
// "events->'{}'->'{}'->'{}' IS NOT NULL",
125-
// query.event_type, query.metric, session.uid
126-
// ));
127-
128122
format!(
129-
"SUM({}::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time from event_aggregates",
123+
"SUM({}::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time",
130124
query.metric, interval
131125
)
132126
}
133127
};
134128

135129
if segment_by_channel {
136-
select_clause = format!("{}, channelId", select_clause);
137-
group_clause = format!("{}, channelId", group_clause);
130+
select_clause = format!("{}, channel_id", select_clause);
131+
group_clause = format!("{}, channel_id", group_clause);
138132
}
139133

140134
let sql_query = format!(
141-
"SELECT {} WHERE {} GROUP BY {} LIMIT {}",
135+
"SELECT {} FROM event_aggregates WHERE {} GROUP BY {} LIMIT {}",
142136
select_clause,
143137
where_clauses.join(" AND "),
144138
group_clause,
@@ -208,6 +202,7 @@ pub async fn get_advanced_reports(
208202
publisher: &ValidatorId,
209203
channel_ids: &[ChannelId],
210204
) -> Result<AdvancedAnalyticsResponse, Box<dyn Error>> {
205+
println!("get advnaces");
211206
let publisher_reports = [
212207
PublisherReport::ReportPublisherToAdUnit,
213208
PublisherReport::ReportPublisherToAdSlot,
@@ -216,7 +211,7 @@ pub async fn get_advanced_reports(
216211
PublisherReport::ReportPublisherToHostname,
217212
];
218213

219-
let mut publisher_stats = HashMap::new();
214+
let mut publisher_stats: HashMap<PublisherReport, HashMap<String, f64>> = HashMap::new();
220215

221216
for publisher_report in publisher_reports.iter() {
222217
let pair = match publisher_report {
@@ -255,6 +250,7 @@ pub async fn get_advanced_reports(
255250

256251
by_channel_stats.insert(channel_id.to_owned(), channel_stat);
257252
}
253+
258254

259255
Ok(AdvancedAnalyticsResponse {
260256
publisher_stats,

sentry/src/lib.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ lazy_static! {
5757
static ref CHANNEL_EVENTS_AGGREGATES: Regex = Regex::new(r"^/channel/0x([a-zA-Z0-9]{64})/events-aggregates/?$").expect("The regex should be valid");
5858
static ref ANALYTICS_BY_CHANNEL_ID: Regex = Regex::new(r"^/analytics/0x([a-zA-Z0-9]{64})/?$").expect("The regex should be valid");
5959
static ref ADVERTISER_ANALYTICS_BY_CHANNEL_ID: Regex = Regex::new(r"^/analytics/for-advertiser/0x([a-zA-Z0-9]{64})/?$").expect("The regex should be valid");
60+
static ref PUBLISHER_ANALYTICS_BY_CHANNEL_ID: Regex = Regex::new(r"^/analytics/for-publisher/0x([a-zA-Z0-9]{64})/?$").expect("The regex should be valid");
6061
static ref CREATE_EVENTS_BY_CHANNEL_ID: Regex = Regex::new(r"^/channel/0x([a-zA-Z0-9]{64})/events(/.*)?$").expect("The regex should be valid");
6162

6263
}
@@ -216,6 +217,18 @@ async fn analytics_router<A: Adapter + 'static>(
216217
]).await?;
217218

218219
advertiser_analytics(req, app).await
220+
} else if let Some(caps) = PUBLISHER_ANALYTICS_BY_CHANNEL_ID.captures(route) {
221+
let param = RouteParams(vec![caps
222+
.get(1)
223+
.map_or("".to_string(), |m| m.as_str().to_string())]);
224+
req.extensions_mut().insert(param);
225+
226+
let req = chain(req, app, vec![
227+
Box::new(auth_required_middleware),
228+
Box::new(get_channel_id)
229+
]).await?;
230+
231+
publisher_analytics(req, app).await
219232
} else {
220233
Err(ResponseError::NotFound)
221234
}

sentry/src/routes/analytics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,9 @@ pub async fn advanced_analytics<A: Adapter>(
115115
let sess = req.extensions().get::<Session>().expect("auth is required");
116116
let advertiser_channels = advertiser_channel_ids(&app.pool, &sess.uid).await?;
117117

118-
let event_type = serde_urlencoded::from_str::<String>(&req.uri().query().unwrap_or(""))?;
118+
let query = serde_urlencoded::from_str::<AnalyticsQuery>(&req.uri().query().unwrap_or(""))?;
119119

120-
let response = get_advanced_reports(&app.redis, &event_type, &sess.uid, &advertiser_channels)
120+
let response = get_advanced_reports(&app.redis, &query.event_type, &sess.uid, &advertiser_channels)
121121
.await
122122
.map_err(|_| ResponseError::BadRequest("error occurred; try again later".to_string()))?;
123123

0 commit comments

Comments
 (0)