Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ if(RAPIDSMPF_HAVE_STREAMING)
src/streaming/core/context.cpp
src/streaming/core/fanout.cpp
src/streaming/core/leaf_node.cpp
src/streaming/core/memory_reserve_or_wait.cpp
src/streaming/core/node.cpp
src/streaming/core/spillable_messages.cpp
src/streaming/cudf/partition.cpp
Expand Down
9 changes: 4 additions & 5 deletions cpp/benchmarks/streaming/ndsh/utils.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
* SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: Apache-2.0
*/

Expand All @@ -11,13 +11,12 @@

#include <mpi.h>

#include <rapidsmpf/communicator/mpi.hpp>
#include <rapidsmpf/streaming/core/channel.hpp>
#include <rapidsmpf/streaming/core/context.hpp>
#include <rapidsmpf/streaming/core/node.hpp>
#include <rapidsmpf/streaming/cudf/table_chunk.hpp>

#include "rapidsmpf/communicator/mpi.hpp"
#include "rapidsmpf/streaming/core/channel.hpp"
#include "rapidsmpf/streaming/core/node.hpp"

namespace rapidsmpf::ndsh {
namespace detail {

Expand Down
5 changes: 2 additions & 3 deletions cpp/include/rapidsmpf/memory/content_description.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
* SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: Apache-2.0
*/
#pragma once
Expand All @@ -8,8 +8,7 @@
#include <utility>

#include <rapidsmpf/memory/buffer.hpp>

#include "rapidsmpf/memory/memory_type.hpp"
#include <rapidsmpf/memory/memory_type.hpp>

namespace rapidsmpf {

Expand Down
11 changes: 2 additions & 9 deletions cpp/include/rapidsmpf/streaming/core/context.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
* SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down Expand Up @@ -69,7 +69,7 @@ class Context {
*
* @return The Options instance.
*/
[[nodiscard]] config::Options get_options() const noexcept;
[[nodiscard]] config::Options options() const noexcept;

/**
* @brief Returns the communicator.
Expand Down Expand Up @@ -138,13 +138,6 @@ class Context {
std::size_t buffer_size
) const noexcept;

/**
* @brief Returns the options.
*
* @return The Options instance.
*/
[[nodiscard]] config::Options const& options() const noexcept;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now options() always returns a copy, rather than a const reference. Is there a particular reason for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplification, config::Options is always backed by a shared pointer internally so the overhead is minimal


private:
config::Options options_;
std::shared_ptr<Communicator> comm_;
Expand Down
209 changes: 209 additions & 0 deletions cpp/include/rapidsmpf/streaming/core/memory_reserve_or_wait.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <optional>
#include <set>

#include <rapidsmpf/config.hpp>
#include <rapidsmpf/memory/memory_reservation.hpp>
#include <rapidsmpf/streaming/core/context.hpp>
#include <rapidsmpf/utils.hpp>

#include <coro/task.hpp>

namespace rapidsmpf::streaming {


/**
* @brief Asynchronous coordinator for memory reservation requests.
*
* `MemoryReserveOrWait` provides a coroutine-based mechanism for reserving
* memory with backpressure. Callers submit reservation requests via
* `reserve_or_wait()`, which suspends until enough memory is available or
* progress must be forced.
*/
class MemoryReserveOrWait {
public:
/**
* @brief Constructs a `MemoryReserveOrWait` instance.
*
* If no reservation request can be satisfied within @p timeout, the coroutine
* forces progress by selecting the smallest pending request and attempting to
* reserve memory for it. This attempt may result in an empty reservation if the
* request still cannot be satisfied.
Comment on lines +34 to +37
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so the idea will be that you still need to check your reservation gave you enough space.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, and you will have to decide if you want to overbook or maybe you have a low-memory mode

*
* @param mem_type The memory type for which reservations are requested.
* @param ctx Streaming context.
* @param timeout Timeout duration. If not explicitly provided, the timeout is read
* from the option key `"memory_reserve_timeout_ms"`, which defaults to 100 ms.
*/
MemoryReserveOrWait(
MemoryType mem_type,
std::shared_ptr<Context> ctx,
std::optional<Duration> timeout = std::nullopt
);

~MemoryReserveOrWait() noexcept;

/**
* @brief Shuts down all pending memory reservation requests.
*
* @return A coroutine that completes only after all pending requests have been
* cancelled and the periodic memory check task has exited.
*/
Node shutdown();

/**
* @brief Attempts to reserve memory or waits until progress can be made.
*
* This coroutine submits a memory reservation request and then suspends until
* either sufficient memory becomes available or no reservation request (including
* other pending requests) makes progress within the configured timeout.
*
* The timeout does not apply specifically to this request. Instead, it is used as
* a global progress guarantee: if no pending reservation request can be satisfied
* within timeout, `MemoryReserveOrWait` forces progress by selecting the smallest
* pending request and attempting to reserve memory for it. The forced reservation
* attempt may result in an empty `MemoryReservation` if the selected request still
* cannot be satisfied.
*
* When multiple reservation requests are eligible, `MemoryReserveOrWait` uses
* @p future_release_potential as a heuristic to prefer requests that are expected
* to free memory sooner. Operations that do not free memory, for example reading
* data from disk into memory, should use a value of zero. Operations that are
* expected to reduce memory usage, for example a reduction such as a sum, should
* use a value corresponding to the amount of input data that will be released
* once the operation completes.
*
* @param size Number of bytes to reserve.
* @param future_release_potential Estimated number of bytes the requester may
* release in the future.
* @return A `MemoryReservation` representing the allocated memory, or an empty
* reservation if progress could not be made.
*
* @throws std::runtime_error If shutdown occurs before the request can be processed.
*/
coro::task<MemoryReservation> reserve_or_wait(
std::size_t size, std::size_t future_release_potential
);

/**
* @brief Variant of `reserve_or_wait()` that allows overbooking on timeout.
*
* This coroutine behaves identically to `reserve_or_wait()` with respect to
* request submission, waiting, and progress guarantees. The only difference is
* the behavior when the progress timeout expires.
*
* If no reservation request can be satisfied before the timeout, this method
* attempts to reserve the requested memory by allowing overbooking. This
* guarantees forward progress, but may exceed the configured memory limits.
*
* @param size Number of bytes to reserve.
* @param future_release_potential Estimated number of bytes the requester may
* release in the future.
* @return A pair consisting of:
* - A `MemoryReservation` representing the allocated memory.
* - The number of bytes by which the reservation overbooked the available
* memory. This value is zero if no overbooking occurred.
*
* @throws std::runtime_error If shutdown occurs before the request can be processed.
*
* @see reserve_or_wait()
*/
coro::task<std::pair<MemoryReservation, std::size_t>> reserve_or_wait_or_overbook(
std::size_t size, std::size_t future_release_potential
);

/**
* @brief Returns the number of pending memory reservation requests.
*
* It may change concurrently as requests are added or fulfilled.
*
* @return The number of outstanding reservation requests.
*/
[[nodiscard]] std::size_t size() const noexcept;

/**
* @brief Returns the number of iterations performed by `periodic_memory_check()`.
*
* This counter is incremented once per loop iteration inside
* `periodic_memory_check()`, and can be useful for diagnostics or testing.
*
* @return The total number of memory-check iterations executed so far.
*/
[[nodiscard]] std::size_t periodic_memory_check_counter() const noexcept;

private:
/**
* @brief Represents a single memory reservation request.
*
* A `Request` is inserted into a sorted container and processed by
* `periodic_memory_check()`.
*/
struct Request {
/// @brief The number of bytes requested.
std::size_t size;

/// @brief Estimated number of bytes expected to be released in the future.
std::size_t future_release_potential;

/// @brief Monotonically increasing identifier used to preserve submission order.
std::uint64_t sequence_number;

/// @brief Queue into which a reservation is pushed once the request is satisfied.
coro::queue<MemoryReservation>& queue;

/// @brief Ordering by `size` and `sequence_number` (ascending).
friend bool operator<(Request const& a, Request const& b) {
return std::tie(a.size, a.sequence_number)
< std::tie(b.size, b.sequence_number);
}
};

/**
* @brief Periodically processes pending memory reservation requests.
*
* This coroutine drives the asynchronous mechanism of `MemoryReserveOrWait`.
* It repeatedly:
* - Queries the currently available memory for the configured memory type.
* - Identifies all pending reservation requests whose `size` fits within the
* available memory.
* - Among those, selects the request with the largest `future_release_potential`.
* - Fulfills the request by creating a `MemoryReservation` and pushing it into
* the requester's queue.
*
* If no reservation request can be satisfied for longer than `timeout_`, the
* coroutine forces progress by selecting the smallest pending request and
* attempting a reservation for it. This may produce an empty reservation if the
* request still cannot be satisfied.
*
* Shutdown and lifetime coordination
* ----------------------------------
* A periodic memory check task is spawned on demand when the first pending
* request is enqueued, and it exits once all requests have been extracted.
*
* The task is spawned as a joinable coroutine. `shutdown()` and the destructor
* await the joinable task (if present) to ensure `periodic_memory_check()` has
* fully exited before object teardown. This avoids dangling references to
* members accessed by the coroutine.
*
* @return A coroutine that completes only once all pending requests have been
* extracted and all in-flight work has finished.
*/
coro::task<void> periodic_memory_check();

mutable std::mutex mutex_;
std::uint64_t sequence_counter{0};
MemoryType const mem_type_;
std::shared_ptr<Context> ctx_;
Duration const timeout_;
std::set<Request> reservation_requests_;
std::atomic<std::uint64_t> periodic_memory_check_counter_{0};
std::optional<coro::task<void>> periodic_memory_check_task_;
};

} // namespace rapidsmpf::streaming
5 changes: 2 additions & 3 deletions cpp/src/allgather/allgather.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
* SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: Apache-2.0
*/
#include <algorithm>
Expand All @@ -15,11 +15,10 @@
#include <rapidsmpf/allgather/allgather.hpp>
#include <rapidsmpf/communicator/communicator.hpp>
#include <rapidsmpf/memory/buffer.hpp>
#include <rapidsmpf/nvtx.hpp>
#include <rapidsmpf/progress_thread.hpp>
#include <rapidsmpf/utils.hpp>

#include "rapidsmpf/nvtx.hpp"

namespace rapidsmpf::allgather {
namespace detail {

Expand Down
8 changes: 2 additions & 6 deletions cpp/src/streaming/core/context.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
* SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down Expand Up @@ -118,7 +118,7 @@ Context::~Context() noexcept {
br_->spill_manager().remove_spill_function(spill_function_id_);
}

config::Options Context::get_options() const noexcept {
config::Options Context::options() const noexcept {
return options_;
}

Expand Down Expand Up @@ -156,10 +156,6 @@ std::shared_ptr<BoundedQueue> Context::create_bounded_queue(
return std::shared_ptr<BoundedQueue>(new BoundedQueue(buffer_size));
}

config::Options const& Context::options() const noexcept {
return options_;
}

std::shared_ptr<SpillableMessages> Context::spillable_messages() const noexcept {
return spillable_messages_;
}
Expand Down
Loading