Skip to content

fix(test/cpu_monitor): use psutil to get more accurate CPU measurements #5363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 13, 2025
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 34 additions & 26 deletions tests/framework/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import typing
from collections import defaultdict, namedtuple
from contextlib import contextmanager
from pathlib import Path
from typing import Dict

import psutil
Expand All @@ -29,7 +30,6 @@
FLUSH_CMD = 'screen -S {session} -X colon "logfile flush 0^M"'
CommandReturn = namedtuple("CommandReturn", "returncode stdout stderr")
CMDLOG = logging.getLogger("commands")
GET_CPU_LOAD = "top -bn1 -H -p {} -w512 | tail -n+8"


def get_threads(pid: int) -> dict:
Expand All @@ -56,30 +56,43 @@ def set_cpu_affinity(pid: int, cpulist: list) -> list:
return psutil.Process(pid).cpu_affinity(real_cpulist)


def get_cpu_utilization(pid: int) -> Dict[str, float]:
"""Return current process per thread CPU utilization."""
_, stdout, _ = check_output(GET_CPU_LOAD.format(pid))
cpu_utilization = {}
def get_thread_name(pid: int, tid: int) -> str:
"""Return thread name from pid and tid pair."""
return Path("/proc", str(pid), "task", str(tid), "comm").read_text("utf-8").strip()


# Take all except the last line
lines = stdout.strip().split(sep="\n")
for line in lines:
# sometimes the firecracker process will have gone away, in which case top does not return anything
if not line:
continue
CpuTimes = namedtuple("CpuTimes", ["user", "system"])

info = line.strip().split()
# We need at least CPU utilization and threads names cols (which
# might be two cols e.g `fc_vcpu 0`).
info_len = len(info)
assert info_len > 11, line

cpu_percent = float(info[8])
def get_cpu_times(pid: int) -> Dict[str, CpuTimes]:
"""Return a dict mapping thread name to CPU usage (in seconds) since start."""
cpu_times = {}
for thread in psutil.Process(pid).threads():
thread_name = get_thread_name(pid, thread.id)
cpu_times[thread_name] = CpuTimes(thread.user_time, thread.system_time)
return cpu_times

# Handles `fc_vcpu 0` case as well.
thread_name = info[11] + (" " + info[12] if info_len > 12 else "")
cpu_utilization[thread_name] = cpu_percent

def get_cpu_utilization(
pid: int,
interval: int = 1,
split_user_system: bool = False,
) -> Dict[str, float | CpuTimes]:
"""Return current process per thread CPU utilization over the interval (seconds)."""
cpu_utilization = {}
cpu_times_before = get_cpu_times(pid)
time.sleep(interval)
cpu_times_after = get_cpu_times(pid)
threads = set(cpu_times_before.keys()) & set(cpu_times_after.keys())
for thread_name in threads:
before = cpu_times_before[thread_name]
after = cpu_times_after[thread_name]
user = (after.user - before.user) / interval * 100
system = (after.system - before.system) / interval * 100
if split_user_system:
cpu_utilization[thread_name] = CpuTimes(user, system)
else:
cpu_utilization[thread_name] = user + system
return cpu_utilization


Expand All @@ -94,18 +107,13 @@ def track_cpu_utilization(
# Sleep first `omit` secconds
time.sleep(omit)

cpu_utilization = {}
cpu_utilization = defaultdict(list)
for _ in range(iterations):
current_cpu_utilization = get_cpu_utilization(pid)
assert len(current_cpu_utilization) > 0

for thread_name, value in current_cpu_utilization.items():
if not cpu_utilization.get(thread_name):
cpu_utilization[thread_name] = []
cpu_utilization[thread_name].append(value)

# 1 second granularity
time.sleep(1)
return cpu_utilization


Expand Down