Skip to content

Commit c431036

Browse files
authored
[Feat] Zeus daemon major refactor, bump to v0.3.0 (#211)
1 parent d406cec commit c431036

File tree

27 files changed

+1940
-1473
lines changed

27 files changed

+1940
-1473
lines changed

tests/optimizer/test_power_limit_optimizer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ def test_power_limit_optimizer(
127127
# Mock away the atexit hook, which raises an NVML error when testing finishes.
128128
mocker.patch("zeus.optimizer.power_limit.atexit", autospec=True)
129129

130+
# Disable missing `SYS_ADMIN` capability warning during testing.
131+
for gpu in get_gpus().gpus:
132+
gpu._disable_sys_admin_warning = True
133+
130134
monitor = ReplayZeusMonitor(
131135
log_file=replay_log.log_file,
132136
ignore_sync_execution=True,

zeus/device/cpu/rapl.py

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -286,40 +286,54 @@ def __init__(
286286
self.zeusd_sock_path = zeusd_sock_path
287287

288288
self._client = httpx.Client(transport=httpx.HTTPTransport(uds=zeusd_sock_path))
289-
self._url_prefix = f"http://zeusd/cpu/{cpu_index}"
290289

291290
self.dram_available = self._supports_get_dram_energy_consumption()
292291

293292
def _supports_get_dram_energy_consumption(self) -> bool:
294-
"""Calls zeusd to return if the specified CPU supports DRAM energy monitoring."""
295-
resp = self._client.get(
296-
self._url_prefix + "/supports_dram_energy",
297-
)
293+
"""Query the /discover endpoint to check DRAM energy support for this CPU."""
294+
resp = self._client.get("http://zeusd/discover")
298295
if resp.status_code != 200:
299-
raise ZeusdError(f"Failed to query Zeusd whether DRAM energy is supported: {resp.text}")
296+
raise ZeusdError(f"Failed to query Zeusd discovery endpoint: {resp.text}")
300297
data = resp.json()
301298
dram_available = data.get("dram_available")
302299
if dram_available is None:
303-
raise ZeusdError("Failed to get whether DRAM energy is supported.")
304-
return dram_available
300+
raise ZeusdError("Discovery response missing 'dram_available' field.")
301+
cpu_ids = data.get("cpu_ids", [])
302+
try:
303+
idx = cpu_ids.index(self.cpu_index)
304+
except ValueError as e:
305+
raise ZeusdError(f"CPU {self.cpu_index} not found in discovery response (available: {cpu_ids})") from e
306+
if len(cpu_ids) != len(dram_available):
307+
raise ZeusdError(
308+
f"Discovery response has mismatched lengths: "
309+
f"{len(cpu_ids)} cpu_ids vs {len(dram_available)} dram_available entries"
310+
)
311+
return dram_available[idx]
305312

306313
def get_total_energy_consumption(self) -> CpuDramMeasurement:
307314
"""Returns the total energy consumption of the specified powerzone. Units: mJ."""
308-
resp = self._client.post(
309-
self._url_prefix + "/get_index_energy",
310-
json={
311-
"cpu": True,
312-
"dram": True,
315+
resp = self._client.get(
316+
"http://zeusd/cpu/get_cumulative_energy",
317+
params={
318+
"cpu_ids": str(self.cpu_index),
319+
"cpu": "true",
320+
"dram": "true",
313321
},
314322
)
315323
if resp.status_code != 200:
316324
raise ZeusdError(f"Failed to get total energy consumption: {resp.text}")
317325

318326
data = resp.json()
319-
cpu_mj = data["cpu_energy_uj"] / 1000
327+
cpu_data = data.get(str(self.cpu_index))
328+
if cpu_data is None:
329+
raise ZeusdError(f"CPU {self.cpu_index} not found in response")
330+
cpu_uj = cpu_data.get("cpu_energy_uj")
331+
if cpu_uj is None:
332+
raise ZeusdError(f"CPU {self.cpu_index}: cpu_energy_uj is null in response")
333+
cpu_mj = cpu_uj / 1000
320334

321335
dram_mj = None
322-
dram_uj = data.get("dram_energy_uj")
336+
dram_uj = cpu_data.get("dram_energy_uj")
323337
if dram_uj is None:
324338
if self.dram_available:
325339
raise ZeusdError("DRAM energy should be available but no measurement was found")

zeus/device/gpu/nvidia.py

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ def __init__(
304304
self.zeusd_sock_path = zeusd_sock_path
305305

306306
self._client = httpx.Client(transport=httpx.HTTPTransport(uds=zeusd_sock_path))
307-
self._url_prefix = f"http://zeusd/gpu/{gpu_index}"
307+
self._gpu_index = gpu_index
308308

309309
@property
310310
def supports_nonblocking_setters(self) -> bool:
@@ -318,8 +318,12 @@ def set_power_management_limit(self, power_limit_mw: int, block: bool = True) ->
318318
return
319319

320320
resp = self._client.post(
321-
self._url_prefix + "/set_power_limit",
322-
json=dict(power_limit_mw=power_limit_mw, block=block),
321+
"http://zeusd/gpu/set_power_limit",
322+
params={
323+
"gpu_ids": str(self._gpu_index),
324+
"power_limit_mw": str(power_limit_mw),
325+
"block": "true" if block else "false",
326+
},
323327
)
324328
if resp.status_code != 200:
325329
raise ZeusdError(f"Failed to set power management limit: {resp.text}")
@@ -336,8 +340,12 @@ def reset_power_management_limit(self, block: bool = True) -> None:
336340
def set_persistence_mode(self, enabled: bool, block: bool = True) -> None:
337341
"""Set persistence mode."""
338342
resp = self._client.post(
339-
self._url_prefix + "/set_persistence_mode",
340-
json=dict(enabled=enabled, block=block),
343+
"http://zeusd/gpu/set_persistence_mode",
344+
params={
345+
"gpu_ids": str(self._gpu_index),
346+
"enabled": "true" if enabled else "false",
347+
"block": "true" if block else "false",
348+
},
341349
)
342350
if resp.status_code != 200:
343351
raise ZeusdError(f"Failed to set persistence mode: {resp.text}")
@@ -346,31 +354,53 @@ def set_persistence_mode(self, enabled: bool, block: bool = True) -> None:
346354
def set_memory_locked_clocks(self, min_clock_mhz: int, max_clock_mhz: int, block: bool = True) -> None:
347355
"""Lock the memory clock to a specified range. Units: MHz."""
348356
resp = self._client.post(
349-
self._url_prefix + "/set_mem_locked_clocks",
350-
json=dict(min_clock_mhz=min_clock_mhz, max_clock_mhz=max_clock_mhz, block=block),
357+
"http://zeusd/gpu/set_mem_locked_clocks",
358+
params={
359+
"gpu_ids": str(self._gpu_index),
360+
"min_clock_mhz": str(min_clock_mhz),
361+
"max_clock_mhz": str(max_clock_mhz),
362+
"block": "true" if block else "false",
363+
},
351364
)
352365
if resp.status_code != 200:
353366
raise ZeusdError(f"Failed to set memory locked clocks: {resp.text}")
354367
logger.debug("Took %s ms to set memory locked clocks", resp.elapsed.microseconds / 1000)
355368

356369
def reset_memory_locked_clocks(self, block: bool = True) -> None:
357370
"""Reset the locked memory clocks to the default."""
358-
resp = self._client.post(self._url_prefix + "/reset_mem_locked_clocks", json=dict(block=block))
371+
resp = self._client.post(
372+
"http://zeusd/gpu/reset_mem_locked_clocks",
373+
params={
374+
"gpu_ids": str(self._gpu_index),
375+
"block": "true" if block else "false",
376+
},
377+
)
359378
if resp.status_code != 200:
360379
raise ZeusdError(f"Failed to reset memory locked clocks: {resp.text}")
361380

362381
def set_gpu_locked_clocks(self, min_clock_mhz: int, max_clock_mhz: int, block: bool = True) -> None:
363382
"""Lock the GPU clock to a specified range. Units: MHz."""
364383
resp = self._client.post(
365-
self._url_prefix + "/set_gpu_locked_clocks",
366-
json=dict(min_clock_mhz=min_clock_mhz, max_clock_mhz=max_clock_mhz, block=block),
384+
"http://zeusd/gpu/set_gpu_locked_clocks",
385+
params={
386+
"gpu_ids": str(self._gpu_index),
387+
"min_clock_mhz": str(min_clock_mhz),
388+
"max_clock_mhz": str(max_clock_mhz),
389+
"block": "true" if block else "false",
390+
},
367391
)
368392
if resp.status_code != 200:
369393
raise ZeusdError(f"Failed to set GPU locked clocks: {resp.text}")
370394

371395
def reset_gpu_locked_clocks(self, block: bool = True) -> None:
372396
"""Reset the locked GPU clocks to the default."""
373-
resp = self._client.post(self._url_prefix + "/reset_gpu_locked_clocks", json=dict(block=block))
397+
resp = self._client.post(
398+
"http://zeusd/gpu/reset_gpu_locked_clocks",
399+
params={
400+
"gpu_ids": str(self._gpu_index),
401+
"block": "true" if block else "false",
402+
},
403+
)
374404
if resp.status_code != 200:
375405
raise ZeusdError(f"Failed to reset GPU locked clocks: {resp.text}")
376406

zeus/monitor/power_streaming.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ def _check_server_reachable(self, server: ZeusdTcpConfig | ZeusdUdsConfig) -> No
315315
ConnectionError: If the server is not reachable.
316316
ValueError: If requested GPU or CPU indices are not available.
317317
"""
318-
url = self._url(server, "/gpu/power")
318+
url = self._url(server, "/discover")
319319
try:
320320
with self._make_http_client(server, timeout=5.0) as client:
321321
response = client.get(url)
@@ -329,7 +329,7 @@ def _check_server_reachable(self, server: ZeusdTcpConfig | ZeusdUdsConfig) -> No
329329
raise ConnectionError(f"zeusd at {server.key} returned HTTP {e.response.status_code}") from e
330330

331331
if server.gpu_indices is not None:
332-
available = {int(k) for k in data.get("power_mw", {})}
332+
available = set(data.get("gpu_ids", []))
333333
requested = set(server.gpu_indices)
334334
missing = requested - available
335335
if missing:
@@ -344,16 +344,16 @@ def _check_cpu_available(self, server: ZeusdTcpConfig | ZeusdUdsConfig) -> bool:
344344
Raises:
345345
ValueError: If requested CPU indices are not available on the server.
346346
"""
347-
url = self._url(server, "/cpu/power")
347+
url = self._url(server, "/discover")
348348
try:
349349
with self._make_http_client(server, timeout=5.0) as client:
350350
response = client.get(url)
351351
response.raise_for_status()
352352
data = response.json()
353-
power_mw = data.get("power_mw", {})
354-
if power_mw:
353+
cpu_ids = data.get("cpu_ids", [])
354+
if cpu_ids:
355355
if server.cpu_indices is not None:
356-
available = {int(k) for k in power_mw}
356+
available = set(cpu_ids)
357357
requested = set(server.cpu_indices)
358358
missing = requested - available
359359
if missing:
@@ -365,12 +365,12 @@ def _check_cpu_available(self, server: ZeusdTcpConfig | ZeusdUdsConfig) -> bool:
365365
return True
366366
return False
367367
except (httpx.RequestError, httpx.HTTPStatusError):
368-
logger.warning("Failed to probe CPU power endpoint on %s", server.key, exc_info=True)
368+
logger.warning("Failed to probe discovery endpoint on %s", server.key, exc_info=True)
369369
return False
370370

371371
def _gpu_stream_loop(self, server: ZeusdTcpConfig | ZeusdUdsConfig) -> None:
372372
"""Background thread: stream GPU power from a single server."""
373-
base_url = self._url(server, "/gpu/power/stream")
373+
base_url = self._url(server, "/gpu/stream_power")
374374
# User specified specific indices to stream
375375
if server.gpu_indices is not None:
376376
ids_param = ",".join(str(i) for i in server.gpu_indices)
@@ -382,7 +382,7 @@ def _gpu_stream_loop(self, server: ZeusdTcpConfig | ZeusdUdsConfig) -> None:
382382

383383
def _cpu_stream_loop(self, server: ZeusdTcpConfig | ZeusdUdsConfig) -> None:
384384
"""Background thread: stream CPU power from a single server."""
385-
base_url = self._url(server, "/cpu/power/stream")
385+
base_url = self._url(server, "/cpu/stream_power")
386386
# User specified specific indices to stream
387387
if server.cpu_indices is not None:
388388
ids_param = ",".join(str(i) for i in server.cpu_indices)

0 commit comments

Comments
 (0)