Skip to content

Commit ea91432

Browse files
refactor(telemetry): migrate EnrichmentService to AsyncService base
- Extend AsyncService for standardized async/sync orchestration - Replace custom worker checking with inherited implementation - Simplify enqueue_enrichment using execute_with_fallback - Use batch_execute for efficient multi-trace processing
1 parent 60b8f55 commit ea91432

File tree

1 file changed

+41
-70
lines changed
  • apps/backend/src/rhesis/backend/app/services/telemetry/enrichment

1 file changed

+41
-70
lines changed

apps/backend/src/rhesis/backend/app/services/telemetry/enrichment/service.py

Lines changed: 41 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
"""
55

66
import logging
7-
from typing import TYPE_CHECKING, List, Set
7+
from typing import TYPE_CHECKING, Any, List, Set
88

99
from sqlalchemy.orm import Session
1010

11+
from rhesis.backend.app.services.async_service import AsyncService
1112
from rhesis.backend.app.services.telemetry.enrichment.processor import TraceEnricher
1213

1314
if TYPE_CHECKING:
@@ -17,43 +18,51 @@
1718
logger = logging.getLogger(__name__)
1819

1920

20-
class EnrichmentService:
21+
class EnrichmentService(AsyncService[dict]):
2122
"""Service for orchestrating trace enrichment with async/sync fallback."""
2223

2324
def __init__(self, db: Session):
2425
"""Initialize the enrichment service."""
26+
super().__init__()
2527
self.db = db
2628

27-
def _check_workers_available(self) -> bool:
29+
def _execute_sync(self, trace_id: str, project_id: str, organization_id: str) -> dict | None:
2830
"""
29-
Check if Celery workers are available to process telemetry tasks.
31+
Synchronous enrichment (development fallback).
32+
33+
Args:
34+
trace_id: Trace ID to enrich
35+
project_id: Project ID for access control
36+
organization_id: Organization ID for multi-tenant security
3037
3138
Returns:
32-
True if workers are available, False otherwise
39+
Enriched trace data or None if enrichment failed
3340
"""
34-
try:
35-
from rhesis.backend.worker import app as celery_app
36-
37-
# Use ping with 3 second timeout - more reliable for solo pool workers
38-
# Solo pool workers process tasks sequentially, so stats() may timeout
39-
# while ping() is faster and gets prioritized
40-
inspect = celery_app.control.inspect(timeout=3.0)
41+
enricher = TraceEnricher(self.db)
42+
enriched_data = enricher.enrich_trace(trace_id, project_id, organization_id)
43+
if enriched_data:
44+
logger.info(f"Completed sync enrichment for trace {trace_id}")
45+
else:
46+
logger.warning(f"Sync enrichment returned no data for trace {trace_id}")
47+
return enriched_data
4148

42-
# Ping is faster and works better with solo pool
43-
ping_result = inspect.ping()
49+
def _enqueue_async(self, trace_id: str, project_id: str, organization_id: str) -> Any:
50+
"""
51+
Enqueue async enrichment task.
4452
45-
if not ping_result:
46-
return False
53+
Args:
54+
trace_id: Trace ID to enrich
55+
project_id: Project ID for access control
56+
organization_id: Organization ID for multi-tenant security
4757
48-
# If we can ping workers, they're available
49-
logger.debug(
50-
f"Found {len(ping_result)} available worker(s): {list(ping_result.keys())}"
51-
)
52-
return True
58+
Returns:
59+
Celery AsyncResult
60+
"""
61+
from rhesis.backend.tasks.telemetry.enrich import enrich_trace_async
5362

54-
except Exception as e:
55-
logger.debug(f"Worker availability check failed: {e}")
56-
return False
63+
result = enrich_trace_async.delay(trace_id, project_id, organization_id)
64+
logger.debug(f"Enqueued async enrichment for trace {trace_id} (task: {result.id})")
65+
return result
5766

5867
def enqueue_enrichment(
5968
self,
@@ -80,40 +89,14 @@ def enqueue_enrichment(
8089
Returns:
8190
True if async task was enqueued, False if sync fallback was used
8291
"""
83-
# Check if workers are available (use cached result if provided)
84-
if workers_available is None:
85-
workers_available = self._check_workers_available()
86-
87-
if workers_available:
88-
try:
89-
from rhesis.backend.tasks.telemetry.enrich import enrich_trace_async
90-
91-
# Try to enqueue async task
92-
result = enrich_trace_async.delay(trace_id, project_id, organization_id)
93-
logger.debug(f"Enqueued async enrichment for trace {trace_id} (task: {result.id})")
94-
return True
95-
96-
except Exception as e:
97-
logger.warning(
98-
f"Async enrichment failed for trace {trace_id}, using sync fallback: {e}"
99-
)
100-
else:
101-
logger.info(f"No Celery workers available, using sync enrichment for trace {trace_id}")
102-
103-
# Fall back to synchronous enrichment
10492
try:
105-
enricher = TraceEnricher(self.db)
106-
enriched_data = enricher.enrich_trace(trace_id, project_id, organization_id)
107-
if enriched_data:
108-
logger.info(f"Completed sync enrichment for trace {trace_id}")
109-
else:
110-
logger.warning(f"Sync enrichment returned no data for trace {trace_id}")
111-
return False
112-
except Exception as sync_error:
113-
# Log but don't fail the ingestion
114-
logger.error(
115-
f"Sync enrichment failed for trace {trace_id}: {sync_error}", exc_info=True
93+
was_async, _ = self.execute_with_fallback(
94+
trace_id, project_id, organization_id, workers_available=workers_available
11695
)
96+
return was_async
97+
except Exception as e:
98+
# Log but don't fail the ingestion
99+
logger.error(f"Enrichment failed for trace {trace_id}: {e}", exc_info=True)
117100
return False
118101

119102
def enrich_traces(
@@ -130,20 +113,8 @@ def enrich_traces(
130113
Returns:
131114
Tuple of (async_count, sync_count)
132115
"""
133-
async_count = 0
134-
sync_count = 0
135-
136-
# Check worker availability once before the loop to avoid N×3 second timeout
137-
# when workers are unavailable (prevents batch processing delays)
138-
workers_available = self._check_workers_available()
139-
140-
for trace_id in trace_ids:
141-
if self.enqueue_enrichment(trace_id, project_id, organization_id, workers_available):
142-
async_count += 1
143-
else:
144-
sync_count += 1
145-
146-
return async_count, sync_count
116+
items = [((trace_id, project_id, organization_id), {}) for trace_id in trace_ids]
117+
return self.batch_execute(items)
147118

148119
def create_and_enrich_spans(
149120
self,

0 commit comments

Comments
 (0)