Skip to content

Commit 5d81854

Browse files
authored
refactor(iris): replace ContainerStatus bool with ContainerPhase enum (#3105)
- Replace `ContainerStatus.running: bool` with `ContainerStatus.phase: ContainerPhase` where `ContainerPhase` is a `StrEnum` with `PENDING`, `RUNNING`, `STOPPED` - The old boolean conflated "not yet started" with "finished" — callers had to inspect `exit_code` to disambiguate. The enum makes the state machine explicit and enables the upcoming BUILDING→RUNNING backpressure fix to distinguish K8s pods in Pending vs Running phase. - All three runtime implementations (process, docker, kubernetes) and all tests updated <details> <summary>Context</summary> Extracted from the [BUILDING semantics investigation](#3090). The next PR in the stack (BUILDING→RUNNING backpressure) depends on `ContainerPhase.PENDING` vs `RUNNING` to gate task state transitions. Related: #3102 (controller RBAC/scheduling), #3103 (worker kubectl saturation)
1 parent 78fd102 commit 5d81854

File tree

10 files changed

+73
-40
lines changed

10 files changed

+73
-40
lines changed

lib/iris/src/iris/cluster/runtime/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from iris.cluster.runtime.types import (
1616
ContainerConfig,
1717
ContainerHandle,
18+
ContainerPhase,
1819
ContainerResult,
1920
ContainerRuntime,
2021
ContainerStats,
@@ -26,6 +27,7 @@
2627
__all__ = [
2728
"ContainerConfig",
2829
"ContainerHandle",
30+
"ContainerPhase",
2931
"ContainerResult",
3032
"ContainerRuntime",
3133
"ContainerStats",

lib/iris/src/iris/cluster/runtime/docker.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from iris.cluster.runtime.types import (
3737
ContainerConfig,
3838
ContainerErrorKind,
39+
ContainerPhase,
3940
ContainerStats,
4041
ContainerStatus,
4142
ImageInfo,
@@ -283,7 +284,7 @@ def build(self) -> list[LogLine]:
283284
build_logs.extend(new_logs)
284285
last_log_time = Timestamp.from_seconds(new_logs[-1].timestamp.timestamp()).add_ms(1)
285286

286-
if not status.running:
287+
if status.phase == ContainerPhase.STOPPED:
287288
break
288289
time.sleep(0.5)
289290

@@ -368,7 +369,7 @@ def stop(self, force: bool = False) -> None:
368369
def status(self) -> ContainerStatus:
369370
"""Check container status (running, exit code, error)."""
370371
if not self._run_container_id:
371-
return ContainerStatus(running=False, error="Container not started")
372+
return ContainerStatus(phase=ContainerPhase.STOPPED, error="Container not started")
372373
return self._docker_inspect(self._run_container_id)
373374

374375
def log_reader(self) -> DockerLogReader:
@@ -589,7 +590,7 @@ def _docker_inspect(self, container_id: str) -> ContainerStatus:
589590

590591
if result.returncode != 0:
591592
return ContainerStatus(
592-
running=False,
593+
phase=ContainerPhase.STOPPED,
593594
error=f"Container not found: id={container_id}",
594595
error_kind=ContainerErrorKind.INFRA_NOT_FOUND,
595596
)
@@ -602,15 +603,15 @@ def _docker_inspect(self, container_id: str) -> ContainerStatus:
602603
oom_killed = state.get("OOMKilled", False)
603604

604605
return ContainerStatus(
605-
running=running,
606+
phase=ContainerPhase.RUNNING if running else ContainerPhase.STOPPED,
606607
exit_code=exit_code if not running else None,
607608
error=error_msg,
608609
error_kind=ContainerErrorKind.USER_CODE if error_msg else ContainerErrorKind.NONE,
609610
oom_killed=oom_killed,
610611
)
611612
except (json.JSONDecodeError, KeyError) as e:
612613
return ContainerStatus(
613-
running=False,
614+
phase=ContainerPhase.STOPPED,
614615
error=f"Failed to parse inspect output: {e}",
615616
error_kind=ContainerErrorKind.RUNTIME_ERROR,
616617
)
@@ -661,7 +662,7 @@ def _docker_stats(self, container_id: str) -> ContainerStats:
661662
def _docker_kill(self, container_id: str, force: bool = False) -> None:
662663
"""Kill container."""
663664
status = self._docker_inspect(container_id)
664-
if not status.running:
665+
if status.phase == ContainerPhase.STOPPED:
665666
return
666667

667668
signal = "SIGKILL" if force else "SIGTERM"

lib/iris/src/iris/cluster/runtime/kubernetes.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,13 @@
3030
resolve_cpu_spec,
3131
resolve_memory_spec,
3232
)
33-
from iris.cluster.runtime.types import ContainerConfig, ContainerErrorKind, ContainerStats, ContainerStatus
33+
from iris.cluster.runtime.types import (
34+
ContainerConfig,
35+
ContainerErrorKind,
36+
ContainerPhase,
37+
ContainerStats,
38+
ContainerStatus,
39+
)
3440
from iris.cluster.worker.worker_types import LogLine
3541
from iris.rpc import cluster_pb2
3642
from iris.time_utils import Deadline, Duration
@@ -334,7 +340,7 @@ def stop(self, force: bool = False) -> None:
334340

335341
def status(self) -> ContainerStatus:
336342
if not self._pod_name:
337-
return ContainerStatus(running=False, error="Pod not started")
343+
return ContainerStatus(phase=ContainerPhase.STOPPED, error="Pod not started")
338344

339345
pod = self.kubectl.get_json("pod", self._pod_name)
340346
if pod is None:
@@ -350,11 +356,11 @@ def status(self) -> ContainerStatus:
350356
self._pod_not_found_count,
351357
_POD_NOT_FOUND_RETRY_COUNT,
352358
)
353-
return ContainerStatus(running=True)
359+
return ContainerStatus(phase=ContainerPhase.PENDING)
354360

355361
attempt = self.config.attempt_id if self.config.attempt_id is not None else -1
356362
return ContainerStatus(
357-
running=False,
363+
phase=ContainerPhase.STOPPED,
358364
error=(
359365
"Task pod not found after retry window: "
360366
f"name={self._pod_name}, namespace={self.kubectl.namespace}, "
@@ -369,8 +375,10 @@ def status(self) -> ContainerStatus:
369375
self._pod_not_found_deadline = None
370376

371377
phase = pod.get("status", {}).get("phase", "")
372-
if phase in ("Pending", "Running"):
373-
return ContainerStatus(running=True)
378+
if phase == "Pending":
379+
return ContainerStatus(phase=ContainerPhase.PENDING)
380+
if phase == "Running":
381+
return ContainerStatus(phase=ContainerPhase.RUNNING)
374382

375383
statuses = pod.get("status", {}).get("containerStatuses", [])
376384
terminated = {}
@@ -388,7 +396,7 @@ def status(self) -> ContainerStatus:
388396
error = message or reason or None
389397
oom_killed = reason == "OOMKilled"
390398
return ContainerStatus(
391-
running=False,
399+
phase=ContainerPhase.STOPPED,
392400
exit_code=exit_code if isinstance(exit_code, int) else 1,
393401
error=error,
394402
oom_killed=oom_killed,

lib/iris/src/iris/cluster/runtime/process.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
resolve_cpu_spec,
4343
resolve_memory_spec,
4444
)
45-
from iris.cluster.runtime.types import ContainerConfig, ContainerStats, ContainerStatus, RuntimeLogReader
45+
from iris.cluster.runtime.types import ContainerConfig, ContainerPhase, ContainerStats, ContainerStatus, RuntimeLogReader
4646
from iris.cluster.worker.worker_types import LogLine
4747
from iris.managed_thread import ManagedThread, get_thread_container
4848
from iris.rpc import cluster_pb2
@@ -515,9 +515,9 @@ def stop(self, force: bool = False) -> None:
515515
def status(self) -> ContainerStatus:
516516
"""Check container status (running, exit code, error)."""
517517
if not self._container:
518-
return ContainerStatus(running=False, error="Container not started")
518+
return ContainerStatus(phase=ContainerPhase.STOPPED, error="Container not started")
519519
return ContainerStatus(
520-
running=self._container._running,
520+
phase=ContainerPhase.RUNNING if self._container._running else ContainerPhase.STOPPED,
521521
exit_code=self._container._exit_code,
522522
error=self._container._error,
523523
)

lib/iris/src/iris/cluster/runtime/types.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,19 @@ class ContainerErrorKind(StrEnum):
3535
RUNTIME_ERROR = "runtime_error"
3636

3737

38+
class ContainerPhase(StrEnum):
39+
"""Lifecycle phase of a container from the runtime's perspective.
40+
41+
PENDING: container created but not yet executing (K8s pod scheduling, image pull).
42+
RUNNING: container is executing the main command.
43+
STOPPED: container has exited (check exit_code/error for details).
44+
"""
45+
46+
PENDING = "pending"
47+
RUNNING = "running"
48+
STOPPED = "stopped"
49+
50+
3851
@dataclass
3952
class ContainerConfig:
4053
"""Configuration for running a container."""
@@ -91,7 +104,7 @@ class ContainerStats:
91104
class ContainerStatus:
92105
"""Container state from runtime inspection."""
93106

94-
running: bool
107+
phase: ContainerPhase
95108
exit_code: int | None = None
96109
error: str | None = None
97110
error_kind: ContainerErrorKind = ContainerErrorKind.NONE

lib/iris/src/iris/cluster/worker/task_attempt.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
ContainerConfig,
2323
ContainerErrorKind,
2424
ContainerHandle,
25+
ContainerPhase,
2526
ContainerRuntime,
2627
RuntimeLogReader,
2728
)
@@ -624,7 +625,7 @@ def _monitor(self) -> None:
624625

625626
# Check container status
626627
status = handle.status()
627-
if not status.running:
628+
if status.phase == ContainerPhase.STOPPED:
628629
logger.info(
629630
"Container exited for task %s (container_id=%s, exit_code=%s, error=%s)",
630631
self.task_id,

lib/iris/tests/cluster/runtime/test_kubernetes_runtime.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515

1616
from iris.cluster.runtime.kubernetes import KubernetesRuntime
17-
from iris.cluster.runtime.types import ContainerConfig, ContainerErrorKind
17+
from iris.cluster.runtime.types import ContainerConfig, ContainerErrorKind, ContainerPhase
1818
from iris.rpc import cluster_pb2
1919

2020

@@ -233,11 +233,11 @@ def fake_get_json(resource: str, name: str):
233233
second = handle.status()
234234
third = handle.status()
235235

236-
assert first.running is True
236+
assert first.phase == ContainerPhase.PENDING
237237
assert first.error is None
238-
assert second.running is True
238+
assert second.phase == ContainerPhase.PENDING
239239
assert second.error is None
240-
assert third.running is True
240+
assert third.phase == ContainerPhase.RUNNING
241241

242242

243243
def test_status_returns_structured_error_after_persistent_pod_not_found(monkeypatch):
@@ -259,9 +259,9 @@ def fake_get_json(resource: str, name: str):
259259
second = handle.status()
260260
third = handle.status()
261261

262-
assert first.running is True
263-
assert second.running is True
264-
assert third.running is False
262+
assert first.phase == ContainerPhase.PENDING
263+
assert second.phase == ContainerPhase.PENDING
264+
assert third.phase == ContainerPhase.STOPPED
265265
assert third.error_kind == ContainerErrorKind.INFRA_NOT_FOUND
266266
assert third.error is not None
267267
assert "Task pod not found after retry window" in third.error

lib/iris/tests/cluster/worker/test_dashboard.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from iris.cluster.worker.bundle_cache import BundleCache
1616
from iris.cluster.worker.dashboard import WorkerDashboard
1717
from iris.cluster.runtime.docker import DockerRuntime
18-
from iris.cluster.runtime.types import ContainerStats, ContainerStatus
18+
from iris.cluster.runtime.types import ContainerPhase, ContainerStats, ContainerStatus
1919
from iris.cluster.worker.service import WorkerServiceImpl
2020
from iris.cluster.worker.worker import Worker, WorkerConfig
2121
from iris.rpc import cluster_pb2
@@ -48,7 +48,7 @@ def create_mock_container_handle():
4848
handle.container_id = "container123"
4949
handle.build = Mock(return_value=[])
5050
handle.run = Mock()
51-
handle.status = Mock(return_value=ContainerStatus(running=False, exit_code=0))
51+
handle.status = Mock(return_value=ContainerStatus(phase=ContainerPhase.STOPPED, exit_code=0))
5252
handle.stop = Mock()
5353
handle.logs = Mock(return_value=[])
5454
handle.stats = Mock(return_value=ContainerStats(memory_mb=100, cpu_percent=50, process_count=1, available=True))

lib/iris/tests/cluster/worker/test_worker.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from iris.cluster.types import Entrypoint, JobName
1616
from iris.cluster.worker.bundle_cache import BundleCache
1717
from iris.cluster.runtime.docker import DockerRuntime
18-
from iris.cluster.runtime.types import ContainerErrorKind, ContainerStats, ContainerStatus
18+
from iris.cluster.runtime.types import ContainerErrorKind, ContainerPhase, ContainerStats, ContainerStatus
1919
from iris.cluster.worker.port_allocator import PortAllocator
2020
from iris.cluster.worker.service import WorkerServiceImpl
2121
from iris.cluster.worker.worker import Worker, WorkerConfig
@@ -111,8 +111,8 @@ def create_mock_container_handle(
111111

112112
if status_sequence is None:
113113
status_sequence = [
114-
ContainerStatus(running=True),
115-
ContainerStatus(running=False, exit_code=0),
114+
ContainerStatus(phase=ContainerPhase.RUNNING),
115+
ContainerStatus(phase=ContainerPhase.STOPPED, exit_code=0),
116116
]
117117

118118
call_count = [0]
@@ -260,7 +260,9 @@ def test_task_with_ports(worker):
260260
def test_task_failure_on_nonzero_exit(worker, mock_runtime):
261261
"""Test task fails when container exits with non-zero code."""
262262
# Update the mock handle's status to return failure immediately
263-
mock_handle = create_mock_container_handle(status_sequence=[ContainerStatus(running=False, exit_code=1)])
263+
mock_handle = create_mock_container_handle(
264+
status_sequence=[ContainerStatus(phase=ContainerPhase.STOPPED, exit_code=1)]
265+
)
264266
mock_runtime.create_container = Mock(return_value=mock_handle)
265267

266268
request = create_run_task_request()
@@ -280,8 +282,8 @@ def test_task_failure_on_error(worker, mock_runtime):
280282
# Update the mock handle's status to return error after first poll
281283
mock_handle = create_mock_container_handle(
282284
status_sequence=[
283-
ContainerStatus(running=True),
284-
ContainerStatus(running=False, exit_code=1, error="Container crashed"),
285+
ContainerStatus(phase=ContainerPhase.RUNNING),
286+
ContainerStatus(phase=ContainerPhase.STOPPED, exit_code=1, error="Container crashed"),
285287
]
286288
)
287289
mock_runtime.create_container = Mock(return_value=mock_handle)
@@ -302,7 +304,7 @@ def test_task_infra_not_found_error_maps_to_worker_failed(worker, mock_runtime):
302304
mock_handle = create_mock_container_handle(
303305
status_sequence=[
304306
ContainerStatus(
305-
running=False,
307+
phase=ContainerPhase.STOPPED,
306308
exit_code=1,
307309
error="Task pod not found after retry window: name=iris-task-abc, namespace=iris",
308310
error_kind=ContainerErrorKind.INFRA_NOT_FOUND,
@@ -351,7 +353,9 @@ def test_list_tasks(worker):
351353
def test_kill_running_task(worker, mock_runtime):
352354
"""Test killing a running task with graceful timeout."""
353355
# Create a handle that stays running until killed
354-
mock_handle = create_mock_container_handle(status_sequence=[ContainerStatus(running=True)] * 100) # Stay running
356+
mock_handle = create_mock_container_handle(
357+
status_sequence=[ContainerStatus(phase=ContainerPhase.RUNNING)] * 100
358+
) # Stay running
355359
mock_runtime.create_container = Mock(return_value=mock_handle)
356360

357361
request = create_run_task_request()
@@ -379,7 +383,9 @@ def test_kill_running_task(worker, mock_runtime):
379383
def test_new_attempt_supersedes_old(worker, mock_runtime):
380384
"""New attempt for same task_id kills the old attempt and starts a new one."""
381385
# Create a handle that stays running until killed
382-
mock_handle = create_mock_container_handle(status_sequence=[ContainerStatus(running=True)] * 100) # Stay running
386+
mock_handle = create_mock_container_handle(
387+
status_sequence=[ContainerStatus(phase=ContainerPhase.RUNNING)] * 100
388+
) # Stay running
383389
mock_runtime.create_container = Mock(return_value=mock_handle)
384390

385391
request_0 = create_run_task_request(task_id=JobName.root("retry-task").task(0).to_wire(), attempt_id=0)
@@ -419,7 +425,9 @@ def test_new_attempt_supersedes_old(worker, mock_runtime):
419425
def test_duplicate_attempt_rejected(worker, mock_runtime):
420426
"""Same attempt_id for an existing non-terminal task is rejected."""
421427
# Create a handle that stays running until killed
422-
mock_handle = create_mock_container_handle(status_sequence=[ContainerStatus(running=True)] * 100) # Stay running
428+
mock_handle = create_mock_container_handle(
429+
status_sequence=[ContainerStatus(phase=ContainerPhase.RUNNING)] * 100
430+
) # Stay running
423431
mock_runtime.create_container = Mock(return_value=mock_handle)
424432

425433
request = create_run_task_request(task_id=JobName.root("dup-task").task(0).to_wire(), attempt_id=0)

lib/iris/tests/e2e/test_coreweave_live_kubernetes_runtime.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from iris.cluster.config import load_config
2020
import fsspec.core
2121
from iris.cluster.runtime.kubernetes import KubernetesRuntime
22-
from iris.cluster.runtime.types import ContainerConfig
22+
from iris.cluster.runtime.types import ContainerConfig, ContainerPhase
2323
from iris.rpc import cluster_pb2
2424

2525
pytestmark = [pytest.mark.e2e, pytest.mark.slow]
@@ -35,7 +35,7 @@ def _wait_finished(handle, timeout_seconds: float) -> cluster_pb2.TaskState:
3535
deadline = time.monotonic() + timeout_seconds
3636
while time.monotonic() < deadline:
3737
status = handle.status()
38-
if not status.running:
38+
if status.phase == ContainerPhase.STOPPED:
3939
return cluster_pb2.TASK_STATE_SUCCEEDED if status.exit_code == 0 else cluster_pb2.TASK_STATE_FAILED
4040
time.sleep(2.0)
4141
raise TimeoutError(f"pod {handle.container_id} did not finish in {timeout_seconds}s")
@@ -265,7 +265,7 @@ def test_incremental_log_reader_no_duplicates(coreweave_runtime: KubernetesRunti
265265
for line in new_lines:
266266
collected.append(line.data)
267267
status = handle.status()
268-
if not status.running:
268+
if status.phase == ContainerPhase.STOPPED:
269269
# One final read to drain remaining
270270
for line in reader.read():
271271
collected.append(line.data)

0 commit comments

Comments
 (0)