Skip to content

Commit 9b7209c

Browse files
authored
Merge pull request #982 from lmnr-ai/dev
Fix CH migrations, fix console logs, optimize tag writing
2 parents f145d07 + 38726b6 commit 9b7209c

File tree

19 files changed

+243
-133
lines changed

19 files changed

+243
-133
lines changed

app-server/src/ch/tags.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ use anyhow::Result;
22
use chrono::Utc;
33
use clickhouse::Row;
44
use serde::{Deserialize, Serialize};
5+
use tracing::instrument;
56
use uuid::Uuid;
67

7-
use crate::db::tags::TagSource;
8+
use crate::db::tags::{SpanTag, TagSource};
89

910
use super::utils::chrono_to_nanoseconds;
1011

@@ -79,3 +80,44 @@ pub async fn insert_tag(
7980
}
8081
}
8182
}
83+
84+
#[instrument(skip(client, tags))]
85+
pub async fn insert_tags_batch(client: clickhouse::Client, tags: &[SpanTag]) -> Result<()> {
86+
if tags.is_empty() {
87+
return Ok(());
88+
}
89+
90+
let ch_insert = client.insert("tags");
91+
match ch_insert {
92+
Ok(mut ch_insert) => {
93+
for span_tag in tags {
94+
let id = Uuid::new_v4();
95+
let tag = CHTag::new(
96+
span_tag.project_id,
97+
id,
98+
span_tag.name.clone(),
99+
span_tag.source.clone(),
100+
span_tag.span_id,
101+
);
102+
ch_insert.write(&tag).await?;
103+
}
104+
105+
let ch_insert_end_res = ch_insert.end().await;
106+
match ch_insert_end_res {
107+
Ok(_) => Ok(()),
108+
Err(e) => {
109+
return Err(anyhow::anyhow!(
110+
"Clickhouse batch tag insertion failed: {:?}",
111+
e
112+
));
113+
}
114+
}
115+
}
116+
Err(e) => {
117+
return Err(anyhow::anyhow!(
118+
"Failed to insert tags batch into Clickhouse: {:?}",
119+
e
120+
));
121+
}
122+
}
123+
}

app-server/src/db/prices.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@ pub struct DBPriceEntry {
1111
pub additional_prices: Value,
1212
}
1313

