Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cuda_core/tests/test_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,19 @@ def test_eq_hash(self):
assert Host() != Host(numa_id=0)
assert Host.numa_current() != Host()
assert hash(Host(numa_id=1)) == hash(Host(numa_id=1))

def test_repr(self):
assert repr(Host()) == "Host()"
assert repr(Host(numa_id=2)) == "Host(numa_id=2)"
assert repr(Host.numa_current()) == "Host.numa_current()"

def test_pickle_roundtrip_preserves_singleton(self):
# __reduce__ routes numa_current through _reconstruct_numa_current and
# the others through Host(numa_id); both rebuild the same singleton.
# copy.copy / copy.deepcopy share the same __reduce__ machinery.
import copy
import pickle

for h in (Host(), Host(numa_id=4), Host.numa_current()):
assert pickle.loads(pickle.dumps(h)) is h # noqa: S301
assert copy.copy(h) is h
70 changes: 70 additions & 0 deletions cuda_core/tests/test_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,76 @@ class _FakeDev:
assert attr.value.cooperative == 1, f"Expected cooperative=1, got {attr.value.cooperative}"


def test_launch_config_cluster_accepts_hopper_cc(monkeypatch):
"""LaunchConfig accepts ``cluster`` when the device reports compute
capability >= 9.0. Device is mocked so the cluster-cast branch runs on any
GPU (real cluster support otherwise requires Hopper+)."""
from cuda.core import _launch_config as _lc_mod

class _FakeDev:
compute_capability = (9, 0)

# looked_up confirms the mock took effect.
looked_up = []
monkeypatch.setattr(_lc_mod, "Device", lambda: looked_up.append(1) or _FakeDev())

config = LaunchConfig(grid=(2, 3), cluster=(2, 2), block=32)
assert looked_up, "Device was not looked up via the module global; mock did not take effect"
assert config.cluster == (2, 2, 1)
assert config.grid == (2, 3, 1)


def test_launch_config_cluster_rejects_pre_hopper_cc(monkeypatch):
"""LaunchConfig(cluster=...) raises on a device with compute capability < 9.0."""
from cuda.core import _launch_config as _lc_mod

class _FakeDev:
compute_capability = (8, 6)

# looked_up confirms the mock took effect.
looked_up = []
monkeypatch.setattr(_lc_mod, "Device", lambda: looked_up.append(1) or _FakeDev())

with pytest.raises(CUDAError, match="thread block clusters are not supported"):
LaunchConfig(grid=2, cluster=2, block=32)
assert looked_up, "Device was not looked up via the module global; mock did not take effect"


def test_to_native_launch_config_cluster_branch():
"""Covers the cluster branch of ``_to_native_launch_config`` (grid is
converted from cluster units to block units, plus the cluster-dimension
attribute) without requiring Hopper.

The cc gate lives in ``LaunchConfig.__init__``; ``cluster`` itself is a
public attribute, so setting it on a cluster-free config yields the exact
object ``__init__`` would build on Hopper and lets the conversion run on
any GPU.

Note: this exercises the standalone ``cpdef _to_native_launch_config``
function (a duplicate of the ``LaunchConfig._to_native_launch_config``
cdef method, slated for removal once all modules are cythonized), not the
cdef method that ``launch`` / ``Module`` actually call in production.
"""
from cuda.bindings import driver
from cuda.core._launch_config import _to_native_launch_config

config = LaunchConfig(grid=(2, 3, 4), block=(5, 6, 7))
config.cluster = (2, 2, 2)
native = _to_native_launch_config(config)

# grid (in cluster units) * cluster -> block units
assert native.gridDimX == 4
assert native.gridDimY == 6
assert native.gridDimZ == 8
assert native.blockDimX == 5
assert native.blockDimY == 6
assert native.blockDimZ == 7
assert native.numAttrs == 1
attr = native.attrs[0]
assert attr.id == driver.CUlaunchAttributeID.CU_LAUNCH_ATTRIBUTE_CLUSTER_DIMENSION
assert (attr.value.clusterDim.x, attr.value.clusterDim.y, attr.value.clusterDim.z) == (2, 2, 2)


