Skip to content

Commit d939029

Browse files
authored
Python bindings for the streaming shuffler (rapidsai#490)
Also adding bindings to `PartitionVectorChunk` Authors: - Mads R. B. Kristensen (https://github.com/madsbk) Approvers: - Kyle Edwards (https://github.com/KyleFromNVIDIA) - Peter Andreas Entschev (https://github.com/pentschev) - Lawrence Mitchell (https://github.com/wence-) URL: rapidsai#490
1 parent 93b41c0 commit d939029

File tree

16 files changed

+423
-16
lines changed

16 files changed

+423
-16
lines changed

cpp/benchmarks/streaming/data_generator.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ namespace rapidsmpf::streaming::node {
2929
* distributed in the range [`min_val`, `max_val`]. Each generated table is wrapped
3030
* in a `TableChunk` and sent to the provided output channel in streaming fashion.
3131
*
32-
* @param ctx The context to use.
32+
* @param ctx The node context to use.
3333
* @param stream The CUDA stream on which to create the random tables. TODO: use a pool
3434
* of CUDA streams.
3535
* @param ch_out Output channel to which generated `TableChunk` objects are sent.

cpp/include/rapidsmpf/streaming/core/leaf_node.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ namespace rapidsmpf::streaming::node {
1818
* Sends each message of the input vector into the channel in order,
1919
* marking the end of the stream once done.
2020
*
21-
* @param ctx The context to use.
21+
* @param ctx The node context to use.
2222
* @param ch_out Output channel to which messages will be sent.
2323
* @param messages Input vector containing the messages to send.
2424
* @return Streaming node representing the asynchronous operation.
@@ -37,7 +37,7 @@ Node push_to_channel(
3737
* Receives messages from the channel until it is closed and appends them
3838
* to the provided output vector.
3939
*
40-
* @param ctx The context to use.
40+
* @param ctx The node context to use.
4141
* @param ch_in Input channel providing messages.
4242
* @param out_messages Output vector to store the received messages.
4343
* @return Streaming node representing the asynchronous operation.

cpp/include/rapidsmpf/streaming/cudf/partition.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ namespace node {
8383
* `num_partitions` based on a hash of the selected columns, packs the resulting
8484
* partitions, and sends them to an output channel.
8585
*
86-
* @param ctx The context to use.
86+
* @param ctx The node context to use.
8787
* @param ch_in Input channel providing `TableChunk`s to partition.
8888
* @param ch_out Output channel to which `PartitionMapChunk`s are sent.
8989
* @param columns_to_hash Indices of input columns to hash.
@@ -119,7 +119,7 @@ Node partition_and_pack(
119119
* them, and sends the resulting tables to the output channel. Empty partitions are
120120
* ignored.
121121
*
122-
* @param ctx The context to use.
122+
* @param ctx The node context to use.
123123
* @param ch_in Input channel providing packed partitions as PartitionMapChunk or
124124
* PartitionVectorChunk.
125125
* @param ch_out Output channel to which unpacked and concatenated tables table are sent.

cpp/include/rapidsmpf/streaming/cudf/shuffler.hpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ namespace rapidsmpf::streaming::node {
2323
* It consumes partitioned input data from the input channel and produces output chunks
2424
* grouped by `partition_owner`.
2525
*
26-
* @param ctx The streaming context providing communication, memory, stream, and execution
27-
* resources.
26+
* @param ctx The context to use.
2827
* @param stream The CUDA stream on which to perform the shuffling. If chunks from the
2928
* input channel aren't created on `stream`, the streams are all synchronized.
3029
* @param ch_in Input channel providing PartitionMapChunk to be shuffled.

python/rapidsmpf/rapidsmpf/streaming/core/context.pyx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ cdef class Context:
6464
return self._options
6565

6666
def comm(self):
67-
return self._options
67+
return self._comm
6868

6969
def br(self):
7070
return self._br

python/rapidsmpf/rapidsmpf/streaming/cudf/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# SPDX-License-Identifier: Apache-2.0
44
# =================================================================================
55

6-
set(cython_modules partition.pyx partition_chunk.pyx table_chunk.pyx)
6+
set(cython_modules partition.pyx partition_chunk.pyx shuffler.pyx table_chunk.pyx)
77

88
rapids_cython_create_modules(
99
CXX

python/rapidsmpf/rapidsmpf/streaming/cudf/partition.pyi

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ from collections.abc import Iterable
66
from rapidsmpf.streaming.core.channel import Channel
77
from rapidsmpf.streaming.core.context import Context
88
from rapidsmpf.streaming.core.node import CppNode
9-
from rapidsmpf.streaming.cudf.partition_chunk import PartitionMapChunk
9+
from rapidsmpf.streaming.cudf.partition_chunk import (
10+
PartitionMapChunk,
11+
PartitionVectorChunk,
12+
)
1013
from rapidsmpf.streaming.cudf.table_chunk import TableChunk
1114

1215
def partition_and_pack(
@@ -18,6 +21,8 @@ def partition_and_pack(
1821
) -> CppNode: ...
1922
def unpack_and_concat(
2023
ctx: Context,
21-
ch_in: Channel[PartitionMapChunk],
24+
ch_in: Channel[PartitionMapChunk]
25+
| Channel[PartitionVectorChunk]
26+
| Channel[PartitionMapChunk | PartitionVectorChunk],
2227
ch_out: Channel[TableChunk],
2328
) -> CppNode: ...

python/rapidsmpf/rapidsmpf/streaming/cudf/partition_chunk.pxd

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from libc.stdint cimport uint32_t, uint64_t
55
from libcpp.memory cimport unique_ptr
66
from libcpp.unordered_map cimport unordered_map
7+
from libcpp.vector cimport vector
78
from rmm.librmm.cuda_stream_view cimport cuda_stream_view
89
from rmm.pylibrmm.stream cimport Stream
910

@@ -16,6 +17,11 @@ cdef extern from "<rapidsmpf/streaming/cudf/partition.hpp>" nogil:
1617
unordered_map[uint32_t, cpp_PackedData] data
1718
cuda_stream_view stream
1819

20+
cdef cppclass cpp_PartitionVectorChunk "rapidsmpf::streaming::PartitionVectorChunk":
21+
uint64_t sequence_number
22+
vector[cpp_PackedData] data
23+
cuda_stream_view stream
24+
1925

2026
cdef class PartitionMapChunk:
2127
cdef unique_ptr[cpp_PartitionMapChunk] _handle
@@ -28,3 +34,16 @@ cdef class PartitionMapChunk:
2834
)
2935
cdef const cpp_PartitionMapChunk* handle_ptr(self)
3036
cdef unique_ptr[cpp_PartitionMapChunk] release_handle(self)
37+
38+
39+
cdef class PartitionVectorChunk:
40+
cdef unique_ptr[cpp_PartitionVectorChunk] _handle
41+
cdef Stream _stream
42+
cdef object _owner
43+
44+
@staticmethod
45+
cdef PartitionVectorChunk from_handle(
46+
unique_ptr[cpp_PartitionVectorChunk] handle, Stream stream, object owner
47+
)
48+
cdef const cpp_PartitionVectorChunk* handle_ptr(self)
49+
cdef unique_ptr[cpp_PartitionVectorChunk] release_handle(self)

python/rapidsmpf/rapidsmpf/streaming/cudf/partition_chunk.pyi

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,20 @@ class PartitionMapChunk:
1717
@property
1818
def stream(self) -> Stream: ...
1919

20+
class PartitionVectorChunk:
21+
@classmethod
22+
def from_message(cls, message: Message[Self]) -> Self: ...
23+
def into_message(self, message: Message[Self]) -> None: ...
24+
@property
25+
def sequence_number(self) -> int: ...
26+
@property
27+
def stream(self) -> Stream: ...
28+
2029
if TYPE_CHECKING:
2130
# Check that PartitionMapChunk implements Payload.
2231
t1: PartitionMapChunk
2332
t2: Payload = t1
33+
34+
# Check that PartitionVectorChunk implements Payload.
35+
t3: PartitionVectorChunk
36+
t4: Payload = t3

python/rapidsmpf/rapidsmpf/streaming/cudf/partition_chunk.pyx

Lines changed: 153 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ cdef class PartitionMapChunk:
3232
The CUDA stream on which this chunk was created. If `None`,
3333
the stream is obtained from the handle.
3434
owner
35-
Python object that owns the underlying buffers and must
36-
be kept alive for the lifetime of this PartitionMapChunk.
35+
An optional Python object to keep alive for as long as this
36+
PartitionMapChunk exists (e.g., to maintain resource lifetime).
3737
3838
Returns
3939
-------
@@ -156,3 +156,154 @@ cdef class PartitionMapChunk:
156156
The CUDA stream.
157157
"""
158158
return self._stream
159+
160+
161+
cdef class PartitionVectorChunk:
162+
def __init__(self):
163+
raise ValueError("use the `from_*` factory functions")
164+
165+
def __dealloc__(self):
166+
with nogil:
167+
self._handle.reset()
168+
169+
@staticmethod
170+
cdef PartitionVectorChunk from_handle(
171+
unique_ptr[cpp_PartitionVectorChunk] handle, Stream stream, object owner
172+
):
173+
"""
174+
Construct a PartitionVectorChunk from an existing C++ handle.
175+
176+
Parameters
177+
----------
178+
handle
179+
A unique pointer to a C++ PartitionVectorChunk.
180+
stream
181+
The CUDA stream on which this chunk was created. If `None`,
182+
the stream is obtained from the handle.
183+
owner
184+
An optional Python object to keep alive for as long as this
185+
PartitionVectorChunk exists (e.g., to maintain resource lifetime).
186+
187+
Returns
188+
-------
189+
A new PartitionVectorChunk wrapping the given handle.
190+
"""
191+
192+
if stream is None:
193+
stream = Stream._from_cudaStream_t(
194+
deref(handle).stream.value()
195+
)
196+
cdef PartitionVectorChunk ret = PartitionVectorChunk.__new__(
197+
PartitionVectorChunk
198+
)
199+
ret._handle = move(handle)
200+
ret._stream = stream
201+
ret._owner = owner
202+
return ret
203+
204+
@staticmethod
205+
def from_message(Message message not None):
206+
"""
207+
Construct a PartitionVectorChunk by consuming a Message.
208+
209+
Parameters
210+
----------
211+
message
212+
Message containing a PartitionVectorChunk. The message is released
213+
and is empty after this call.
214+
215+
Returns
216+
-------
217+
A new PartitionVectorChunk extracted from the given message.
218+
"""
219+
return PartitionVectorChunk.from_handle(
220+
make_unique[cpp_PartitionVectorChunk](
221+
message._handle.release[cpp_PartitionVectorChunk]()
222+
),
223+
stream = None,
224+
owner = None,
225+
)
226+
227+
def into_message(self, Message message not None):
228+
"""
229+
Move this PartitionVectorChunk into a Message.
230+
231+
This method is not typically called directly. Instead, it is invoked by
232+
`Message.__init__()` when creating a new Message with this PartitionVectorChunk
233+
as its payload.
234+
235+
Parameters
236+
----------
237+
message
238+
Message object that will take ownership of this PartitionVectorChunk.
239+
240+
Raises
241+
------
242+
ValueError
243+
If the provided message is not empty.
244+
245+
Warnings
246+
--------
247+
The PartitionVectorChunk is released and must not be used after this call.
248+
"""
249+
message._handle = cpp_Message(self.release_handle())
250+
251+
cdef const cpp_PartitionVectorChunk* handle_ptr(self):
252+
"""
253+
Return a pointer to the underlying C++ PartitionVectorChunk.
254+
255+
Returns
256+
-------
257+
Raw pointer to the underlying C++ object.
258+
259+
Raises
260+
------
261+
ValueError
262+
If the PartitionVectorChunk is uninitialized.
263+
"""
264+
if not self._handle:
265+
raise ValueError("is uninitialized, has it been released?")
266+
return self._handle.get()
267+
268+
cdef unique_ptr[cpp_PartitionVectorChunk] release_handle(self):
269+
"""
270+
Release ownership of the underlying C++ PartitionVectorChunk.
271+
272+
After this call, the current object is in a moved-from state and
273+
must not be accessed.
274+
275+
Returns
276+
-------
277+
Unique pointer to the underlying C++ object.
278+
279+
Raises
280+
------
281+
ValueError
282+
If the PartitionVectorChunk is uninitialized.
283+
"""
284+
if not self._handle:
285+
raise ValueError("is uninitialized, has it been released?")
286+
return move(self._handle)
287+
288+
@property
289+
def sequence_number(self):
290+
"""
291+
Return the sequence number of this chunk.
292+
293+
Returns
294+
-------
295+
The sequence number.
296+
"""
297+
return deref(self.handle_ptr()).sequence_number
298+
299+
@property
300+
def stream(self):
301+
"""
302+
Return the CUDA stream on which this chunk was created.
303+
304+
Returns
305+
-------
306+
Stream
307+
The CUDA stream.
308+
"""
309+
return self._stream

0 commit comments

Comments
 (0)