|
| 1 | +use crate::db::DbPool; |
| 2 | +use bb8::RunError; |
| 3 | +use bb8_postgres::tokio_postgres::types::ToSql; |
| 4 | +use chrono::{DateTime, Utc}; |
| 5 | +use primitives::sentry::EventAggregate; |
| 6 | +use primitives::ValidatorId; |
| 7 | + |
| 8 | +pub async fn list_event_aggregates( |
| 9 | + pool: &DbPool, |
| 10 | + limit: u32, |
| 11 | + from: &Option<ValidatorId>, |
| 12 | + after: &Option<DateTime<Utc>>, |
| 13 | +) -> Result<Vec<EventAggregate>, RunError<bb8_postgres::tokio_postgres::Error>> { |
| 14 | + let (mut where_clauses, mut params) = (vec![], Vec::<&(dyn ToSql + Sync)>::new()); |
| 15 | + if let Some(from) = from { |
| 16 | + let key_counts = format!( |
| 17 | + "events->'IMPRESSION'->'eventPayouts'->'{}'", |
| 18 | + from.to_string() |
| 19 | + ); |
| 20 | + where_clauses.push(format!("{} IS NOT NULL", key_counts)); |
| 21 | + } |
| 22 | + if let Some(after) = after { |
| 23 | + params.push(after); |
| 24 | + where_clauses.push(format!("created > {}", params.len())); |
| 25 | + } |
| 26 | + |
| 27 | + let event_aggregates = pool |
| 28 | + .run(move |connection| { |
| 29 | + async move { |
| 30 | + let where_clause = if !where_clauses.is_empty() { |
| 31 | + format!("WHERE {}", where_clauses.join(" AND ")) |
| 32 | + } else { |
| 33 | + "".to_string() |
| 34 | + }; |
| 35 | + let statement = format!("SELECT channel_id, created, events FROM event_aggregates {} ORDER BY created DESC LIMIT {}", where_clause, limit); |
| 36 | + match connection.prepare(&statement).await { |
| 37 | + Ok(stmt) => { |
| 38 | + match connection.query(&stmt, params.as_slice()).await { |
| 39 | + Ok(rows) => { |
| 40 | + let event_aggregates = rows.iter().map(EventAggregate::from).collect(); |
| 41 | + |
| 42 | + Ok((event_aggregates, connection)) |
| 43 | + }, |
| 44 | + Err(e) => Err((e, connection)), |
| 45 | + } |
| 46 | + }, |
| 47 | + Err(e) => Err((e, connection)), |
| 48 | + } |
| 49 | + } |
| 50 | + }) |
| 51 | + .await?; |
| 52 | + |
| 53 | + Ok(event_aggregates) |
| 54 | +} |
0 commit comments