@@ -12,6 +12,7 @@ use uuid::Uuid;
1212use super :: {
1313 OBSERVATIONS_EXCHANGE , OBSERVATIONS_QUEUE , OBSERVATIONS_ROUTING_KEY ,
1414 summary:: push_to_trace_summary_queue,
15+ trigger:: { check_span_trigger, get_summary_trigger_spans_cached} ,
1516} ;
1617use crate :: {
1718 api:: v1:: traces:: RabbitMqSpanMessage ,
@@ -236,6 +237,13 @@ async fn process_batch(
236237 let mut span_usage_vec = Vec :: new ( ) ;
237238 let mut all_events = Vec :: new ( ) ;
238239
240+ // we get project id from the first span in the batch
241+ // because all spans in the batch have the same project id
242+ // batching is happening on the Otel SpanProcessor level
243+ // project_id can never be None, because batch is never empty
244+ // but we do unwrap_or_default to avoid Option<Uuid> in the rest of the code
245+ let project_id = spans. first ( ) . map ( |s| s. project_id ) . unwrap_or_default ( ) ;
246+
239247 for span in & mut spans {
240248 let span_usage =
241249 get_llm_usage_for_span ( & mut span. attributes , db. clone ( ) , cache. clone ( ) , & span. name )
@@ -330,21 +338,9 @@ async fn process_batch(
330338 }
331339 }
332340
333- // Check for completed traces (top-level spans) and push to trace summary queue
334- for span in & spans {
335- if span. parent_span_id . is_none ( ) {
336- if let Err ( e) =
337- push_to_trace_summary_queue ( span. trace_id , span. project_id , queue. clone ( ) ) . await
338- {
339- log:: error!(
340- "Failed to push trace completion to summary queue: trace_id={}, project_id={}, error={:?}" ,
341- span. trace_id,
342- span. project_id,
343- e
344- ) ;
345- }
346- }
347- }
341+ // Check for spans matching trigger conditions and push to trace summary queue
342+ check_and_push_trace_summaries ( project_id, & spans, db. clone ( ) , cache. clone ( ) , queue. clone ( ) )
343+ . await ;
348344
349345 // Send realtime messages directly to SSE connections after successful ClickHouse writes
350346 send_realtime_messages_to_sse ( & spans, & sse_connections) . await ;
@@ -379,26 +375,21 @@ async fn process_batch(
379375 . sum :: < usize > ( )
380376 + total_events_ingested_bytes;
381377
382- // we get project id from the first span in the batch
383- // because all spans in the batch have the same project id
384- // batching is happening on the Otel SpanProcessor level
385- if let Some ( project_id) = stripped_spans. first ( ) . map ( |s| s. project_id ) {
386- if is_feature_enabled ( Feature :: UsageLimit ) {
387- if let Err ( e) = update_workspace_limit_exceeded_by_project_id (
388- db. clone ( ) ,
389- clickhouse. clone ( ) ,
390- cache. clone ( ) ,
378+ if is_feature_enabled ( Feature :: UsageLimit ) {
379+ if let Err ( e) = update_workspace_limit_exceeded_by_project_id (
380+ db. clone ( ) ,
381+ clickhouse. clone ( ) ,
382+ cache. clone ( ) ,
383+ project_id,
384+ total_ingested_bytes,
385+ )
386+ . await
387+ {
388+ log:: error!(
389+ "Failed to update workspace limit exceeded for project [{}]: {:?}" ,
391390 project_id,
392- total_ingested_bytes,
393- )
394- . await
395- {
396- log:: error!(
397- "Failed to update workspace limit exceeded for project [{}]: {:?}" ,
398- project_id,
399- e
400- ) ;
401- }
391+ e
392+ ) ;
402393 }
403394 }
404395
@@ -513,3 +504,51 @@ fn span_to_realtime_span(span: &Span) -> Value {
513504 // Note: input and output fields are intentionally excluded for performance
514505 } )
515506}
507+
508+ /// Check spans against trigger conditions and push matching traces to summary queue
509+ /// This function groups spans by project to minimize database/cache queries
510+ async fn check_and_push_trace_summaries (
511+ project_id : Uuid ,
512+ spans : & [ Span ] ,
513+ db : Arc < DB > ,
514+ cache : Arc < Cache > ,
515+ queue : Arc < MessageQueue > ,
516+ ) {
517+ match get_summary_trigger_spans_cached ( db. clone ( ) , cache. clone ( ) , project_id) . await {
518+ Ok ( trigger_spans) => {
519+ // Check each span against its project's trigger spans
520+ for span in spans {
521+ // Check if this span name matches any trigger
522+ let matching_triggers = check_span_trigger ( & span. name , & trigger_spans) ;
523+
524+ // Send one message per matching trigger
525+ for trigger in matching_triggers {
526+ if let Err ( e) = push_to_trace_summary_queue (
527+ span. trace_id ,
528+ span. project_id ,
529+ span. span_id ,
530+ trigger. event_definition ,
531+ queue. clone ( ) ,
532+ )
533+ . await
534+ {
535+ log:: error!(
536+ "Failed to push trace completion to summary queue: trace_id={}, project_id={}, span_name={}, error={:?}" ,
537+ span. trace_id,
538+ span. project_id,
539+ span. name,
540+ e
541+ ) ;
542+ }
543+ }
544+ }
545+ }
546+ Err ( e) => {
547+ log:: error!(
548+ "Failed to get summary trigger spans for project {}: {:?}" ,
549+ project_id,
550+ e
551+ ) ;
552+ }
553+ }
554+ }
0 commit comments