@@ -118,40 +118,59 @@ def __call__(self, embeddings_dataset: DocumentDataset) -> DocumentDataset: # n
118118 with performance_report_if_with_ts_suffix (self .profile_dir , "clustering-model" ):
119119 if not self .keep_all_columns :
120120 embeddings_df = embeddings_df [[self .id_col , self .embedding_column ]]
121+ # We persist here to avoid a re-read of the embeddings_df
122+ # We only persist if we are not keeping all columns as text column can be large resulting in OOM
123+ embeddings_df = embeddings_df .persist ()
124+ else :
125+ self .logger .warning (
126+ "Since all columns are being kept, we will not persist the embeddings_df which will result in a slowdown"
127+ )
121128
122129 if self .clustering_input_partition_size is not None :
123130 embeddings_df = embeddings_df .repartition (partition_size = self .clustering_input_partition_size )
124131
125- try :
126- embeddings_df = embeddings_df .to_backend ("pandas" ).persist ()
127- embeddings_length = embeddings_df .shape [0 ].compute ()
128-
129- if embeddings_length < self .n_clusters :
130- msg = (
131- "Number of clusters is greater than the number of documents in your dataset: "
132- f"dataset length is { embeddings_length } while n_clusters is set to { self .n_clusters } . "
133- f"Please reduce n_clusters to be less than or equal to { embeddings_length } ."
134- )
135- raise ValueError (msg )
136- except IndexError as e :
137- msg = (
138- f'Original error message: "{ e } ". '
139- "This could be due to empty partitions in your DocumentDataset. "
140- "Please check your dataset for empty partitions and remove them if necessary."
141- )
142- raise IndexError (msg ) from e
132+ # Optimize now to ensure consistent partition counts between embeddings_df and kmeans predictions
133+ # Without this, the partition counts would mismatch and cause assignment errors
134+ embeddings_df = embeddings_df .optimize ()
143135
144- embeddings_df = embeddings_df .to_backend ("cudf" )
145136 # Normalize embeddings before clustering
146137 embeddings_df = embeddings_df .map_partitions (
147138 normalize_embeddings_col_in_df ,
148139 embedding_col = self .embedding_column ,
149140 meta = embeddings_df ._meta .copy (), # noqa: SLF001
150141 )
142+
151143 cupy_normalized_darr = embeddings_df .map_partitions (
152144 get_array_from_df , self .embedding_column , meta = cp .ndarray ([1 , 1 ])
153145 )
154- cupy_normalized_darr .compute_chunk_sizes ()
146+ # We ideally would persist here in case embeddings_df is not persisted
147+ # However because of https://github.com/rapidsai/cudf/issues/18750 we run into an issue
148+ # cupy_normalized_darr = cupy_normalized_darr.persist() # noqa: ERA001
149+ try :
150+ cupy_normalized_darr .compute_chunk_sizes ()
151+ except Exception : # noqa: BLE001
152+ try :
153+ import dask
154+
155+ # For cudf 25.02 / 25.04 compute_chunk_sizes fails with a task fusion error
156+ # This is a workaround to disable task fusion
157+ with dask .config .set ({"optimization.fuse.active" : False }):
158+ cupy_normalized_darr .compute_chunk_sizes ()
159+ except Exception as inner_e :
160+ msg = (
161+ "Unable to compute chunk sizes for the embeddings array. "
162+ "Please raise an issue at https://github.com/NVIDIA/NeMo-Curator/issues"
163+ )
164+ raise RuntimeError (msg ) from inner_e
165+
166+ # TODO: Remove once https://github.com/rapidsai/cuml/issues/6643 is fixed
167+ if len (cupy_normalized_darr ) < self .n_clusters :
168+ msg = (
169+ f"Number of clusters is greater than the number of documents in your dataset: "
170+ f"dataset length is { len (cupy_normalized_darr )} while n_clusters is set to { self .n_clusters } . "
171+ f"Please reduce n_clusters to be less than or equal to { len (cupy_normalized_darr )} ."
172+ )
173+ raise ValueError (msg )
155174
156175 # Perform KMeans clustering (KMeans.fit)
157176 t0 = time .time ()
@@ -192,7 +211,7 @@ def __call__(self, embeddings_dataset: DocumentDataset) -> DocumentDataset: # n
192211 # Deleting kmeans triggers a future cancelled error in dask
193212 # See issue:https://github.com/NVIDIA/NeMo-Curator/issues/624
194213 # del kmeans
195- del centroids , cupy_normalized_darr
214+ del centroids
196215
197216 # Save embeddings by nearest center to a file
198217 clustering_output_dir = os .path .join (self .clustering_output_dir , "embs_by_nearest_center" )
@@ -206,6 +225,7 @@ def __call__(self, embeddings_dataset: DocumentDataset) -> DocumentDataset: # n
206225 partition_on = "nearest_cent" ,
207226 write_index = False ,
208227 )
228+
209229 self .logger .info (
210230 f"Time taken for assigning distance to each embedding: { time .time () - t0 } s"
211231 f" and output written at { clustering_output_dir } "
0 commit comments