@@ -5,17 +5,10 @@ use backoff::ExponentialBackoffBuilder;
55use serde:: { Deserialize , Serialize } ;
66use uuid:: Uuid ;
77
8- use super :: {
9- TRACE_SUMMARY_EXCHANGE , TRACE_SUMMARY_QUEUE , TRACE_SUMMARY_ROUTING_KEY ,
10- eligibility:: check_trace_eligibility,
11- } ;
12- use crate :: {
13- cache:: Cache ,
14- db:: DB ,
15- mq:: {
16- MessageQueue , MessageQueueAcker , MessageQueueDeliveryTrait , MessageQueueReceiverTrait ,
17- MessageQueueTrait ,
18- } ,
8+ use super :: { TRACE_SUMMARY_EXCHANGE , TRACE_SUMMARY_QUEUE , TRACE_SUMMARY_ROUTING_KEY } ;
9+ use crate :: mq:: {
10+ MessageQueue , MessageQueueAcker , MessageQueueDeliveryTrait , MessageQueueReceiverTrait ,
11+ MessageQueueTrait ,
1912} ;
2013
2114#[ derive( Serialize , Deserialize , Debug , Clone ) ]
@@ -63,14 +56,14 @@ pub async fn push_to_trace_summary_queue(
6356}
6457
6558/// Main worker function to process trace summary messages
66- pub async fn process_trace_summaries ( db : Arc < DB > , cache : Arc < Cache > , queue : Arc < MessageQueue > ) {
59+ pub async fn process_trace_summaries ( queue : Arc < MessageQueue > ) {
6760 loop {
68- inner_process_trace_summaries ( db . clone ( ) , cache . clone ( ) , queue. clone ( ) ) . await ;
61+ inner_process_trace_summaries ( queue. clone ( ) ) . await ;
6962 log:: warn!( "Trace summary listener exited. Rebinding queue connection..." ) ;
7063 }
7164}
7265
73- async fn inner_process_trace_summaries ( db : Arc < DB > , cache : Arc < Cache > , queue : Arc < MessageQueue > ) {
66+ async fn inner_process_trace_summaries ( queue : Arc < MessageQueue > ) {
7467 // Add retry logic with exponential backoff for connection failures
7568 let get_receiver = || async {
7669 queue
@@ -131,15 +124,7 @@ async fn inner_process_trace_summaries(db: Arc<DB>, cache: Arc<Cache>, queue: Ar
131124 } ;
132125
133126 // Process the trace summary generation
134- if let Err ( e) = process_single_trace_summary (
135- & client,
136- db. clone ( ) ,
137- cache. clone ( ) ,
138- trace_summary_message,
139- acker,
140- )
141- . await
142- {
127+ if let Err ( e) = process_single_trace_summary ( & client, trace_summary_message, acker) . await {
143128 log:: error!( "Failed to process trace summary: {:?}" , e) ;
144129 }
145130 }
@@ -149,20 +134,9 @@ async fn inner_process_trace_summaries(db: Arc<DB>, cache: Arc<Cache>, queue: Ar
149134
150135async fn process_single_trace_summary (
151136 client : & reqwest:: Client ,
152- db : Arc < DB > ,
153- cache : Arc < Cache > ,
154137 message : TraceSummaryMessage ,
155138 acker : MessageQueueAcker ,
156139) -> anyhow:: Result < ( ) > {
157- let eligibility_result = check_trace_eligibility ( db, cache, message. project_id ) . await ?;
158-
159- if !eligibility_result. is_eligible {
160- if let Err ( e) = acker. ack ( ) . await {
161- log:: error!( "Failed to ack trace summary message: {:?}" , e) ;
162- }
163- return Ok ( ( ) ) ;
164- }
165-
166140 let summarizer_service_url = if let Ok ( url) = env:: var ( "TRACE_SUMMARIZER_URL" ) {
167141 url
168142 } else {
0 commit comments