Skip to content

Commit 7508712

Browse files
record tags in a batch (#977)
* record tags in a batch * structured tag instead of tuple --------- Co-authored-by: Din <[email protected]>
1 parent 1ecf65a commit 7508712

File tree

4 files changed

+89
-42
lines changed

4 files changed

+89
-42
lines changed

app-server/src/ch/tags.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ use anyhow::Result;
22
use chrono::Utc;
33
use clickhouse::Row;
44
use serde::{Deserialize, Serialize};
5+
use tracing::instrument;
56
use uuid::Uuid;
67

7-
use crate::db::tags::TagSource;
8+
use crate::db::tags::{SpanTag, TagSource};
89

910
use super::utils::chrono_to_nanoseconds;
1011

@@ -79,3 +80,44 @@ pub async fn insert_tag(
7980
}
8081
}
8182
}
83+
84+
#[instrument(skip(client, tags))]
85+
pub async fn insert_tags_batch(client: clickhouse::Client, tags: &[SpanTag]) -> Result<()> {
86+
if tags.is_empty() {
87+
return Ok(());
88+
}
89+
90+
let ch_insert = client.insert("tags");
91+
match ch_insert {
92+
Ok(mut ch_insert) => {
93+
for span_tag in tags {
94+
let id = Uuid::new_v4();
95+
let tag = CHTag::new(
96+
span_tag.project_id,
97+
id,
98+
span_tag.name.clone(),
99+
span_tag.source.clone(),
100+
span_tag.span_id,
101+
);
102+
ch_insert.write(&tag).await?;
103+
}
104+
105+
let ch_insert_end_res = ch_insert.end().await;
106+
match ch_insert_end_res {
107+
Ok(_) => Ok(()),
108+
Err(e) => {
109+
return Err(anyhow::anyhow!(
110+
"Clickhouse batch tag insertion failed: {:?}",
111+
e
112+
));
113+
}
114+
}
115+
}
116+
Err(e) => {
117+
return Err(anyhow::anyhow!(
118+
"Failed to insert tags batch into Clickhouse: {:?}",
119+
e
120+
));
121+
}
122+
}
123+
}

app-server/src/db/tags.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,30 @@
11
use serde::{Deserialize, Serialize};
2+
use uuid::Uuid;
23

3-
#[derive(sqlx::Type, Serialize, Deserialize, Clone, PartialEq)]
4+
#[derive(Debug, sqlx::Type, Serialize, Deserialize, Clone, PartialEq)]
45
#[sqlx(type_name = "tag_source")]
56
pub enum TagSource {
67
MANUAL,
78
AUTO,
89
CODE,
910
}
11+
12+
/// Structured representation of a span tag for batch operations
13+
#[derive(Debug, Clone)]
14+
pub struct SpanTag {
15+
pub project_id: Uuid,
16+
pub name: String,
17+
pub source: TagSource,
18+
pub span_id: Uuid,
19+
}
20+
21+
impl SpanTag {
22+
pub fn new(project_id: Uuid, name: String, source: TagSource, span_id: Uuid) -> Self {
23+
Self {
24+
project_id,
25+
name,
26+
source,
27+
span_id,
28+
}
29+
}
30+
}

app-server/src/traces/consumer.rs

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,13 @@ use crate::{
2424
spans::CHSpan,
2525
traces::{CHTrace, TraceAggregation, upsert_traces_batch},
2626
},
27-
db::{DB, events::Event, spans::Span, trace::upsert_trace_statistics_batch},
27+
db::{
28+
DB,
29+
events::Event,
30+
spans::Span,
31+
tags::{SpanTag, TagSource},
32+
trace::upsert_trace_statistics_batch,
33+
},
2834
evaluators::{get_evaluators_by_path, push_to_evaluators_queue},
2935
features::{Feature, is_feature_enabled},
3036
mq::{
@@ -38,7 +44,7 @@ use crate::{
3844
events::record_events,
3945
limits::update_workspace_limit_exceeded_by_project_id,
4046
provider::convert_span_to_provider_format,
41-
utils::{get_llm_usage_for_span, prepare_span_for_recording, record_tags},
47+
utils::{get_llm_usage_for_span, prepare_span_for_recording},
4248
},
4349
};
4450

@@ -427,19 +433,22 @@ async fn process_batch(
427433
}
428434
}
429435

430-
for span in &stripped_spans {
431-
if let Err(e) = record_tags(
432-
clickhouse.clone(),
433-
&span.tags,
434-
&span.span_id,
435-
&span.project_id,
436-
)
437-
.await
438-
{
436+
// Collect all tags from all spans for batch insertion
437+
let tags_batch: Vec<SpanTag> = stripped_spans
438+
.iter()
439+
.flat_map(|span| {
440+
span.tags.iter().map(move |tag| {
441+
SpanTag::new(span.project_id, tag.clone(), TagSource::CODE, span.span_id)
442+
})
443+
})
444+
.collect();
445+
446+
// Record all tags in a single batch
447+
if !tags_batch.is_empty() {
448+
if let Err(e) = crate::ch::tags::insert_tags_batch(clickhouse.clone(), &tags_batch).await {
439449
log::error!(
440-
"Failed to record tags to DB. span_id [{}], project_id [{}]: {:?}",
441-
span.span_id,
442-
span.project_id,
450+
"Failed to record tags to DB for batch of {} tags: {:?}",
451+
tags_batch.len(),
443452
e
444453
);
445454
}

app-server/src/traces/utils.rs

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::{
1313

1414
use crate::{
1515
cache::Cache,
16-
db::{DB, spans::Span, tags::TagSource},
16+
db::{DB, spans::Span},
1717
language_model::costs::estimate_cost_by_provider_name,
1818
};
1919

@@ -97,31 +97,6 @@ pub async fn get_llm_usage_for_span(
9797
}
9898
}
9999

100-
#[instrument(skip(clickhouse, tags, span_id, project_id))]
101-
pub async fn record_tags(
102-
clickhouse: clickhouse::Client,
103-
tags: &[String],
104-
span_id: &Uuid,
105-
project_id: &Uuid,
106-
) -> anyhow::Result<()> {
107-
if tags.is_empty() {
108-
return Ok(());
109-
}
110-
111-
for tag_name in tags {
112-
crate::ch::tags::insert_tag(
113-
clickhouse.clone(),
114-
*project_id,
115-
tag_name.clone(),
116-
TagSource::CODE,
117-
*span_id,
118-
)
119-
.await?;
120-
}
121-
122-
Ok(())
123-
}
124-
125100
pub fn skip_span_name(name: &str) -> bool {
126101
SKIP_SPAN_NAME_REGEX.is_match(name)
127102
}

0 commit comments

Comments
 (0)