Skip to content

Commit 3e95c79

Browse files
fregataaclaude
andauthored
fix(BA-5297): exclude unmeasurable metrics from utilization idle check (#10316)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 345b197 commit 3e95c79

File tree

3 files changed

+129
-23
lines changed

3 files changed

+129
-23
lines changed

changes/10316.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Exclude unmeasurable metrics from utilization idle check instead of treating stat collection failures as 0% usage

src/ai/backend/manager/idle.py

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -138,15 +138,17 @@ class UtilizationResourceReport(UserDict[str, UtilizationExtraInfo]):
138138
@classmethod
139139
def from_avg_threshold(
140140
cls,
141-
avg_utils: Mapping[str, float],
141+
avg_utils: Mapping[str, float | None],
142142
thresholds: ResourceThresholds,
143143
exclusions: set[str],
144144
) -> UtilizationResourceReport:
145145
data: dict[str, UtilizationExtraInfo] = {}
146146
for metric_key, val in thresholds.items():
147147
if val.average is None or metric_key in exclusions:
148148
continue
149-
avg_util = avg_utils.get(metric_key, 0)
149+
avg_util = avg_utils.get(metric_key)
150+
if avg_util is None:
151+
continue
150152
data[metric_key] = UtilizationExtraInfo(float(avg_util), float(val.average))
151153
return cls(data)
152154

@@ -1119,13 +1121,15 @@ async def check_idleness(
11191121
# Update utilization time-series data.
11201122
raw_util_series = await self._redis_live.get_live_data(util_series_key)
11211123

1122-
def default_util_series() -> dict[str, list[float]]:
1124+
def default_util_series() -> dict[str, list[float | None]]:
11231125
return {resource: [] for resource in current_utilizations.keys()}
11241126

11251127
if raw_util_series is not None:
11261128
try:
1127-
raw_data: dict[str, list[float]] = msgpack.unpackb(raw_util_series, use_list=True)
1128-
util_series: dict[str, list[float]] = {
1129+
raw_data: dict[str, list[float | None]] = msgpack.unpackb(
1130+
raw_util_series, use_list=True
1131+
)
1132+
util_series: dict[str, list[float | None]] = {
11291133
metric_key: v for metric_key, v in raw_data.items()
11301134
}
11311135
except TypeError:
@@ -1159,13 +1163,13 @@ def default_util_series() -> dict[str, list[float]]:
11591163
ex=max(86400, int(self.time_window.total_seconds() * 2)),
11601164
)
11611165

1162-
def _avg(util_list: list[float]) -> float:
1163-
try:
1164-
return sum(util_list) / len(util_list)
1165-
except ZeroDivisionError:
1166-
return 0.0
1166+
def _avg(util_list: list[float | None]) -> float | None:
1167+
filtered = [v for v in util_list if v is not None]
1168+
if not filtered:
1169+
return None
1170+
return sum(filtered) / len(filtered)
11671171

1168-
avg_utils: Mapping[str, float] = {k: _avg(v) for k, v in util_series.items()}
1172+
avg_utils: Mapping[str, float | None] = {k: _avg(v) for k, v in util_series.items()}
11691173

11701174
util_avg_thresholds = UtilizationResourceReport.from_avg_threshold(
11711175
avg_utils, self.resource_thresholds, excluded_resources
@@ -1208,14 +1212,20 @@ async def get_current_utilization(
12081212
self,
12091213
kernel_ids: Sequence[KernelId],
12101214
occupied_slots: Mapping[str, Any],
1211-
) -> Mapping[str, float] | None:
1215+
) -> Mapping[str, float | None] | None:
12121216
"""
12131217
Return the current utilization key-value pairs of multiple kernels, possibly the
12141218
components of a cluster session. If there are multiple kernel_ids, this method
12151219
will return the averaged values over the kernels for each utilization.
1220+
1221+
When a metric is missing from some kernels' stats (e.g., CUDA plugin failure),
1222+
the metric is averaged only over the kernels that reported it. If no kernel
1223+
reported a metric, it is returned as None (not 0.0) so that the idle checker
1224+
can exclude it from the idle decision rather than treating it as idle.
12161225
"""
12171226
try:
1218-
utilizations: defaultdict[str, float] = defaultdict(float)
1227+
utilization_sums: defaultdict[str, float] = defaultdict(float)
1228+
utilization_counts: defaultdict[str, int] = defaultdict(int)
12191229
live_stat = {}
12201230
kernel_counter = 0
12211231
for kernel_id in kernel_ids:
@@ -1227,28 +1237,35 @@ async def get_current_utilization(
12271237
continue
12281238
live_stat = raw_live_stat
12291239
kernel_utils = {
1230-
k: float(nmget(live_stat, f"{k}.pct", 0.0))
1231-
for k in self.resource_names_to_check
1240+
k: nmget(live_stat, f"{k}.pct") for k in self.resource_names_to_check
12321241
}
12331242

12341243
for resource, val in kernel_utils.items():
1235-
utilizations[resource] = utilizations[resource] + val
1244+
if val is None:
1245+
continue
1246+
utilization_sums[resource] += float(val)
1247+
utilization_counts[resource] += 1
12361248

12371249
# NOTE: Manual calculation of mem utilization.
12381250
# mem.capacity does not report total amount of memory allocated to
12391251
# the container, and mem.pct always report >90% even when nothing is
12401252
# executing. So, we just replace it with the value of occupied slot.
12411253
mem_slots = float(occupied_slots.get("mem", 0))
12421254
mem_current = float(nmget(live_stat, "mem.current", 0.0))
1243-
utilizations["mem"] = (
1244-
utilizations["mem"] + mem_current / mem_slots * 100 if mem_slots > 0 else 0
1245-
)
1255+
if mem_slots > 0:
1256+
utilization_sums["mem"] += mem_current / mem_slots * 100
12461257

12471258
kernel_counter += 1
12481259
if kernel_counter == 0:
12491260
return None
1250-
divider = kernel_counter
1251-
return {k: v / divider for k, v in utilizations.items()}
1261+
result: dict[str, float | None] = {}
1262+
for resource in self.resource_names_to_check:
1263+
count = utilization_counts.get(resource, 0)
1264+
if count > 0:
1265+
result[resource] = utilization_sums[resource] / count
1266+
else:
1267+
result[resource] = None
1268+
return result
12521269
except Exception as e:
12531270
_msg = f"Unable to collect utilization for idleness check (kernels:{kernel_ids})"
12541271
log.warning(_msg, exc_info=e)

tests/unit/manager/test_idle_checker.py

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -804,8 +804,8 @@ async def test_utilization_current(
804804
expected_utilization = {
805805
"cpu_util": current_test_config.expected_cpu_util,
806806
"mem": current_test_config.expected_mem_util,
807-
"cuda_mem": 0.0,
808-
"cuda_util": 0.0,
807+
"cuda_mem": None,
808+
"cuda_util": None,
809809
}
810810

811811
# When
@@ -1235,3 +1235,91 @@ def mock_get_live_data_side_effect(key: str) -> bytes | None:
12351235
assert should_alive is insufficient_test_config.expected_alive
12361236
assert remaining == insufficient_test_config.expected_remaining
12371237
assert util_info is not None
1238+
1239+
# Test 5: Missing metrics should be excluded from idle decision
1240+
@pytest.fixture
1241+
async def missing_cpu_stat_checker(
1242+
self,
1243+
base_time: datetime,
1244+
valkey_live: AsyncMock,
1245+
valkey_stat: AsyncMock,
1246+
event_producer: AsyncMock,
1247+
mocker: Any,
1248+
) -> UtilizationIdleChecker:
1249+
"""UtilizationIdleChecker where cpu_util stat is missing from live_stat.
1250+
1251+
Simulates a scenario where stat collection fails for cpu_util
1252+
but memory is collected normally with sufficient utilization.
1253+
Uses OR operator so that if cpu_util were treated as 0.0
1254+
(old behavior), the session would be falsely terminated.
1255+
"""
1256+
elapsed_seconds = 50
1257+
time_window_seconds = 15
1258+
now = base_time + timedelta(seconds=elapsed_seconds)
1259+
valkey_live.get_server_time.return_value = now.timestamp()
1260+
mocker.patch("ai.backend.manager.idle.get_db_now", return_value=now)
1261+
1262+
# live_stat has Memory but NO cpu_util key
1263+
live_stat = {
1264+
"mem": {"current": "5.0", "pct": "10.0"},
1265+
}
1266+
valkey_stat.get_kernel_statistics.return_value = live_stat
1267+
1268+
util_first_collected = now.timestamp() - time_window_seconds
1269+
1270+
def get_live_data_side_effect(key: str) -> bytes | None:
1271+
if ".util_first_collected" in key:
1272+
return f"{util_first_collected:.06f}".encode()
1273+
if ".util_series" in key:
1274+
return msgpack.packb({"cpu_util": [], "mem": []})
1275+
if ".utilization_extra" in key:
1276+
return msgpack.packb({"resources": {}})
1277+
if ".utilization" in key:
1278+
return msgpack.packb(-1)
1279+
return None
1280+
1281+
valkey_live.get_live_data.side_effect = get_live_data_side_effect
1282+
1283+
checker = UtilizationIdleChecker(
1284+
IdleCheckerArgs(
1285+
event_producer=event_producer,
1286+
redis_live=valkey_live,
1287+
valkey_stat_client=valkey_stat,
1288+
)
1289+
)
1290+
await checker.populate_config({
1291+
"initial-grace-period": "0",
1292+
"resource-thresholds": {
1293+
"cpu_util": {"average": "10"},
1294+
"mem": {"average": "10"},
1295+
},
1296+
"thresholds-check-operator": "or",
1297+
"time-window": str(time_window_seconds),
1298+
})
1299+
return checker
1300+
1301+
async def test_missing_metrics_excluded_from_idle_decision(
1302+
self,
1303+
missing_cpu_stat_checker: UtilizationIdleChecker,
1304+
utilization_kernel_row: Any,
1305+
session_id: SessionId,
1306+
db_connection: AsyncMock,
1307+
) -> None:
1308+
"""Test that missing metrics (stat collection failure) are excluded from idle check.
1309+
1310+
With OR operator, ALL configured metrics must exceed their thresholds for
1311+
the session to stay alive. If missing cpu_util were treated as 0.0
1312+
(old behavior), the session would be falsely terminated because
1313+
0.0 < threshold (10%). With the fix, missing metrics are excluded from
1314+
the decision, so only memory (above 10%) is checked.
1315+
"""
1316+
# When
1317+
should_alive = await missing_cpu_stat_checker.check_idleness(
1318+
utilization_kernel_row,
1319+
db_connection,
1320+
mock_row(idle_timeout=15),
1321+
)
1322+
1323+
# Then - session should stay alive because cpu_util is excluded,
1324+
# not treated as 0.0
1325+
assert should_alive is True

0 commit comments

Comments
 (0)