Skip to content

Commit 8745303

Browse files
rjzamoraayushdg
andauthored
use forward- and backward-compatible logic for importing dask_expr (#477)
Signed-off-by: rjzamora <rzamora217@gmail.com> Co-authored-by: Ayush Dattagupta <ayushdg95@gmail.com>
1 parent 371adf3 commit 8745303

File tree

2 files changed

+10
-5
lines changed

2 files changed

+10
-5
lines changed

nemo_curator/_compat.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ def query_planning_enabled():
6464
global _DASK_QUERY_PLANNING_ENABLED
6565

6666
if _DASK_QUERY_PLANNING_ENABLED is None:
67-
if _dask_version > parse_version("2024.6.0"):
67+
if _dask_version > parse_version("2024.12.1"):
68+
_DASK_QUERY_PLANNING_ENABLED = True
69+
elif _dask_version > parse_version("2024.6.0"):
6870
import dask.dataframe as dd
6971

7072
_DASK_QUERY_PLANNING_ENABLED = dd.DASK_EXPR_ENABLED

nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,15 @@ def rearange_by_column_direct(
6565
)
6666

6767
elif query_planning_enabled():
68-
from dask_expr._collection import new_collection
69-
from dask_expr._shuffle import RearrangeByColumn
68+
try:
69+
from dask.dataframe import dask_expr
70+
except ImportError:
71+
# TODO: Remove when pinned to dask>2024.12.1
72+
import dask_expr
7073

7174
# Use the internal dask-expr API
72-
return new_collection(
73-
RearrangeByColumn(
75+
return dask_expr.new_collection(
76+
dask_expr._shuffle.RearrangeByColumn(
7477
frame=df.expr,
7578
partitioning_index=col,
7679
npartitions_out=npartitions,

0 commit comments

Comments
 (0)