Skip to content

Commit 8b8479e

Browse files
committed
feat: async context and task tracking
1 parent 73f33ea commit 8b8479e

File tree

5 files changed

+316
-141
lines changed

5 files changed

+316
-141
lines changed

hud/datasets/runner.py

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,33 +29,40 @@ async def run_dataset(
2929
auto_respond: bool = False,
3030
custom_system_prompt: str | None = None,
3131
) -> list[Any]:
32-
"""
33-
Run all tasks in a dataset with automatic job tracking.
32+
"""Run all tasks in a dataset with automatic job and telemetry tracking.
33+
34+
This function handles concurrent task execution with proper telemetry collection.
35+
All tasks are executed in parallel up to `max_concurrent`, with full telemetry
36+
automatically uploaded to the HUD platform.
3437
3538
Args:
3639
name: Name for the job
3740
dataset: HuggingFace dataset identifier (e.g. "hud-evals/SheetBench-50"),
3841
Dataset object, OR list of Task objects
3942
agent_class: Agent class to instantiate (e.g., ClaudeAgent)
4043
agent_config: Configuration/kwargs for agent (model, etc.)
41-
max_concurrent: Maximum parallel task execution
44+
max_concurrent: Maximum parallel task execution. Higher values improve throughput
45+
but may increase memory usage. Recommended: 30-200 depending on
46+
task complexity and available resources.
4247
metadata: Optional metadata for the job
4348
max_steps: Maximum steps per task
4449
split: Dataset split to use when loading from string (default: "train")
4550
auto_respond: Whether to use auto-response agent
4651
custom_system_prompt: Override system prompt for all tasks
4752
4853
Returns:
49-
List of results from agent.run() in dataset order
54+
List of results from agent.run() in dataset order. Telemetry is automatically
55+
collected and uploaded for all tasks.
5056
5157
Example:
5258
>>> from hud.agents import ClaudeAgent
53-
>>> # Option 1: From dataset string identifier
59+
>>> # Basic usage with dataset identifier
5460
>>> results = await run_dataset(
5561
... "SheetBench Eval",
5662
... "hud-evals/SheetBench-50",
5763
... ClaudeAgent,
5864
... {"model": "claude-3-5-sonnet-20241022"},
65+
... max_concurrent=100, # Adjust based on your needs
5966
... )
6067
>>> # Option 2: From HuggingFace dataset object
6168
>>> from datasets import load_dataset
@@ -64,10 +71,13 @@ async def run_dataset(
6471
>>> # Option 3: From list of dicts
6572
>>> tasks = [{"prompt": "...", "mcp_config": {...}, ...}, ...]
6673
>>> results = await run_dataset("browser_eval", tasks, ClaudeAgent)
74+
75+
Note:
76+
Telemetry collection and upload is handled automatically. The function ensures
77+
all telemetry is flushed before returning, even at high concurrency levels.
6778
"""
6879
# Import here to avoid circular imports
69-
import hud
70-
from hud.telemetry.async_context import async_job, async_trace
80+
from hud.telemetry import async_job, async_trace
7181

7282
dataset_link = None
7383

@@ -126,24 +136,39 @@ async def _worker(index: int, task_dict: Any, max_steps: int = 10) -> None:
126136
return_exceptions=True, # Don't fail entire batch on one error
127137
)
128138

129-
# Wait for all tracked tasks to complete (job/trace exits already tried a short wait)
130-
from hud.utils.task_tracking import wait_all_tasks
131-
completed = await wait_all_tasks(timeout=20.0)
132-
if completed > 0:
133-
logger.info(f"Waited for {completed} telemetry tasks to complete")
139+
# Ensure all telemetry is uploaded before returning
140+
await _flush_telemetry()
141+
142+
return results
143+
144+
145+
async def _flush_telemetry() -> None:
146+
"""Flush all pending telemetry operations.
134147
135-
# Flush telemetry - this ensures BatchSpanProcessor exports pending spans
148+
Ensures complete telemetry upload by:
149+
1. Waiting for all async status updates to complete
150+
2. Forcing OpenTelemetry span processor to export remaining spans
151+
152+
This prevents telemetry loss at high concurrency (200+ tasks) by ensuring
153+
all operations complete before process exit.
154+
"""
136155
from hud.otel.config import is_telemetry_configured
156+
from hud.utils.task_tracking import wait_all_tasks
157+
158+
# Step 1: Wait for async status updates (job/trace status)
159+
completed_tasks = await wait_all_tasks(timeout=20.0)
160+
if completed_tasks > 0:
161+
logger.debug(f"Completed {completed_tasks} pending telemetry tasks")
162+
163+
# Step 2: Flush OpenTelemetry span exports
137164
if is_telemetry_configured():
138165
try:
139166
from opentelemetry import trace
140167
from opentelemetry.sdk.trace import TracerProvider
168+
141169
provider = trace.get_tracer_provider()
142-
# Check if it's an SDK TracerProvider (not the default no-op one)
143170
if isinstance(provider, TracerProvider):
144-
provider.force_flush(timeout_millis=20000) # 20 second timeout
145-
logger.info("Telemetry provider flushed successfully")
171+
provider.force_flush(timeout_millis=20000)
172+
logger.debug("OpenTelemetry spans flushed successfully")
146173
except Exception as e:
147-
logger.warning(f"Failed to flush telemetry: {e}")
148-
149-
return results
174+
logger.warning(f"Failed to flush OpenTelemetry: {e}")

hud/otel/exporters.py

Lines changed: 88 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
1-
"""Custom OpenTelemetry exporter that sends spans to the existing HUD telemetry
2-
HTTP endpoint (/trace/<id>/telemetry-upload).
1+
"""Custom OpenTelemetry exporter for HUD telemetry backend.
32
4-
The exporter groups spans by ``hud.task_run_id`` baggage / attribute so we keep
5-
exactly the same semantics the old async worker in ``hud.telemetry.exporter``
6-
implemented.
3+
This exporter sends spans to the HUD telemetry HTTP endpoint, grouping them
4+
by task_run_id for efficient batch uploads.
75
8-
This exporter is *synchronous* (derives from :class:`SpanExporter`). We rely on
9-
``hud.shared.make_request_sync`` which already contains retry & auth logic.
6+
Performance optimizations:
7+
- Detects async contexts and runs exports in a thread pool to avoid blocking
8+
- Uses persistent HTTP client with connection pooling for reduced overhead
9+
- Tracks pending export futures to ensure completion during shutdown
10+
11+
The exporter derives from SpanExporter (synchronous interface) but handles
12+
async contexts intelligently to prevent event loop blocking during high-concurrency
13+
workloads.
1014
"""
1115

1216
from __future__ import annotations
@@ -21,7 +25,6 @@
2125
from concurrent.futures import ThreadPoolExecutor
2226
from datetime import UTC, datetime
2327
from typing import TYPE_CHECKING, Any
24-
import httpx
2528

2629
from mcp.types import ClientRequest, ServerResult
2730
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
@@ -35,18 +38,30 @@
3538

3639
logger = logging.getLogger(__name__)
3740

38-
# Global thread pool for span exports to avoid blocking event loop
41+
# Global singleton thread pool for span exports
3942
_export_executor: ThreadPoolExecutor | None = None
4043

44+
4145
def get_export_executor() -> ThreadPoolExecutor:
42-
"""Get or create the global export executor."""
46+
"""Get or create the global thread pool for span exports.
47+
48+
Returns a singleton ThreadPoolExecutor used for running span exports
49+
in a thread pool when called from async contexts, preventing event
50+
loop blocking during high-concurrency workloads.
51+
52+
The executor is automatically cleaned up on process exit via atexit.
53+
54+
Returns:
55+
ThreadPoolExecutor with 2 workers
56+
"""
4357
global _export_executor
4458
if _export_executor is None:
4559
_export_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="span-export")
46-
# Register cleanup
60+
4761
def cleanup():
4862
if _export_executor is not None:
4963
_export_executor.shutdown(wait=True)
64+
5065
atexit.register(cleanup)
5166
return _export_executor
5267