def test_launch_invalid_values(init_cuda):
code = 'extern "C" __global__ void my_kernel() {}'
program = Program(code, SourceCodeType.CXX)
Expand Down
178 changes: 178 additions & 0 deletions cuda_core/tests/test_program_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2000,6 +2000,184 @@ def thread_b():
assert cache._tracked_size_bytes == 0, f"tracker went negative: {cache._tracked_size_bytes}"


def test_filestream_delitem_missing_key_with_cap_raises_keyerror(tmp_path):
"""With a size cap active, ``__delitem__`` of an absent key raises KeyError
from the stat-before-unlink miss branch (so the tracker stays correct)."""
from cuda.core.utils import FileStreamProgramCache

with FileStreamProgramCache(tmp_path / "fc", max_size_bytes=1000) as cache, pytest.raises(KeyError):
del cache[b"absent"]


def test_filestream_clear_with_cap_resets_tracker(tmp_path):
"""``clear()`` re-derives the size tracker from the post-clear disk state
when a size cap is active."""
from cuda.core.utils import FileStreamProgramCache

with FileStreamProgramCache(tmp_path / "fc", max_size_bytes=10_000) as cache:
cache[b"a"] = b"a" * 100
cache[b"b"] = b"b" * 100
assert len(cache) == 2
assert cache._tracked_size_bytes == 200

cache.clear()
assert len(cache) == 0
assert cache._tracked_size_bytes == 0


def test_filestream_iter_entry_paths_skips_stray_top_level_file(tmp_path):
"""A non-directory file sitting directly in ``entries/`` is ignored; only
the two-level digest shards hold real entries."""
from cuda.core.utils import FileStreamProgramCache

with FileStreamProgramCache(tmp_path / "fc") as cache:
cache[b"k"] = b"v"
stray = cache._entries / "not-a-shard"
stray.write_bytes(b"junk")
# The stray top-level file is skipped; only the real entry counts.
assert len(cache) == 1


def test_filestream_iter_entry_paths_returns_when_entries_dir_missing(tmp_path):
"""``_iter_entry_paths`` returns cleanly (len 0) if ``entries/`` vanishes."""
import shutil

from cuda.core.utils import FileStreamProgramCache

with FileStreamProgramCache(tmp_path / "fc") as cache:
cache[b"k"] = b"v"
shutil.rmtree(cache._entries)
assert len(cache) == 0


def test_filestream_sum_tmp_sizes_returns_zero_when_tmp_dir_missing(tmp_path):
"""``_sum_tmp_sizes`` (via ``_iter_tmp_entries``) returns 0 if ``tmp/`` is gone."""
import shutil

from cuda.core.utils import FileStreamProgramCache

with FileStreamProgramCache(tmp_path / "fc") as cache:
shutil.rmtree(cache._tmp)
assert cache._sum_tmp_sizes() == 0


def test_filestream_enforce_size_cap_noop_without_cap(tmp_path):
"""``_enforce_size_cap`` returns immediately when no size cap is configured."""
from cuda.core.utils import FileStreamProgramCache

with FileStreamProgramCache(tmp_path / "fc") as cache: # max_size_bytes=None
cache[b"k"] = b"v"
cache._enforce_size_cap() # no-op; must not raise or evict
assert len(cache) == 1


def test_filestream_touch_atime_path_fallback_swallows_stat_failure(tmp_path, monkeypatch):
"""In the path-based fallback (the Windows code path), a failing
``path.stat()`` is swallowed: ``_touch_atime`` returns without raising
and without calling ``os.utime`` -- the entry just isn't re-stamped."""
import os as _os

from cuda.core.utils import FileStreamProgramCache, _program_cache
from cuda.core.utils._program_cache._file_stream import _touch_atime

monkeypatch.setattr(_program_cache._file_stream, "_UTIME_SUPPORTS_FD", False)
with FileStreamProgramCache(tmp_path / "fc") as cache:
cache[b"k"] = b"v"
path = cache._path_for_key(b"k")
st_before = path.stat()
path.unlink() # now the fallback's re-stat raises FileNotFoundError (an OSError)

