Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 79 additions & 6 deletions cpp/include/rapidsmpf/buffer/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,42 @@
#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>

#include <rapidsmpf/buffer/pinned_memory_resource.hpp>
#include <rapidsmpf/cuda_event.hpp>
#include <rapidsmpf/error.hpp>
#include <rapidsmpf/utils.hpp>

namespace rapidsmpf {

/// @brief Enum representing the type of memory.
enum class MemoryType : int {
enum class MemoryType : uint8_t {
DEVICE = 0, ///< Device memory
HOST = 1 ///< Host memory
PINNED_HOST = 1, ///< Pinned host memory
HOST = 2 ///< Host memory
};

/// @brief The lowest memory type that can be spilled to.
constexpr MemoryType LowestSpillType = MemoryType::HOST;

/// @brief Array of all the different memory types.
/// @note Ensure that this array is always sorted in decreasing order of preference.
constexpr std::array<MemoryType, 2> MEMORY_TYPES{{MemoryType::DEVICE, MemoryType::HOST}};
constexpr std::array<MemoryType, 3> MEMORY_TYPES{
{MemoryType::DEVICE, MemoryType::PINNED_HOST, MemoryType::HOST}
};

/**
* @brief Converts a memory type to a string.
*
* @param mem_type The memory type to convert.
* @return The string view of the memory type.
*/
constexpr std::string_view memory_type_to_string(MemoryType mem_type) {
constexpr std::array<const char*, std::size(MEMORY_TYPES)> mem_type_names{
{"Device", "PinnedHost", "Host"}
};

return mem_type_names[static_cast<std::size_t>(mem_type)];
}

/**
* @brief Buffer representing device or host memory.
Expand All @@ -60,10 +78,13 @@ class Buffer {
/// @brief Storage type for the host buffer.
using HostStorageT = std::unique_ptr<std::vector<uint8_t>>;

/// @brief Storage type for the pinned host buffer.
using PinnedHostStorageT = std::unique_ptr<PinnedHostBuffer>;

/**
* @brief Storage type in Buffer, which could be either host or device memory.
* @brief Storage type in Buffer, which could be device, pinned host, or host memory.
*/
using StorageT = std::variant<DeviceStorageT, HostStorageT>;
using StorageT = std::variant<DeviceStorageT, PinnedHostStorageT, HostStorageT>;

/**
* @brief Access the underlying host memory buffer (const).
Expand All @@ -85,6 +106,16 @@ class Buffer {
*/
[[nodiscard]] DeviceStorageT const& device() const;

/**
* @brief Access the underlying pinned host memory buffer (const).
*
* @return A const reference to the unique pointer managing the pinned host memory.
*
* @throws std::logic_error if the buffer does not manage pinned host memory.
* @throws std::logic_error If the buffer is locked.
*/
[[nodiscard]] PinnedHostStorageT const& pinned_host() const;

/**
* @brief Access the underlying memory buffer (host or device memory).
*
Expand Down Expand Up @@ -201,7 +232,10 @@ class Buffer {
return std::visit(
overloaded{
[](HostStorageT const&) -> MemoryType { return MemoryType::HOST; },
[](DeviceStorageT const&) -> MemoryType { return MemoryType::DEVICE; }
[](DeviceStorageT const&) -> MemoryType { return MemoryType::DEVICE; },
[](PinnedHostStorageT const&) -> MemoryType {
return MemoryType::PINNED_HOST;
}
},
storage_
);
Expand Down Expand Up @@ -297,6 +331,25 @@ class Buffer {
*/
Buffer(std::unique_ptr<rmm::device_buffer> device_buffer);

/**
* @brief Construct a stream-ordered Buffer from pinned host memory.
*
* Adopts @p pinned_host_buffer as the Buffer's storage and inherits its CUDA stream.
* At construction, the Buffer records an initial "latest write" on that stream,
* so `is_latest_write_done()` will become `true` once all work enqueued on the
* adopted stream up to this point has completed.
*
* @note No synchronization is performed by the constructor. Any producer that
* initialized or modified @p pinned_host_buffer must have enqueued that work on the
* same stream (or established ordering with it) for correctness.
*
* @param pinned_host_buffer Unique pointer to a pinned host buffer. Must be non-null.
*
* @throws std::invalid_argument If @p pinned_host_buffer is null.
* @throws std::logic_error If the buffer is locked.
*/
Buffer(std::unique_ptr<PinnedHostBuffer> pinned_host_buffer);

/**
* @brief Throws if the buffer is currently locked by `exclusive_data_access()`.
*
Expand Down Expand Up @@ -324,6 +377,16 @@ class Buffer {
*/
[[nodiscard]] DeviceStorageT& device();

/**
* @brief Access the underlying pinned host memory buffer.
*
* @return A reference to the unique pointer managing the pinned host memory.
*
* @throws std::logic_error if the buffer does not manage pinned host memory.
* @throws std::logic_error If the buffer is locked.
*/
[[nodiscard]] PinnedHostStorageT& pinned_host();

/**
* @brief Release the underlying device memory buffer.
*
Expand All @@ -344,6 +407,16 @@ class Buffer {
*/
[[nodiscard]] HostStorageT release_host();

/**
* @brief Release the underlying pinned host memory buffer.
*
* @return The underlying pinned host memory buffer.
*
* @throws std::logic_error if the buffer does not manage pinned host memory.
* @throws std::logic_error If the buffer is locked.
*/
[[nodiscard]] PinnedHostStorageT release_pinned_host();

public:
std::size_t const size; ///< The size of the buffer in bytes.

Expand Down
20 changes: 19 additions & 1 deletion cpp/include/rapidsmpf/buffer/pinned_memory_resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ class PinnedMemoryResource; // forward declaration
*
* @sa https://github.com/rapidsai/rmm/issues/1931
*/
struct PinnedPoolProperties {};
struct PinnedPoolProperties {
size_t max_pool_size = 0; ///< The maximum size of the pool in bytes.
size_t initial_pool_size = 0; ///< The initial size of the pool in bytes.
};

/**
* @brief A pinned host memory pool for stream-ordered allocations/deallocations. This
Expand Down Expand Up @@ -112,6 +115,13 @@ class PinnedMemoryPool {
return properties_;
}

/**
* @brief Gets the native handle of the pinned memory pool.
*
* @return The native handle of the pinned memory pool.
*/
[[nodiscard]] cudaMemPool_t native_handle() const noexcept;

private:
PinnedPoolProperties properties_; ///< Configuration properties for this pool.

Expand Down Expand Up @@ -141,6 +151,13 @@ class PinnedMemoryResource {
const PinnedMemoryResource&, cuda::mr::host_accessible
) noexcept {}

/**
* @brief Friend function to get the host_accessible property.
*/
friend constexpr void get_property(
const PinnedMemoryResource&, cuda::mr::device_accessible
) noexcept {}

/**
* @brief Constructs a new pinned memory resource.
*
Expand Down Expand Up @@ -235,6 +252,7 @@ class PinnedMemoryResource {
};

static_assert(cuda::mr::resource_with<PinnedMemoryResource, cuda::mr::host_accessible>);
static_assert(cuda::mr::resource_with<PinnedMemoryResource, cuda::mr::device_accessible>);

/**
* @brief A buffer that manages stream-ordered pinned host memory. Only available for CUDA
Expand Down
100 changes: 99 additions & 1 deletion cpp/include/rapidsmpf/buffer/resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <rmm/cuda_stream_pool.hpp>

#include <rapidsmpf/buffer/buffer.hpp>
#include <rapidsmpf/buffer/pinned_memory_resource.hpp>
#include <rapidsmpf/buffer/spill_manager.hpp>
#include <rapidsmpf/error.hpp>
#include <rapidsmpf/rmm_resource_adaptor.hpp>
Expand Down Expand Up @@ -143,6 +144,37 @@ class BufferResource {
*/
using MemoryAvailable = std::function<std::int64_t()>;


/**
* @brief Constructs a buffer resource when pinned host memory resource is available.
*
* @param device_mr Reference to the RMM device memory resource used for device
* allocations.
* @param pinned_host_mr Reference to the pinned host memory resource used for pinned
* host allocations.
* @param memory_available Optional memory availability functions mapping memory types
* to available memory checkers. Memory types without availability functions are
* assumed to have unlimited memory.
* @param periodic_spill_check Enable periodic spill checks. A dedicated thread
* continuously checks and perform spilling based on the memory availability
* functions. The value of `periodic_spill_check` is used as the pause between checks.
* If `std::nullopt`, no periodic spill check is performed.
* @param stream_pool Pool of CUDA streams. Used throughout RapidsMPF for operations
* that do not take an explicit CUDA stream.
* @param statistics The statistics instance to use (disabled by default).
*
* @throws std::runtime_error If pinned host memory resource is not available.
*/
BufferResource(
rmm::device_async_resource_ref device_mr,
std::shared_ptr<PinnedMemoryResource> pinned_host_mr,
std::unordered_map<MemoryType, MemoryAvailable> memory_available = {},
std::optional<Duration> periodic_spill_check = std::chrono::milliseconds{1},
std::shared_ptr<rmm::cuda_stream_pool> stream_pool = std::make_shared<
rmm::cuda_stream_pool>(16, rmm::cuda_stream::flags::non_blocking),
std::shared_ptr<Statistics> statistics = Statistics::disabled()
);

/**
* @brief Constructs a buffer resource.
*
Expand Down Expand Up @@ -179,6 +211,15 @@ class BufferResource {
return device_mr_;
}

/**
* @brief Get the pinned host memory resource.
*
* @return Reference to the memory resource used for pinned host allocations.
*
* @throws std::runtime_error If pinned host memory resource is not available.
*/
[[nodiscard]] rmm::host_device_async_resource_ref pinned_host_mr() const;

/**
* @brief Retrieves the memory availability function for a given memory type.
*
Expand Down Expand Up @@ -212,6 +253,15 @@ class BufferResource {
return memory_reserved_[static_cast<std::size_t>(mem_type)];
}

/**
* @brief Check if pinned host memory resource is available.
*
* @return True if pinned host memory resource is available, false otherwise.
*/
[[nodiscard]] inline bool is_pinned_memory_available() const noexcept {
return is_pinned_memory_resources_supported() && pinned_host_mr_;
}

/**
* @brief Reserve an amount of the specified memory type.
*
Expand All @@ -230,6 +280,9 @@ class BufferResource {
* @return A pair containing the reservation and the amount of overbooking. On success
* the size of the reservation always equals `size` and on failure the size always
* equals zero (a zero-sized reservation never fails).
*
* @throws std::invalid_argument if the memory type is `PINNED_HOST` and pinned host
* memory resource is not available/ not supported.
*/
std::pair<MemoryReservation, std::size_t> reserve(
MemoryType mem_type, size_t size, bool allow_overbooking
Expand Down Expand Up @@ -260,6 +313,10 @@ class BufferResource {
* the order they appear in `MEMORY_TYPES`.
* @return A memory reservation.
* @throws std::runtime_error if no memory reservation was made.
*
* @throws std::invalid_argument if the @p mem_type is `PINNED_HOST` and pinned host
* memory resource is not available.
*
*/
[[nodiscard]] MemoryReservation reserve_or_fail(
size_t size, std::optional<MemoryType> mem_type = std::nullopt
Expand Down Expand Up @@ -287,8 +344,9 @@ class BufferResource {
* @param reservation The reservation to use for memory allocations.
* @return A unique pointer to the allocated Buffer.
*
* @throws std::invalid_argument if the memory type does not match the reservation.
* @throws std::overflow_error if `size` exceeds the size of the reservation.
* @throws std::invalid_argument if @p reservation is PINNED_HOST and pinned host
* memory resource is not available.
*/
std::unique_ptr<Buffer> allocate(
std::size_t size, rmm::cuda_stream_view stream, MemoryReservation& reservation
Expand Down Expand Up @@ -327,6 +385,25 @@ class BufferResource {
std::unique_ptr<rmm::device_buffer> data, rmm::cuda_stream_view stream
);

/**
* @brief Move pinned host buffer data into a Buffer.
*
* This operation is cheap; no copy is performed. The resulting Buffer resides in
* pinned host memory.
*
* If @p stream differs from the pinned host buffer's current stream:
* - @p stream is synchronized with the pinned host buffer's current stream, and
* - the pinned host buffer's current stream is updated to @p stream.
*
* @param data Unique pointer to the pinned host buffer.
* @param stream CUDA stream associated with the new Buffer. Use or synchronize with
* this stream when operating on the Buffer.
* @return Unique pointer to the resulting Buffer.
*/
std::unique_ptr<Buffer> move(
std::unique_ptr<PinnedHostBuffer> data, rmm::cuda_stream_view stream
);

/**
* @brief Move a Buffer to the memory type specified by the reservation.
*
Expand Down Expand Up @@ -377,6 +454,24 @@ class BufferResource {
std::unique_ptr<Buffer> buffer, MemoryReservation& reservation
);

/**
* @brief Move a Buffer into a pinned host buffer.
*
* If the Buffer already resides in pinned host memory, a cheap move is performed.
* Otherwise, the Buffer is copied to pinned host memory using its own CUDA stream.
*
* @param buffer Buffer to move.
* @param reservation Memory reservation used if a copy is required.
* @return Unique pointer to the resulting pinned host buffer.
*
* @throws std::invalid_argument If the reservation's memory type isn't pinned host
* memory.
* @throws std::overflow_error If the allocation size exceeds the reservation.
*/
std::unique_ptr<PinnedHostBuffer> move_to_pinned_host_buffer(
std::unique_ptr<Buffer> buffer, MemoryReservation& reservation
);

/**
* @brief Returns the CUDA stream pool used by this buffer resource.
*
Expand Down Expand Up @@ -404,6 +499,9 @@ class BufferResource {
private:
std::mutex mutex_;
rmm::device_async_resource_ref device_mr_;

std::shared_ptr<PinnedMemoryResource> pinned_host_mr_{};

std::unordered_map<MemoryType, MemoryAvailable> memory_available_;
// Zero initialized reserved counters.
std::array<std::size_t, MEMORY_TYPES.size()> memory_reserved_ = {};
Expand Down
Loading