@@ -316,41 +331,62 @@ def _span_to_dict(span: ReadableSpan) -> dict[str, Any]:
316331

317332

318333
class HudSpanExporter(SpanExporter):
319-
"""Exporter that forwards spans to HUD backend using existing endpoint."""
334+
"""OpenTelemetry span exporter for the HUD backend.
335+
336+
This exporter groups spans by task_run_id and sends them to the HUD
337+
telemetry endpoint. Performance optimizations include:
338+
339+
- Auto-detects async contexts and runs exports in thread pool (non-blocking)
340+
- Tracks pending export futures for proper shutdown coordination
341+
342+
Handles high-concurrency scenarios (200+ parallel tasks) by offloading
343+
synchronous HTTP operations to a thread pool when called from async
344+
contexts, preventing event loop blocking.
345+
"""
320346

321347
def __init__(self, *, telemetry_url: str, api_key: str) -> None:
348+
"""Initialize the HUD span exporter.
349+
350+
Args:
351+
telemetry_url: Base URL for the HUD telemetry backend
352+
api_key: API key for authentication
353+
"""
322354
super().__init__()
323355
self._telemetry_url = telemetry_url.rstrip("/")
324356
self._api_key = api_key
325-
# Track pending export futures so we can force-flush on shutdown
357+
358+
# Track pending export futures for shutdown coordination
326359
self._pending_futures: list[cf.Future[SpanExportResult]] = []
327-
# Persistent HTTP client to reuse connections
328-
self._client = httpx.Client(
329-
timeout=30.0,
330-
limits=httpx.Limits(max_connections=2000, max_keepalive_connections=512, keepalive_expiry=15.0),
331-
)
332-
333-
# ------------------------------------------------------------------
334-
# Core API
335-
# ------------------------------------------------------------------
360+
336361
def export(self, spans: list[ReadableSpan]) -> SpanExportResult: # type: ignore[override]
362+
"""Export spans to HUD backend.
363+
364+
Auto-detects async contexts: if called from an async event loop, runs
365+
the export in a thread pool to avoid blocking. Otherwise runs synchronously.
366+
367+
Args:
368+
spans: List of ReadableSpan objects to export
369+
370+
Returns:
371+
SpanExportResult.SUCCESS (returns immediately in async contexts)
372+
"""
337373
if not spans:
338374
return SpanExportResult.SUCCESS
339375