utime_calls = []
monkeypatch.setattr(_os, "utime", lambda *a, **k: utime_calls.append((a, k)))

# Best-effort: the failing stat is swallowed -- no exception, no utime.
assert _touch_atime(path, st_before) is None
assert not utime_calls, "os.utime must not run when path.stat() fails"


def test_filestream_touch_atime_swallows_open_failure(tmp_path, monkeypatch):
"""The best-effort atime bump swallows an ``os.open`` failure: the read
still returns the cached bytes and never reaches ``os.utime``."""
import os as _os

from cuda.core.utils import FileStreamProgramCache, _program_cache

monkeypatch.setattr(_program_cache._file_stream, "_UTIME_SUPPORTS_FD", True)
with FileStreamProgramCache(tmp_path / "fc") as cache:
cache[b"k"] = b"v"
entry_path = cache._path_for_key(b"k")

# Fail only this entry's atime-bump open; let other os.open calls pass
# through so a broken read can't masquerade as the swallowed failure.
real_open = _os.open
opened = []

def _failing_open(path, flags, *args, **kwargs):
if _os.fspath(path) == _os.fspath(entry_path) and flags == _os.O_RDONLY:
opened.append(path)
raise OSError("open refused")
return real_open(path, flags, *args, **kwargs)

utime_calls = []
monkeypatch.setattr(_os, "open", _failing_open)
monkeypatch.setattr(_os, "utime", lambda *a, **k: utime_calls.append((a, k)))

assert cache[b"k"] == b"v"
assert opened, "the atime bump should have attempted os.open on the entry"
assert not utime_calls, "os.utime must not run after os.open fails"


def test_filestream_touch_atime_swallows_fstat_failure(tmp_path, monkeypatch):
"""The best-effort atime bump swallows an ``os.fstat`` failure after the fd
was opened: the read still returns the cached bytes, closes the fd, and
never reaches ``os.utime``."""
import os as _os

from cuda.core.utils import FileStreamProgramCache, _program_cache

monkeypatch.setattr(_program_cache._file_stream, "_UTIME_SUPPORTS_FD", True)
with FileStreamProgramCache(tmp_path / "fc") as cache:
cache[b"k"] = b"v"
entry_path = cache._path_for_key(b"k")

# Record the fd the atime bump opens so we can prove it gets closed even
# though fstat fails -- a leaked fd would block deletes on Windows.
real_open = _os.open
opened_fds = []

def _recording_open(path, flags, *args, **kwargs):
fd = real_open(path, flags, *args, **kwargs)
if _os.fspath(path) == _os.fspath(entry_path) and flags == _os.O_RDONLY:
opened_fds.append(fd)
return fd

closed_fds = []
real_close = _os.close

def _recording_close(fd):
closed_fds.append(fd)
return real_close(fd)

# os.fstat runs only in the atime bump here; the wrapper forces and confirms the swallowed failure.
fstat_calls = []

def _failing_fstat(fd):
fstat_calls.append(fd)
raise OSError("fstat refused")

utime_calls = []
monkeypatch.setattr(_os, "open", _recording_open)
monkeypatch.setattr(_os, "close", _recording_close)
monkeypatch.setattr(_os, "fstat", _failing_fstat)
monkeypatch.setattr(_os, "utime", lambda *a, **k: utime_calls.append((a, k)))

assert cache[b"k"] == b"v"
assert fstat_calls, "the atime bump should have attempted os.fstat"
assert opened_fds, "the atime bump should have opened the entry fd"
assert opened_fds[0] in closed_fds, "the opened fd must be closed even when fstat fails"
assert not utime_calls, "os.utime must not run after os.fstat fails"


def test_make_program_cache_key_changes_with_key_schema_version(monkeypatch):
"""Bumping ``_KEY_SCHEMA_VERSION`` produces a different cache key for
the same logical inputs. That's what makes a schema bump invalidate
Expand Down
Loading
Loading