Skip to content

Commit 9c372ee

Browse files
yonromaiclaude
andauthored
feat(iris): add request-level observability to controller RPCs (#3073)
Fixes #3071. Related: #3062 (incident), #3067 (retry fix), #2809 (threading survey), #2964 (lock contention). ## Problem When the GPU canary ferry hit `DEADLINE_EXCEEDED` (#3062), the controller was alive but we had **no way to tell why `GetTaskLogs` took 30s+**. The retry fix (#3067) treats the symptom. We need observability to find the root cause next time. Today: zero RPC durations, zero storage latency measurements, zero heartbeat round timings, no periodic health snapshot. ## Changes Five logging hooks — one new file (`interceptors.py`), edits to three existing files. Logging only, no metrics infra. All thresholds are hardcoded initial guesses. | # | Change | Key output | |---|--------|------------| | 1 | RPC timing interceptor (`UnaryInterceptorSync`) | `WARN RPC GetTaskLogs completed in 31204ms (slow)` | | 2 | Storage read timing in `get_task_logs` | `WARN Storage read for job/0 attempt 0: 28500ms (slow)` | | 3 | Heartbeat + scheduling phase-level timing | `DEBUG Heartbeat round: 128 workers, 3 failed, 4200ms (snapshot: 3100ms)` | | 4 | Periodic health summary (~30s) | `INFO Controller status: 128 workers (2 failed), 2 active jobs, 0 pending` | | 5 | Uvicorn log level `"error"` → `"warning"` | Surfaces connection warnings from GH Actions | The snapshot sub-timing in Change 3 disambiguates lock contention from slow RPCs: if `snapshot_ms` dominates the round time, the `ControllerState` RLock is contended. If `elapsed >> snapshot_ms`, slow worker RPCs are the cause. **Diagnostic trail — slow storage:** ``` INFO Controller status: 128 workers (0 failed), 2 active jobs, 0 pending tasks DEBUG Heartbeat round: 128 workers, 3 failed, 4200ms (snapshot: 45ms) WARN Storage read for job/0/0 attempt 0: 28500ms (slow) WARN RPC GetTaskLogs completed in 31204ms (slow) ``` **Diagnostic trail — lock contention:** ``` WARN Heartbeat round: 128 workers, 0 failed, 6200ms (snapshot: 5800ms) WARN RPC GetTaskLogs completed in 31204ms (slow) ``` <details><summary>Scope boundaries</summary> - **Logging only** — no Prometheus/OTel. A metrics stack is a separate effort (#2826). - **Hardcoded thresholds** (RPC: 1s, storage: 2s, heartbeat: 5s) — tunable later once we have baseline data. - **Phase-level timing as lock-contention proxy** — we time the lock-acquiring phases from outside rather than instrumenting `ControllerState`'s RLock (too invasive, too noisy). - **`DEBUG` for normal, `WARNING` for slow/errored** — no `INFO`-level RPC logging (too noisy at 128 workers × 1s polling). - **Not in scope**: performance fixes, K8s CPU allocation ([per @rjpower](#3071 (comment))), structured logging format. </details> ## Test plan - [x] Unit tests for `RequestTimingInterceptor` (pass-through + re-raise contract) - [x] Existing controller tests pass (268 tests) - [ ] Post-merge: verify `DEBUG RPC ...` on dashboard navigation, `DEBUG Heartbeat round:` every ~5s, `INFO Controller status:` every ~30s, no `WARNING` under normal load Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 82d1a79 commit 9c372ee

File tree

5 files changed

+128
-5
lines changed

5 files changed

+128
-5
lines changed

lib/iris/src/iris/cluster/controller/controller.py

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,13 @@
4444
from iris.managed_thread import ManagedThread, ThreadContainer, get_thread_container
4545
from iris.rpc import cluster_pb2
4646
from iris.rpc.cluster_connect import WorkerServiceClientSync
47-
from iris.time_utils import Duration, ExponentialBackoff
47+
from iris.time_utils import Duration, ExponentialBackoff, Timer
4848

4949
logger = logging.getLogger(__name__)
5050

51+
_SLOW_HEARTBEAT_MS = 5000
52+
_HEALTH_SUMMARY_INTERVAL = 6 # every ~30s at 5s heartbeat interval
53+
5154

5255
def job_requirements_from_job(job: ControllerJob) -> JobRequirements:
5356
"""Convert a ControllerJob to scheduler-compatible JobRequirements."""
@@ -274,6 +277,8 @@ def __init__(
274277
# Autoscaler (passed in, configured in start() if provided)
275278
self._autoscaler: Autoscaler | None = autoscaler
276279

280+
self._heartbeat_iteration = 0
281+
277282
def wake(self) -> None:
278283
"""Signal the controller loop to run immediately.
279284
@@ -298,7 +303,7 @@ def start(self) -> None:
298303
self._dashboard._app,
299304
host=self._config.host,
300305
port=self._config.port,
301-
log_level="error",
306+
log_level="warning",
302307
timeout_keep_alive=120,
303308
)
304309
self._server = uvicorn.Server(server_config)
@@ -381,8 +386,10 @@ def _run_scheduling(self) -> None:
381386
No lock is needed since only one scheduling thread exists. All state
382387
reads and writes go through ControllerState which has its own lock.
383388
"""
389+
timer = Timer()
384390
pending_tasks = self._state.peek_pending_tasks()
385391
workers = self._state.get_available_workers()
392+
state_read_ms = timer.elapsed_ms()
386393

387394
if not pending_tasks:
388395
return
@@ -418,6 +425,12 @@ def _run_scheduling(self) -> None:
418425
# Buffer assignments for heartbeat delivery (commits resources via TaskAssignedEvent)
419426
if result.assignments:
420427
self._buffer_assignments(result.assignments)
428+
logger.debug(
429+
"Scheduling cycle: %d assignments, %dms (state read: %dms)",
430+
len(result.assignments),
431+
timer.elapsed_ms(),
432+
state_read_ms,
433+
)
421434

422435
def _buffer_assignments(
423436
self,
@@ -543,12 +556,17 @@ def _heartbeat_all_workers(self) -> None:
543556
_on_worker_failed prunes it from state. We detect this (worker no longer
544557
in state) and evict the cached stub + notify the autoscaler.
545558
"""
546-
# Phase 1: create snapshots for all healthy workers
559+
round_timer = Timer()
560+
561+
# Phase 1: create snapshots for all healthy workers (lock-acquiring).
562+
# Timing this phase separately gives a lock-contention signal.
563+
snapshot_timer = Timer()
547564
snapshots: list[HeartbeatSnapshot] = []
548565
for w in self._state.get_available_workers():
549566
snapshot = self._state.begin_heartbeat(w.worker_id)
550567
if snapshot:
551568
snapshots.append(snapshot)
569+
snapshot_ms = snapshot_timer.elapsed_ms()
552570

553571
if not snapshots:
554572
return
@@ -578,9 +596,11 @@ def _dispatch_worker() -> None:
578596
worker_futures = [self._dispatch_executor.submit(_dispatch_worker) for _ in range(worker_count)]
579597

580598
# Phase 3: consume all responses; per-worker RPC timeout determines failures.
599+
fail_count = 0
581600
for _ in snapshots:
582601
snapshot, response, error = result_queue.get()
583602
if error is not None:
603+
fail_count += 1
584604
logger.warning("Heartbeat error for %s: %s", snapshot.worker_id, error)
585605
self._handle_heartbeat_failure(snapshot, error)
586606
continue
@@ -590,6 +610,31 @@ def _dispatch_worker() -> None:
590610
for future in worker_futures:
591611
future.cancel()
592612

613+
elapsed = round_timer.elapsed_ms()
614+
level = logging.WARNING if elapsed > _SLOW_HEARTBEAT_MS else logging.DEBUG
615+
logger.log(
616+
level,
617+
"Heartbeat round: %d workers, %d failed, %dms (snapshot: %dms)",
618+
len(snapshots),
619+
fail_count,
620+
elapsed,
621+
snapshot_ms,
622+
)
623+
624+
self._heartbeat_iteration += 1
625+
if self._heartbeat_iteration % _HEALTH_SUMMARY_INTERVAL == 0:
626+
workers = self._state.get_available_workers()
627+
jobs = self._state.list_all_jobs()
628+
active = sum(1 for j in jobs if j.state == cluster_pb2.JOB_STATE_RUNNING)
629+
pending = len(self._state.peek_pending_tasks())
630+
logger.info(
631+
"Controller status: %d workers (%d failed), %d active jobs, %d pending tasks",
632+
len(workers),
633+
fail_count,
634+
active,
635+
pending,
636+
)
637+
593638
def _handle_heartbeat_failure(self, snapshot: HeartbeatSnapshot, error: str) -> None:
594639
"""Process a heartbeat failure: update state, evict stub + notify autoscaler if worker died.
595640

lib/iris/src/iris/cluster/controller/dashboard.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from iris.cluster.dashboard_common import html_shell, static_files_mount
2828
from iris.rpc import cluster_pb2
2929
from iris.rpc.cluster_connect import ControllerServiceWSGIApplication
30+
from iris.rpc.interceptors import RequestTimingInterceptor
3031

3132
logger = logging.getLogger(__name__)
3233

@@ -57,7 +58,7 @@ def port(self) -> int:
5758
return self._port
5859

5960
def _create_app(self) -> Starlette:
60-
rpc_wsgi_app = ControllerServiceWSGIApplication(service=self._service)
61+
rpc_wsgi_app = ControllerServiceWSGIApplication(service=self._service, interceptors=[RequestTimingInterceptor()])
6162
rpc_app = WSGIMiddleware(rpc_wsgi_app)
6263

6364
routes = [

lib/iris/src/iris/cluster/controller/service.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,13 @@
4242
from iris.rpc.cluster_connect import WorkerServiceClientSync
4343
from iris.rpc.errors import rpc_error_handler
4444
from iris.rpc.proto_utils import task_state_name
45-
from iris.time_utils import Timestamp
45+
from iris.time_utils import Timer, Timestamp
4646

4747
logger = logging.getLogger(__name__)
4848

4949
DEFAULT_TRANSACTION_LIMIT = 50
5050
DEFAULT_MAX_TOTAL_LINES = 10000
51+
_SLOW_STORAGE_READ_MS = 2000
5152

5253
# Maximum bundle size in bytes (25 MB) - matches client-side limit
5354
MAX_BUNDLE_SIZE_BYTES = 25 * 1024 * 1024
@@ -919,12 +920,21 @@ def get_task_logs(
919920
continue
920921

921922
try:
923+
storage_timer = Timer()
922924
reader = task_logging.LogReader.from_log_directory(log_directory=attempt.log_directory)
923925
log_entries = reader.read_logs(
924926
source=None, # All sources
925927
regex_filter=request.regex if request.regex else None,
926928
max_lines=max(0, max_lines - total_lines) if max_lines > 0 else 0,
927929
)
930+
storage_elapsed = storage_timer.elapsed_ms()
931+
if storage_elapsed > _SLOW_STORAGE_READ_MS:
932+
logger.warning(
933+
"Storage read for %s attempt %d: %dms (slow)",
934+
task_id_wire,
935+
attempt.attempt_id,
936+
storage_elapsed,
937+
)
928938

929939
worker_logs = []
930940
for entry in log_entries:
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Copyright 2025 The Marin Authors
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
import logging
5+
6+
from iris.time_utils import Timer
7+
8+
logger = logging.getLogger(__name__)
9+
10+
_SLOW_RPC_THRESHOLD_MS = 1000
11+
12+
13+
class RequestTimingInterceptor:
14+
"""Logs method name + duration for every unary RPC."""
15+
16+
def intercept_unary_sync(self, call_next, request, ctx):
17+
method = ctx.method().name
18+
timer = Timer()
19+
try:
20+
response = call_next(request, ctx)
21+
elapsed = timer.elapsed_ms()
22+
if elapsed > _SLOW_RPC_THRESHOLD_MS:
23+
logger.warning("RPC %s completed in %dms (slow)", method, elapsed)
24+
else:
25+
logger.debug("RPC %s completed in %dms", method, elapsed)
26+
return response
27+
except Exception as e:
28+
logger.warning("RPC %s failed after %dms: %s", method, timer.elapsed_ms(), e)
29+
raise
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Copyright 2025 The Marin Authors
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
from dataclasses import dataclass
5+
from unittest.mock import Mock
6+
7+
import pytest
8+
9+
from iris.rpc.interceptors import RequestTimingInterceptor
10+
11+
12+
@dataclass(frozen=True)
13+
class FakeMethodInfo:
14+
name: str
15+
16+
17+
def _make_ctx(method_name: str):
18+
ctx = Mock()
19+
ctx.method.return_value = FakeMethodInfo(name=method_name)
20+
return ctx
21+
22+
23+
def test_interceptor_passes_through_response():
24+
interceptor = RequestTimingInterceptor()
25+
ctx = _make_ctx("GetTaskLogs")
26+
result = interceptor.intercept_unary_sync(lambda req, ctx: "ok", "request", ctx)
27+
assert result == "ok"
28+
29+
30+
def test_interceptor_reraises_exceptions():
31+
interceptor = RequestTimingInterceptor()
32+
ctx = _make_ctx("LaunchJob")
33+
34+
def failing_handler(req, ctx):
35+
raise ValueError("boom")
36+
37+
with pytest.raises(ValueError, match="boom"):
38+
interceptor.intercept_unary_sync(failing_handler, "request", ctx)

0 commit comments

Comments
 (0)