14-
pub async fn get_price(pool: &PgPool, provider: &str, model: &str) -> anyhow::Result<DBPriceEntry> {
14+
pub async fn get_price(
15+
pool: &PgPool,
16+
provider: &str,
17+
model: &str,
18+
) -> anyhow::Result<Option<DBPriceEntry>> {
1519
let price = sqlx::query_as::<_, DBPriceEntry>(
1620
"SELECT
1721
provider,
@@ -32,7 +36,7 @@ pub async fn get_price(pool: &PgPool, provider: &str, model: &str) -> anyhow::Re
3236
)
3337
.bind(provider)
3438
.bind(model)
35-
.fetch_one(pool)
39+
.fetch_optional(pool)
3640
.await?;
3741

3842
Ok(price)

app-server/src/db/tags.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,30 @@
11
use serde::{Deserialize, Serialize};
2+
use uuid::Uuid;
23

3-
#[derive(sqlx::Type, Serialize, Deserialize, Clone, PartialEq)]
4+
#[derive(Debug, sqlx::Type, Serialize, Deserialize, Clone, PartialEq)]
45
#[sqlx(type_name = "tag_source")]
56
pub enum TagSource {
67
MANUAL,
78
AUTO,
89
CODE,
910
}
11+
12+
/// Structured representation of a span tag for batch operations
13+
#[derive(Debug, Clone)]
14+
pub struct SpanTag {
15+
pub project_id: Uuid,
16+
pub name: String,
17+
pub source: TagSource,
18+
pub span_id: Uuid,
19+
}
20+
21+
impl SpanTag {
22+
pub fn new(project_id: Uuid, name: String, source: TagSource, span_id: Uuid) -> Self {
23+
Self {
24+
project_id,
25+
name,
26+
source,
27+
span_id,
28+
}
29+
}
30+
}

app-server/src/instrumentation/mod.rs

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,30 +9,35 @@ use opentelemetry::trace::TracerProvider;
99
use opentelemetry_sdk::trace::SdkTracerProvider;
1010
use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
1111

12-
pub fn setup_tracing() {
12+
pub fn setup_tracing(enable_otel: bool) {
1313
let env_filter = if std::env::var("RUST_LOG").is_ok_and(|s| !s.is_empty()) {
1414
EnvFilter::from_default_env()
1515
} else {
1616
EnvFilter::new("info")
1717
};
1818

19-
let mut provider_builder = SdkTracerProvider::builder();
19+
let registry = tracing_subscriber::registry()
20+
.with(env_filter)
21+
.with(tracing_subscriber::fmt::layer());
2022

21-
if std::env::var("SENTRY_DSN").is_ok() {
22-
provider_builder = provider_builder
23-
.with_span_processor(sentry::integrations::opentelemetry::SentrySpanProcessor::new());
24-
}
23+
if enable_otel {
24+
let mut provider_builder = SdkTracerProvider::builder();
2525

26-
let tracer_provider = provider_builder.build();
27-
let tracer = tracer_provider.tracer("app-server");
26+
if std::env::var("SENTRY_DSN").is_ok() {
27+
provider_builder = provider_builder.with_span_processor(
28+
sentry::integrations::opentelemetry::SentrySpanProcessor::new(),
29+
);
30+
}
2831

29-
global::set_tracer_provider(tracer_provider);
32+
let tracer_provider = provider_builder.build();
33+
let tracer = tracer_provider.tracer("app-server");
3034

31-
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
35+
global::set_tracer_provider(tracer_provider);
3236

33-
tracing_subscriber::registry()
34-
.with(env_filter)
35-
.with(otel_layer)
36-
.with(tracing_subscriber::fmt::layer())
37-
.init();
37+
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
38+
39+
registry.with(otel_layer).init();
40+
} else {
41+
registry.init();
42+
}
3843
}

app-server/src/language_model/costs/mod.rs

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ use utils::calculate_cost;
1313

1414
mod utils;
1515

16+
const LLM_PRICES_CACHE_TTL_SECONDS: u64 = 60 * 60 * 24; // 24 hours
17+
1618
#[derive(Clone, Deserialize, Serialize)]
1719
pub struct LLMPriceEntry {
1820
_provider: String,
@@ -44,15 +46,30 @@ pub async fn estimate_output_cost(
4446
num_tokens: i64,
4547
) -> Option<f64> {
4648
let cache_key = format!("{LLM_PRICES_CACHE_KEY}:{provider}:{model}");
47-
let cache_res = cache.get::<LLMPriceEntry>(&cache_key).await.ok()?;
49+
let cache_res = cache.get::<LLMPriceEntry>(&cache_key).await;
4850

4951
let price_per_million_tokens = match cache_res {
50-
Some(price) => price.output_price_per_million,
51-
None => {
52-
let price = get_price(&db.pool, provider, model).await.ok()?;
52+
Ok(Some(price)) => price.output_price_per_million,
53+
Ok(None) | Err(_) => {
54+
let price = get_price(&db.pool, provider, model)
55+
.await
56+
.map_err(|e| {
57+
log::error!(
58+
"Error getting price from DB for provider: {}, model: {}: {:?}",
59+
provider,
60+
model,
61+
e
62+
);
63+
e
64+
})
65+
.unwrap_or_default()?;
5366
let price = LLMPriceEntry::from(price);
5467
let _ = cache
55-
.insert::<LLMPriceEntry>(&cache_key, price.clone())
68+
.insert_with_ttl::<LLMPriceEntry>(
69+
&cache_key,
70+
price.clone(),
71+
LLM_PRICES_CACHE_TTL_SECONDS,
72+
)
5673
.await;
5774
price.output_price_per_million
5875
}
@@ -68,16 +85,30 @@ pub async fn estimate_input_cost(
6885
input_tokens: InputTokens,
6986
) -> Option<f64> {
7087
let cache_key = format!("{LLM_PRICES_CACHE_KEY}:{provider}:{model}");
71-
// let cache_res = cache.get::<LLMPriceEntry>(&cache_key).await.ok()?;
72-
let cache_res = None;
88+
let cache_res = cache.get::<LLMPriceEntry>(&cache_key).await;
7389

7490
let price = match cache_res {
75-
Some(price) => price,
76-
None => {
77-
let price = get_price(&db.pool, provider, model).await.ok()?;
91+
Ok(Some(price)) => price,
92+
Ok(None) | Err(_) => {
93+
let price = get_price(&db.pool, provider, model)
94+
.await
95+
.map_err(|e| {
96+
log::error!(
97+
"Error getting price from DB for provider: {}, model: {}: {:?}",
98+
provider,
99+
model,
100+
e
101+
);
102+
e
103+
})
104+
.unwrap_or_default()?;
78105
let price = LLMPriceEntry::from(price);
79106
let _ = cache
80-
.insert::<LLMPriceEntry>(&cache_key, price.clone())
107+
.insert_with_ttl::<LLMPriceEntry>(
108+
&cache_key,
109+
price.clone(),
110+
LLM_PRICES_CACHE_TTL_SECONDS,
111+
)
81112
.await;
82113
price
83114
}

app-server/src/main.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,7 @@ fn main() -> anyhow::Result<()> {
115115
drop(_sentry_guard);
116116
}
117117

118-
if is_feature_enabled(Feature::Tracing) {
119-
instrumentation::setup_tracing();
120-
}
118+
instrumentation::setup_tracing(is_feature_enabled(Feature::Tracing));
121119

122120
let http_payload_limit: usize = env::var("HTTP_PAYLOAD_LIMIT")
123121
.unwrap_or(String::from("5242880")) // default to 5MB

app-server/src/traces/consumer.rs

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,13 @@ use crate::{
2424
spans::CHSpan,
2525
traces::{CHTrace, TraceAggregation, upsert_traces_batch},
2626
},
27-
db::{DB, events::Event, spans::Span, trace::upsert_trace_statistics_batch},
27+
db::{
28+
DB,
29+
events::Event,
30+
spans::Span,
31+
tags::{SpanTag, TagSource},
32+
trace::upsert_trace_statistics_batch,
33+
},
2834
evaluators::{get_evaluators_by_path, push_to_evaluators_queue},
2935
features::{Feature, is_feature_enabled},
3036
mq::{
@@ -38,7 +44,7 @@ use crate::{
3844
events::record_events,
3945
limits::update_workspace_limit_exceeded_by_project_id,
4046
provider::convert_span_to_provider_format,
41-
utils::{get_llm_usage_for_span, prepare_span_for_recording, record_tags},
47+
utils::{get_llm_usage_for_span, prepare_span_for_recording},
4248
},
4349
};
4450

@@ -427,19 +433,22 @@ async fn process_batch(
427433
}
428434
}
429435

430-
for span in &stripped_spans {
431-
if let Err(e) = record_tags(
432-
clickhouse.clone(),
433-
&span.tags,
434-
&span.span_id,
435-
&span.project_id,
436-
)
437-
.await
438-
{
436+
// Collect all tags from all spans for batch insertion
437+
let tags_batch: Vec<SpanTag> = stripped_spans
438+
.iter()
439+
.flat_map(|span| {
440+
span.tags.iter().map(move |tag| {
441+
SpanTag::new(span.project_id, tag.clone(), TagSource::CODE, span.span_id)
442+
})
443+
})
444+
.collect();
445+
446+
// Record all tags in a single batch
447+
if !tags_batch.is_empty() {
448+
if let Err(e) = crate::ch::tags::insert_tags_batch(clickhouse.clone(), &tags_batch).await {
439449
log::error!(
440-
"Failed to record tags to DB. span_id [{}], project_id [{}]: {:?}",
441-
span.span_id,
442-
span.project_id,
450+
"Failed to record tags to DB for batch of {} tags: {:?}",
451+
tags_batch.len(),
443452
e
444453
);
445454
}

app-server/src/traces/utils.rs

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::{
1313

1414
use crate::{
1515
cache::Cache,
16-
db::{DB, spans::Span, tags::TagSource},
16+
db::{DB, spans::Span},
1717
language_model::costs::estimate_cost_by_provider_name,
1818
};
1919

@@ -97,31 +97,6 @@ pub async fn get_llm_usage_for_span(
9797
}
9898
}
9999

100-
#[instrument(skip(clickhouse, tags, span_id, project_id))]
101-
pub async fn record_tags(
102-
clickhouse: clickhouse::Client,
103-
tags: &[String],
104-
span_id: &Uuid,
105-
project_id: &Uuid,
106-
) -> anyhow::Result<()> {
107-
if tags.is_empty() {
108-
return Ok(());
109-
}
110-
111-
for tag_name in tags {
112-
crate::ch::tags::insert_tag(
113-
clickhouse.clone(),
114-
*project_id,
115-
tag_name.clone(),
116-
TagSource::CODE,
117-
*span_id,
118-
)
119-
.await?;
120-
}
121-
122-
Ok(())
123-
}
124-
125100
pub fn skip_span_name(name: &str) -> bool {
126101
SKIP_SPAN_NAME_REGEX.is_match(name)
127102
}

0 commit comments

Comments
 (0)