Skip to content

Commit 7c5f971

Browse files
authored
Feat/bg summary (#925)
* feat: summary WIP * feat: remove eligibility check from FE, rename vars * feat: remove test * feat: remove cache * Revert "feat: remove cache" This reverts commit 42bc3b2.
1 parent 4b9e30b commit 7c5f971

File tree

8 files changed

+215
-141
lines changed

8 files changed

+215
-141
lines changed

app-server/src/db/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub mod evaluators;
88
pub mod events;
99
pub mod prices;
1010
pub mod project_api_keys;
11+
pub mod project_settings;
1112
pub mod projects;
1213
pub mod provider_api_keys;
1314
pub mod spans;
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use anyhow::Result;
2+
use serde::{Deserialize, Serialize};
3+
use sqlx::{FromRow, PgPool};
4+
use uuid::Uuid;
5+
6+
#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
7+
pub struct ProjectSetting {
8+
pub id: Uuid,
9+
pub name: Option<String>,
10+
pub value: Option<String>,
11+
pub project_id: Uuid,
12+
}
13+
14+
/// Get a specific project setting by name
15+
pub async fn get_project_setting(
16+
pool: &PgPool,
17+
project_id: &Uuid,
18+
setting_name: &str,
19+
) -> Result<Option<ProjectSetting>> {
20+
let setting = sqlx::query_as::<_, ProjectSetting>(
21+
"SELECT id, name, value, project_id
22+
FROM project_settings
23+
WHERE project_id = $1 AND name = $2
24+
LIMIT 1",
25+
)
26+
.bind(project_id)
27+
.bind(setting_name)
28+
.fetch_optional(pool)
29+
.await?;
30+
31+
Ok(setting)
32+
}
33+
34+
/// Check if a project setting is enabled (value = 'true')
35+
pub async fn is_project_setting_enabled(
36+
pool: &PgPool,
37+
project_id: &Uuid,
38+
setting_name: &str,
39+
) -> Result<bool> {
40+
let setting = get_project_setting(pool, project_id, setting_name).await?;
41+
42+
Ok(setting
43+
.and_then(|s| s.value)
44+
.map(|v| v.trim().to_lowercase() == "true")
45+
.unwrap_or(false))
46+
}

app-server/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,8 @@ fn main() -> anyhow::Result<()> {
588588

589589
for _ in 0..num_trace_summary_workers_per_thread {
590590
tokio::spawn(process_trace_summaries(
591+
db_for_http.clone(),
592+
cache_for_http.clone(),
591593
mq_for_http.clone(),
592594
));
593595
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
use std::sync::Arc;
2+
3+
use anyhow::Result;
4+
use uuid::Uuid;
5+
6+
use crate::{
7+
cache::{Cache, CacheTrait, keys::PROJECT_CACHE_KEY},
8+
db::{DB, project_settings::is_project_setting_enabled, projects::get_project_and_workspace_billing_info},
9+
};
10+
11+
#[derive(Debug, Clone)]
12+
pub struct TraceEligibilityResult {
13+
pub is_eligible: bool,
14+
pub reason: Option<String>,
15+
pub tier_name: Option<String>,
16+
pub has_trace_analysis: bool,
17+
}
18+
19+
pub async fn check_trace_eligibility(
20+
db: Arc<DB>,
21+
cache: Arc<Cache>,
22+
project_id: Uuid,
23+
) -> Result<TraceEligibilityResult> {
24+
let cache_key = format!("{}:{}", PROJECT_CACHE_KEY, project_id);
25+
let project_info = cache.get::<serde_json::Value>(&cache_key).await;
26+
27+
let tier_name = match project_info {
28+
Ok(Some(info)) => {
29+
info.get("tier_name")
30+
.and_then(|v| v.as_str())
31+
.map(|s| s.to_string())
32+
}
33+
_ => {
34+
// Fallback: query database if not in cache
35+
match get_project_and_workspace_billing_info(&db.pool, &project_id).await {
36+
Ok(project) => Some(project.tier_name),
37+
Err(_) => {
38+
return Ok(TraceEligibilityResult {
39+
is_eligible: false,
40+
reason: Some("project not found".to_string()),
41+
tier_name: None,
42+
has_trace_analysis: false,
43+
});
44+
}
45+
}
46+
}
47+
};
48+
49+
let is_paid_tier = tier_name
50+
.as_ref()
51+
.map(|name| name.trim().to_lowercase() != "free")
52+
.unwrap_or(false);
53+
54+
if !is_paid_tier {
55+
return Ok(TraceEligibilityResult {
56+
is_eligible: false,
57+
reason: Some("workspace is on free tier".to_string()),
58+
tier_name,
59+
has_trace_analysis: false,
60+
});
61+
}
62+
63+
let is_trace_analysis_enabled =
64+
is_project_setting_enabled(&db.pool, &project_id, "enable_trace_analysis").await?;
65+
66+
if !is_trace_analysis_enabled {
67+
return Ok(TraceEligibilityResult {
68+
is_eligible: false,
69+
reason: Some("trace analysis not enabled in project settings".to_string()),
70+
tier_name,
71+
has_trace_analysis: false,
72+
});
73+
}
74+
75+
Ok(TraceEligibilityResult {
76+
is_eligible: true,
77+
reason: None,
78+
tier_name,
79+
has_trace_analysis: true,
80+
})
81+
}

app-server/src/traces/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod consumer;
2+
pub mod eligibility;
23
pub mod events;
34
pub mod grpc_service;
45
pub mod limits;

app-server/src/traces/summary.rs

Lines changed: 67 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
1-
//! This module handles trace summary generation
2-
//! It reads trace completion messages from RabbitMQ and generates summaries via internal API
3-
41
use std::env;
52
use std::sync::Arc;
63

74
use backoff::ExponentialBackoffBuilder;
85
use serde::{Deserialize, Serialize};
96
use uuid::Uuid;
107

11-
use super::{TRACE_SUMMARY_EXCHANGE, TRACE_SUMMARY_QUEUE, TRACE_SUMMARY_ROUTING_KEY};
12-
use crate::mq::{
13-
MessageQueue, MessageQueueAcker, MessageQueueDeliveryTrait, MessageQueueReceiverTrait,
14-
MessageQueueTrait,
8+
use super::{
9+
TRACE_SUMMARY_EXCHANGE, TRACE_SUMMARY_QUEUE, TRACE_SUMMARY_ROUTING_KEY,
10+
eligibility::check_trace_eligibility,
11+
};
12+
use crate::{
13+
cache::Cache,
14+
db::DB,
15+
mq::{
16+
MessageQueue, MessageQueueAcker, MessageQueueDeliveryTrait, MessageQueueReceiverTrait,
17+
MessageQueueTrait,
18+
},
1519
};
1620

1721
#[derive(Serialize, Deserialize, Debug, Clone)]
@@ -51,14 +55,14 @@ pub async fn push_to_trace_summary_queue(
5155
}
5256

5357
/// Main worker function to process trace summary messages
54-
pub async fn process_trace_summaries(queue: Arc<MessageQueue>) {
58+
pub async fn process_trace_summaries(db: Arc<DB>, cache: Arc<Cache>, queue: Arc<MessageQueue>) {
5559
loop {
56-
inner_process_trace_summaries(queue.clone()).await;
60+
inner_process_trace_summaries(db.clone(), cache.clone(), queue.clone()).await;
5761
log::warn!("Trace summary listener exited. Rebinding queue connection...");
5862
}
5963
}
6064

61-
async fn inner_process_trace_summaries(queue: Arc<MessageQueue>) {
65+
async fn inner_process_trace_summaries(db: Arc<DB>, cache: Arc<Cache>, queue: Arc<MessageQueue>) {
6266
// Add retry logic with exponential backoff for connection failures
6367
let get_receiver = || async {
6468
queue
@@ -119,7 +123,15 @@ async fn inner_process_trace_summaries(queue: Arc<MessageQueue>) {
119123
};
120124

121125
// Process the trace summary generation
122-
if let Err(e) = process_single_trace_summary(&client, trace_summary_message, acker).await {
126+
if let Err(e) = process_single_trace_summary(
127+
&client,
128+
db.clone(),
129+
cache.clone(),
130+
trace_summary_message,
131+
acker,
132+
)
133+
.await
134+
{
123135
log::error!("Failed to process trace summary: {:?}", e);
124136
}
125137
}
@@ -129,43 +141,70 @@ async fn inner_process_trace_summaries(queue: Arc<MessageQueue>) {
129141

130142
async fn process_single_trace_summary(
131143
client: &reqwest::Client,
144+
db: Arc<DB>,
145+
cache: Arc<Cache>,
132146
message: TraceSummaryMessage,
133147
acker: MessageQueueAcker,
134148
) -> anyhow::Result<()> {
135-
// Get the internal API base URL - this should be the internal service URL
136-
let internal_api_base_url =
137-
env::var("NEXT_BACKEND_URL").unwrap_or_else(|_| "http://localhost:3000".to_string());
149+
let eligibility_result = check_trace_eligibility(db, cache, message.project_id).await?;
150+
151+
if !eligibility_result.is_eligible {
152+
log::info!(
153+
"Skipping trace summary generation: trace_id={}, project_id={}, reason={}",
154+
message.trace_id,
155+
message.project_id,
156+
eligibility_result.reason.unwrap_or_default()
157+
);
158+
if let Err(e) = acker.ack().await {
159+
log::error!("Failed to ack trace summary message: {:?}", e);
160+
}
161+
return Ok(());
162+
}
163+
164+
let summarizer_service_url = env::var("TRACE_SUMMARIZER_URL")
165+
.map_err(|_| anyhow::anyhow!("TRACE_SUMMARIZER_URL environment variable not set"))?;
138166

139-
let url = format!("{}/api/traces/summary", internal_api_base_url);
167+
let auth_token = env::var("TRACE_SUMMARIZER_SECRET_KEY")
168+
.map_err(|_| anyhow::anyhow!("TRACE_SUMMARIZER_SECRET_KEY environment variable not set"))?;
140169

141170
let request_body = serde_json::json!({
142171
"projectId": message.project_id.to_string(),
143172
"traceId": message.trace_id.to_string(),
173+
"maxRetries": 5
144174
});
145175

146-
let call_internal_api = || async {
176+
let call_summarizer_service = || async {
147177
let response = client
148-
.post(&url)
178+
.post(&summarizer_service_url)
179+
.header("Authorization", format!("Bearer {}", auth_token))
180+
.header("Content-Type", "application/json")
149181
.json(&request_body)
150182
.send()
151183
.await
152184
.map_err(|e| {
153-
log::warn!("Failed to call internal API for trace summary: {:?}", e);
185+
log::warn!("Failed to call summarizer service for trace summary: {:?}", e);
154186
backoff::Error::transient(anyhow::Error::from(e))
155187
})?;
156188

157189
if response.status().is_success() {
158-
Ok(response)
190+
let response_text = response.text().await.unwrap_or_default();
191+
log::debug!(
192+
"Summarizer service response for trace_id={}, project_id={}: {}",
193+
message.trace_id,
194+
message.project_id,
195+
response_text
196+
);
197+
Ok(())
159198
} else {
160199
let status = response.status();
161200
let response_text = response.text().await.unwrap_or_default();
162201
log::warn!(
163-
"Internal API returned error status for trace summary: {}, Response: {}",
202+
"Summarizer service returned error status for trace summary: {}, Response: {}",
164203
status,
165204
response_text
166205
);
167206
Err(backoff::Error::transient(anyhow::anyhow!(
168-
"Internal API error: {}, Response: {}",
207+
"Summarizer service error: {}, Response: {}",
169208
status,
170209
response_text
171210
)))
@@ -178,8 +217,13 @@ async fn process_single_trace_summary(
178217
.with_max_elapsed_time(Some(std::time::Duration::from_secs(60 * 5))) // 5 minutes max
179218
.build();
180219

181-
match backoff::future::retry(backoff, call_internal_api).await {
182-
Ok(_response) => {
220+
match backoff::future::retry(backoff, call_summarizer_service).await {
221+
Ok(_) => {
222+
log::info!(
223+
"Successfully generated trace summary: trace_id={}, project_id={}",
224+
message.trace_id,
225+
message.project_id
226+
);
183227
if let Err(e) = acker.ack().await {
184228
log::error!("Failed to ack trace summary message: {:?}", e);
185229
}

frontend/app/api/traces/summary/route.ts

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
import { observe } from '@lmnr-ai/lmnr';
2-
import { prettifyError } from 'zod/v4';
1+
import { observe } from "@lmnr-ai/lmnr";
2+
import { prettifyError } from "zod/v4";
33

4-
import { checkTraceEligibility } from '@/lib/actions/project/trace-eligibility';
5-
import { executeQuery } from '@/lib/actions/sql';
6-
import { generateTraceSummary } from '@/lib/actions/trace/agent';
7-
import { GenerateTraceSummaryRequestSchema } from '@/lib/actions/trace/agent/summary';
4+
import { executeQuery } from "@/lib/actions/sql";
5+
import { generateTraceSummary } from "@/lib/actions/trace/agent";
6+
import { GenerateTraceSummaryRequestSchema } from "@/lib/actions/trace/agent/summary";
87

98
/**
109
* Internal endpoint for trace summary generation.
@@ -16,7 +15,7 @@ export async function POST(req: Request) {
1615
const traceSummaryResult = GenerateTraceSummaryRequestSchema.safeParse(body);
1716

1817
if (!traceSummaryResult.success) {
19-
console.error('Validation error for trace summary request:', prettifyError(traceSummaryResult.error));
18+
console.error("Validation error for trace summary request:", prettifyError(traceSummaryResult.error));
2019
return Response.json({ error: prettifyError(traceSummaryResult.error) }, { status: 400 });
2120
}
2221

@@ -26,16 +25,6 @@ export async function POST(req: Request) {
2625
const { projectId, traceId } = traceSummaryResult.data;
2726

2827
try {
29-
// Check if project is eligible for trace summary generation
30-
const eligibilityResult = await checkTraceEligibility({ projectId });
31-
32-
if (!eligibilityResult.isEligible) {
33-
return Response.json({
34-
success: true,
35-
message: `Skipped - ${eligibilityResult.reason}`
36-
});
37-
}
38-
3928
// check if the trace contains at least one LLM span
4029
const llmSpanCheckQuery = `
4130
SELECT COUNT(*) as llm_span_count
@@ -50,28 +39,32 @@ export async function POST(req: Request) {
5039
query: llmSpanCheckQuery,
5140
parameters: {
5241
traceId,
53-
}
42+
},
5443
});
5544

5645
const hasLlmSpans = llmSpanResult.length > 0 && llmSpanResult[0].llm_span_count > 0;
5746

5847
if (!hasLlmSpans) {
5948
return Response.json({
6049
success: true,
61-
message: "Skipped - trace contains no LLM spans"
50+
message: "Skipped - trace contains no LLM spans",
6251
});
6352
}
6453

6554
// Generate the trace summary since all requirements are met
6655
// Disable retries for this call since we want to fail fast if the summary generation fails
67-
await observe({ name: "generateTraceSummaryIfNeeded" }, async () => await generateTraceSummary({
68-
...traceSummaryResult.data,
69-
maxRetries: 0,
70-
}));
56+
await observe(
57+
{ name: "generateTraceSummaryIfNeeded" },
58+
async () =>
59+
await generateTraceSummary({
60+
...traceSummaryResult.data,
61+
maxRetries: 0,
62+
})
63+
);
7164

7265
return Response.json({ success: true });
7366
} catch (error) {
74-
console.error('Failed to generate trace summary:', error);
67+
console.error("Failed to generate trace summary:", error);
7568
return Response.json(
7669
{ error: error instanceof Error ? error.message : "Failed to generate trace summary." },
7770
{ status: 500 }

0 commit comments

Comments
 (0)