Skip to content

Commit 792a46c

Browse files
committed
perf: improve performance on parallel processing
1 parent 193abe4 commit 792a46c

File tree

2 files changed

+13
-11
lines changed

2 files changed

+13
-11
lines changed

hud/otel/exporters.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from concurrent.futures import ThreadPoolExecutor
2222
from datetime import UTC, datetime
2323
from typing import TYPE_CHECKING, Any
24+
import httpx
2425

2526
from mcp.types import ClientRequest, ServerResult
2627
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
@@ -323,6 +324,11 @@ def __init__(self, *, telemetry_url: str, api_key: str) -> None:
323324
self._api_key = api_key
324325
# Track pending export futures so we can force-flush on shutdown
325326
self._pending_futures: list[cf.Future[SpanExportResult]] = []
327+
# Persistent HTTP client to reuse connections
328+
self._client = httpx.Client(
329+
timeout=30.0,
330+
limits=httpx.Limits(max_connections=2000, max_keepalive_connections=512, keepalive_expiry=15.0),
331+
)
326332

327333
# ------------------------------------------------------------------
328334
# Core API
@@ -378,6 +384,7 @@ def _sync_export():
378384
url=url,
379385
json=payload,
380386
api_key=self._api_key,
387+
client=self._client,
381388
)
382389
except Exception as exc:
383390
logger.exception("HUD exporter failed to send spans for task %s: %s", run_id, exc)
@@ -435,6 +442,7 @@ def _cleanup_done(f: cf.Future[SpanExportResult]) -> None:
435442
url=url,
436443
json=payload,
437444
api_key=self._api_key,
445+
client=self._client,
438446
)
439447
except Exception as exc:
440448
logger.exception("HUD exporter failed to send spans for task %s: %s", run_id, exc)
@@ -452,6 +460,11 @@ def shutdown(self) -> None: # type: ignore[override]
452460
pass
453461
finally:
454462
self._pending_futures.clear()
463+
# Close persistent client
464+
try:
465+
self._client.close()
466+
except Exception:
467+
pass
455468

456469
def force_flush(self, timeout_millis: int | None = None) -> bool: # type: ignore[override]
457470
# Wait for pending export futures

hud/telemetry/async_context.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
from hud.telemetry.job import Job, _print_job_url, _print_job_complete_url, get_current_job
2424
from hud.telemetry.trace import Trace
2525
from hud.utils.task_tracking import track_task
26-
from hud.utils.task_tracking import wait_all_tasks
2726

2827
logger = logging.getLogger(__name__)
2928

@@ -119,11 +118,6 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
119118
if self._otel_trace:
120119
self._otel_trace.__exit__(exc_type, exc_val, exc_tb)
121120

122-
# Ensure any pending telemetry tasks complete quickly
123-
try:
124-
await wait_all_tasks(timeout=5.0)
125-
except Exception:
126-
pass
127121

128122
logger.debug(f"Ended async trace for task_run_id={self.task_run_id}")
129123

@@ -211,11 +205,6 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
211205
# Restore old job
212206
_current_job = self._old_job
213207

214-
# Ensure any pending telemetry tasks complete quickly
215-
try:
216-
await wait_all_tasks(timeout=5.0)
217-
except Exception:
218-
pass
219208

220209
logger.debug(f"Ended async job {self.job.id}")
221210

0 commit comments

Comments
 (0)