Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 81 additions & 1 deletion cpp/include/rapidsmpf/integrations/cudf/partition.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
* SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: Apache-2.0
*/
#pragma once
Expand Down Expand Up @@ -209,4 +209,84 @@ std::vector<PackedData> unspill_partitions(
std::shared_ptr<Statistics> statistics = Statistics::disabled()
);

/// @brief The amount of extra memory to reserve for packing.
constexpr size_t packing_wiggle_room_per_column = 1024; ///< 1 KiB per column

/**
* @brief The total amount of extra memory to reserve for packing.
*
* @param table The table to pack.
* @return The total amount of extra memory to reserve for packing.
*/
inline size_t total_packing_wiggle_room(cudf::table_view const& table) {
return packing_wiggle_room_per_column * static_cast<size_t>(table.num_columns());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: This is likely not enough if the table has many nested columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

/**
* @brief Pack a table using a @p chunk_size device buffer using `cudf::chunked_pack`.
*
* All device operations will be performed on @p bounce_buf 's stream.
* `cudf::chunked_pack` requires the buffer to be at least 1 MiB in size.
*
* @param table The table to pack.
* @param bounce_buf A device bounce buffer to use for packing.
* @param data_res Memory reservation for the data buffer. If the final packed buffer size
* is with in a wiggle room, this @p data_res will be padded to the packed buffer size.
*
* @return A `PackedData` containing the packed table.
*
* @throws std::runtime_error If the memory allocation fails.
* @throws std::invalid_argument If the bounce buffer is not in device memory.
*/
PackedData chunked_pack(
cudf::table_view const& table, Buffer& bounce_buf, MemoryReservation& data_res
);

/// @brief The minimum buffer size for `cudf::chunked_pack`.
constexpr size_t cudf_chunked_pack_min_buffer_size = size_t(1) << 20; ///< 1 MiB

/**
* @brief Pack a table to host memory using `cudf::pack` or `cudf::chunked_pack`.
*
* Based on benchmarks (rapidsai/rapidsmpf#745), the order of packing performance is as
* follows:
* - `cudf::pack` -> DEVICE
* - `cudf::chunked_pack` -> DEVICE
* - `cudf::pack` -> PINNED_HOST
* - `cudf::chunked_pack` -> PINNED HOST
*
* This utility using the following strategy:
* - data reservation must be big enough to pack the table.
* - if the data reservation is from device accessible memory, use cudf::pack, as it
* requires O(estimated_table_size) memory, which is already reserved up front.
* - if the data reservation is from host memory, for each memory type in @p
* bounce_buf_types, do the following:
* - try to reserve estimated_table_size for the memory type.
* - if the reservation is successful without overbooking, use cudf::pack, and move the
* packed data device buffer to the data reservation.
* - else if the leftover memory `>= cudf_chunked_pack_min_buffer_size`, allocate a
* device accessible bounce buffer, and use chunked_pack to pack to the data
* reservation.
* - else loop again with the next memory type.
* - if all memory types are tried and no success, fail.
*
* @param table The table to pack.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param data_res Memory reservation for the host data buffer.
* @param bounce_buf_types The memory types to use for the bounce buffer. Default is
* `DEVICE_ACCESSIBLE_MEMORY_TYPES`.
*
* @return A `PackedData` containing the packed table.
*
* @throws std::invalid_argument If the memory reservation is not big enough to pack the
* table.
* @throws std::runtime_error If all attempts to pack the table fail.
*/
std::unique_ptr<PackedData> pack(
cudf::table_view const& table,
rmm::cuda_stream_view stream,
MemoryReservation& data_res,
std::span<MemoryType const> bounce_buf_types = DEVICE_ACCESSIBLE_MEMORY_TYPES
);

} // namespace rapidsmpf
56 changes: 46 additions & 10 deletions cpp/include/rapidsmpf/memory/buffer_resource.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
* SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down Expand Up @@ -107,13 +107,30 @@ class BufferResource {
*
* @return Reference to the RMM resource used for pinned host allocations.
*/
[[nodiscard]] rmm::host_async_resource_ref pinned_mr() {
RAPIDSMPF_EXPECTS(
pinned_mr_, "no pinned memory resource is available", std::invalid_argument
);
return *pinned_mr_;
[[nodiscard]] rmm::host_device_async_resource_ref pinned_mr() {
return get_checked_pinned_mr();
}

/**
* @brief Get the RMM device memory resource for a given memory type.
*
* @param mem_type The memory type.
* @return Reference to the RMM resource used for device allocations.
* @throws std::invalid_argument if the memory type is not device accessible.
*/
[[nodiscard]] rmm::device_async_resource_ref get_device_mr(
MemoryType const& mem_type
);

/**
* @brief Get the RMM host memory resource for a given memory type.
*
* @param mem_type The memory type.
* @return Reference to the RMM resource used for host allocations.
* @throws std::invalid_argument if the memory type is not host accessible.
*/
[[nodiscard]] rmm::host_async_resource_ref get_host_mr(MemoryType const& mem_type);

/**
* @brief Retrieves the memory availability function for a given memory type.
*
Expand Down Expand Up @@ -211,7 +228,9 @@ class BufferResource {
return std::move(res);
}
}
RAPIDSMPF_FAIL("failed to reserve memory", std::runtime_error);
RAPIDSMPF_FAIL(
"failed to reserve memory " + std::to_string(size), std::runtime_error
);
}

/**
Expand Down Expand Up @@ -271,10 +290,10 @@ class BufferResource {
);

/**
* @brief Move device buffer data into a Buffer.
* @brief Move device/ pinned host buffer data into a Buffer.
*
* This operation is cheap; no copy is performed. The resulting Buffer resides in
* device memory.
* device/ pinned host memory.
*
* If @p stream differs from the device buffer's current stream:
* - @p stream is synchronized with the device buffer's current stream, and
Expand All @@ -283,10 +302,15 @@ class BufferResource {
* @param data Unique pointer to the device buffer.
* @param stream CUDA stream associated with the new Buffer. Use or synchronize with
* this stream when operating on the Buffer.
* @param mem_type The memory type of the device buffer. Defaults to
* `MemoryType::DEVICE`.
*
* @return Unique pointer to the resulting Buffer.
*/
std::unique_ptr<Buffer> move(
std::unique_ptr<rmm::device_buffer> data, rmm::cuda_stream_view stream
std::unique_ptr<rmm::device_buffer> data,
rmm::cuda_stream_view stream,
MemoryType mem_type = MemoryType::DEVICE
);

/**
Expand Down Expand Up @@ -364,6 +388,18 @@ class BufferResource {
std::shared_ptr<Statistics> statistics();

private:
/**
* @brief Get the RMM pinned host memory resource.
*
* @return Reference to the RMM resource used for pinned host allocations.
*/
[[nodiscard]] PinnedMemoryResource& get_checked_pinned_mr() {
RAPIDSMPF_EXPECTS(
pinned_mr_, "no pinned memory resource is available", std::invalid_argument
);
return *pinned_mr_;
}

std::mutex mutex_;
rmm::device_async_resource_ref device_mr_;
std::shared_ptr<PinnedMemoryResource> pinned_mr_;
Expand Down
38 changes: 37 additions & 1 deletion cpp/include/rapidsmpf/memory/memory_type.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
* SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: Apache-2.0
*/
#pragma once
Expand All @@ -9,6 +9,8 @@
#include <ranges>
#include <span>

#include <rapidsmpf/utils.hpp>

namespace rapidsmpf {

/// @brief Enum representing the type of memory sorted in decreasing order of preference.
Expand Down Expand Up @@ -40,6 +42,20 @@ constexpr std::array<MemoryType, 2> SPILL_TARGET_MEMORY_TYPES{
{MemoryType::PINNED_HOST, MemoryType::HOST}
};

/**
* @brief Memory types that are device accessible in the order of preference.
*/
constexpr std::array<MemoryType, 2> DEVICE_ACCESSIBLE_MEMORY_TYPES{
{MemoryType::DEVICE, MemoryType::PINNED_HOST}
};

/**
* @brief Memory types that are host accessible in the order of preference.
*/
constexpr std::array<MemoryType, 2> HOST_ACCESSIBLE_MEMORY_TYPES{
{MemoryType::PINNED_HOST, MemoryType::HOST}
};

/**
* @brief Get the memory types with preference lower than or equal to @p mem_type.
*
Expand All @@ -65,6 +81,26 @@ static_assert(std::ranges::equal(
leq_memory_types(static_cast<MemoryType>(-1)), std::ranges::empty_view<MemoryType>{}
));

/**
* @brief Check if the memory type is host accessible.
*
* @param mem_type The memory type.
* @return True if the memory type is host accessible, false otherwise.
*/
constexpr bool is_host_accessible(MemoryType const& mem_type) noexcept {
return contains(HOST_ACCESSIBLE_MEMORY_TYPES, mem_type);
};

/**
* @brief Check if the memory type is device accessible.
*
* @param mem_type The memory type.
* @return True if the memory type is device accessible, false otherwise.
*/
constexpr bool is_device_accessible(MemoryType const& mem_type) noexcept {
return contains(DEVICE_ACCESSIBLE_MEMORY_TYPES, mem_type);
};

/**
* @brief Get the name of a MemoryType.
*
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/cuda_event.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
* SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down Expand Up @@ -45,15 +45,15 @@ void CudaEvent::record(rmm::cuda_stream_view stream) {
RAPIDSMPF_CUDA_TRY(cudaEventRecord(event_, stream));
}

[[nodiscard]] bool CudaEvent::CudaEvent::is_ready() const {
[[nodiscard]] bool CudaEvent::is_ready() const {
auto result = cudaEventQuery(event_);
if (result != cudaSuccess && result != cudaErrorNotReady) {
RAPIDSMPF_CUDA_TRY(result);
}
return result == cudaSuccess;
}

void CudaEvent::CudaEvent::host_wait() const {
void CudaEvent::host_wait() const {
RAPIDSMPF_CUDA_TRY(cudaEventSynchronize(event_));
}

Expand Down
Loading
Loading