1+ use std:: collections:: HashSet ;
2+
13use chrono:: { DateTime , Utc } ;
24use clickhouse:: Row ;
35use serde:: { Deserialize , Serialize } ;
46use uuid:: Uuid ;
57
6- use super :: spans:: CHSpan ;
7- use super :: utils:: { chrono_to_nanoseconds, nanoseconds_to_chrono} ;
8- use crate :: db:: spans:: SpanType ;
8+ use super :: utils:: chrono_to_nanoseconds;
9+ use crate :: db:: spans:: { Span , SpanType } ;
910use crate :: db:: trace:: Trace ;
11+ use crate :: traces:: spans:: SpanUsage ;
1012
1113#[ derive( Debug , Clone , Serialize , Deserialize , Row ) ]
1214pub struct CHTrace {
@@ -92,7 +94,7 @@ pub struct TraceAggregation {
9294 pub user_id : Option < String > ,
9395 pub status : Option < String > ,
9496 pub metadata : Option < serde_json:: Value > ,
95- pub tags : Vec < String > ,
97+ pub tags : HashSet < String > ,
9698 pub num_spans : i32 ,
9799 pub top_span_id : Option < Uuid > ,
98100 pub top_span_name : Option < String > ,
@@ -101,13 +103,13 @@ pub struct TraceAggregation {
101103}
102104
103105impl TraceAggregation {
104- /// Aggregate statistics from a batch of CHSpans grouped by trace_id
105- pub fn from_ch_spans ( spans : & [ CHSpan ] ) -> Vec < Self > {
106+ /// Aggregate statistics from a batch of Spans and SpanUsage grouped by trace_id
107+ pub fn from_spans ( spans : & [ Span ] , span_usage_vec : & [ SpanUsage ] ) -> Vec < Self > {
106108 use std:: collections:: HashMap ;
107109
108110 let mut trace_aggregations: HashMap < Uuid , TraceAggregation > = HashMap :: new ( ) ;
109111
110- for span in spans {
112+ for ( span, span_usage ) in spans. iter ( ) . zip ( span_usage_vec . iter ( ) ) {
111113 let entry =
112114 trace_aggregations
113115 . entry ( span. trace_id )
@@ -126,70 +128,86 @@ impl TraceAggregation {
126128 user_id : None ,
127129 status : None ,
128130 metadata : None ,
129- tags : Vec :: new ( ) ,
131+ tags : HashSet :: new ( ) ,
130132 num_spans : 0 ,
131133 top_span_id : None ,
132134 top_span_name : None ,
133135 top_span_type : 0 ,
134136 trace_type : 0 ,
135137 } ) ;
136138
137- // Convert nanoseconds to DateTime<Utc>
138- let start_dt = nanoseconds_to_chrono ( span. start_time ) ;
139+ // Aggregate min start_time
139140 entry. start_time = Some ( match entry. start_time {
140- Some ( existing) => existing. min ( start_dt ) ,
141- None => start_dt ,
141+ Some ( existing) => existing. min ( span . start_time ) ,
142+ None => span . start_time ,
142143 } ) ;
143144
144145 // Aggregate max end_time
145- let end_dt = nanoseconds_to_chrono ( span. end_time ) ;
146146 entry. end_time = Some ( match entry. end_time {
147- Some ( existing) => existing. max ( end_dt ) ,
148- None => end_dt ,
147+ Some ( existing) => existing. max ( span . end_time ) ,
148+ None => span . end_time ,
149149 } ) ;
150150
151- // Sum tokens and costs
152- entry. input_tokens += span . input_tokens ;
153- entry. output_tokens += span . output_tokens ;
154- entry. total_tokens += span . total_tokens ;
155- entry. input_cost += span . input_cost ;
156- entry. output_cost += span . output_cost ;
157- entry. total_cost += span . total_cost ;
151+ // Sum tokens and costs from SpanUsage
152+ entry. input_tokens += span_usage . input_tokens ;
153+ entry. output_tokens += span_usage . output_tokens ;
154+ entry. total_tokens += span_usage . total_tokens ;
155+ entry. input_cost += span_usage . input_cost ;
156+ entry. output_cost += span_usage . output_cost ;
157+ entry. total_cost += span_usage . total_cost ;
158158
159159 // Use "any" strategy for these fields (take first non-empty value)
160- if entry. session_id . is_none ( ) && !span. session_id . is_empty ( ) {
161- entry. session_id = Some ( span. session_id . clone ( ) ) ;
160+ if entry. session_id . is_none ( ) {
161+ if let Some ( session_id) = span. attributes . session_id ( ) {
162+ if !session_id. is_empty ( ) {
163+ entry. session_id = Some ( session_id) ;
164+ }
165+ }
162166 }
163- if entry. user_id . is_none ( ) && !span. user_id . is_empty ( ) {
164- entry. user_id = Some ( span. user_id . clone ( ) ) ;
167+ if entry. user_id . is_none ( ) {
168+ if let Some ( user_id) = span. attributes . user_id ( ) {
169+ if !user_id. is_empty ( ) {
170+ entry. user_id = Some ( user_id) ;
171+ }
172+ }
165173 }
166- if entry. status . is_none ( ) && !span. status . is_empty ( ) {
167- entry. status = Some ( span. status . clone ( ) ) ;
174+ if entry. status . is_none ( ) {
175+ if let Some ( status) = & span. status {
176+ if !status. is_empty ( ) {
177+ entry. status = Some ( status. clone ( ) ) ;
178+ }
179+ }
168180 }
169- if entry. metadata . is_none ( ) && !span. trace_metadata . is_empty ( ) {
170- if let Ok ( parsed) = serde_json:: from_str ( & span. trace_metadata ) {
171- entry. metadata = Some ( parsed) ;
181+ if entry. metadata . is_none ( ) {
182+ if let Some ( metadata) = span. attributes . metadata ( ) {
183+ if let Ok ( metadata_value) = serde_json:: to_value ( & metadata) {
184+ entry. metadata = Some ( metadata_value) ;
185+ }
172186 }
173187 }
174- if span . trace_type != 0 {
175- entry. trace_type = span . trace_type ;
188+ if let Some ( trace_type) = span . attributes . trace_type ( ) {
189+ entry. trace_type = trace_type. clone ( ) . into ( ) ;
176190 }
177191
178- if SpanType :: from ( span. span_type ) == SpanType :: EVALUATION {
192+ if span. span_type == SpanType :: EVALUATION {
179193 entry. trace_type = 1 ;
180194 }
181195
182- if span. parent_span_id == Uuid :: nil ( ) {
196+ if span. parent_span_id . is_none ( ) {
183197 entry. top_span_id = Some ( span. span_id ) ;
184198 entry. top_span_name = Some ( span. name . clone ( ) ) ;
185- entry. top_span_type = span. span_type as u8 ;
199+ entry. top_span_type = span. span_type . clone ( ) . into ( ) ;
200+ }
201+
202+ if entry. top_span_name . is_none ( ) {
203+ let path = span. attributes . path ( ) . unwrap_or_default ( ) ;
204+ path. first ( )
205+ . map ( |name| entry. top_span_name = Some ( name. clone ( ) ) ) ;
186206 }
187207
188208 // Collect unique tags
189- for tag in & span. tags_array {
190- if !entry. tags . contains ( tag) {
191- entry. tags . push ( tag. clone ( ) ) ;
192- }
209+ for tag in span. attributes . tags ( ) {
210+ entry. tags . insert ( tag) ;
193211 }
194212
195213 entry. num_spans += 1 ;
0 commit comments