340-
# Group spans by hud.task_run_id attribute
376+
# Group spans by task_run_id for batched uploads
341377
grouped: dict[str, list[ReadableSpan]] = defaultdict(list)
342378
for span in spans:
343379
run_id = span.attributes.get("hud.task_run_id") if span.attributes else None
344380
if not run_id:
345-
# Skip spans that are outside HUD traces
381+
# Skip spans outside HUD traces
346382
continue
347383
grouped[str(run_id)].append(span)
348384

349-
# Try to run export in background if we're in an event loop
385+
# Detect async context to avoid event loop blocking
350386
import asyncio
351387
try:
352388
loop = asyncio.get_running_loop()
353-
# We're in an async context - schedule export in thread to avoid blocking
389+
# In async context - offload to thread pool
354390
executor = get_export_executor()
355391

356392
def _sync_export():
@@ -384,7 +420,6 @@ def _sync_export():
384420
url=url,
385421
json=payload,
386422
api_key=self._api_key,
387-
client=self._client,
388423
)
389424
except Exception as exc:
390425
logger.exception("HUD exporter failed to send spans for task %s: %s", run_id, exc)
@@ -442,7 +477,6 @@ def _cleanup_done(f: cf.Future[SpanExportResult]) -> None:
442477
url=url,
443478
json=payload,
444479
api_key=self._api_key,
445-
client=self._client,
446480
)
447481
except Exception as exc:
448482
logger.exception("HUD exporter failed to send spans for task %s: %s", run_id, exc)
@@ -452,39 +486,52 @@ def _cleanup_done(f: cf.Future[SpanExportResult]) -> None:
452486
return SpanExportResult.SUCCESS
453487

