Skip to content

Commit cebf111

Browse files
authored
Bump rapids to 25.10 (#1204)
* Bump rapids to 25.10 and remove cugraph in favor of pylibcugraph Signed-off-by: Ayush Dattagupta <[email protected]> * Update shuffler based on rapidsmpf api changes Signed-off-by: Ayush Dattagupta <[email protected]> * Remove cugraph comms init in favor of pylibcugraph comms Signed-off-by: Ayush Dattagupta <[email protected]> * Update KMeans API to the latest version Signed-off-by: Ayush Dattagupta <[email protected]> * Update KMeans to pass in random state and use kmeans|| for single GPU consistency check Signed-off-by: Ayush Dattagupta <[email protected]> * Upgrade packages in lockfile Signed-off-by: Ayush Dattagupta <[email protected]> * Update lockfile Signed-off-by: Ayush Dattagupta <[email protected]> * Exclude rapidsmpf shuffler class from testing since it's tested indirectly via shuffle stage Signed-off-by: Ayush Dattagupta <[email protected]> --------- Signed-off-by: Ayush Dattagupta <[email protected]>
1 parent 1ae47d5 commit cebf111

File tree

6 files changed

+2486
-3219
lines changed

6 files changed

+2486
-3219
lines changed

nemo_curator/stages/deduplication/fuzzy/connected_components.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
from typing import TYPE_CHECKING, Any
1717

1818
import cudf
19-
from cugraph.dask.comms.comms_wrapper import init_subcomms as c_init_subcomms
2019
from loguru import logger
2120
from pylibcugraph import GraphProperties, MGGraph, ResourceHandle
2221
from pylibcugraph import weakly_connected_components as pylibcugraph_wcc
22+
from pylibcugraph.comms.comms_wrapper import init_subcomms as c_init_subcomms
2323

2424
from nemo_curator.backends.experimental.utils import RayStageSpecKeys
2525
from nemo_curator.stages.base import ProcessingStage

nemo_curator/stages/deduplication/semantic/kmeans.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,8 @@ def process_batch(self, tasks: list[FileGroupTask]) -> list[_EmptyTask]:
189189
logger.debug(f"Read time: {(t1 - t0):.2f} seconds")
190190
# Fit the model cooperatively across actors, then predict on local data
191191
concatenated_embeddings = cp.concatenate(embeddings_arrays, axis=0)
192-
self.kmeans.fit(concatenated_embeddings, sample_weight=None)
193-
labels = self.kmeans.predict(concatenated_embeddings).astype(cp.int32)
192+
self.kmeans._fit(concatenated_embeddings, sample_weight=None, convert_dtype=False, multigpu=True)
193+
labels = self.kmeans.predict(concatenated_embeddings, convert_dtype=False).astype(cp.int32)
194194

195195
t2 = time.perf_counter()
196196
logger.info(f"KMeans fit+predict time: {(t2 - t1):.2f} seconds")
@@ -233,7 +233,7 @@ def process_batch(self, tasks: list[FileGroupTask]) -> list[_EmptyTask]:
233233
return results
234234

235235
def setup(self, _: WorkerMetadata | None = None) -> None:
236-
from cuml.cluster.kmeans_mg import KMeansMG as cumlKMeans
236+
from cuml.cluster.kmeans import KMeans as cumlKMeans
237237

238238
if not hasattr(self, "_raft_handle"):
239239
msg = "RAFT handle not found. Make sure the stage is initialized with RAFT"
@@ -246,11 +246,11 @@ def setup(self, _: WorkerMetadata | None = None) -> None:
246246
n_clusters=self.n_clusters,
247247
max_iter=self.max_iter,
248248
tol=self.tol,
249+
random_state=self.random_state,
249250
verbose=self.verbose,
250251
n_init=self.n_init,
251252
oversampling_factor=self.oversampling_factor,
252253
max_samples_per_batch=self.max_samples_per_batch,
253-
convert_dtype=False,
254254
)
255255

256256
@staticmethod

nemo_curator/stages/deduplication/shuffle_utils/rapidsmpf_shuffler.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@
2020
import rmm.mr
2121
from rapidsmpf.buffer.buffer import MemoryType
2222
from rapidsmpf.buffer.resource import BufferResource, LimitAvailableMemory
23-
from rapidsmpf.shuffler import partition_and_pack, unpack_and_concat
23+
from rapidsmpf.integrations.cudf.partition import (
24+
partition_and_pack,
25+
unpack_and_concat,
26+
unspill_partitions,
27+
)
28+
from rapidsmpf.rmm_resource_adaptor import RmmResourceAdaptor
2429
from rapidsmpf.statistics import Statistics
2530
from rapidsmpf.utils.cudf import cudf_to_pylibcudf_table, pylibcudf_to_cudf_dataframe
2631
from rapidsmpf.utils.ray_utils import BaseShufflingActor
@@ -34,7 +39,8 @@
3439
from rapidsmpf.shuffler import Shuffler
3540

3641

