Skip to content

Commit 77f12c5

Browse files
yonromaiclaude
andauthored
fix(iris): keep tasks in BUILDING until container reaches Running phase (#3106)
## Summary - Tasks now stay in `BUILDING` until the container runtime reports `ContainerPhase.RUNNING`, instead of transitioning immediately when `run()` is called - For process/docker runtimes this is instant (no behavioral change). For K8s, tasks wait in BUILDING while pods are in `Pending`/`ContainerCreating` phase. - This makes `max_building_tasks_per_worker` actually effective — it now gates concurrent pod creation instead of just counting tasks that instantly flip to RUNNING <details> <summary>Context</summary> Stacked on #3105 (ContainerPhase enum). Extracted from the [BUILDING semantics investigation](#3090). Without this fix, submitting 20+ tasks to a K8s worker creates all pods simultaneously. The pod creation storm overwhelmed the controller (4 GiB memory limit), causing it to crash ~15s after job submission. With backpressure, only `max_building_tasks_per_worker` (default 4) pods are created concurrently. Validated on CW: [workflow run 22498485767](https://github.com/marin-community/marin/actions/runs/22498485767) — 70+ task transitions observed with 5.2–10.4s BUILDING durations, controller survived the entire run. See #3090 comment for full findings. Related: #3102 (controller RBAC/scheduling), #3103 (worker kubectl saturation) </details> ## Test plan - [x] `uv run pytest lib/iris/tests/cluster/worker/ lib/iris/tests/cluster/runtime/test_kubernetes_runtime.py` — 77 passed - [x] Pre-commit clean - [ ] CI - [x] `test_building_backpressure.py` e2e test — passed (20 tasks, peak building within limit) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 801fac0 commit 77f12c5

File tree

2 files changed

+128
-7
lines changed

2 files changed

+128
-7
lines changed

lib/iris/src/iris/cluster/worker/task_attempt.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ def _download_bundle(self) -> None:
433433
"""
434434
self.transition_to(cluster_pb2.TASK_STATE_BUILDING, message="downloading bundle")
435435
self.started_at = Timestamp.now()
436+
self._building_start_monotonic = time.monotonic()
436437
self._report_state() # Report BUILDING state to controller
437438

438439
download_start = time.monotonic()
@@ -562,15 +563,9 @@ def _build_container(self) -> None:
562563
logger.info("Build phase completed for task %s", self.task_id)
563564

564565
def _run_container(self) -> None:
565-
"""Start the main command during RUNNING state.
566-
567-
Non-blocking - returns immediately after starting.
568-
"""
566+
"""Start the container. Task stays in BUILDING until _monitor() confirms readiness."""
569567
assert self._container_handle is not None
570568

571-
self.transition_to(cluster_pb2.TASK_STATE_RUNNING)
572-
self._report_state()
573-
574569
self._container_handle.run()
575570
logger.info(
576571
"Container started for task %s (container_id=%s, ports=%s)",
@@ -625,6 +620,13 @@ def _monitor(self) -> None:
625620

626621
# Check container status
627622
status = handle.status()
623+
624+
if self.status == cluster_pb2.TASK_STATE_BUILDING and status.phase == ContainerPhase.RUNNING:
625+
building_duration = time.monotonic() - self._building_start_monotonic
626+
logger.info("Task %s BUILDING→RUNNING after %.1fs", self.task_id, building_duration)
627+
self.transition_to(cluster_pb2.TASK_STATE_RUNNING)
628+
self._report_state()
629+
628630
if status.phase == ContainerPhase.STOPPED:
629631
logger.info(
630632
"Container exited for task %s (container_id=%s, exit_code=%s, error=%s)",
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
# Copyright 2025 The Marin Authors
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""Backpressure test: slow-starting containers must not exceed max_building_tasks_per_worker."""
5+
6+
import time
7+
from pathlib import Path
8+
9+
import pytest
10+
from iris.client.client import IrisClient
11+
from iris.cluster.config import load_config, make_local_config
12+
from iris.cluster.manager import connect_cluster
13+
from iris.cluster.runtime.process import ProcessContainerHandle
14+
from iris.cluster.runtime.types import ContainerPhase, ContainerStatus
15+
from iris.rpc import cluster_pb2, config_pb2
16+
from iris.rpc.cluster_connect import ControllerServiceClientSync
17+
18+
from .conftest import TestCluster
19+
from .helpers import _quick
20+
21+
pytestmark = pytest.mark.e2e
22+
23+
IRIS_ROOT = Path(__file__).resolve().parents[2]
24+
READY_DELAY = 3.0
25+
NUM_TASKS = 20
26+
MAX_BUILDING = 4 # default max_building_tasks_per_worker
27+
28+
29+
@pytest.fixture
30+
def single_worker_cluster():
31+
"""Single-worker cluster so building backpressure is observable."""
32+
config = load_config(IRIS_ROOT / "examples" / "demo.yaml")
33+
config.scale_groups.clear()
34+
sg = config.scale_groups["local-cpu"]
35+
sg.name = "local-cpu"
36+
sg.accelerator_type = config_pb2.ACCELERATOR_TYPE_CPU
37+
sg.num_vms = 1
38+
sg.min_slices = 1
39+
sg.max_slices = 1
40+
sg.resources.cpu_millicores = 8000
41+
sg.resources.memory_bytes = 16 * 1024**3
42+
sg.resources.disk_bytes = 50 * 1024**3
43+
sg.slice_template.local.SetInParent()
44+
config = make_local_config(config)
45+
with connect_cluster(config) as url:
46+
client = IrisClient.remote(url, workspace=IRIS_ROOT)
47+
controller_client = ControllerServiceClientSync(address=url, timeout_ms=30000)
48+
tc = TestCluster(url=url, client=client, controller_client=controller_client)
49+
tc.wait_for_workers(1, timeout=30)
50+
yield tc
51+
controller_client.close()
52+
53+
54+
def test_building_backpressure_with_slow_starting_containers(single_worker_cluster, monkeypatch):
55+
"""With slow-starting containers, at most max_building_tasks_per_worker tasks
56+
should be in BUILDING/ASSIGNED at any time.
57+
58+
Monkeypatches ProcessContainerHandle.status() to return ContainerPhase.PENDING
59+
for READY_DELAY seconds after run(), simulating K8S pod pending phase.
60+
"""
61+
cluster = single_worker_cluster
62+
original_run = ProcessContainerHandle.run
63+
original_status = ProcessContainerHandle.status
64+
run_times: dict[int, float] = {}
65+
66+
def patched_run(self):
67+
run_times[id(self)] = time.monotonic()
68+
return original_run(self)
69+
70+
def patched_status(self) -> ContainerStatus:
71+
result = original_status(self)
72+
run_time = run_times.get(id(self))
73+
if run_time is not None and result.phase == ContainerPhase.RUNNING:
74+
elapsed = time.monotonic() - run_time
75+
if elapsed < READY_DELAY:
76+
result = ContainerStatus(
77+
phase=ContainerPhase.PENDING,
78+
exit_code=result.exit_code,
79+
error=result.error,
80+
error_kind=result.error_kind,
81+
oom_killed=result.oom_killed,
82+
)
83+
return result
84+
85+
monkeypatch.setattr(ProcessContainerHandle, "run", patched_run)
86+
monkeypatch.setattr(ProcessContainerHandle, "status", patched_status)
87+
88+
jobs = [cluster.submit(_quick, f"burst-{i}", cpu=0) for i in range(NUM_TASKS)]
89+
90+
peak_building = 0
91+
all_done = False
92+
deadline = time.monotonic() + 120
93+
94+
while not all_done and time.monotonic() < deadline:
95+
building_count = 0
96+
all_done = True
97+
for job in jobs:
98+
status = cluster.status(job)
99+
for task in status.tasks:
100+
if task.state in (cluster_pb2.TASK_STATE_BUILDING, cluster_pb2.TASK_STATE_ASSIGNED):
101+
building_count += 1
102+
if not status.state or status.state in (
103+
cluster_pb2.JOB_STATE_PENDING,
104+
cluster_pb2.JOB_STATE_RUNNING,
105+
):
106+
all_done = False
107+
108+
peak_building = max(peak_building, building_count)
109+
time.sleep(0.3)
110+
111+
# Allow +1 slack for race between scheduling and heartbeat reporting
112+
assert peak_building <= MAX_BUILDING + 1, (
113+
f"Peak building count {peak_building} exceeded limit {MAX_BUILDING}. " f"Backpressure is not working."
114+
)
115+
116+
# All jobs should eventually succeed
117+
for job in jobs:
118+
status = cluster.wait(job, timeout=120)
119+
assert status.state == cluster_pb2.JOB_STATE_SUCCEEDED

0 commit comments

Comments
 (0)