44"""
55
66import logging
7- from typing import TYPE_CHECKING , List , Set
7+ from typing import TYPE_CHECKING , Any , List , Set
88
99from sqlalchemy .orm import Session
1010
11+ from rhesis .backend .app .services .async_service import AsyncService
1112from rhesis .backend .app .services .telemetry .enrichment .processor import TraceEnricher
1213
1314if TYPE_CHECKING :
1718logger = logging .getLogger (__name__ )
1819
1920
20- class EnrichmentService :
21+ class EnrichmentService ( AsyncService [ dict ]) :
2122 """Service for orchestrating trace enrichment with async/sync fallback."""
2223
2324 def __init__ (self , db : Session ):
2425 """Initialize the enrichment service."""
26+ super ().__init__ ()
2527 self .db = db
2628
27- def _check_workers_available (self ) -> bool :
29+ def _execute_sync (self , trace_id : str , project_id : str , organization_id : str ) -> dict | None :
2830 """
29- Check if Celery workers are available to process telemetry tasks.
31+ Synchronous enrichment (development fallback).
32+
33+ Args:
34+ trace_id: Trace ID to enrich
35+ project_id: Project ID for access control
36+ organization_id: Organization ID for multi-tenant security
3037
3138 Returns:
32- True if workers are available, False otherwise
39+ Enriched trace data or None if enrichment failed
3340 """
34- try :
35- from rhesis . backend . worker import app as celery_app
36-
37- # Use ping with 3 second timeout - more reliable for solo pool workers
38- # Solo pool workers process tasks sequentially, so stats() may timeout
39- # while ping() is faster and gets prioritized
40- inspect = celery_app . control . inspect ( timeout = 3.0 )
41+ enricher = TraceEnricher ( self . db )
42+ enriched_data = enricher . enrich_trace ( trace_id , project_id , organization_id )
43+ if enriched_data :
44+ logger . info ( f"Completed sync enrichment for trace { trace_id } " )
45+ else :
46+ logger . warning ( f"Sync enrichment returned no data for trace { trace_id } " )
47+ return enriched_data
4148
42- # Ping is faster and works better with solo pool
43- ping_result = inspect .ping ()
49+ def _enqueue_async (self , trace_id : str , project_id : str , organization_id : str ) -> Any :
50+ """
51+ Enqueue async enrichment task.
4452
45- if not ping_result :
46- return False
53+ Args:
54+ trace_id: Trace ID to enrich
55+ project_id: Project ID for access control
56+ organization_id: Organization ID for multi-tenant security
4757
48- # If we can ping workers, they're available
49- logger .debug (
50- f"Found { len (ping_result )} available worker(s): { list (ping_result .keys ())} "
51- )
52- return True
58+ Returns:
59+ Celery AsyncResult
60+ """
61+ from rhesis .backend .tasks .telemetry .enrich import enrich_trace_async
5362
54- except Exception as e :
55- logger .debug (f"Worker availability check failed : { e } " )
56- return False
63+ result = enrich_trace_async . delay ( trace_id , project_id , organization_id )
64+ logger .debug (f"Enqueued async enrichment for trace { trace_id } (task : { result . id } ) " )
65+ return result
5766
5867 def enqueue_enrichment (
5968 self ,
@@ -80,40 +89,14 @@ def enqueue_enrichment(
8089 Returns:
8190 True if async task was enqueued, False if sync fallback was used
8291 """
83- # Check if workers are available (use cached result if provided)
84- if workers_available is None :
85- workers_available = self ._check_workers_available ()
86-
87- if workers_available :
88- try :
89- from rhesis .backend .tasks .telemetry .enrich import enrich_trace_async
90-
91- # Try to enqueue async task
92- result = enrich_trace_async .delay (trace_id , project_id , organization_id )
93- logger .debug (f"Enqueued async enrichment for trace { trace_id } (task: { result .id } )" )
94- return True
95-
96- except Exception as e :
97- logger .warning (
98- f"Async enrichment failed for trace { trace_id } , using sync fallback: { e } "
99- )
100- else :
101- logger .info (f"No Celery workers available, using sync enrichment for trace { trace_id } " )
102-
103- # Fall back to synchronous enrichment
10492 try :
105- enricher = TraceEnricher (self .db )
106- enriched_data = enricher .enrich_trace (trace_id , project_id , organization_id )
107- if enriched_data :
108- logger .info (f"Completed sync enrichment for trace { trace_id } " )
109- else :
110- logger .warning (f"Sync enrichment returned no data for trace { trace_id } " )
111- return False
112- except Exception as sync_error :
113- # Log but don't fail the ingestion
114- logger .error (
115- f"Sync enrichment failed for trace { trace_id } : { sync_error } " , exc_info = True
93+ was_async , _ = self .execute_with_fallback (
94+ trace_id , project_id , organization_id , workers_available = workers_available
11695 )
96+ return was_async
97+ except Exception as e :
98+ # Log but don't fail the ingestion
99+ logger .error (f"Enrichment failed for trace { trace_id } : { e } " , exc_info = True )
117100 return False
118101
119102 def enrich_traces (
@@ -130,20 +113,8 @@ def enrich_traces(
130113 Returns:
131114 Tuple of (async_count, sync_count)
132115 """
133- async_count = 0
134- sync_count = 0
135-
136- # Check worker availability once before the loop to avoid N×3 second timeout
137- # when workers are unavailable (prevents batch processing delays)
138- workers_available = self ._check_workers_available ()
139-
140- for trace_id in trace_ids :
141- if self .enqueue_enrichment (trace_id , project_id , organization_id , workers_available ):
142- async_count += 1
143- else :
144- sync_count += 1
145-
146- return async_count , sync_count
116+ items = [((trace_id , project_id , organization_id ), {}) for trace_id in trace_ids ]
117+ return self .batch_execute (items )
147118
148119 def create_and_enrich_spans (
149120 self ,
0 commit comments