-
Notifications
You must be signed in to change notification settings - Fork 26
Memory reserve or wait #688
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
We need bcca40734054d8b0a4da134fa15595b3025d69e0 for rapidsai#688
#688 needs jbaldwin/libcoro#423 Authors: - Mads R. B. Kristensen (https://github.com/madsbk) Approvers: - Peter Andreas Entschev (https://github.com/pentschev) - Lawrence Mitchell (https://github.com/wence-) URL: #747
c447471 to
5e0831a
Compare
fe430de to
65124fe
Compare
wence-
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass
cpp/include/rapidsmpf/streaming/core/memory_reserve_or_wait.hpp
Outdated
Show resolved
Hide resolved
| // Extract the selected request and push the reservation into its queue. | ||
| ResReq request = reservation_requests_.extract(it).value(); | ||
| lock.unlock(); | ||
| push_into_queue(request, std::move(res)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: Maybe? push_into_queue takes the request by reference. But that reference could be dead by the time the queue is accessed inside the task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note, request.queue is also just a reference. It is reserve_or_wait() that creates the queue and keeps it alive until it has been fulfilled or cancelled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's change it to,
auto push_into_queue = [this](coro::queue& res_queue, MemoryReservation res)..., then the intention is more clearer.
Co-authored-by: Lawrence Mitchell <[email protected]>
|
Thanks @wence-, I have fixed the ordering so we pick the smallest request on timeout and if tie, the oldest request. |
|
|
||
| // Use libcoro's queue to track completion of this reservation request. | ||
| // The queue will have at most one item: the fulfilled memory reservation. | ||
| coro::queue<MemoryReservation> request_queue{}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
coro::queue seems like a bigger hammer than what we really need here. How about coro::event?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, but we would need both a event, a MemoryReservation, and handle shutdown. Isn't it better to keep it simple?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@madsbk My thinking was we could use a coro::event with an optional<MemoryReservation>.
We could set the coro::event when a reservation is ready. During shutdown, we can set the event with a nullopt.
I feel like it would be not-very-complicated (I guess) 😇 . Maybe we could do this later as an improvement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My gut feeling is that this is an unnecessary complication, but we can consider it as a follow-up PR.
| // Extract the selected request and push the reservation into its queue. | ||
| ResReq request = reservation_requests_.extract(it).value(); | ||
| lock.unlock(); | ||
| push_into_queue(request, std::move(res)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's change it to,
auto push_into_queue = [this](coro::queue& res_queue, MemoryReservation res)..., then the intention is more clearer.
| if (Clock::now() - last_reservation_success > timeout_) { | ||
| // This is the only way out of the while-loop that doesn't shutdown | ||
| // the periodic memory check. | ||
| break; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm missing something here. Shouldnt timeout_ be evaluated for each request? periodic_memory_check is a long-running task per MemoryReserveOrWait instance. So, when the timeout expires, we pick the smallest request with the future_release_potential, and try to reserve that. BUT this could actually be the last request that was added. And if the reservation fails, we inform it that it has failed. But in reality, it could have been attempted again later.
What I'm trying to say is that, I feel like the Request should have a timestamp that tells us when it was created, and based on time-now, any expired requests should also be released.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout is not specific to a single request. I have updated the docs:
/**
* @brief Attempts to reserve memory or waits until progress can be made.
*
* This coroutine submits a memory reservation request and then suspends until
* either sufficient memory becomes available or no reservation request (including
* other pending requests) makes progress within the configured timeout.
*
* The timeout does not apply specifically to this request. Instead, it is used as
* a global progress guarantee: if no pending reservation request can be satisfied
* within timeout, `MemoryReserveOrWait` forces progress by selecting the smallest
* pending request and attempting to reserve memory for it. The forced reservation
* attempt may result in an empty `MemoryReservation` if the selected request still
* cannot be satisfied.
*
* When multiple reservation requests are eligible, `MemoryReserveOrWait` uses
* @p future_release_potential as a heuristic to prefer requests that are expected
* to free memory sooner. Operations that do not free memory, for example reading
* data from disk into memory, should use a value of zero. Operations that are
* expected to reduce memory usage, for example a reduction such as a sum, should
* use a value corresponding to the amount of input data that will be released
* once the operation completes.
*
* @param size Number of bytes to reserve.
* @param future_release_potential Estimated number of bytes the requester may
* release in the future.
* @return A `MemoryReservation` representing the allocated memory, or an empty
* reservation if progress could not be made.
*
* @throws std::runtime_error If shutdown occurs before the request can be processed.
*/
coro::task<MemoryReservation> reserve_or_wait(
std::size_t size, std::size_t future_release_potential
);Co-authored-by: Niranda Perera <[email protected]>
…into memory_reserve_or_wait
| // the periodic memory check. | ||
| break; | ||
| } | ||
| auto const max_size = memory_available(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should yield/continue if max_size is smaller than the smallest request size, isnt it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That happens on line 204 where we found the eligible requests.
|
|
||
| while (true) { | ||
| auto last_reservation_success = Clock::now(); | ||
| while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a bit of a concern with this double-while-loop. Consider the scenario where, memory is fully/almost fully reserved, and none of the request can be served.
Then, IINM, this coroutine will wake up from the yield and very quickly yeild back in the next iteration, and continue to do so, until the timeout expires. Then on the outer loop, keep on draining the request queue. I feel like this is more-or-less a busy loop.
Why didnt we consider a callback approach? Where we register a callback in BufferResource that will be called for every release. Then based on the callback call, we can wake up coroutines that are waiting in the request queue?
I feel like we might not need this periodic mem check task that way? Then we would not have to worry about the busy-loops IMO.
Basically "do nothing until someone releases memory".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main limitation of a pure callback-based approach is that BufferResource does not have full visibility into memory usage. It only sees allocations made via BufferResource::allocate(), not allocations performed by libcudf, which can represent a large share of the memory pressure and are often why a reservation cannot be satisfied.
I agree this is a valid concern. When the system is effectively out of memory, we can end up in a wake up, immediately yield again pattern. That said, it may not be as bad as it sounds: yield gives all other coroutines a chance to run, and checking available memory is very fast. The periodic memory check is therefore a pragmatic compromise. It keeps the logic simple, avoids coupling to a specific allocator, and still works when memory is released outside BufferResource’s control.
Longer term, a unified reservation system could enable a callback-driven approach and remove the need for periodic polling. Given the current constraints, however, the periodic check is the best trade-off.
| * | ||
| * @return The Options instance. | ||
| */ | ||
| [[nodiscard]] config::Options const& options() const noexcept; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now options() always returns a copy, rather than a const reference. Is there a particular reason for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplification, config::Options is always backed by a shared pointer internally so the overhead is minimal
| * If no reservation request can be satisfied within @p timeout, the coroutine | ||
| * forces progress by selecting the smallest pending request and attempting to | ||
| * reserve memory for it. This attempt may result in an empty reservation if the | ||
| * request still cannot be satisfied. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so the idea will be that you still need to check your reservation gave you enough space.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, and you will have to decide if you want to overbook or maybe you have a low-memory mode
| if (!reservation_requests.empty()) { | ||
| std::vector<Node> nodes; | ||
| for (Request const& request : reservation_requests) { | ||
| nodes.push_back(request.queue.shutdown_drain(ctx_->executor())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean shutdown_drain, which waits for the consumer to pick it up, or shutdown which does not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, we want shutdown
| // the periodic memory check. | ||
| break; | ||
| } | ||
| auto const max_size = memory_available(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That happens on line 204 where we found the eligible requests.
|
I think this makes sense. With a query about shutdown. |
|
/ok to test f1ad5db |
|
/merge |
|
/merge |
Introduce a new class,
MemoryReserveOrWait, that provides asynchronous waiting for memory reservation requests.