Skip to content

Commit 4a392d3

Browse files
authored
only write tags to clickhouse (#903)
* only write tags to clickhouse * fix build
1 parent 5eefca1 commit 4a392d3

File tree

25 files changed

+4122
-1596
lines changed

25 files changed

+4122
-1596
lines changed

app-server/src/api/v1/evaluators.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use actix_web::{
24
HttpResponse, post,
35
web::{Data, Json},
@@ -12,9 +14,10 @@ use crate::{
1214
DB,
1315
evaluators::{EvaluatorScoreSource, insert_evaluator_score},
1416
project_api_keys::ProjectApiKey,
15-
spans::{get_root_span_id, is_span_in_project},
1617
},
18+
query_engine::QueryEngine,
1719
routes::types::ResponseResult,
20+
sql::{self, ClickhouseReadonlyClient},
1821
};
1922

2023
#[derive(Deserialize)]
@@ -55,9 +58,14 @@ pub async fn create_evaluator_score(
5558
req: Json<CreateEvaluatorScoreRequest>,
5659
db: Data<DB>,
5760
clickhouse: Data<clickhouse::Client>,
61+
clickhouse_ro: Data<Option<Arc<ClickhouseReadonlyClient>>>,
62+
query_engine: Data<Arc<QueryEngine>>,
5863
project_api_key: ProjectApiKey,
5964
) -> ResponseResult {
6065
let req = req.into_inner();
66+
let clickhouse_ro = clickhouse_ro.as_ref().clone().unwrap();
67+
let query_engine = query_engine.as_ref().clone();
68+
let clickhouse = clickhouse.as_ref().clone();
6169

6270
// Extract common fields from both variants
6371
let (name, metadata, score, source) = match &req {
@@ -85,11 +93,21 @@ pub async fn create_evaluator_score(
8593

8694
let span_id = match &req {
8795
CreateEvaluatorScoreRequest::WithTraceId(req) => {
88-
get_root_span_id(&db.pool, &req.trace_id, &project_api_key.project_id).await?
96+
sql::queries::get_top_span_id(
97+
clickhouse_ro,
98+
query_engine,
99+
req.trace_id,
100+
project_api_key.project_id,
101+
)
102+
.await?
89103
}
90104
CreateEvaluatorScoreRequest::WithSpanId(req) => {
91-
let exists =
92-
is_span_in_project(&db.pool, &req.span_id, &project_api_key.project_id).await?;
105+
let exists = crate::ch::spans::is_span_in_project(
106+
clickhouse.clone(),
107+
req.span_id,
108+
project_api_key.project_id,
109+
)
110+
.await?;
93111
if !exists {
94112
return Ok(HttpResponse::NotFound().body("No matching spans found"));
95113
}
@@ -118,7 +136,7 @@ pub async fn create_evaluator_score(
118136
.await?;
119137

120138
let _ = insert_evaluator_score_ch(
121-
clickhouse.into_inner().as_ref().clone(),
139+
clickhouse.clone(),
122140
score_id,
123141
project_id,
124142
name,

app-server/src/api/v1/tag.rs

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
use std::sync::Arc;
2+
13
use crate::{
24
ch::spans::append_tags_to_span,
3-
db::{self, DB, project_api_keys::ProjectApiKey, tags::TagSource},
5+
db::{DB, project_api_keys::ProjectApiKey, tags::TagSource},
6+
query_engine::QueryEngine,
47
routes::types::ResponseResult,
5-
tags::insert_or_update_tag,
8+
sql::{self, ClickhouseReadonlyClient},
9+
tags::create_tag,
610
};
711
use actix_web::{
812
HttpResponse, post,
@@ -39,6 +43,8 @@ pub async fn tag_trace(
3943
req: Json<TagRequest>,
4044
db: web::Data<DB>,
4145
clickhouse: web::Data<clickhouse::Client>,
46+
clickhouse_ro: web::Data<Option<Arc<ClickhouseReadonlyClient>>>,
47+
query_engine: web::Data<Arc<QueryEngine>>,
4248
project_api_key: ProjectApiKey,
4349
) -> ResponseResult {
4450
let req = req.into_inner();
@@ -49,15 +55,27 @@ pub async fn tag_trace(
4955
if names.is_empty() {
5056
return Ok(HttpResponse::BadRequest().body("No names provided"));
5157
}
58+
let clickhouse_ro = clickhouse_ro.as_ref().clone().unwrap();
59+
let query_engine = query_engine.as_ref().clone();
60+
let clickhouse = clickhouse.as_ref().clone();
61+
5262
let span_id = match &req {
5363
TagRequest::WithTraceId(req) => {
54-
db::spans::get_root_span_id(&db.pool, &req.trace_id, &project_api_key.project_id)
55-
.await?
64+
sql::queries::get_top_span_id(
65+
clickhouse_ro,
66+
query_engine,
67+
req.trace_id,
68+
project_api_key.project_id,
69+
)
70+
.await?
5671
}
5772
TagRequest::WithSpanId(req) => {
58-
let exists =
59-
db::spans::is_span_in_project(&db.pool, &req.span_id, &project_api_key.project_id)
60-
.await?;
73+
let exists = crate::ch::spans::is_span_in_project(
74+
clickhouse.clone(),
75+
req.span_id,
76+
project_api_key.project_id,
77+
)
78+
.await?;
6179
if !exists {
6280
return Ok(HttpResponse::NotFound().body("No matching spans found"));
6381
}
@@ -69,26 +87,21 @@ pub async fn tag_trace(
6987
return Ok(HttpResponse::NotFound().body("No matching spans found"));
7088
};
7189

72-
let clickhouse = clickhouse.as_ref().clone();
73-
7490
let futures = names
7591
.iter()
7692
.map(|name| {
77-
insert_or_update_tag(
93+
create_tag(
7894
&db.pool,
7995
clickhouse.clone(),
8096
project_api_key.project_id,
81-
Uuid::new_v4(),
8297
span_id,
83-
None,
84-
None,
8598
name.clone(),
8699
TagSource::CODE,
87100
)
88101
})
89102
.collect::<Vec<_>>();
90103

91-
let tags = futures_util::future::try_join_all(futures).await?;
104+
let tag_ids = futures_util::future::try_join_all(futures).await?;
92105

93106
append_tags_to_span(
94107
clickhouse.clone(),
@@ -98,14 +111,12 @@ pub async fn tag_trace(
98111
)
99112
.await?;
100113

101-
let response = tags
114+
let response = tag_ids
102115
.iter()
103-
.map(|tag| {
116+
.map(|id| {
104117
serde_json::json!({
105-
"id": tag.id,
106-
"spanId": tag.span_id,
107-
"createdAt": tag.created_at,
108-
"updatedAt": tag.updated_at,
118+
"id": id,
119+
"spanId": span_id,
109120
})
110121
})
111122
.collect::<Vec<_>>();

app-server/src/ch/spans.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,3 +218,18 @@ pub async fn append_tags_to_span(
218218

219219
Ok(())
220220
}
221+
222+
pub async fn is_span_in_project(
223+
clickhouse: clickhouse::Client,
224+
span_id: Uuid,
225+
project_id: Uuid,
226+
) -> Result<bool> {
227+
let result = clickhouse
228+
.query("SELECT count(*) FROM spans WHERE span_id = ? AND project_id = ?")
229+
.bind(span_id)
230+
.bind(project_id)
231+
.fetch_one::<u64>()
232+
.await?;
233+
234+
Ok(result > 0)
235+
}

app-server/src/ch/tags.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,11 @@ pub struct CHTag {
3636
}
3737

3838
impl CHTag {
39-
pub fn new(
40-
project_id: Uuid,
41-
class_id: Uuid,
42-
id: Uuid,
43-
name: String,
44-
source: TagSource,
45-
span_id: Uuid,
46-
) -> Self {
39+
pub fn new(project_id: Uuid, id: Uuid, name: String, source: TagSource, span_id: Uuid) -> Self {
4740
Self {
4841
project_id,
49-
class_id,
42+
// TODO: Remove this once we drop the class_id column
43+
class_id: Uuid::nil(),
5044
created_at: chrono_to_nanoseconds(Utc::now()),
5145
id,
5246
name,
@@ -59,13 +53,12 @@ impl CHTag {
5953
pub async fn insert_tag(
6054
client: clickhouse::Client,
6155
project_id: Uuid,
62-
class_id: Uuid,
6356
id: Uuid,
6457
name: String,
6558
source: TagSource,
6659
span_id: Uuid,
6760
) -> Result<()> {
68-
let tag = CHTag::new(project_id, class_id, id, name, source, span_id);
61+
let tag = CHTag::new(project_id, id, name, source, span_id);
6962
let ch_insert = client.insert("tags");
7063
match ch_insert {
7164
Ok(mut ch_insert) => {

app-server/src/db/spans.rs

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use anyhow::Result;
44
use chrono::{DateTime, Utc};
55
use serde::{Deserialize, Serialize};
66
use serde_json::Value;
7-
use sqlx::PgPool;
87
use uuid::Uuid;
98

109
use crate::traces::spans::SpanAttributes;
@@ -63,39 +62,6 @@ pub struct Span {
6362
pub output_url: Option<String>,
6463
}
6564

66-
pub async fn get_root_span_id(
67-
pool: &PgPool,
68-
trace_id: &Uuid,
69-
project_id: &Uuid,
70-
) -> Result<Option<Uuid>> {
71-
let span_id = sqlx::query_scalar::<_, Uuid>(
72-
"SELECT span_id FROM spans
73-
WHERE trace_id = $1
74-
AND project_id = $2
75-
AND parent_span_id IS NULL
76-
ORDER BY start_time ASC
77-
LIMIT 1",
78-
)
79-
.bind(trace_id)
80-
.bind(project_id)
81-
.fetch_optional(pool)
82-
.await?;
83-
84-
Ok(span_id)
85-
}
86-
87-
pub async fn is_span_in_project(pool: &PgPool, span_id: &Uuid, project_id: &Uuid) -> Result<bool> {
88-
let exists = sqlx::query_scalar::<_, bool>(
89-
"SELECT EXISTS(SELECT 1 FROM spans WHERE span_id = $1 AND project_id = $2)",
90-
)
91-
.bind(span_id)
92-
.bind(project_id)
93-
.fetch_one(pool)
94-
.await?;
95-
96-
Ok(exists)
97-
}
98-
9965
#[cfg(test)]
10066
mod tests {
10167
use super::*;

0 commit comments

Comments
 (0)