diff --git a/cuda_core/tests/test_host.py b/cuda_core/tests/test_host.py index d5eeaca9a3..de33f20cf2 100644 --- a/cuda_core/tests/test_host.py +++ b/cuda_core/tests/test_host.py @@ -53,3 +53,19 @@ def test_eq_hash(self): assert Host() != Host(numa_id=0) assert Host.numa_current() != Host() assert hash(Host(numa_id=1)) == hash(Host(numa_id=1)) + + def test_repr(self): + assert repr(Host()) == "Host()" + assert repr(Host(numa_id=2)) == "Host(numa_id=2)" + assert repr(Host.numa_current()) == "Host.numa_current()" + + def test_pickle_roundtrip_preserves_singleton(self): + # __reduce__ routes numa_current through _reconstruct_numa_current and + # the others through Host(numa_id); both rebuild the same singleton. + # copy.copy / copy.deepcopy share the same __reduce__ machinery. + import copy + import pickle + + for h in (Host(), Host(numa_id=4), Host.numa_current()): + assert pickle.loads(pickle.dumps(h)) is h # noqa: S301 + assert copy.copy(h) is h diff --git a/cuda_core/tests/test_launcher.py b/cuda_core/tests/test_launcher.py index 1080ecc099..15a2108e41 100644 --- a/cuda_core/tests/test_launcher.py +++ b/cuda_core/tests/test_launcher.py @@ -183,6 +183,76 @@ class _FakeDev: assert attr.value.cooperative == 1, f"Expected cooperative=1, got {attr.value.cooperative}" +def test_launch_config_cluster_accepts_hopper_cc(monkeypatch): + """LaunchConfig accepts ``cluster`` when the device reports compute + capability >= 9.0. Device is mocked so the cluster-cast branch runs on any + GPU (real cluster support otherwise requires Hopper+).""" + from cuda.core import _launch_config as _lc_mod + + class _FakeDev: + compute_capability = (9, 0) + + # looked_up confirms the mock took effect. + looked_up = [] + monkeypatch.setattr(_lc_mod, "Device", lambda: looked_up.append(1) or _FakeDev()) + + config = LaunchConfig(grid=(2, 3), cluster=(2, 2), block=32) + assert looked_up, "Device was not looked up via the module global; mock did not take effect" + assert config.cluster == (2, 2, 1) + assert config.grid == (2, 3, 1) + + +def test_launch_config_cluster_rejects_pre_hopper_cc(monkeypatch): + """LaunchConfig(cluster=...) raises on a device with compute capability < 9.0.""" + from cuda.core import _launch_config as _lc_mod + + class _FakeDev: + compute_capability = (8, 6) + + # looked_up confirms the mock took effect. + looked_up = [] + monkeypatch.setattr(_lc_mod, "Device", lambda: looked_up.append(1) or _FakeDev()) + + with pytest.raises(CUDAError, match="thread block clusters are not supported"): + LaunchConfig(grid=2, cluster=2, block=32) + assert looked_up, "Device was not looked up via the module global; mock did not take effect" + + +def test_to_native_launch_config_cluster_branch(): + """Covers the cluster branch of ``_to_native_launch_config`` (grid is + converted from cluster units to block units, plus the cluster-dimension + attribute) without requiring Hopper. + + The cc gate lives in ``LaunchConfig.__init__``; ``cluster`` itself is a + public attribute, so setting it on a cluster-free config yields the exact + object ``__init__`` would build on Hopper and lets the conversion run on + any GPU. + + Note: this exercises the standalone ``cpdef _to_native_launch_config`` + function (a duplicate of the ``LaunchConfig._to_native_launch_config`` + cdef method, slated for removal once all modules are cythonized), not the + cdef method that ``launch`` / ``Module`` actually call in production. + """ + from cuda.bindings import driver + from cuda.core._launch_config import _to_native_launch_config + + config = LaunchConfig(grid=(2, 3, 4), block=(5, 6, 7)) + config.cluster = (2, 2, 2) + native = _to_native_launch_config(config) + + # grid (in cluster units) * cluster -> block units + assert native.gridDimX == 4 + assert native.gridDimY == 6 + assert native.gridDimZ == 8 + assert native.blockDimX == 5 + assert native.blockDimY == 6 + assert native.blockDimZ == 7 + assert native.numAttrs == 1 + attr = native.attrs[0] + assert attr.id == driver.CUlaunchAttributeID.CU_LAUNCH_ATTRIBUTE_CLUSTER_DIMENSION + assert (attr.value.clusterDim.x, attr.value.clusterDim.y, attr.value.clusterDim.z) == (2, 2, 2) + + def test_launch_invalid_values(init_cuda): code = 'extern "C" __global__ void my_kernel() {}' program = Program(code, SourceCodeType.CXX) diff --git a/cuda_core/tests/test_program_cache.py b/cuda_core/tests/test_program_cache.py index 3923312a3e..01a39e0032 100644 --- a/cuda_core/tests/test_program_cache.py +++ b/cuda_core/tests/test_program_cache.py @@ -2000,6 +2000,184 @@ def thread_b(): assert cache._tracked_size_bytes == 0, f"tracker went negative: {cache._tracked_size_bytes}" +def test_filestream_delitem_missing_key_with_cap_raises_keyerror(tmp_path): + """With a size cap active, ``__delitem__`` of an absent key raises KeyError + from the stat-before-unlink miss branch (so the tracker stays correct).""" + from cuda.core.utils import FileStreamProgramCache + + with FileStreamProgramCache(tmp_path / "fc", max_size_bytes=1000) as cache, pytest.raises(KeyError): + del cache[b"absent"] + + +def test_filestream_clear_with_cap_resets_tracker(tmp_path): + """``clear()`` re-derives the size tracker from the post-clear disk state + when a size cap is active.""" + from cuda.core.utils import FileStreamProgramCache + + with FileStreamProgramCache(tmp_path / "fc", max_size_bytes=10_000) as cache: + cache[b"a"] = b"a" * 100 + cache[b"b"] = b"b" * 100 + assert len(cache) == 2 + assert cache._tracked_size_bytes == 200 + + cache.clear() + assert len(cache) == 0 + assert cache._tracked_size_bytes == 0 + + +def test_filestream_iter_entry_paths_skips_stray_top_level_file(tmp_path): + """A non-directory file sitting directly in ``entries/`` is ignored; only + the two-level digest shards hold real entries.""" + from cuda.core.utils import FileStreamProgramCache + + with FileStreamProgramCache(tmp_path / "fc") as cache: + cache[b"k"] = b"v" + stray = cache._entries / "not-a-shard" + stray.write_bytes(b"junk") + # The stray top-level file is skipped; only the real entry counts. + assert len(cache) == 1 + + +def test_filestream_iter_entry_paths_returns_when_entries_dir_missing(tmp_path): + """``_iter_entry_paths`` returns cleanly (len 0) if ``entries/`` vanishes.""" + import shutil + + from cuda.core.utils import FileStreamProgramCache + + with FileStreamProgramCache(tmp_path / "fc") as cache: + cache[b"k"] = b"v" + shutil.rmtree(cache._entries) + assert len(cache) == 0 + + +def test_filestream_sum_tmp_sizes_returns_zero_when_tmp_dir_missing(tmp_path): + """``_sum_tmp_sizes`` (via ``_iter_tmp_entries``) returns 0 if ``tmp/`` is gone.""" + import shutil + + from cuda.core.utils import FileStreamProgramCache + + with FileStreamProgramCache(tmp_path / "fc") as cache: + shutil.rmtree(cache._tmp) + assert cache._sum_tmp_sizes() == 0 + + +def test_filestream_enforce_size_cap_noop_without_cap(tmp_path): + """``_enforce_size_cap`` returns immediately when no size cap is configured.""" + from cuda.core.utils import FileStreamProgramCache + + with FileStreamProgramCache(tmp_path / "fc") as cache: # max_size_bytes=None + cache[b"k"] = b"v" + cache._enforce_size_cap() # no-op; must not raise or evict + assert len(cache) == 1 + + +def test_filestream_touch_atime_path_fallback_swallows_stat_failure(tmp_path, monkeypatch): + """In the path-based fallback (the Windows code path), a failing + ``path.stat()`` is swallowed: ``_touch_atime`` returns without raising + and without calling ``os.utime`` -- the entry just isn't re-stamped.""" + import os as _os + + from cuda.core.utils import FileStreamProgramCache, _program_cache + from cuda.core.utils._program_cache._file_stream import _touch_atime + + monkeypatch.setattr(_program_cache._file_stream, "_UTIME_SUPPORTS_FD", False) + with FileStreamProgramCache(tmp_path / "fc") as cache: + cache[b"k"] = b"v" + path = cache._path_for_key(b"k") + st_before = path.stat() + path.unlink() # now the fallback's re-stat raises FileNotFoundError (an OSError) + + utime_calls = [] + monkeypatch.setattr(_os, "utime", lambda *a, **k: utime_calls.append((a, k))) + + # Best-effort: the failing stat is swallowed -- no exception, no utime. + assert _touch_atime(path, st_before) is None + assert not utime_calls, "os.utime must not run when path.stat() fails" + + +def test_filestream_touch_atime_swallows_open_failure(tmp_path, monkeypatch): + """The best-effort atime bump swallows an ``os.open`` failure: the read + still returns the cached bytes and never reaches ``os.utime``.""" + import os as _os + + from cuda.core.utils import FileStreamProgramCache, _program_cache + + monkeypatch.setattr(_program_cache._file_stream, "_UTIME_SUPPORTS_FD", True) + with FileStreamProgramCache(tmp_path / "fc") as cache: + cache[b"k"] = b"v" + entry_path = cache._path_for_key(b"k") + + # Fail only this entry's atime-bump open; let other os.open calls pass + # through so a broken read can't masquerade as the swallowed failure. + real_open = _os.open + opened = [] + + def _failing_open(path, flags, *args, **kwargs): + if _os.fspath(path) == _os.fspath(entry_path) and flags == _os.O_RDONLY: + opened.append(path) + raise OSError("open refused") + return real_open(path, flags, *args, **kwargs) + + utime_calls = [] + monkeypatch.setattr(_os, "open", _failing_open) + monkeypatch.setattr(_os, "utime", lambda *a, **k: utime_calls.append((a, k))) + + assert cache[b"k"] == b"v" + assert opened, "the atime bump should have attempted os.open on the entry" + assert not utime_calls, "os.utime must not run after os.open fails" + + +def test_filestream_touch_atime_swallows_fstat_failure(tmp_path, monkeypatch): + """The best-effort atime bump swallows an ``os.fstat`` failure after the fd + was opened: the read still returns the cached bytes, closes the fd, and + never reaches ``os.utime``.""" + import os as _os + + from cuda.core.utils import FileStreamProgramCache, _program_cache + + monkeypatch.setattr(_program_cache._file_stream, "_UTIME_SUPPORTS_FD", True) + with FileStreamProgramCache(tmp_path / "fc") as cache: + cache[b"k"] = b"v" + entry_path = cache._path_for_key(b"k") + + # Record the fd the atime bump opens so we can prove it gets closed even + # though fstat fails -- a leaked fd would block deletes on Windows. + real_open = _os.open + opened_fds = [] + + def _recording_open(path, flags, *args, **kwargs): + fd = real_open(path, flags, *args, **kwargs) + if _os.fspath(path) == _os.fspath(entry_path) and flags == _os.O_RDONLY: + opened_fds.append(fd) + return fd + + closed_fds = [] + real_close = _os.close + + def _recording_close(fd): + closed_fds.append(fd) + return real_close(fd) + + # os.fstat runs only in the atime bump here; the wrapper forces and confirms the swallowed failure. + fstat_calls = [] + + def _failing_fstat(fd): + fstat_calls.append(fd) + raise OSError("fstat refused") + + utime_calls = [] + monkeypatch.setattr(_os, "open", _recording_open) + monkeypatch.setattr(_os, "close", _recording_close) + monkeypatch.setattr(_os, "fstat", _failing_fstat) + monkeypatch.setattr(_os, "utime", lambda *a, **k: utime_calls.append((a, k))) + + assert cache[b"k"] == b"v" + assert fstat_calls, "the atime bump should have attempted os.fstat" + assert opened_fds, "the atime bump should have opened the entry fd" + assert opened_fds[0] in closed_fds, "the opened fd must be closed even when fstat fails" + assert not utime_calls, "os.utime must not run after os.fstat fails" + + def test_make_program_cache_key_changes_with_key_schema_version(monkeypatch): """Bumping ``_KEY_SCHEMA_VERSION`` produces a different cache key for the same logical inputs. That's what makes a schema bump invalidate diff --git a/cuda_core/tests/test_strided_layout.py b/cuda_core/tests/test_strided_layout.py index 734837a886..0f09ae5035 100644 --- a/cuda_core/tests/test_strided_layout.py +++ b/cuda_core/tests/test_strided_layout.py @@ -965,3 +965,154 @@ def test_eq_reflexive_and_value_equality(): assert a == a assert a == b assert a != c + + +def test_dense_like_invalid_stride_order_raises(): + """dense_like rejects a stride_order that is not 'K'/'C'/'F' or a permutation.""" + layout = _StridedLayout.dense((5, 4), 4) + with pytest.raises(ValueError, match="must be 'K', 'C', 'F', or a permutation"): + _StridedLayout.dense_like(layout, "X") + + +def test_repr_includes_slice_offset_when_nonzero(): + """__repr__ renders the _slice_offset field only when the slice offset != 0.""" + dense = _StridedLayout.dense((5, 6), 4) + assert "_slice_offset" not in repr(dense) + sliced = dense[:, 1:5] + assert sliced.slice_offset != 0 + assert "_slice_offset" in repr(sliced) + + +def test_min_offset_max_offset_properties(): + """The min_offset / max_offset scalar properties match offset_bounds.""" + layout = _StridedLayout.dense((5, 4), 4) + assert layout.min_offset == 0 + assert layout.max_offset == 19 # volume 20 -> last element at offset 19 + assert (layout.min_offset, layout.max_offset) == layout.offset_bounds + + +def test_broadcast_to_fewer_dims_raises(): + """broadcast_to rejects a target shape with fewer dims than the source.""" + layout = _StridedLayout.dense((2, 3), 4) + with pytest.raises(ValueError, match="must be greater than or equal to"): + layout.broadcast_to((3,)) + + +def test_broadcast_to_incompatible_extent_raises(): + """broadcast_to rejects an extent that is neither 1 nor equal to the target.""" + layout = _StridedLayout.dense((2, 3), 4) + with pytest.raises(ValueError, match="cannot be broadcast together"): + layout.broadcast_to((2, 5)) + + +def test_unsqueezed_invalid_axis_raises(): + """unsqueezed rejects an axis outside the resulting rank.""" + layout = _StridedLayout.dense((2, 3), 4) + with pytest.raises(ValueError, match="Invalid axis"): + layout.unsqueezed(9) + + +def test_unsqueezed_repeated_axis_raises(): + """unsqueezed rejects the same new axis specified twice.""" + layout = _StridedLayout.dense((2, 3), 4) + with pytest.raises(ValueError, match="appears multiple times"): + layout.unsqueezed((0, 0)) + + +def test_repacked_pack_invalid_axis_raises(): + """repacked (packing) rejects an out-of-range axis.""" + layout = _StridedLayout.dense((5, 4), 4) + with pytest.raises(ValueError, match="Invalid axis"): + layout.repacked(8, axis=5) + + +def test_repacked_pack_unaligned_data_ptr_raises(): + """repacked (packing) requires data_ptr aligned to the packed itemsize.""" + layout = _StridedLayout.dense((5, 4), 4) + with pytest.raises(ValueError, match="must be aligned to the packed itemsize"): + layout.repacked(8, data_ptr=4) + + +def test_repacked_pack_axis_stride_not_one_raises(): + """repacked (packing) requires the packed axis stride to be 1.""" + layout = _StridedLayout((4, 5), (5, 1), 4) + with pytest.raises(ValueError, match="axis 0 stride must be 1"): + layout.repacked(8, axis=0) + + +def test_repacked_pack_extent_not_divisible_raises(): + """repacked (packing) requires the packed axis extent divisible by vec_size.""" + layout = _StridedLayout.dense((5, 3), 4) + with pytest.raises(ValueError, match="extent .* must be divisible by 2"): + layout.repacked(8, axis=1) + + +def test_repacked_pack_slice_offset_not_divisible_raises(): + """repacked (packing) requires the slice offset divisible by vec_size.""" + layout = _StridedLayout.dense((5, 6), 4)[:, 1:5] + assert layout.slice_offset == 1 + with pytest.raises(ValueError, match="slice offset .* must be divisible by 2"): + layout.repacked(8, axis=1) + + +def test_repacked_pack_other_stride_not_divisible_raises(): + """repacked (packing) requires non-packed strides divisible by vec_size.""" + layout = _StridedLayout((3, 4), (5, 1), 4) + with pytest.raises(ValueError, match="stride .* must be divisible by 2"): + layout.repacked(8, axis=1) + + +def test_repacked_unpack_invalid_axis_raises(): + """repacked (unpacking) rejects an out-of-range axis.""" + layout = _StridedLayout.dense((5, 4), 8) + with pytest.raises(ValueError, match="Invalid axis"): + layout.repacked(4, axis=9) + + +def test_repacked_unpack_nonpositive_itemsize_raises(): + """repacked (unpacking) requires a positive new itemsize.""" + layout = _StridedLayout.dense((5, 4), 8) + with pytest.raises(ValueError, match="new itemsize must be greater than zero"): + layout.repacked(0) + + +def test_repacked_unpack_zero_extent_raises(): + """repacked (unpacking) rejects a zero extent on the unpacked axis.""" + layout = _StridedLayout((5, 0), (0, 1), 8) + with pytest.raises(ValueError, match="extent must be non-zero"): + layout.repacked(4, axis=1) + + +def test_repacked_unpack_axis_stride_not_one_raises(): + """repacked (unpacking) requires the unpacked axis stride to be 1.""" + layout = _StridedLayout((4, 5), (5, 1), 8) + with pytest.raises(ValueError, match="axis 0 stride must be 1"): + layout.repacked(4, axis=0) + + +def test_max_compatible_itemsize_nonpositive_max_raises(): + """max_compatible_itemsize requires a positive max_itemsize.""" + layout = _StridedLayout.dense((5, 4), 4) + with pytest.raises(ValueError, match="max_itemsize must be greater than zero"): + layout.max_compatible_itemsize(max_itemsize=0) + + +def test_max_compatible_itemsize_invalid_axis_raises(): + """max_compatible_itemsize rejects an out-of-range axis.""" + layout = _StridedLayout.dense((5, 4), 4) + with pytest.raises(ValueError, match="Invalid axis"): + layout.max_compatible_itemsize(axis=9) + + +def test_max_compatible_itemsize_max_below_itemsize_raises(): + """max_compatible_itemsize rejects max_itemsize smaller than the itemsize.""" + layout = _StridedLayout.dense((5, 4), 8) + with pytest.raises(ValueError, match="cannot be less than itemsize"): + layout.max_compatible_itemsize(max_itemsize=4) + + +def test_max_compatible_itemsize_axis_stride_not_one_returns_itemsize(): + """When the axis stride is not 1, max_compatible_itemsize returns the + current itemsize unchanged (no larger pack is possible).""" + layout = _StridedLayout((4, 5), (5, 1), 8) + assert layout.max_compatible_itemsize(axis=0, max_itemsize=16) == 8 diff --git a/cuda_core/tests/test_utils.py b/cuda_core/tests/test_utils.py index 3d4059b696..80631c90c3 100644 --- a/cuda_core/tests/test_utils.py +++ b/cuda_core/tests/test_utils.py @@ -1060,25 +1060,25 @@ def test_dlpack_export_non_native_endian_rejected(): bad_view.__dlpack__() -@pytest.mark.parametrize( - "dtype", - [ - np.uint8, - np.uint16, - np.uint32, - np.uint64, - np.int8, - np.int16, - np.int32, - np.int64, - np.float16, - np.float32, - np.float64, - np.complex64, - np.complex128, - np.bool_, - ], +_NUMPY_NATIVE_DLPACK_DTYPES = ( + np.uint8, + np.uint16, + np.uint32, + np.uint64, + np.int8, + np.int16, + np.int32, + np.int64, + np.float16, + np.float32, + np.float64, + np.complex64, + np.complex128, + np.bool_, ) + + +@pytest.mark.parametrize("dtype", _NUMPY_NATIVE_DLPACK_DTYPES) def test_strided_memory_view_dtype_roundtrip_all(dtype): """Exercise dtype_dlpack_to_numpy for every NumPy-native DLPack dtype. @@ -1094,3 +1094,425 @@ def test_strided_memory_view_dtype_roundtrip_all(dtype): pytest.skip(f"NumPy does not export {np.dtype(dtype)} via DLPack: {e}") view = StridedMemoryView.from_dlpack(src, stream_ptr=-1) assert view.dtype == np.dtype(dtype) # .dtype triggers dtype_dlpack_to_numpy + + +def test_as_tensor_map_assembles_kwargs(monkeypatch): + """``as_tensor_map`` forwards the view + box_dim and only the non-None + tiled options to ``TensorMapDescriptor._from_tiled``. + + The real ``_from_tiled`` requires a device-accessible, 16-byte-aligned view + on TMA-capable hardware (sm90+), so we replace the (module-level) class the + method imports with a recorder and assert the assembled call instead. + """ + captured = {} + sentinel = object() + + class _RecordingTMD: + @classmethod + def _from_tiled(cls, view, box_dim=None, **kwargs): + captured["view"] = view + captured["box_dim"] = box_dim + captured["kwargs"] = kwargs + return sentinel + + # as_tensor_map does `from cuda.core._tensor_map import TensorMapDescriptor` + # on each call, so patching the module attribute swaps the bound name. + monkeypatch.setattr("cuda.core._tensor_map.TensorMapDescriptor", _RecordingTMD) + + src = np.zeros(6, dtype=np.float32) + view = StridedMemoryView.from_any_interface(src, stream_ptr=-1) + result = view.as_tensor_map( + (2, 3), + options="OPT", + element_strides=(1, 1), + data_type="DT", + interleave="IL", + swizzle="SW", + l2_promotion="L2", + oob_fill="OOB", + ) + assert result is sentinel + assert captured["view"] is view + assert captured["box_dim"] == (2, 3) + assert captured["kwargs"] == { + "options": "OPT", + "element_strides": (1, 1), + "data_type": "DT", + "interleave": "IL", + "swizzle": "SW", + "l2_promotion": "L2", + "oob_fill": "OOB", + } + + +def test_as_tensor_map_omits_none_kwargs(monkeypatch): + """Tiled options left as None are not forwarded to ``_from_tiled``.""" + captured = {} + + class _RecordingTMD: + @classmethod + def _from_tiled(cls, _view, _box_dim=None, **kwargs): + captured["kwargs"] = kwargs + return None + + monkeypatch.setattr("cuda.core._tensor_map.TensorMapDescriptor", _RecordingTMD) + view = StridedMemoryView.from_any_interface(np.zeros(6, dtype=np.float32), stream_ptr=-1) + view.as_tensor_map((6,)) + assert captured["kwargs"] == {} + + +def _assert_dlpack_export_roundtrip(src): + # Skip only if NumPy itself can't round-trip this dtype/shape; past the + # probe, a failure on our view is a regression, not an env limitation. + try: + np.from_dlpack(src) + except (BufferError, TypeError, RuntimeError) as e: + pytest.skip(f"NumPy does not support DLPack for {src.dtype} {src.shape}: {e}") + view = StridedMemoryView.from_any_interface(src, stream_ptr=-1) + out = np.from_dlpack(view) + assert out.dtype == src.dtype + assert out.shape == src.shape + assert np.array_equal(out, src) + + +@pytest.mark.parametrize("dtype", _NUMPY_NATIVE_DLPACK_DTYPES) +def test_dlpack_export_roundtrip_dtypes(dtype): + """Export every NumPy-native DLPack dtype through ``StridedMemoryView.__dlpack__``.""" + _assert_dlpack_export_roundtrip(np.zeros((2, 3), dtype=dtype)) + + +@pytest.mark.parametrize( + "shape", + [pytest.param((), id="scalar"), pytest.param((0, 3), id="empty")], +) +def test_dlpack_export_roundtrip_special_shapes(shape): + """Export scalar and zero-volume shapes through ``StridedMemoryView.__dlpack__``.""" + _assert_dlpack_export_roundtrip(np.zeros(shape, dtype=np.complex128)) + + +def test_dlpack_export_unversioned_capsule_and_deleter(): + """``__dlpack__()`` with no ``max_version`` yields an *unversioned* unused + DLPack capsule; dropping it unconsumed runs ``_smv_pycapsule_deleter`` on + the non-versioned branch (freeing the managed tensor).""" + src = np.arange(6, dtype=np.int32) + view = StridedMemoryView.from_any_interface(src, stream_ptr=-1) + capsule = view.__dlpack__() + assert _PyCapsule_IsValid(capsule, b"dltensor") == 1 + assert _PyCapsule_IsValid(capsule, b"dltensor_versioned") == 0 + del capsule # unconsumed -> deleter frees dlm_tensor + + +def test_dlpack_export_versioned_capsule_and_deleter(): + """``__dlpack__(max_version=(1, 0))`` yields a *versioned* unused capsule; + dropping it unconsumed runs the versioned ``_smv_pycapsule_deleter`` branch.""" + src = np.arange(6, dtype=np.int32) + view = StridedMemoryView.from_any_interface(src, stream_ptr=-1) + capsule = view.__dlpack__(max_version=(1, 0)) + assert _PyCapsule_IsValid(capsule, b"dltensor_versioned") == 1 + assert _PyCapsule_IsValid(capsule, b"dltensor") == 0 + del capsule # unconsumed -> versioned deleter frees dlm_tensor_ver + + +def test_from_dlpack_cpu_stream_none_ambiguous(): + """A CPU DLPack source with ``stream_ptr=None`` is rejected as ambiguous.""" + src = np.arange(4, dtype=np.float32) + with pytest.raises(BufferError, match="stream=None is ambiguous"): + StridedMemoryView.from_dlpack(src, stream_ptr=None) + + +def test_from_dlpack_unsupported_device_type(): + """``view_as_dlpack`` rejects a DLPack device that is neither CPU, CUDA, + CUDA-pinned, nor CUDA-managed before ever calling ``__dlpack__``.""" + + class _FakeUnsupportedDevice: + def __dlpack_device__(self): + return (7, 0) # e.g. kDLVulkan -- unsupported by cuda.core + + def __dlpack__(self, **kwargs): + raise AssertionError("__dlpack__ must not be reached") + + with pytest.raises(BufferError, match="device not supported"): + StridedMemoryView.from_dlpack(_FakeUnsupportedDevice(), stream_ptr=0) + + +class _DLPackNoMaxVersion: + """Wraps a NumPy array but rejects the ``max_version`` kwarg, forcing the + TypeError fallback in ``view_as_dlpack`` and an *unversioned* capsule import.""" + + def __init__(self, arr): + self._arr = arr + self.max_versions = [] # max_version seen on each __dlpack__ call, in order + + def __dlpack_device__(self): + return self._arr.__dlpack_device__() + + def __dlpack__(self, *, stream=None, max_version=None, **kwargs): + self.max_versions.append(max_version) + if max_version is not None: + raise TypeError("max_version is not supported") + return self._arr.__dlpack__(stream=stream) + + +def test_from_dlpack_typeerror_fallback_unversioned_import(): + """When ``__dlpack__(max_version=...)`` raises TypeError, view_as_dlpack + retries without it and imports the resulting unversioned capsule; the view + then owns that capsule and frees it on ``__dealloc__``.""" + src = np.arange(6, dtype=np.int32) + wrapper = _DLPackNoMaxVersion(src) + view = StridedMemoryView.from_dlpack(wrapper, stream_ptr=-1) + # Guard the TypeError fallback path: versioned attempt, then legacy retry. + assert len(wrapper.max_versions) == 2, f"expected versioned attempt + retry, got {wrapper.max_versions}" + assert isinstance(wrapper.max_versions[0], tuple) # versioned attempt was made + assert wrapper.max_versions[1] is None # fallback retried without max_version + assert view.ptr == src.ctypes.data + out = np.from_dlpack(view) + assert np.array_equal(out, src) + del view # exercise __dealloc__ on the imported (used) unversioned capsule + + +def test_strided_memory_view_proxy_cai_only_has_dlpack_false(): + """``_StridedMemoryViewProxy`` records ``has_dlpack=False`` for an object + that exposes only ``__cuda_array_interface__`` (check_has_dlpack CAI branch).""" + from cuda.core._memoryview import _StridedMemoryViewProxy + + obj = _make_cuda_array_interface_obj(shape=(2,), strides=None) + proxy = _StridedMemoryViewProxy(obj) + assert proxy.has_dlpack is False + assert proxy.obj is obj + + +def test_view_as_cai_device_pointer_and_stream_ordering(init_cuda): + """``view_as_cai`` on a real device pointer resolves the device ordinal via + ``cuPointerGetAttribute`` and, when the CAI ``stream`` differs from the + consumer stream, establishes stream ordering through an event. + + Uses a synthetic CAI object backed by a genuine device allocation, so the + cupy/numba-only device branch is exercised without those optional deps. + """ + dev = init_cuda + buffer = dev.memory_resource.allocate(64, stream=dev.default_stream) + producer = dev.create_stream() + consumer = dev.create_stream() + obj = _make_cuda_array_interface_obj( + shape=(8,), + strides=None, + typestr=" deleter frees the managed tensor + dev.default_stream.sync() + + +def test_strided_memory_view_repr_with_none_dtype(init_cuda): + """``__repr__`` of a view whose dtype is None renders the dtype via + ``get_simple_repr`` taking the builtins branch (NoneType).""" + dev = init_cuda + buffer = dev.memory_resource.allocate(16, stream=dev.default_stream) + view = StridedMemoryView.from_buffer(buffer, shape=(16,), itemsize=1, dtype=None) + assert view.dtype is None + r = repr(view) + assert r.startswith("StridedMemoryView(ptr=") + assert "dtype=NoneType" in r + + +# --------------------------------------------------------------------------- +# DLPack C exchange API (`__dlpack_c_exchange_api__`) +# +# Drive the C function pointers exposed by the capsule the way a native +# consumer would, exercising the StridedMemoryView exchange-API implementation. +# --------------------------------------------------------------------------- + +_PyCapsule_GetPointer = ctypes.pythonapi.PyCapsule_GetPointer +_PyCapsule_GetPointer.argtypes = (ctypes.py_object, ctypes.c_char_p) +_PyCapsule_GetPointer.restype = ctypes.c_void_p + + +class _DLPackVersion(ctypes.Structure): + _fields_ = [("major", ctypes.c_uint32), ("minor", ctypes.c_uint32)] + + +class _DLPackExchangeAPIHeader(ctypes.Structure): + _fields_ = [("version", _DLPackVersion), ("prev_api", ctypes.c_void_p)] + + +class _DLDevice(ctypes.Structure): + _fields_ = [("device_type", ctypes.c_int), ("device_id", ctypes.c_int32)] + + +class _DLDataType(ctypes.Structure): + _fields_ = [("code", ctypes.c_uint8), ("bits", ctypes.c_uint8), ("lanes", ctypes.c_uint16)] + + +class _DLTensor(ctypes.Structure): + _fields_ = [ + ("data", ctypes.c_void_p), + ("device", _DLDevice), + ("ndim", ctypes.c_int32), + ("dtype", _DLDataType), + ("shape", ctypes.POINTER(ctypes.c_int64)), + ("strides", ctypes.POINTER(ctypes.c_int64)), + ("byte_offset", ctypes.c_uint64), + ] + + +_FN_FROM_PY = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p)) +_FN_TO_PY = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p)) +_FN_DLTENSOR_FROM_PY = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p) +_FN_ALLOCATOR = ctypes.CFUNCTYPE( + ctypes.c_int, ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p), ctypes.c_void_p, ctypes.c_void_p +) +_FN_CURRENT_STREAM = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, ctypes.c_int32, ctypes.POINTER(ctypes.c_void_p)) + + +class _DLPackExchangeAPI(ctypes.Structure): + _fields_ = [ + ("header", _DLPackExchangeAPIHeader), + ("managed_tensor_allocator", _FN_ALLOCATOR), + ("managed_tensor_from_py_object_no_sync", _FN_FROM_PY), + ("managed_tensor_to_py_object_no_sync", _FN_TO_PY), + ("dltensor_from_py_object_no_sync", _FN_DLTENSOR_FROM_PY), + ("current_work_stream", _FN_CURRENT_STREAM), + ] + + +def _get_exchange_api(): + capsule = StridedMemoryView.__dlpack_c_exchange_api__ + ptr = _PyCapsule_GetPointer(capsule, b"dlpack_exchange_api") + assert ptr + return ctypes.cast(ptr, ctypes.POINTER(_DLPackExchangeAPI)).contents + + +def test_dlpack_c_exchange_api_header_version(): + """The exchange-API header advertises a non-zero DLPack version.""" + api = _get_exchange_api() + assert (api.header.version.major, api.header.version.minor) >= (1, 0) + assert not api.header.prev_api + + +def test_dlpack_c_exchange_api_current_work_stream(): + """``current_work_stream`` reports no current stream (cuda.core has none).""" + api = _get_exchange_api() + out = ctypes.c_void_p(123) + rc = api.current_work_stream(int(DLDeviceType.kDLCPU), 0, ctypes.byref(out)) + assert rc == 0 + assert not out.value # set back to NULL + + +def test_dlpack_c_exchange_api_dltensor_from_py_object(): + """``dltensor_from_py_object_no_sync`` fills a borrowed DLTensor from a view.""" + api = _get_exchange_api() + src = np.arange(12, dtype=np.int32).reshape(3, 4) + view = StridedMemoryView.from_any_interface(src, stream_ptr=-1) + out = _DLTensor() + rc = api.dltensor_from_py_object_no_sync(id(view), ctypes.byref(out)) + assert rc == 0 + assert out.ndim == 2 + assert out.device.device_type == int(DLDeviceType.kDLCPU) + assert out.data == src.ctypes.data + assert [out.shape[i] for i in range(out.ndim)] == [3, 4] + + +def _exchange_api_cause(exc): + """Underlying exception raised by the noexcept C fn (surfaced by ctypes as + SystemError, with the real error chained as __cause__ or __context__).""" + return exc.value.__cause__ or exc.value.__context__ + + +def test_dlpack_c_exchange_api_dltensor_from_py_object_type_error(): + """A non-StridedMemoryView py_object is rejected (TypeError, rc=-1).""" + api = _get_exchange_api() + not_a_view = object() + out = _DLTensor() + with pytest.raises(SystemError) as exc: + api.dltensor_from_py_object_no_sync(id(not_a_view), ctypes.byref(out)) + assert isinstance(_exchange_api_cause(exc), TypeError) + + +def test_dlpack_c_exchange_api_managed_tensor_roundtrip(): + """``managed_tensor_from_py_object_no_sync`` produces a managed tensor that + ``managed_tensor_to_py_object_no_sync`` turns back into a StridedMemoryView. + + This exercises the versioned export fill and the capsule-import helper. + The reconstructed view intentionally keeps a reference (the C side holds one + via Py_INCREF), so the managed tensor is not freed here -- avoiding any + double-free across the two calls that share the same tensor. + """ + api = _get_exchange_api() + src = np.arange(6, dtype=np.float64).reshape(2, 3) + view = StridedMemoryView.from_any_interface(src, stream_ptr=-1) + + tensor = ctypes.c_void_p(0) + rc = api.managed_tensor_from_py_object_no_sync(id(view), ctypes.byref(tensor)) + assert rc == 0 + assert tensor.value # non-NULL DLManagedTensorVersioned* + + out_obj = ctypes.c_void_p(0) + rc = api.managed_tensor_to_py_object_no_sync(tensor, ctypes.byref(out_obj)) + assert rc == 0 + assert out_obj.value + imported = ctypes.cast(ctypes.c_void_p(out_obj.value), ctypes.py_object).value + assert isinstance(imported, StridedMemoryView) + assert imported.shape == (2, 3) + assert imported.ptr == src.ctypes.data + + +def test_dlpack_c_exchange_api_to_py_object_null_tensor(): + """``managed_tensor_to_py_object_no_sync`` rejects a NULL tensor (RuntimeError).""" + api = _get_exchange_api() + out_obj = ctypes.c_void_p(0) + with pytest.raises(SystemError) as exc: + api.managed_tensor_to_py_object_no_sync(None, ctypes.byref(out_obj)) + assert isinstance(_exchange_api_cause(exc), RuntimeError) + assert not out_obj.value # set to NULL before the error + + +def test_dlpack_c_exchange_api_managed_tensor_allocator_not_supported(): + """``managed_tensor_allocator`` is unsupported (NotImplementedError).""" + api = _get_exchange_api() + out = ctypes.c_void_p(123) + with pytest.raises(SystemError) as exc: + api.managed_tensor_allocator(None, ctypes.byref(out), None, None) + assert isinstance(_exchange_api_cause(exc), NotImplementedError) + assert not out.value # set to NULL before the error