Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
41918b5
Add design doc for async load-generator
nv-alicheng Apr 2, 2026
1eb7f3d
feat: add async load strategies, sample ordering, and delay functions…
nv-alicheng Apr 2, 2026
4be9129
feat: async BenchmarkSession with multi-phase support, PhaseIssuer, a…
nv-alicheng Apr 2, 2026
da061b9
feat: rewrite HttpClientSampleIssuer for async session, add HTTPEndpo…
nv-alicheng Apr 2, 2026
09aad43
refactor: remove legacy load generator (Sample, Scheduler, SampleEven…
nv-alicheng Apr 2, 2026
b5de33c
Fix test_end_to_end_oracle.py to use new loadgen
nv-alicheng Apr 2, 2026
21b2f8e
Integrate pubsub into execute.py
nv-alicheng Apr 2, 2026
b1d899f
Fix integration with accuracy tests and Scorer
nv-alicheng Apr 2, 2026
be0cf98
PR fixes, add full e2e accuracy integration test
nv-alicheng Apr 2, 2026
3199f0b
Fix for sockets in some docker contexts
nv-alicheng Apr 3, 2026
05d53f1
Fix race condition bug where workers were closing+reopening sockets a…
nv-alicheng Apr 4, 2026
1443814
Remove hardcoded 1-min drain timeout
nv-alicheng Apr 4, 2026
3851887
Remove lazy imports, fix bug with accuracy datasets discarding prepro…
nv-alicheng Apr 4, 2026
8acf5d7
Resolve report at end of session rather than end of phase
nv-alicheng Apr 6, 2026
6a84619
Fix name mismatch in metrics KVStore using str(enum) rather than enum…
nv-alicheng Apr 7, 2026
90dde8d
Add report.txt detailed report back in
nv-alicheng Apr 7, 2026
601dd90
Show error if tmpfs is used on ARM systems. Remove StreamChunk(is_com…
nv-alicheng Apr 7, 2026
ec87036
Remove unused is_complete flag from StreamChunk
nv-alicheng Apr 7, 2026
1770d88
PR Comments - documentation fixes, ZMQ context scoping
nv-alicheng Apr 8, 2026
53746e2
Remove lazy imports from tests
nv-alicheng Apr 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,086 changes: 1,086 additions & 0 deletions docs/load_generator/design.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Filesystem type detection for mmap ordering decisions.

On tmpfs (/dev/shm), msync() is a no-op because there is no backing store.
On a real on-disk filesystem, msync() flushes dirty pages to the shared page
cache, which provides write ordering for cross-process mmap readers.

On ARM (weak memory model), we need msync() to act as an ordering mechanism
between the value write and the count update in _SeriesItem.append(). This
only works on a real filesystem — not tmpfs. Detecting the filesystem type
lets us:
- Skip the useless msync() syscall on tmpfs (any architecture)
- Warn if ARM code is running on tmpfs (msync won't provide ordering)
"""

from __future__ import annotations

import ctypes
import ctypes.util
import logging
import platform
from pathlib import Path

logger = logging.getLogger(__name__)

_TMPFS_MAGIC = 0x01021994
"""Special tmpfs filesystem header value."""


def _is_tmpfs_via_statfs(path: str) -> bool | None:
"""Check filesystem type via libc statfs(2). Returns None if unavailable."""
try:
lib_name = ctypes.util.find_library("c")
if lib_name is None:
return None
libc = ctypes.CDLL(lib_name, use_errno=True)

# Allocate a large buffer to account for differently sized statfs
# structs across architectures. f_type is always the first field
# (__SWORD_TYPE / long) at offset 0 on all Linux archs.
buf = ctypes.create_string_buffer(256)
if libc.statfs(path.encode(), buf) != 0:
return None
# f_type is a native-endian long at offset 0
f_type = ctypes.c_long.from_buffer(buf, 0).value
return f_type == _TMPFS_MAGIC
except (OSError, AttributeError, ValueError):
return None


def _is_tmpfs_via_proc_mounts(path: str) -> bool | None:
"""Check filesystem type via /proc/mounts. Returns None if unavailable."""
try:
resolved = str(Path(path).resolve())
best_match = ""
best_fstype = ""
with open("/proc/mounts") as f:
for line in f:
parts = line.split()
if len(parts) < 3:
continue
mount_point, fstype = parts[1], parts[2]
if resolved.startswith(mount_point) and len(mount_point) > len(
best_match
):
best_match = mount_point
best_fstype = fstype
if not best_match:
return None
return best_fstype == "tmpfs"
except OSError:
return None


def is_tmpfs(path: str | Path) -> bool:
"""Check if a path resides on a tmpfs filesystem.

Tries statfs(2) via ctypes first, falls back to /proc/mounts.
Returns False if detection fails (safe default — will call msync).
"""
path_str = str(path)

result = _is_tmpfs_via_statfs(path_str)
if result is not None:
return result

result = _is_tmpfs_via_proc_mounts(path_str)
if result is not None:
return result

logger.warning(
"Could not determine filesystem type for %s "
"(statfs and /proc/mounts both unavailable). "
"Assuming non-tmpfs (msync will be called on every series append).",
path_str,
)
return False


def needs_msync(path: str | Path) -> bool:
"""Determine if msync() is needed for mmap write ordering at this path.

Returns True if msync should be called between value write and count
update in series append. This is needed on ARM when the backing store
is a real filesystem (not tmpfs).

On x86-64 (TSO), store ordering is guaranteed by hardware — msync is
never needed regardless of filesystem type.

On ARM with tmpfs, msync is a no-op and won't help — log a warning
since the caller should use an on-disk directory for correct ordering.
"""
if platform.machine() == "x86_64":
return False

on_tmpfs = is_tmpfs(path)
if on_tmpfs:
logger.warning(
"ARM platform with tmpfs-backed metrics at %s. "
"Python does not support memory fences. "
"Use an on-disk metrics directory for correct cross-process reads.",
path,
)
return False

return True
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@
import math
import mmap
import os
import platform
import shutil
import struct
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Literal

from .fs_check import needs_msync

# ---------------------------------------------------------------------------
# Series rollup stats (computed on read)
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -257,6 +258,7 @@ class _SeriesItem:
"_dtype",
"_char",
"_fmt",
"_needs_msync",
)

def __init__(
Expand All @@ -272,6 +274,7 @@ def __init__(
self._dtype = dtype
self._char = _STRUCT_CHAR[dtype]
self._fmt = f"{_ENDIAN}{self._char}"
self._needs_msync = needs_msync(path.parent)
total = _HEADER_BYTES + capacity * _VALUE_BYTES
fd = os.open(str(path), os.O_CREAT | os.O_RDWR, _DEFAULT_FILE_MODE)
try:
Expand All @@ -285,33 +288,17 @@ def append(self, value: int | float) -> None:
if self._closed:
logger.warning("append() called on closed series: %s", self._path)
return
if type(value) != self._dtype:
if not isinstance(value, self._dtype):
raise TypeError(
f"Expected {self._dtype.__name__}, got {type(value).__name__}"
)
if self._count >= self._capacity:
self._grow()
offset = _HEADER_BYTES + self._count * _VALUE_BYTES
struct.pack_into(self._fmt, self._mm, offset, value)
# NOTE: This flush() calls msync(), which is a no-op on tmpfs (/dev/shm)
# and does NOT act as a CPU memory barrier. On x86-64 (TSO), store ordering
# is guaranteed — the value write above is visible before the count update
# below without any explicit barrier. On ARM (weak memory model), a reader
# could observe the count update before the value write. To support ARM
# properly, Python's mmap doesn't expose memory fences; you would need
# ctypes to call libc's __sync_synchronize() or use atomic operations via
# a C extension.
# The primary safety guarantee is the single-writer protocol:
# readers only read up to the count they observed, and on the target
# platform (x86-64 Linux), TSO provides the required ordering.
#
# For ARM platforms: Prometheus integration is planned as a replacement
# for mmap-backed metrics. As a temporary workaround, an on-disk metrics
# directory can be used instead of tmpfs — msync will then act as a real
# flush, providing ordering at the cost of performance.
if platform.machine() != "x86_64":
# Do not flush on x86-64 to avoid a no-op syscall on every append()
# For ARM, flush() and use an on-disk metrics directory instead of tmpfs.
# Flush between value write and count update for cross-process ordering.
# See fs_check.needs_msync() for when this is needed and why.
if self._needs_msync:
self._mm.flush()
self._count += 1
struct.pack_into("<Q", self._mm, 0, self._count)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ def __init__(
requires: tuple[str, ...] = (),
dtype: type = int,
):
self.metric_name = metric_name
# Resolve enum to its value string so KVStore filenames match
# what the reader expects (e.g. "ttft_ns" not "MetricSeriesKey.TTFT_NS").
self.metric_name = (
metric_name.value if isinstance(metric_name, Enum) else metric_name
)
self.kv_store = kv_store
self.requires = requires
self.dtype = dtype
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,13 @@ def bind(self, sock: zmq.Socket, path: str, scheme: str = "ipc") -> str:
"""
if scheme == "ipc":
if self.socket_dir is None:
self._tmp_dir = tempfile.TemporaryDirectory(prefix="zmq_")
# Prefer /dev/shm for IPC sockets — overlayfs (common in
# containers for /tmp) does not support Unix sockets.
shm = Path("/dev/shm")
self._tmp_dir = tempfile.TemporaryDirectory(
prefix="zmq_",
dir=str(shm) if shm.is_dir() else None,
)
self.socket_dir = self._tmp_dir.name
else:
Path(self.socket_dir).mkdir(parents=True, exist_ok=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,11 @@ async def wait(self, timeout: float | None = None) -> list[int]:
len(identities),
self._count,
)
except TimeoutError:
# Don't close socket on timeout — caller may retry.
raise
except BaseException:
# Clean up socket on any failure (timeout, cancellation, etc.)
# Clean up socket on non-retryable failures (cancellation, etc.)
self.close()
raise

Expand Down
Loading
Loading