454488
def shutdown(self) -> None: # type: ignore[override]
455-
# Best effort: wait for pending exports to complete
489+
"""Shutdown the exporter and wait for pending exports.
490+
491+
Waits up to 10 seconds for any in-flight exports to complete.
492+
"""
456493
try:
457494
if self._pending_futures:
458495
cf.wait(self._pending_futures, timeout=10.0)
459496
except Exception:
460497
pass
461498
finally:
462499
self._pending_futures.clear()
463-
# Close persistent client
464-
try:
465-
self._client.close()
466-
except Exception:
467-
pass
468500

469501
def force_flush(self, timeout_millis: int | None = None) -> bool: # type: ignore[override]
470-
# Wait for pending export futures
502+
"""Force flush all pending span exports.
503+
504+
Waits for all pending export futures to complete before returning.
505+
This is called by the OpenTelemetry SDK during shutdown to ensure
506+
all telemetry is uploaded.
507+
508+
Args:
509+
timeout_millis: Maximum time to wait in milliseconds
510+
511+
Returns:
512+
True if all exports completed, False otherwise
513+
"""
471514
try:
472515
if not self._pending_futures:
473516
return True
517+
474518
timeout = (timeout_millis or 30000) / 1000.0
475519
done, not_done = cf.wait(self._pending_futures, timeout=timeout)
476-
# Consume exceptions to avoid warnings
520+
521+
# Consume exceptions to avoid "exception was never retrieved" warnings
477522
for f in list(done):
478523
try:
479524
_ = f.exception()
480525
except Exception:
481526
pass
482-
# Remove finished futures
483-
try:
484-
for f in list(done):
527+
528+
# Remove completed futures
529+
for f in list(done):
530+
try:
485531
self._pending_futures.remove(f)
486-
except ValueError:
487-
pass
532+
except ValueError:
533+
pass
534+
488535
return len(not_done) == 0
489536
except Exception:
490537
return False

hud/telemetry/__init__.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,31 @@
1-
"""HUD Telemetry - User-facing APIs for tracing and job management.
1+
"""HUD Telemetry - Tracing and job management for agent execution.
22
3-
This module provides the main telemetry APIs that users interact with:
4-
- trace: Context manager for tracing code execution
5-
- job: Context manager and utilities for job management
6-
- instrument: Decorator for instrumenting functions
7-
- get_trace: Retrieve collected traces for replay/analysis
3+
This module provides telemetry APIs for tracking agent execution:
4+
5+
Standard Usage (for most users):
6+
- trace(): Context manager for tracing code execution
7+
- job(): Context manager for grouping related tasks
8+
- instrument(): Decorator for instrumenting functions
9+
- get_trace(): Retrieve collected traces for replay/analysis
10+
11+
High-Concurrency Usage (200+ parallel tasks):
12+
- async_trace(): Async context manager for traces (prevents event loop blocking)
13+
- async_job(): Async context manager for jobs (prevents event loop blocking)
14+
15+
The async versions are automatically used by run_dataset() and other high-concurrency
16+
functions. Most users don't need to use them directly.
817
"""
918

1019
from __future__ import annotations
1120

21+
from .async_context import async_job, async_trace
1222
from .instrument import instrument
1323
from .job import Job, create_job, job
1424
from .replay import clear_trace, get_trace
1525
from .trace import Trace, trace
1626

1727
__all__ = [
28+
# Standard synchronous APIs (for typical usage)
1829
"Job",
1930
"Trace",
2031
"clear_trace",
@@ -23,4 +34,7 @@
2334
"instrument",
2435
"job",
2536
"trace",
37+
# Async APIs (for high-concurrency scenarios)
38+
"async_job",
39+
"async_trace",
2640
]

0 commit comments

Comments
 (0)