Skip to content

Commit defed36

Browse files
authored
Add optional owner slot to TableChunks (rapidsai#536)
When a `TableChunk` is constructed from a `cudf::table_view`, we are on the hook to keep the backing data alive. When we move `TableChunk`s into `Message`s and out again from Python we cannot do this in a sane way externally, so we must stash the owning object in the `TableChunk` itself. The `PyObject` is stored in a `unique_ptr` with a custom deleter that, when called, acquires the GIL and decrefs the object. This way, even if we consume the `TableChunk` in a C++ node (rather than a Python node) its backing storage will be deallocated. Authors: - Lawrence Mitchell (https://github.com/wence-) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) - Gil Forsyth (https://github.com/gforsyth) URL: rapidsai#536
1 parent 0ff039a commit defed36

File tree

12 files changed

+191
-56
lines changed

12 files changed

+191
-56
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#pragma once
7+
8+
#include <memory>
9+
10+
namespace rapidsmpf::streaming {
11+
12+
/**
13+
* @brief Utility class to store an arbitrary type-erased object while another object is
14+
* alive.
15+
*
16+
* When sending messages through `Channel`s from Python, we typically need to keep various
17+
* Python objects alive since the matching C++ objects only hold views.
18+
*
19+
* For example, when constructing a `TableChunk` from a pylibcudf `Table`, the
20+
* `TableChunk` has a non-owning `cudf::table_view` of the `Table` and someone must be
21+
* responsible for keeping the `Table` alive for the lifetime of the `TableChunk`. If we
22+
* want to allow creation of such objects in Python with the ability to sink them on the
23+
* C++ side we cannot rely on the Python side of things keeping the `Table` alive (the
24+
* reference disappears!). Similarly when we send a message through a `Channel` the sender
25+
* will, once pushed into the channel, drop the reference to the message payload and so,
26+
* again, we need some way of keeping the payload alive.
27+
*
28+
* To square this circle, such C++ objects have an `OwningWrapper` slot that stores a
29+
* type-erased pointer with, as far as we are concerned, unique ownership semantics. When
30+
* this object is destroyed, the custom deleter runs and can do whatever deallocation is
31+
* necessary.
32+
*
33+
* @warning Behaviour is undefined if the unique ownership semantic is not respected. The
34+
* deleter may be called from any thread at any time, the implementer of the deleter is
35+
* responsible for correct synchronisation with (for example) the Python GIL. Furthermore,
36+
* the deleter may not throw: if an error occurs, the only safe thing to do is
37+
* `std::terminate`.
38+
*
39+
* @warning When using this `OwningWrapper` inside a C++ object, make sure it is
40+
* constructed first and destructed last.
41+
*/
42+
class OwningWrapper {
43+
public:
44+
/// @brief Callback used to delete the owned object.
45+
using deleter_type = void (*)(void*);
46+
47+
OwningWrapper() = default;
48+
49+
/**
50+
* @brief Take ownership and responsibility for the destruction of an object.
51+
*
52+
* @param obj Type-erased object to own.
53+
* @param deleter Function called to destruct the object.
54+
*/
55+
explicit OwningWrapper(void* obj, deleter_type deleter)
56+
: obj_{owning_type(obj, deleter)} {}
57+
58+
private:
59+
using owning_type = std::unique_ptr<void, deleter_type>;
60+
owning_type obj_{nullptr, [](void*) {}};
61+
};
62+
} // namespace rapidsmpf::streaming

cpp/include/rapidsmpf/streaming/cudf/table_chunk.hpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <rapidsmpf/streaming/core/channel.hpp>
2020
#include <rapidsmpf/streaming/core/context.hpp>
2121
#include <rapidsmpf/streaming/core/node.hpp>
22+
#include <rapidsmpf/streaming/cudf/owning_wrapper.hpp>
2223

2324
namespace rapidsmpf::streaming {
2425

@@ -58,12 +59,17 @@ class TableChunk {
5859
* @param table_view Device-resident table view.
5960
* @param device_alloc_size The number of bytes in device memory.
6061
* @param stream The CUDA stream on which the table was created.
62+
* @param owner Optional object owning the memory backing the @p table_view. If it
63+
* exists this object will be destructed last when the @p TableChunk is destroyed.
64+
* This is typically used when constructing a @p TableChunk from python and we need to
65+
* keep the owning python object alive.
6166
*/
6267
TableChunk(
6368
std::uint64_t sequence_number,
6469
cudf::table_view table_view,
6570
std::size_t device_alloc_size,
66-
rmm::cuda_stream_view stream
71+
rmm::cuda_stream_view stream,
72+
OwningWrapper&& owner = {}
6773
);
6874

6975
/**
@@ -183,6 +189,9 @@ class TableChunk {
183189
[[nodiscard]] TableChunk spill_to_host(BufferResource* br);
184190

185191
private:
192+
///< @brief Optional owning object if the TableChunk was constructed from a
193+
///< table_view.
194+
OwningWrapper owner_{};
186195
std::uint64_t sequence_number_;
187196

188197
// At most, one of the following unique pointers is non-null. If all of them are null,

cpp/src/shuffler/shuffler.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
*/
55

66
#include <algorithm>
7-
#include <concepts>
87
#include <functional>
98
#include <memory>
109
#include <numeric>

cpp/src/streaming/cudf/shuffler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ coro::task<std::vector<PackedData>> ShufflerAsync::extract_async(shuffler::PartI
122122
auto all_extracted = all_extracted_unsafe();
123123

124124
auto chunks = shuffler_.extract(pid);
125-
lock.unlock(); // no longer need the lock
125+
lock.unlock();
126126

127127
// if all partitions have been extracted, notify all waiting tasks.
128128
if (all_extracted) {

cpp/src/streaming/cudf/table_chunk.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,13 @@ TableChunk::TableChunk(
2525
std::uint64_t sequence_number,
2626
cudf::table_view table_view,
2727
std::size_t device_alloc_size,
28-
rmm::cuda_stream_view stream
28+
rmm::cuda_stream_view stream,
29+
OwningWrapper&& owner
2930
)
30-
: sequence_number_{sequence_number}, table_view_{table_view}, stream_{stream} {
31+
: owner_{std::move(owner)},
32+
sequence_number_{sequence_number},
33+
table_view_{table_view},
34+
stream_{stream} {
3135
data_alloc_size_[static_cast<std::size_t>(MemoryType::DEVICE)] = device_alloc_size;
3236
make_available_cost_ = 0;
3337
}

cpp/tests/streaming/test_shuffler.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6+
#include <algorithm>
7+
68
#include <gmock/gmock.h>
79
#include <gtest/gtest.h>
810

cpp/tests/streaming/test_table_chunk.cpp

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
#include <rmm/cuda_stream_view.hpp>
1818
#include <rmm/mr/device/per_device_resource.hpp>
1919

20+
#include <rapidsmpf/streaming/core/channel.hpp>
21+
#include <rapidsmpf/streaming/cudf/owning_wrapper.hpp>
2022
#include <rapidsmpf/streaming/cudf/table_chunk.hpp>
2123

2224
#include "../utils.hpp"
@@ -43,6 +45,51 @@ TEST_F(StreamingTableChunk, FromTable) {
4345
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(chunk.table_view(), expect);
4446
}
4547

48+
TEST_F(StreamingTableChunk, TableChunkOwner) {
49+
constexpr unsigned int num_rows = 100;
50+
constexpr std::int64_t seed = 1337;
51+
constexpr std::uint64_t seq = 42;
52+
53+
cudf::table expect = random_table_with_index(seed, num_rows, 0, 10);
54+
// Static because the deleter function is a void(*)(void*) which precludes the use of
55+
// a lambda with captures.
56+
static std::size_t num_deletions{0};
57+
auto deleter = [](void* p) {
58+
num_deletions++;
59+
delete static_cast<int*>(p);
60+
};
61+
auto make_chunk = [&]() {
62+
return TableChunk{
63+
seq, expect, expect.alloc_size(), stream, OwningWrapper(new int, deleter)
64+
};
65+
};
66+
auto check_chunk = [&](TableChunk const& chunk) {
67+
EXPECT_EQ(chunk.sequence_number(), seq);
68+
EXPECT_EQ(chunk.stream().value(), stream.value());
69+
EXPECT_TRUE(chunk.is_available());
70+
EXPECT_EQ(chunk.make_available_cost(), 0);
71+
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(chunk.table_view(), expect);
72+
};
73+
{
74+
auto chunk = make_chunk();
75+
check_chunk(chunk);
76+
EXPECT_EQ(num_deletions, 0);
77+
}
78+
EXPECT_EQ(num_deletions, 1);
79+
{
80+
auto msg = Message(std::make_unique<TableChunk>(make_chunk()));
81+
EXPECT_EQ(num_deletions, 1);
82+
}
83+
EXPECT_EQ(num_deletions, 2);
84+
{
85+
auto msg = Message(std::make_unique<TableChunk>(make_chunk()));
86+
auto chunk = msg.release<TableChunk>();
87+
check_chunk(chunk);
88+
EXPECT_EQ(num_deletions, 2);
89+
}
90+
EXPECT_EQ(num_deletions, 3);
91+
}
92+
4693
TEST_F(StreamingTableChunk, FromTableView) {
4794
constexpr unsigned int num_rows = 100;
4895
constexpr std::int64_t seed = 1337;

dependencies.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ dependencies:
281281
- gdb
282282
- openmpi >=5.0 # See <https://github.com/rapidsai/rapidsmpf/issues/17>
283283
- valgrind
284+
- gdb
284285
test_python:
285286
common:
286287
- output_types: conda

python/rapidsmpf/rapidsmpf/streaming/cudf/partition_chunk.pxd

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,21 @@ cdef extern from "<rapidsmpf/streaming/cudf/partition.hpp>" nogil:
2121

2222
cdef class PartitionMapChunk:
2323
cdef unique_ptr[cpp_PartitionMapChunk] _handle
24-
cdef object _owner
2524

2625
@staticmethod
2726
cdef PartitionMapChunk from_handle(
28-
unique_ptr[cpp_PartitionMapChunk] handle, object owner
27+
unique_ptr[cpp_PartitionMapChunk] handle
2928
)
3029
cdef const cpp_PartitionMapChunk* handle_ptr(self)
3130
cdef unique_ptr[cpp_PartitionMapChunk] release_handle(self)
3231

3332

3433
cdef class PartitionVectorChunk:
3534
cdef unique_ptr[cpp_PartitionVectorChunk] _handle
36-
cdef object _owner
3735

3836
@staticmethod
3937
cdef PartitionVectorChunk from_handle(
40-
unique_ptr[cpp_PartitionVectorChunk] handle, object owner
38+
unique_ptr[cpp_PartitionVectorChunk] handle
4139
)
4240
cdef const cpp_PartitionVectorChunk* handle_ptr(self)
4341
cdef unique_ptr[cpp_PartitionVectorChunk] release_handle(self)

python/rapidsmpf/rapidsmpf/streaming/cudf/partition_chunk.pyx

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ cdef class PartitionMapChunk:
1818

1919
@staticmethod
2020
cdef PartitionMapChunk from_handle(
21-
unique_ptr[cpp_PartitionMapChunk] handle, object owner
21+
unique_ptr[cpp_PartitionMapChunk] handle
2222
):
2323
"""
2424
Construct a PartitionMapChunk from an existing C++ handle.
@@ -27,17 +27,14 @@ cdef class PartitionMapChunk:
2727
----------
2828
handle
2929
A unique pointer to a C++ PartitionMapChunk.
30-
owner
31-
An optional Python object to keep alive for as long as this
32-
PartitionMapChunk exists (e.g., to maintain resource lifetime).
3330
3431
Returns
3532
-------
3633
A new PartitionMapChunk wrapping the given handle.
3734
"""
35+
3836
cdef PartitionMapChunk ret = PartitionMapChunk.__new__(PartitionMapChunk)
3937
ret._handle = move(handle)
40-
ret._owner = owner
4138
return ret
4239

4340
@staticmethod
@@ -58,8 +55,7 @@ cdef class PartitionMapChunk:
5855
return PartitionMapChunk.from_handle(
5956
make_unique[cpp_PartitionMapChunk](
6057
message._handle.release[cpp_PartitionMapChunk]()
61-
),
62-
owner = None,
58+
)
6359
)
6460

6561
def into_message(self, Message message not None):
@@ -84,6 +80,8 @@ cdef class PartitionMapChunk:
8480
--------
8581
The PartitionMapChunk is released and must not be used after this call.
8682
"""
83+
if not message.empty():
84+
raise ValueError("cannot move into a non-empty message")
8785
message._handle = cpp_Message(self.release_handle())
8886

8987
cdef const cpp_PartitionMapChunk* handle_ptr(self):
@@ -145,7 +143,7 @@ cdef class PartitionVectorChunk:
145143

146144
@staticmethod
147145
cdef PartitionVectorChunk from_handle(
148-
unique_ptr[cpp_PartitionVectorChunk] handle, object owner
146+
unique_ptr[cpp_PartitionVectorChunk] handle
149147
):
150148
"""
151149
Construct a PartitionVectorChunk from an existing C++ handle.
@@ -154,9 +152,6 @@ cdef class PartitionVectorChunk:
154152
----------
155153
handle
156154
A unique pointer to a C++ PartitionVectorChunk.
157-
owner
158-
An optional Python object to keep alive for as long as this
159-
PartitionVectorChunk exists (e.g., to maintain resource lifetime).
160155
161156
Returns
162157
-------
@@ -166,7 +161,6 @@ cdef class PartitionVectorChunk:
166161
PartitionVectorChunk
167162
)
168163
ret._handle = move(handle)
169-
ret._owner = owner
170164
return ret
171165

172166
@staticmethod
@@ -187,8 +181,7 @@ cdef class PartitionVectorChunk:
187181
return PartitionVectorChunk.from_handle(
188182
make_unique[cpp_PartitionVectorChunk](
189183
message._handle.release[cpp_PartitionVectorChunk]()
190-
),
191-
owner = None,
184+
)
192185
)
193186

194187
def into_message(self, Message message not None):
@@ -213,6 +206,8 @@ cdef class PartitionVectorChunk:
213206
--------
214207
The PartitionVectorChunk is released and must not be used after this call.
215208
"""
209+
if not message.empty():
210+
raise ValueError("cannot move into a non-empty message")
216211
message._handle = cpp_Message(self.release_handle())
217212

218213
cdef const cpp_PartitionVectorChunk* handle_ptr(self):

0 commit comments

Comments
 (0)