37-
class BulkRapidsMPFShuffler(BaseShufflingActor):
42+
# Exempt this class from coverage is it's indirectly tested by the ShuffleStage which coverage tools don't pick up.
43+
class BulkRapidsMPFShuffler(BaseShufflingActor): # pragma: no cover
3844
"""
3945
Class that performs a bulk shuffle operation.
4046
This class is compatible with Ray Actors communicating with each other using UCXX communication.
@@ -120,7 +126,7 @@ def setup_worker(self, root_address_bytes: bytes) -> None:
120126
super().setup_worker(root_address_bytes)
121127

122128
# Initialize the RMM memory resource
123-
mr = rmm.mr.StatisticsResourceAdaptor(
129+
mr = RmmResourceAdaptor(
124130
rmm.mr.PoolMemoryResource(
125131
rmm.mr.CudaMemoryResource(),
126132
initial_pool_size=self.rmm_pool_size,
@@ -134,14 +140,14 @@ def setup_worker(self, root_address_bytes: bytes) -> None:
134140
if self.spill_memory_limit is None
135141
else {MemoryType.DEVICE: LimitAvailableMemory(mr, limit=self.spill_memory_limit)}
136142
)
137-
br = BufferResource(mr, memory_available)
143+
self.br = BufferResource(device_mr=mr, memory_available=memory_available)
138144
# Create a statistics object
139-
self.stats = Statistics(self.enable_statistics)
145+
self.stats = Statistics(enable=self.enable_statistics, mr=mr)
140146
# Create a shuffler
141147
self.shuffler: Shuffler = self.create_shuffler(
142148
0,
143149
total_num_partitions=self.total_nparts,
144-
buffer_resource=br,
150+
buffer_resource=self.br,
145151
statistics=self.stats,
146152
)
147153

@@ -216,11 +222,11 @@ def insert_chunk(self, table: plc.Table | cudf.DataFrame, column_names: list[str
216222
table = cudf_to_pylibcudf_table(table)
217223
columns_to_hash = tuple(column_names.index(val) for val in self.shuffle_on)
218224
packed_inputs = partition_and_pack(
219-
table,
225+
table=table,
226+
br=self.br,
220227
columns_to_hash=columns_to_hash,
221228
num_partitions=self.total_nparts,
222229
stream=DEFAULT_STREAM,
223-
device_mr=rmm.mr.get_current_device_resource(),
224230
)
225231
self.shuffler.insert_chunks(packed_inputs)
226232

@@ -269,9 +275,14 @@ def extract(self) -> Iterator[tuple[int, plc.Table]]:
269275
partition_id = self.shuffler.wait_any()
270276
packed_chunks = self.shuffler.extract(partition_id)
271277
partition = unpack_and_concat(
272-
packed_chunks,
278+
unspill_partitions(
279+
packed_chunks,
280+
br=self.br,
281+
allow_overbooking=True,
282+
statistics=self.stats,
283+
),
284+
br=self.br,
273285
stream=DEFAULT_STREAM,
274-
device_mr=rmm.mr.get_current_device_resource(),
275286
)
276287
yield partition_id, partition
277288

pyproject.toml

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,12 @@ cuda12 = ["gpustat", "nvidia-ml-py"]
7171

7272
# Installs CPU + GPU text curation modules
7373
deduplication_cuda12 = [
74-
"cudf-cu12==25.6.*",
75-
"cugraph-cu12==25.6.*",
76-
"cuml-cu12==25.6.*",
77-
"nx-cugraph-cu12==25.6.*",
78-
"pylibraft-cu12==25.6.*",
79-
"raft-dask-cu12==25.6.*",
80-
"rapidsmpf-cu12==25.6.*",
74+
"cudf-cu12==25.10.*",
75+
"cuml-cu12==25.10.*",
76+
"pylibcugraph-cu12==25.10.*",
77+
"pylibraft-cu12==25.10.*",
78+
"raft-dask-cu12==25.10.*",
79+
"rapidsmpf-cu12==25.10.*",
8180
]
8281

8382
audio_cpu = [

tests/stages/deduplication/semantic/test_kmeans.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def run_single_gpu_baseline(
114114
) -> np.ndarray:
115115
single_gpu_kmeans = cuml.KMeans(
116116
n_clusters=n_clusters,
117-
init="k-means++",
117+
init="k-means||",
118118
max_iter=300,
119119
tol=1e-4,
120120
random_state=RANDOM_STATE,
@@ -377,7 +377,7 @@ def test_process_batch_multiple_groups(self, tmp_path: Path): # noqa: PLR0915
377377

378378
# Only mock the essential parts that can't run without RAFT setup
379379
mock_kmeans = Mock()
380-
mock_kmeans.fit = Mock()
380+
mock_kmeans._fit = Mock()
381381
mock_kmeans.predict = Mock(return_value=cp.zeros(40, dtype=cp.int32))
382382
mock_kmeans.cluster_centers_ = cp.random.random((2, 32), dtype=cp.float32)
383383
stage.kmeans = mock_kmeans
@@ -439,11 +439,11 @@ def spy_write(*args, **kwargs) -> None:
439439
assert call_kwargs["assign_id"] is False
440440

441441
# Verify KMeans operations
442-
mock_kmeans.fit.assert_called_once()
442+
mock_kmeans._fit.assert_called_once()
443443
mock_kmeans.predict.assert_called_once()
444444

445445
# Check the concatenated embeddings shape
446-
fit_call_args = mock_kmeans.fit.call_args[0]
446+
fit_call_args = mock_kmeans._fit.call_args[0]
447447
embeddings_passed_to_fit = fit_call_args[0]
448448
assert embeddings_passed_to_fit.shape == (40, 32), "Should concatenate embeddings from all groups"
449449

0 commit comments

Comments
 (0)