Skip to content

Commit 57f0e3c

Browse files
authored
Create check_dask_cwd function (#484)
* Create `check_dask_cwd` function Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add Praateek's suggestions Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> --------- Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>
1 parent 2a39616 commit 57f0e3c

File tree

1 file changed

+22
-0
lines changed

1 file changed

+22
-0
lines changed

nemo_curator/utils/distributed_utils.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import ast
1717
import os
1818
import shutil
19+
import subprocess
1920

2021
import dask
2122

@@ -565,6 +566,9 @@ def read_data(
565566
"""
566567
if isinstance(input_files, str):
567568
input_files = [input_files]
569+
570+
check_dask_cwd(input_files)
571+
568572
if file_type == "pickle":
569573
df = read_pandas_pickle(
570574
input_files[0], add_filename=add_filename, columns=columns, **kwargs
@@ -1013,6 +1017,24 @@ def get_current_client():
10131017
return None
10141018

10151019

1020+
def check_dask_cwd(file_list: List[str]):
1021+
if any(not os.path.isabs(file_path) for file_path in file_list):
1022+
dask_cwd_list = list(get_current_client().run(os.getcwd).values())
1023+
if len(set(dask_cwd_list)) <= 1:
1024+
dask_cwd = dask_cwd_list[0]
1025+
os_pwd = subprocess.check_output("pwd", shell=True, text=True).strip()
1026+
if dask_cwd != os_pwd:
1027+
raise RuntimeError(
1028+
"Mismatch between Dask client and worker working directories. "
1029+
"Use absolute file paths to ensure the correct files are read as intended."
1030+
)
1031+
else:
1032+
raise RuntimeError(
1033+
"Mismatch between at least 2 Dask workers' working directories. "
1034+
"Use absolute file paths to ensure the correct files are read as intended."
1035+
)
1036+
1037+
10161038
def performance_report_if(
10171039
path: Optional[str] = None, report_name: str = "dask-profile.html"
10181040
):

0 commit comments

Comments
 (0)