-
Notifications
You must be signed in to change notification settings - Fork 28
Make unbounded fanout messages spillable #711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make unbounded fanout messages spillable #711
Conversation
Signed-off-by: niranda perera <niranda.perera@gmail.com>
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
…ded-fanout-state-spillable
```c++
/**
* @brief Create a deep copy of a message without removing it.
*
* This method duplicates the message identified by `mid` while leaving the
* original message intact inside the container. The returned message is a
* full deep copy of the payload. If the message is currently being spilled
* by another thread, this call waits until spilling completes.
*
* @param mid Message identifier.
* @param reservation Memory reservation used for allocating buffers during
* the deep copy. The reservation also determines the memory type of the
* returned message.
*
* @return A deep copy of the referenced `Message`.
*
* @throws std::out_of_range If the message has already been extracted.
* @throws std::runtime_error If required memory cannot be allocated using
* the provided reservation.
*/
[[nodiscard]] Message copy(MessageId mid, MemoryReservation& reservation);
```
Based on #711
Authors:
- Mads R. B. Kristensen (https://github.com/madsbk)
Approvers:
- Peter Andreas Entschev (https://github.com/pentschev)
URL: #713
…ded-fanout-state-spillable Signed-off-by: niranda perera <niranda.perera@gmail.com>
Signed-off-by: niranda perera <niranda.perera@gmail.com>
…ded-fanout-state-spillable
…ded-fanout-state-spillable Signed-off-by: niranda perera <niranda.perera@gmail.com>
…ded-fanout-state-spillable Signed-off-by: niranda perera <niranda.perera@gmail.com>
ab0a1e5 to
feb04a8
Compare
Signed-off-by: niranda perera <niranda.perera@gmail.com>
Signed-off-by: niranda perera <niranda.perera@gmail.com>
Signed-off-by: niranda perera <niranda.perera@gmail.com>
Co-authored-by: Mads R. B. Kristensen <madsbk@gmail.com>
Signed-off-by: niranda perera <niranda.perera@gmail.com>
This reverts commit 5606f6a. Signed-off-by: niranda perera <niranda.perera@gmail.com>
…ded-fanout-state-spillable
…randaperera/rapidsmpf into Make-unbounded-fanout-state-spillable
Signed-off-by: niranda perera <niranda.perera@gmail.com>
| constexpr std::span<MemoryType const> leq_memory_types(MemoryType mem_type) noexcept { | ||
| return std::views::drop_while(MEMORY_TYPES, [&](MemoryType const& mt) { | ||
| return mt != mem_type; | ||
| }); | ||
| } | ||
|
|
||
| static_assert(std::ranges::equal(leq_memory_types(MemoryType::DEVICE), MEMORY_TYPES)); | ||
| static_assert(std::ranges::equal( | ||
| leq_memory_types(MemoryType::HOST), std::ranges::single_view{MemoryType::HOST} | ||
| )); | ||
| // unknown memory type should return an empty view | ||
| static_assert(std::ranges::equal( | ||
| leq_memory_types(static_cast<MemoryType>(-1)), std::ranges::empty_view<MemoryType>{} | ||
| )); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not immediately just have:
template<typename Pred>
constexpr std::span<MemoryType const> filter_types(MemoryType mem_type, Pred pred) noexcept {
return std::views::filter(MEMORY_TYPES, pred);
}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would return a Range isnt it? But yes, sure.
Co-authored-by: Mads R. B. Kristensen <madsbk@gmail.com>
Signed-off-by: niranda perera <niranda.perera@gmail.com>
…ded-fanout-state-spillable
madsbk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
cpp/tests/streaming/test_fanout.cpp
Outdated
| } | ||
|
|
||
| nodes.push_back(node::fanout(ctx, in, out_chs, policy)); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| nodes.push_back( | ||
| many_input_sink(ctx, out_chs, ConsumePolicy::CHANNEL_ORDER, outs) | ||
| ); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signed-off-by: niranda perera <niranda.perera@gmail.com>
…ded-fanout-state-spillable
Signed-off-by: niranda perera <niranda.perera@gmail.com>
|
/merge |
Closes #675