@@ -5,6 +5,7 @@ use bb8::RunError;
5
5
use chrono:: Utc ;
6
6
use primitives:: analytics:: { AnalyticsData , AnalyticsQuery , ANALYTICS_QUERY_LIMIT } ;
7
7
use primitives:: sentry:: { AdvancedAnalyticsResponse , ChannelReport , PublisherReport } ;
8
+ use bb8_postgres:: tokio_postgres:: types:: { ToSql } ;
8
9
use primitives:: { ChannelId , ValidatorId } ;
9
10
use redis;
10
11
use redis:: aio:: MultiplexedConnection ;
@@ -40,18 +41,29 @@ pub async fn advertiser_channel_ids(
40
41
}
41
42
42
43
pub async fn get_analytics (
43
- query : AnalyticsQuery ,
44
+ mut query : AnalyticsQuery ,
44
45
pool : & DbPool ,
45
46
analytics_type : AnalyticsType ,
46
47
segment_by_channel : bool ,
47
48
channel_id : Option < & ChannelId > ,
48
49
) -> Result < Vec < AnalyticsData > , RunError < bb8_postgres:: tokio_postgres:: Error > > {
50
+ // converts metric to column
51
+ query. metric_to_column ( ) ;
52
+
53
+ let mut params = Vec :: < & ( dyn ToSql + Sync ) > :: new ( ) ;
49
54
let applied_limit = query. limit . min ( ANALYTICS_QUERY_LIMIT ) ;
50
55
let ( interval, period) = get_time_frame ( & query. timeframe ) ;
51
56
let time_limit = Utc :: now ( ) . timestamp ( ) - period;
52
57
53
58
let mut where_clauses = vec ! [ format!( "created > to_timestamp({})" , time_limit) ] ;
54
59
60
+ params. push ( & query. event_type ) ;
61
+
62
+ where_clauses. extend ( vec ! [
63
+ format!( "event_type = ${}" , params. len( ) ) ,
64
+ format!( "{} IS NOT NULL" , query. metric)
65
+ ] ) ;
66
+
55
67
if let Some ( id) = channel_id {
56
68
where_clauses. push ( format ! ( "channel_id = '{}'" , id) ) ;
57
69
}
@@ -60,26 +72,20 @@ pub async fn get_analytics(
60
72
let mut select_clause = match analytics_type {
61
73
AnalyticsType :: Advertiser { session } => {
62
74
if channel_id. is_none ( ) {
63
- where_clauses. push ( format ! (
64
- "channel_id IN (SELECT id FROM channels WHERE creator = '{}')" ,
65
- session. uid
66
- ) ) ;
75
+ where_clauses. push (
76
+ format ! (
77
+ "channel_id IN (SELECT id FROM channels WHERE creator = '{}')" ,
78
+ session. uid
79
+ )
80
+ ) ;
67
81
}
68
82
69
- where_clauses. push ( format ! ( "event_type = '{}'" , query. event_type) ) ;
70
-
71
- where_clauses. push ( format ! ( "{} IS NOT NULL" , query. metric) ) ;
72
-
73
83
format ! (
74
84
"SUM({}::numeric)::varchar as value, (extract(epoch from created) - (MOD( CAST (extract(epoch from created) AS NUMERIC), {}))) as time" ,
75
85
query. metric, interval
76
86
)
77
87
}
78
88
AnalyticsType :: Global => {
79
- where_clauses. push ( format ! ( "event_type = '{}'" , query. event_type) ) ;
80
-
81
- where_clauses. push ( format ! ( "{} IS NOT NULL" , query. metric) ) ;
82
-
83
89
where_clauses. push ( "earner IS NULL" . to_string ( ) ) ;
84
90
85
91
format ! (
@@ -88,10 +94,6 @@ pub async fn get_analytics(
88
94
)
89
95
}
90
96
AnalyticsType :: Publisher { session } => {
91
- where_clauses. push ( format ! ( "event_type = '{}'" , query. event_type) ) ;
92
-
93
- where_clauses. push ( format ! ( "{} IS NOT NULL" , query. metric) ) ;
94
-
95
97
where_clauses. push ( format ! ( "earner = '{}'" , session. uid) ) ;
96
98
97
99
format ! (
@@ -114,12 +116,10 @@ pub async fn get_analytics(
114
116
applied_limit,
115
117
) ;
116
118
117
- println ! ( "{}" , sql_query) ;
118
-
119
119
// execute query
120
120
pool. run ( move |connection| async move {
121
121
match connection. prepare ( & sql_query) . await {
122
- Ok ( stmt) => match connection. query ( & stmt, & [ ] ) . await {
122
+ Ok ( stmt) => match connection. query ( & stmt, & params ) . await {
123
123
Ok ( rows) => {
124
124
let analytics: Vec < AnalyticsData > =
125
125
rows. iter ( ) . map ( AnalyticsData :: from) . collect ( ) ;
0 commit comments