Skip to content

Commit ac8cc6e

Browse files
authored
create event definitions for new events from traces (#965)
1 parent aad5b6e commit ac8cc6e

File tree

5 files changed

+98
-4
lines changed

5 files changed

+98
-4
lines changed

app-server/src/cache/keys.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ pub const PROJECT_CACHE_KEY: &str = "project";
77
pub const WORKSPACE_LIMITS_CACHE_KEY: &str = "workspace_limits";
88
pub const PROJECT_EVALUATORS_BY_PATH_CACHE_KEY: &str = "project_evaluators_by_path";
99
pub const SUMMARY_TRIGGER_SPANS_CACHE_KEY: &str = "summary_trigger_spans";
10-
10+
pub const PROJECT_EVENT_NAMES_CACHE_KEY: &str = "project_event_names";
1111
pub const WORKSPACE_BYTES_USAGE_CACHE_KEY: &str = "workspace_bytes_usage";
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use anyhow::Result;
2+
use sqlx::PgPool;
3+
use uuid::Uuid;
4+
5+
pub async fn get_event_definition_names(pool: &PgPool, project_id: &Uuid) -> Result<Vec<String>> {
6+
let event_definitions =
7+
sqlx::query_scalar::<_, String>("SELECT name FROM event_definitions WHERE project_id = $1")
8+
.bind(project_id)
9+
.fetch_all(pool)
10+
.await?;
11+
12+
Ok(event_definitions)
13+
}
14+
15+
pub async fn insert_event_definition_names(
16+
pool: &PgPool,
17+
project_id: &Uuid,
18+
names: &Vec<String>,
19+
) -> Result<()> {
20+
sqlx::query(
21+
"INSERT INTO event_definitions (name, project_id)
22+
SELECT UNNEST($1::text[]) AS name, $2 AS project_id
23+
ON CONFLICT (name, project_id) DO NOTHING
24+
",
25+
)
26+
.bind(names)
27+
.bind(project_id)
28+
.execute(pool)
29+
.await?;
30+
31+
Ok(())
32+
}

app-server/src/db/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub mod agent_messages;
55
pub mod datasets;
66
pub mod evaluations;
77
pub mod evaluators;
8+
pub mod event_definitions;
89
pub mod events;
910
pub mod prices;
1011
pub mod project_api_keys;

app-server/src/traces/consumer.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,15 @@ async fn process_batch(
361361
log::error!("Failed to ack MQ delivery (batch): {:?}", e);
362362
});
363363

364-
let total_events_ingested_bytes = match record_events(clickhouse.clone(), &all_events).await {
364+
let total_events_ingested_bytes = match record_events(
365+
cache.clone(),
366+
db.clone(),
367+
project_id,
368+
clickhouse.clone(),
369+
&all_events,
370+
)
371+
.await
372+
{
365373
Ok(bytes) => bytes,
366374
Err(e) => {
367375
log::error!("Failed to record events: {:?}", e);

app-server/src/traces/events.rs

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,71 @@
1+
use std::collections::HashSet;
2+
use std::sync::Arc;
3+
14
use anyhow::Result;
5+
use uuid::Uuid;
26

37
use crate::{
8+
cache::{Cache, CacheTrait, keys::PROJECT_EVENT_NAMES_CACHE_KEY},
49
ch::{self, events::CHEvent},
5-
db::events::Event,
10+
db::{self, DB, events::Event},
611
};
712

8-
///
913
pub async fn record_events(
14+
cache: Arc<Cache>,
15+
db: Arc<DB>,
16+
project_id: Uuid,
1017
clickhouse: clickhouse::Client,
1118
event_payloads: &Vec<Event>,
1219
) -> Result<usize> {
1320
let ch_events = event_payloads
1421
.iter()
1522
.map(|e| CHEvent::from_db_event(e))
1623
.collect::<Vec<CHEvent>>();
24+
let event_names = ch_events
25+
.iter()
26+
.map(|e| e.name.clone())
27+
.collect::<Vec<String>>();
28+
tokio::spawn(async move {
29+
let _ = insert_event_definition_names(db.clone(), cache.clone(), &project_id, event_names)
30+
.await
31+
.map_err(|e| log::error!("Failed to insert event definition names: {:?}", e));
32+
});
1733
ch::events::insert_events(clickhouse, ch_events).await
1834
}
35+
36+
async fn insert_event_definition_names(
37+
db: Arc<DB>,
38+
cache: Arc<Cache>,
39+
project_id: &Uuid,
40+
names: Vec<String>,
41+
) -> Result<()> {
42+
let cache_key = format!("{PROJECT_EVENT_NAMES_CACHE_KEY}:{}", project_id);
43+
let cached_names = cache.get::<Vec<String>>(&cache_key).await;
44+
match cached_names {
45+
Ok(Some(cached_names)) => {
46+
let cached_names_set = HashSet::<String>::from_iter(cached_names);
47+
let names_set = HashSet::from_iter(names);
48+
if cached_names_set.is_superset(&names_set) {
49+
return Ok(());
50+
}
51+
let new_names = cached_names_set
52+
.union(&names_set)
53+
.cloned()
54+
.collect::<Vec<String>>();
55+
// Found in cache, but this event name is new, insert it into the database and cache
56+
db::event_definitions::insert_event_definition_names(&db.pool, project_id, &new_names)
57+
.await?;
58+
cache.insert(&cache_key, new_names).await?;
59+
Ok(())
60+
}
61+
Err(_) | Ok(None) => {
62+
// Not found in cache, insert it into the database, update the cache
63+
db::event_definitions::insert_event_definition_names(&db.pool, project_id, &names)
64+
.await?;
65+
let new_names =
66+
db::event_definitions::get_event_definition_names(&db.pool, project_id).await?;
67+
cache.insert(&cache_key, new_names).await?;
68+
Ok(())
69+
}
70+
}
71+
}

0 commit comments

Comments
 (0)