Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ if(RAPIDSMPF_HAVE_STREAMING)
src/streaming/core/context.cpp
src/streaming/core/fanout.cpp
src/streaming/core/leaf_node.cpp
src/streaming/core/memory_reserve_or_wait.cpp
src/streaming/core/node.cpp
src/streaming/core/spillable_messages.cpp
src/streaming/cudf/partition.cpp
Expand Down
7 changes: 3 additions & 4 deletions cpp/benchmarks/streaming/ndsh/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@

#include <mpi.h>

#include <rapidsmpf/communicator/mpi.hpp>
#include <rapidsmpf/streaming/core/channel.hpp>
#include <rapidsmpf/streaming/core/context.hpp>
#include <rapidsmpf/streaming/core/node.hpp>
#include <rapidsmpf/streaming/cudf/table_chunk.hpp>

#include "rapidsmpf/communicator/mpi.hpp"
#include "rapidsmpf/streaming/core/channel.hpp"
#include "rapidsmpf/streaming/core/node.hpp"

namespace rapidsmpf::ndsh {
namespace detail {

Expand Down
3 changes: 1 addition & 2 deletions cpp/include/rapidsmpf/memory/content_description.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
#include <utility>

#include <rapidsmpf/memory/buffer.hpp>

#include "rapidsmpf/memory/memory_type.hpp"
#include <rapidsmpf/memory/memory_type.hpp>

namespace rapidsmpf {

Expand Down
9 changes: 1 addition & 8 deletions cpp/include/rapidsmpf/streaming/core/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Context {
*
* @return The Options instance.
*/
[[nodiscard]] config::Options get_options() const noexcept;
[[nodiscard]] config::Options options() const noexcept;

/**
* @brief Returns the communicator.
Expand Down Expand Up @@ -138,13 +138,6 @@ class Context {
std::size_t buffer_size
) const noexcept;

/**
* @brief Returns the options.
*
* @return The Options instance.
*/
[[nodiscard]] config::Options const& options() const noexcept;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now options() always returns a copy, rather than a const reference. Is there a particular reason for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplification, config::Options is always backed by a shared pointer internally so the overhead is minimal


private:
config::Options options_;
std::shared_ptr<Communicator> comm_;
Expand Down
175 changes: 175 additions & 0 deletions cpp/include/rapidsmpf/streaming/core/memory_reserve_or_wait.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <optional>
#include <set>

#include <rapidsmpf/config.hpp>
#include <rapidsmpf/memory/memory_reservation.hpp>
#include <rapidsmpf/streaming/core/context.hpp>
#include <rapidsmpf/utils.hpp>

#include <coro/task.hpp>

namespace rapidsmpf::streaming {


/**
* @brief Asynchronous coordinator for memory reservation requests.
*
* `MemoryReserveOrWait` provides a coroutine-based mechanism for reserving
* memory with backpressure. Callers submit reservation requests via
* `reserve_or_wait()`, which suspends until enough memory is available or the
* request times out.
*/
class MemoryReserveOrWait {
public:
/**
* @brief Constructs a `MemoryReserveOrWait` instance.
*
* @param mem_type The memory type for which reservations are requested.
* @param ctx Streaming context.
* @param timeout Optional timeout duration. This timeout applies to how long pending
* requests may wait without making progress. If the timeout expires, a
* `reserve_or_wait()` returns even if no memory became available. If not explicitly
* provided, the timeout is read from the option key `"memory_reserve_timeout_ms"`,
* which defaults to 100 ms.
*/
MemoryReserveOrWait(
MemoryType mem_type,
std::shared_ptr<Context> ctx,
std::optional<Duration> timeout = std::nullopt
);

~MemoryReserveOrWait() noexcept;

/**
* @brief Shuts down all pending memory reservation requests.
*
* @return A coroutine that completes only after all pending requests have been
* cancelled and the periodic memory check task has exited.
*/
Node shutdown();

/**
* @brief Attempts to reserve memory or waits until the reservation can be satisfied.
*
* This coroutine submits a memory reservation request and then suspends until
* either sufficient memory becomes available or no progress is made within the
* configured timeout.
*
* If the timeout expires before the request can be fulfilled, an empty
* `MemoryReservation` is returned.
*
* @param size Number of bytes to reserve.
* @param future_release_potential Estimated number of bytes the requester may release
* in the future, used as a heuristic when selecting which eligible request to satisfy
* first.
* @return A `MemoryReservation` representing the allocated memory, or an empty
* reservation if the timeout expires.
*
* @throws std::runtime_error If shutdown occurs before the request can be processed.
*/
coro::task<MemoryReservation> reserve_or_wait(
std::size_t size, std::size_t future_release_potential
);

/**
* @brief Returns the number of pending memory reservation requests.
*
* It may change concurrently as requests are added or fulfilled.
*
* @return The number of outstanding reservation requests.
*/
[[nodiscard]] std::size_t size() const;

/**
* @brief Returns the number of iterations performed by `periodic_memory_check()`.
*
* This counter is incremented once per loop iteration inside
* `periodic_memory_check()`, and can be useful for diagnostics or testing.
*
* @return The total number of memory-check iterations executed so far.
*/
[[nodiscard]] std::size_t periodic_memory_check_counter() const;

private:
/**
* @brief Represents a single memory reservation request.
*
* A `ResReq` is inserted into a sorted container and processed by
* `periodic_memory_check()`. Each request describes the amount of memory
* needed, an estimate of how much memory may be released in the future, and
* its submission order. A reference to the requester's queue is used to
* deliver the resulting `MemoryReservation` once the request is fulfilled.
*
* The ordering of `ResReq` instances is defined by `operator<`, which sorts
* lexicographically by `(size, future_release_potential, sequence_number)`.
*/
struct ResReq {
/// @brief The number of bytes requested.
std::size_t size;

/// @brief Estimated number of bytes expected to be released in the future.
std::size_t future_release_potential;

/// @brief Monotonically increasing identifier used to preserve submission order.
std::uint64_t sequence_number;

/// @brief Queue into which a reservation is pushed once the request is satisfied.
coro::queue<MemoryReservation>& queue;

/// @brief Lexicographic ordering.
friend bool operator<(ResReq const& a, ResReq const& b) {
return std::tie(a.size, a.future_release_potential, a.sequence_number)
< std::tie(b.size, b.future_release_potential, b.sequence_number);
}
};

/**
* @brief Periodically processes pending memory reservation requests.
*
* This coroutine drives the asynchronous mechanism of `MemoryReserveOrWait`.
* It repeatedly:
* - Queries the currently available memory for the configured memory type.
* - Identifies all pending reservation requests whose `size` fits within the
* available memory.
* - Among those, selects the request with the largest `future_release_potential`.
* - Fulfills the request by creating a `MemoryReservation` and pushing it into
* the requester's queue.
*
* If no reservation request can be satisfied for longer than `timeout_`, the
* coroutine forces progress by selecting the smallest pending request and
* attempting a reservation for it. This may produce an empty reservation if the
* request still cannot be satisfied.
*
* Shutdown and lifetime coordination
* ----------------------------------
* A periodic memory check task is spawned on demand when the first pending
* request is enqueued, and it exits once all requests have been extracted.
*
* The task is spawned as a joinable coroutine. `shutdown()` and the destructor
* await the joinable task (if present) to ensure `periodic_memory_check()` has
* fully exited before object teardown. This avoids dangling references to
* members accessed by the coroutine.
*
* @return A coroutine that completes only once all pending requests have been
* extracted and all in-flight work has finished.
*/
coro::task<void> periodic_memory_check();

mutable std::mutex mutex_;
std::uint64_t sequence_counter{0};
MemoryType const mem_type_;
std::shared_ptr<Context> ctx_;
Duration const timeout_;
std::set<ResReq> reservation_requests_;
std::atomic<std::uint64_t> periodic_memory_check_counter_{0};
std::optional<coro::task<void>> periodic_memory_check_task_;
};

} // namespace rapidsmpf::streaming
3 changes: 1 addition & 2 deletions cpp/src/allgather/allgather.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
#include <rapidsmpf/allgather/allgather.hpp>
#include <rapidsmpf/communicator/communicator.hpp>
#include <rapidsmpf/memory/buffer.hpp>
#include <rapidsmpf/nvtx.hpp>
#include <rapidsmpf/progress_thread.hpp>
#include <rapidsmpf/utils.hpp>

#include "rapidsmpf/nvtx.hpp"

namespace rapidsmpf::allgather {
namespace detail {

Expand Down
6 changes: 1 addition & 5 deletions cpp/src/streaming/core/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ Context::~Context() noexcept {
br_->spill_manager().remove_spill_function(spill_function_id_);
}

config::Options Context::get_options() const noexcept {
config::Options Context::options() const noexcept {
return options_;
}

Expand Down Expand Up @@ -156,10 +156,6 @@ std::shared_ptr<BoundedQueue> Context::create_bounded_queue(
return std::shared_ptr<BoundedQueue>(new BoundedQueue(buffer_size));
}

config::Options const& Context::options() const noexcept {
return options_;
}

std::shared_ptr<SpillableMessages> Context::spillable_messages() const noexcept {
return spillable_messages_;
}
Expand Down
Loading