From 7fe8057391551b91c20049521188062ee1a8785c Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Sat, 4 Oct 2025 04:28:59 +0200 Subject: [PATCH 01/17] TransformAndMapHuggingFaceDatasetJob --- datasets/huggingface.py | 102 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 100 insertions(+), 2 deletions(-) diff --git a/datasets/huggingface.py b/datasets/huggingface.py index f7eaba05..1b7fd398 100644 --- a/datasets/huggingface.py +++ b/datasets/huggingface.py @@ -2,12 +2,18 @@ https://huggingface.co/docs/datasets/ """ -from typing import Optional, Any, Union -from sisyphus import Job, Task, gs +from __future__ import annotations +from typing import TYPE_CHECKING, Optional, Any, Union, Callable, Sequence, Dict +from sisyphus import Job, Task, Path, gs from sisyphus.delayed_ops import DelayedBase from i6_core.util import instanciate_delayed +if TYPE_CHECKING: + from datasets import Dataset, DatasetDict + + TransformFuncT = Union[Callable[[DatasetDict], DatasetDict], Callable[[Dataset], Dataset]] + class DownloadAndPrepareHuggingFaceDatasetJob(Job): """ @@ -112,3 +118,95 @@ def hash(cls, kwargs): "token": kwargs["token"], } return super().hash(d) + + +class TransformAndMapHuggingFaceDatasetJob(Job): + """ + Runs some functions (e.g. filtering, mapping, renaming columns, ...) on a HF dataset. + + We do a map at the end, which supports to directly save to disk (via cache_file_name(s)), + without an additional save_to_disk + """ + + def __init__( + self, + path: Union[str, Path], + name: Optional[str] = None, + *, + load_dataset_opts: Optional[Dict[str, Any]] = None, # e.g. "split", "revision", ... + non_hashed_load_dataset_opts: Optional[Dict[str, Any]] = None, # e.g. {"num_proc": 8} + transform: Union[None, TransformFuncT, Sequence[TransformFuncT]] = None, + map_func: Optional[Callable] = None, + map_opts: Union[ + None, Dict[str, Any], Callable[[Dataset], Dict[str, Any]], Callable[[DatasetDict], Dict[str, Any]] + ] = None, + non_hashed_map_opts: Optional[Dict[str, Any]] = None, + ): + super().__init__() + + self.path = path + self.name = name + self.load_dataset_opts = load_dataset_opts + self.non_hashed_load_dataset_opts = non_hashed_load_dataset_opts + self.transform = transform + self.map_func = map_func + self.map_opts = map_opts + self.non_hashed_map_opts = non_hashed_map_opts + + self.rqmt = {"cpu": 2, "mem": 2, "time": 1} + + self.out_dir = self.output_path("dataset", directory=True) + + @classmethod + def hash(cls, kwargs): + kwargs.pop("non_hashed_load_dataset_opts") + kwargs.pop("non_hashed_map_opts") + return super().hash(kwargs) + + def tasks(self): + yield Task("run", resume="run", rqmt=self.rqmt) + + def run(self): + import os + from datasets import load_dataset, Dataset, DatasetDict + + assert os.environ.get("HF_HOME"), ( + "HF_HOME env var not set," + " set this in your settings.py DEFAULT_ENVIRONMENT_SET" + " (if not CLEANUP_ENVIRONMENT, otherwise in your current env)," + " or via job.set_env" + ) + + ds = load_dataset( + instanciate_delayed(self.path), + name=self.name, + **(instanciate_delayed(self.load_dataset_opts) or {}), + **(instanciate_delayed(self.non_hashed_load_dataset_opts) or {}), + ) + assert isinstance(ds, (Dataset, DatasetDict)) + + if self.transform: + if callable(self.transform): + ds = self.transform(ds) + assert isinstance(ds, (Dataset, DatasetDict)), f"After {self.transform} got {type(ds)}" + else: + for func in self.transform: + ds = func(ds) + assert isinstance(ds, (Dataset, DatasetDict)), f"After {func} got {type(ds)}" + + out_d = self.out_dir.get_path() + os.makedirs(out_d, exist_ok=True) + map_opts = self.map_opts + if callable(map_opts): + map_opts = map_opts(ds) + ds.map( + self.map_func, + **(map_opts or {}), + **(self.non_hashed_map_opts or {}), + **({"cache_file_name": f"{out_d}/data.arrow"} if isinstance(ds, Dataset) else {}), + **( + {"cache_file_names": {k: f"{out_d}/data-{k}.arrow" for k in ds.keys()}} + if isinstance(ds, DatasetDict) + else {} + ), + ) From f7bee40d1e083d1c10250077a0ba875d6a21b608 Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Sat, 4 Oct 2025 12:04:52 +0200 Subject: [PATCH 02/17] better sharding --- datasets/huggingface.py | 58 +++++++++++++++++++++++++++++++++++------ 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/datasets/huggingface.py b/datasets/huggingface.py index 1b7fd398..247fbec8 100644 --- a/datasets/huggingface.py +++ b/datasets/huggingface.py @@ -124,10 +124,14 @@ class TransformAndMapHuggingFaceDatasetJob(Job): """ Runs some functions (e.g. filtering, mapping, renaming columns, ...) on a HF dataset. - We do a map at the end, which supports to directly save to disk (via cache_file_name(s)), - without an additional save_to_disk + The map is handled with special logic, as this involves writing to disk. + We write to the work dir via cache_file_name(s). + Then we do a save_to_disk to the final output dir. + Then we clean up the work dir again. """ + __sis_version__ = 2 # TODO remove later... + def __init__( self, path: Union[str, Path], @@ -141,9 +145,14 @@ def __init__( None, Dict[str, Any], Callable[[Dataset], Dict[str, Any]], Callable[[DatasetDict], Dict[str, Any]] ] = None, non_hashed_map_opts: Optional[Dict[str, Any]] = None, + num_shards: Union[None, int, Dict[str, int]] = None, + max_shard_size: Union[None, str, int] = None, ): super().__init__() + if max_shard_size is not None and num_shards is not None: + raise ValueError(f"{self}: please specify either max_shard_size or num_shards, but not both.") + self.path = path self.name = name self.load_dataset_opts = load_dataset_opts @@ -152,8 +161,10 @@ def __init__( self.map_func = map_func self.map_opts = map_opts self.non_hashed_map_opts = non_hashed_map_opts + self.num_shards = num_shards + self.max_shard_size = max_shard_size - self.rqmt = {"cpu": 2, "mem": 2, "time": 1} + self.rqmt = {"cpu": 16, "mem": 16, "time": 12} self.out_dir = self.output_path("dataset", directory=True) @@ -168,7 +179,10 @@ def tasks(self): def run(self): import os + import shutil from datasets import load_dataset, Dataset, DatasetDict + from datasets.utils.py_utils import convert_file_size_to_int + from datasets import config assert os.environ.get("HF_HOME"), ( "HF_HOME env var not set," @@ -194,19 +208,47 @@ def run(self): ds = func(ds) assert isinstance(ds, (Dataset, DatasetDict)), f"After {func} got {type(ds)}" - out_d = self.out_dir.get_path() - os.makedirs(out_d, exist_ok=True) + work_out_d = "tmp-map-output" + if os.path.exists(work_out_d): + shutil.rmtree(work_out_d) + os.makedirs(work_out_d) map_opts = self.map_opts if callable(map_opts): map_opts = map_opts(ds) - ds.map( + map_extra_opts = {} + if self.non_hashed_map_opts and "num_proc" in self.non_hashed_load_dataset_opts: + num_proc = self.non_hashed_load_dataset_opts["num_proc"] + else: + num_proc = self.rqmt["cpu"] * 2 + map_extra_opts["num_proc"] = num_proc + ds = ds.map( self.map_func, **(map_opts or {}), **(self.non_hashed_map_opts or {}), - **({"cache_file_name": f"{out_d}/data.arrow"} if isinstance(ds, Dataset) else {}), + **({"cache_file_name": f"{work_out_d}/data.arrow"} if isinstance(ds, Dataset) else {}), **( - {"cache_file_names": {k: f"{out_d}/data-{k}.arrow" for k in ds.keys()}} + {"cache_file_names": {k: f"{work_out_d}/data-{k}.arrow" for k in ds.keys()}} if isinstance(ds, DatasetDict) else {} ), + **map_extra_opts, ) + + num_shards = self.num_shards + max_shard_size = self.max_shard_size or config.MAX_SHARD_SIZE + max_shard_size = convert_file_size_to_int(max_shard_size) + if num_shards is None: + # This code is adapted from Dataset.save_to_disk to determine the number of shards. + # We make this independent of num_proc (because num_proc is not hashed). + if isinstance(ds, DatasetDict): + # noinspection PyProtectedMember + num_shards = {k: int(ds_._estimate_nbytes() / max_shard_size) + 1 for k, ds_ in ds.items()} + elif isinstance(ds, Dataset): + # noinspection PyProtectedMember + num_shards = int(ds._estimate_nbytes() / max_shard_size) + 1 + else: + raise TypeError(f"Unexpected type: {type(ds)}") + + ds.save_to_disk(self.out_dir.get_path(), num_shards=num_shards, num_proc=num_proc) + del ds + shutil.rmtree(work_out_d) From 841b9e2ca92411e4467bb31d2f622871a8211956 Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Sat, 4 Oct 2025 12:25:12 +0200 Subject: [PATCH 03/17] small fix --- datasets/huggingface.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datasets/huggingface.py b/datasets/huggingface.py index 247fbec8..360894c3 100644 --- a/datasets/huggingface.py +++ b/datasets/huggingface.py @@ -216,8 +216,8 @@ def run(self): if callable(map_opts): map_opts = map_opts(ds) map_extra_opts = {} - if self.non_hashed_map_opts and "num_proc" in self.non_hashed_load_dataset_opts: - num_proc = self.non_hashed_load_dataset_opts["num_proc"] + if self.non_hashed_map_opts and "num_proc" in self.non_hashed_map_opts: + num_proc = self.non_hashed_map_opts["num_proc"] else: num_proc = self.rqmt["cpu"] * 2 map_extra_opts["num_proc"] = num_proc From 3d9b51f3a295437336eb578d426e596a2552aa70 Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Sun, 5 Oct 2025 16:48:24 +0200 Subject: [PATCH 04/17] ExtractTextFromHuggingFaceDatasetJob --- datasets/huggingface.py | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/datasets/huggingface.py b/datasets/huggingface.py index 360894c3..a5aca79f 100644 --- a/datasets/huggingface.py +++ b/datasets/huggingface.py @@ -252,3 +252,43 @@ def run(self): ds.save_to_disk(self.out_dir.get_path(), num_shards=num_shards, num_proc=num_proc) del ds shutil.rmtree(work_out_d) + + +class ExtractTextFromHuggingFaceDatasetJob(Job): + """ + Extract a text column from a HF dataset and write it to a gzipped text file. + """ + + def __init__( + self, + path: Union[str, Path], + name: Optional[str] = None, + *, + split: Optional[str] = "train", + column_name: str = "text", + ): + super().__init__() + self.path = path + self.name = name + self.split = split + self.column_name = column_name + + self.rqmt = {"cpu": 4, "mem": 8, "time": 2} + + self.out_text = self.output_path("text.txt.gz") + + def tasks(self): + yield Task("run", resume="run", rqmt=self.rqmt) + + def run(self): + import gzip + from datasets import load_dataset, Dataset + + ds = load_dataset(self.path, self.name, split=self.split) + assert isinstance(ds, Dataset), f"Expected a Dataset, got {type(ds)} {ds}" + assert self.column_name in ds.column_names, f"Column name {self.column_name} not in columns {ds.column_names}" + + with gzip.open(self.out_text.get_path(), "wt", encoding="utf-8") as f: + for item in ds: + f.write(item[self.column_name]) + f.write("\n") From 17507326c38c2d959896b8adef5e35d6d0e7ea41 Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Sun, 5 Oct 2025 16:50:03 +0200 Subject: [PATCH 05/17] more --- datasets/huggingface.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datasets/huggingface.py b/datasets/huggingface.py index a5aca79f..f4867ae0 100644 --- a/datasets/huggingface.py +++ b/datasets/huggingface.py @@ -273,7 +273,7 @@ def __init__( self.split = split self.column_name = column_name - self.rqmt = {"cpu": 4, "mem": 8, "time": 2} + self.rqmt = {"cpu": 4, "mem": 8, "time": 10} self.out_text = self.output_path("text.txt.gz") From d59b448b9ff6c4f0260e2cf2c49d7924a5dafe91 Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Sun, 5 Oct 2025 16:53:15 +0200 Subject: [PATCH 06/17] cleanup --- datasets/huggingface.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/datasets/huggingface.py b/datasets/huggingface.py index f4867ae0..1e9763cc 100644 --- a/datasets/huggingface.py +++ b/datasets/huggingface.py @@ -130,8 +130,6 @@ class TransformAndMapHuggingFaceDatasetJob(Job): Then we clean up the work dir again. """ - __sis_version__ = 2 # TODO remove later... - def __init__( self, path: Union[str, Path], From 0a1482bde9f00c8e3c0cfcd93050e19fe6218ffb Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Sun, 5 Oct 2025 22:00:57 +0200 Subject: [PATCH 07/17] more --- datasets/huggingface.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/datasets/huggingface.py b/datasets/huggingface.py index 1e9763cc..8b1e25e5 100644 --- a/datasets/huggingface.py +++ b/datasets/huggingface.py @@ -280,13 +280,22 @@ def tasks(self): def run(self): import gzip + import time from datasets import load_dataset, Dataset ds = load_dataset(self.path, self.name, split=self.split) assert isinstance(ds, Dataset), f"Expected a Dataset, got {type(ds)} {ds}" assert self.column_name in ds.column_names, f"Column name {self.column_name} not in columns {ds.column_names}" + size = ds.num_rows + start_time = time.monotonic() with gzip.open(self.out_text.get_path(), "wt", encoding="utf-8") as f: - for item in ds: + for i, item in enumerate(ds): + if (i + 1) % 10000 == 0 or i + 1 == size: + elapsed = time.monotonic() - start_time + speed = (i + 1) / elapsed if elapsed > 0 else 0 + eta = (size - (i + 1)) / speed if speed > 0 else float("inf") + eta_str = time.strftime("%H:%M:%S", time.gmtime(eta)) if eta != float("inf") else "inf" + print(f"Line {i + 1}/{size}, {((i + 1) / size * 100):.1f}%, {speed:.1f} it/s, ETA {eta_str}") f.write(item[self.column_name]) f.write("\n") From 75425a2172f5bdad7c7762726f03296c83810f4d Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Sun, 5 Oct 2025 22:06:45 +0200 Subject: [PATCH 08/17] better --- datasets/huggingface.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datasets/huggingface.py b/datasets/huggingface.py index 8b1e25e5..22ac4c27 100644 --- a/datasets/huggingface.py +++ b/datasets/huggingface.py @@ -291,7 +291,7 @@ def run(self): start_time = time.monotonic() with gzip.open(self.out_text.get_path(), "wt", encoding="utf-8") as f: for i, item in enumerate(ds): - if (i + 1) % 10000 == 0 or i + 1 == size: + if (i + 1) % 1000 == 0 or i + 1 == size: elapsed = time.monotonic() - start_time speed = (i + 1) / elapsed if elapsed > 0 else 0 eta = (size - (i + 1)) / speed if speed > 0 else float("inf") From db4f56078976cd43b43d8d896e4f52ed839db20a Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Sun, 5 Oct 2025 22:13:05 +0200 Subject: [PATCH 09/17] better --- datasets/huggingface.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/datasets/huggingface.py b/datasets/huggingface.py index 22ac4c27..78953941 100644 --- a/datasets/huggingface.py +++ b/datasets/huggingface.py @@ -279,6 +279,7 @@ def tasks(self): yield Task("run", resume="run", rqmt=self.rqmt) def run(self): + import sys import gzip import time from datasets import load_dataset, Dataset @@ -287,15 +288,21 @@ def run(self): assert isinstance(ds, Dataset), f"Expected a Dataset, got {type(ds)} {ds}" assert self.column_name in ds.column_names, f"Column name {self.column_name} not in columns {ds.column_names}" + def _hms(s): + m, s = divmod(s, 60) + h, m = divmod(m, 60) + return "%d:%02d:%02d" % (h, m, s) + size = ds.num_rows start_time = time.monotonic() with gzip.open(self.out_text.get_path(), "wt", encoding="utf-8") as f: for i, item in enumerate(ds): - if (i + 1) % 1000 == 0 or i + 1 == size: + if (i + 1) % 10000 == 0 or i + 1 == size: elapsed = time.monotonic() - start_time speed = (i + 1) / elapsed if elapsed > 0 else 0 eta = (size - (i + 1)) / speed if speed > 0 else float("inf") - eta_str = time.strftime("%H:%M:%S", time.gmtime(eta)) if eta != float("inf") else "inf" + eta_str = _hms(eta) if eta != float("inf") else "inf" print(f"Line {i + 1}/{size}, {((i + 1) / size * 100):.1f}%, {speed:.1f} it/s, ETA {eta_str}") + sys.stdout.flush() f.write(item[self.column_name]) f.write("\n") From 211cd493f16854cf76830d2e806ccddb3aa1be5c Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Tue, 7 Oct 2025 13:47:23 +0200 Subject: [PATCH 10/17] support loading existing dataset via load_from_disk --- datasets/huggingface.py | 40 +++++++++++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/datasets/huggingface.py b/datasets/huggingface.py index 78953941..9127094e 100644 --- a/datasets/huggingface.py +++ b/datasets/huggingface.py @@ -189,13 +189,39 @@ def run(self): " or via job.set_env" ) - ds = load_dataset( - instanciate_delayed(self.path), - name=self.name, - **(instanciate_delayed(self.load_dataset_opts) or {}), - **(instanciate_delayed(self.non_hashed_load_dataset_opts) or {}), - ) - assert isinstance(ds, (Dataset, DatasetDict)) + dataset_path = instanciate_delayed(self.path) + path_ext = f"{dataset_path}/{self.name}" if self.name is not None else dataset_path + ds = None + if os.path.exists(path_ext): + if os.path.isdir(path_ext): + if os.path.exists(f"{path_ext}/{config.DATASET_INFO_FILENAME}") and os.path.exists( + f"{path_ext}/{config.DATASET_STATE_JSON_FILENAME}" + ): + ds = Dataset.load_from_disk( + path_ext, + **(instanciate_delayed(self.load_dataset_opts) or {}), + **(instanciate_delayed(self.non_hashed_load_dataset_opts) or {}), + ) + elif os.path.exists(f"{path_ext}/{config.DATASETDICT_JSON_FILENAME}"): + ds = DatasetDict.load_from_disk( + path_ext, + **(instanciate_delayed(self.load_dataset_opts) or {}), + **(instanciate_delayed(self.non_hashed_load_dataset_opts) or {}), + ) + elif path_ext.endswith(".arrow"): + ds = Dataset.from_file( + path_ext, + **(instanciate_delayed(self.load_dataset_opts) or {}), + **(instanciate_delayed(self.non_hashed_load_dataset_opts) or {}), + ) + if ds is None: + ds = load_dataset( + dataset_path, + **({"name": self.name} if self.name is not None else {}), + **(instanciate_delayed(self.load_dataset_opts) or {}), + **(instanciate_delayed(self.non_hashed_load_dataset_opts) or {}), + ) + assert isinstance(ds, (Dataset, DatasetDict)) if self.transform: if callable(self.transform): From 1e50b2a783c8ab34484a6ea37e4d3cec415d2771 Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Tue, 7 Oct 2025 14:20:35 +0200 Subject: [PATCH 11/17] support loading existing dataset via load_from_disk fix --- datasets/huggingface.py | 37 +++++++++++++++---------------------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/datasets/huggingface.py b/datasets/huggingface.py index 9127094e..9e82e45d 100644 --- a/datasets/huggingface.py +++ b/datasets/huggingface.py @@ -190,37 +190,30 @@ def run(self): ) dataset_path = instanciate_delayed(self.path) - path_ext = f"{dataset_path}/{self.name}" if self.name is not None else dataset_path + split = None + load_dataset_opts = (instanciate_delayed(self.load_dataset_opts) or {}).copy() + load_dataset_opts.update(instanciate_delayed(self.non_hashed_load_dataset_opts) or {}) + if self.name is not None: + load_dataset_opts["name"] = self.name + if "split" in load_dataset_opts: + split = load_dataset_opts["split"] + path_ext = f"{dataset_path}/{split}" if split is not None else dataset_path ds = None if os.path.exists(path_ext): if os.path.isdir(path_ext): if os.path.exists(f"{path_ext}/{config.DATASET_INFO_FILENAME}") and os.path.exists( f"{path_ext}/{config.DATASET_STATE_JSON_FILENAME}" ): - ds = Dataset.load_from_disk( - path_ext, - **(instanciate_delayed(self.load_dataset_opts) or {}), - **(instanciate_delayed(self.non_hashed_load_dataset_opts) or {}), - ) + load_dataset_opts.pop("split", None) + ds = Dataset.load_from_disk(path_ext, **load_dataset_opts) elif os.path.exists(f"{path_ext}/{config.DATASETDICT_JSON_FILENAME}"): - ds = DatasetDict.load_from_disk( - path_ext, - **(instanciate_delayed(self.load_dataset_opts) or {}), - **(instanciate_delayed(self.non_hashed_load_dataset_opts) or {}), - ) + load_dataset_opts.pop("split", None) + ds = DatasetDict.load_from_disk(path_ext, **load_dataset_opts) elif path_ext.endswith(".arrow"): - ds = Dataset.from_file( - path_ext, - **(instanciate_delayed(self.load_dataset_opts) or {}), - **(instanciate_delayed(self.non_hashed_load_dataset_opts) or {}), - ) + load_dataset_opts.pop("split", None) + ds = Dataset.from_file(path_ext, **load_dataset_opts) if ds is None: - ds = load_dataset( - dataset_path, - **({"name": self.name} if self.name is not None else {}), - **(instanciate_delayed(self.load_dataset_opts) or {}), - **(instanciate_delayed(self.non_hashed_load_dataset_opts) or {}), - ) + ds = load_dataset(dataset_path, **load_dataset_opts) assert isinstance(ds, (Dataset, DatasetDict)) if self.transform: From b846858e5ab60d917c3dc8c04984314e08105b71 Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Tue, 7 Oct 2025 14:50:08 +0200 Subject: [PATCH 12/17] more --- datasets/huggingface.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/datasets/huggingface.py b/datasets/huggingface.py index 9e82e45d..e5a946ac 100644 --- a/datasets/huggingface.py +++ b/datasets/huggingface.py @@ -182,13 +182,6 @@ def run(self): from datasets.utils.py_utils import convert_file_size_to_int from datasets import config - assert os.environ.get("HF_HOME"), ( - "HF_HOME env var not set," - " set this in your settings.py DEFAULT_ENVIRONMENT_SET" - " (if not CLEANUP_ENVIRONMENT, otherwise in your current env)," - " or via job.set_env" - ) - dataset_path = instanciate_delayed(self.path) split = None load_dataset_opts = (instanciate_delayed(self.load_dataset_opts) or {}).copy() @@ -212,7 +205,17 @@ def run(self): elif path_ext.endswith(".arrow"): load_dataset_opts.pop("split", None) ds = Dataset.from_file(path_ext, **load_dataset_opts) + if ds is None: + # Use load_dataset. + # That can potentially download the dataset, so make sure that HF_HOME is set. + assert os.environ.get("HF_HOME"), ( + "HF_HOME env var not set," + " set this in your settings.py DEFAULT_ENVIRONMENT_SET" + " (if not CLEANUP_ENVIRONMENT, otherwise in your current env)," + " or via job.set_env" + ) + ds = load_dataset(dataset_path, **load_dataset_opts) assert isinstance(ds, (Dataset, DatasetDict)) From 927091a319c3ba2c721cf7d7ffc722439d0d0057 Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Tue, 7 Oct 2025 15:15:30 +0200 Subject: [PATCH 13/17] better --- datasets/huggingface.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/datasets/huggingface.py b/datasets/huggingface.py index e5a946ac..7df7846e 100644 --- a/datasets/huggingface.py +++ b/datasets/huggingface.py @@ -241,18 +241,19 @@ def run(self): else: num_proc = self.rqmt["cpu"] * 2 map_extra_opts["num_proc"] = num_proc - ds = ds.map( - self.map_func, - **(map_opts or {}), - **(self.non_hashed_map_opts or {}), - **({"cache_file_name": f"{work_out_d}/data.arrow"} if isinstance(ds, Dataset) else {}), - **( - {"cache_file_names": {k: f"{work_out_d}/data-{k}.arrow" for k in ds.keys()}} - if isinstance(ds, DatasetDict) - else {} - ), - **map_extra_opts, - ) + if self.map_func: + ds = ds.map( + self.map_func, + **(map_opts or {}), + **(self.non_hashed_map_opts or {}), + **({"cache_file_name": f"{work_out_d}/data.arrow"} if isinstance(ds, Dataset) else {}), + **( + {"cache_file_names": {k: f"{work_out_d}/data-{k}.arrow" for k in ds.keys()}} + if isinstance(ds, DatasetDict) + else {} + ), + **map_extra_opts, + ) num_shards = self.num_shards max_shard_size = self.max_shard_size or config.MAX_SHARD_SIZE From 0f2bde430c6122fa637ba07dc0bd4a8111428913 Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Mon, 13 Oct 2025 17:01:37 +0200 Subject: [PATCH 14/17] max_shard_size fixed default --- datasets/huggingface.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datasets/huggingface.py b/datasets/huggingface.py index 7df7846e..8385ecdc 100644 --- a/datasets/huggingface.py +++ b/datasets/huggingface.py @@ -256,7 +256,7 @@ def run(self): ) num_shards = self.num_shards - max_shard_size = self.max_shard_size or config.MAX_SHARD_SIZE + max_shard_size = self.max_shard_size or "500MB" max_shard_size = convert_file_size_to_int(max_shard_size) if num_shards is None: # This code is adapted from Dataset.save_to_disk to determine the number of shards. From cafaf2f229b7f425b832c1a90ad695662226e528 Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Mon, 13 Oct 2025 17:01:45 +0200 Subject: [PATCH 15/17] doc --- datasets/huggingface.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/datasets/huggingface.py b/datasets/huggingface.py index 8385ecdc..74f0b95d 100644 --- a/datasets/huggingface.py +++ b/datasets/huggingface.py @@ -146,6 +146,32 @@ def __init__( num_shards: Union[None, int, Dict[str, int]] = None, max_shard_size: Union[None, str, int] = None, ): + """ + :param path: for :func:`datasets.load_dataset`, + :func:`datasets.Dataset.load_from_disk` or :func:`datasets.DatasetDict.load_from_disk`. + We automatically detect which one to use. + :param name: for :func:`datasets.load_dataset` + :param load_dataset_opts: other options for :func:`datasets.load_dataset` + or :func:`datasets.Dataset.load_from_disk` or :func:`datasets.DatasetDict.load_from_disk`. + E.g. "split", "revision", ... + :param non_hashed_load_dataset_opts: like ``load_dataset_opts``, but not hashed. + E.g. ``{"num_proc": 8}``. + :param transform: function or list of functions to transform the dataset + ((Dataset) -> Dataset or (DatasetDict) -> DatasetDict). + E.g. filtering, renaming columns, ... + :param map_func: function to map the dataset examples, or batch of examples. + This is passed to :func:`datasets.Dataset.map` or :func:`datasets.DatasetDict.map`. + None (default) means identity. + :param map_opts: further options passed :func:`datasets.Dataset.map` or :func:`datasets.DatasetDict.map`, + or a function that returns such options (e.g. depending on the dataset size). + E.g. ``{"batched": True, "batch_size": 1000}``. + :param non_hashed_map_opts: like ``map_opts``, but not hashed. + :param num_shards: how many shards to write via :func:`datasets.Dataset.save_to_disk` + or :func:`datasets.DatasetDict.save_to_disk`. + If not given, will be auto-detected based on the dataset size and ``max_shard_size``. + :param max_shard_size: maximum size of each shard. + If not given, will use ``"500MB"``. + """ super().__init__() if max_shard_size is not None and num_shards is not None: From 5f88c5438e0244499d2c090d0975aacce0274fb2 Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Mon, 13 Oct 2025 17:02:31 +0200 Subject: [PATCH 16/17] doc --- datasets/huggingface.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/datasets/huggingface.py b/datasets/huggingface.py index 74f0b95d..19fd8710 100644 --- a/datasets/huggingface.py +++ b/datasets/huggingface.py @@ -314,6 +314,12 @@ def __init__( split: Optional[str] = "train", column_name: str = "text", ): + """ + :param path: for :func:`datasets.load_dataset` + :param name: for :func:`datasets.load_dataset` + :param split: for :func:`datasets.load_dataset` + :param column_name: name of the text column to extract + """ super().__init__() self.path = path self.name = name From 0c6ea2bbf58a8d08a159fbce9273c5bd2c09dd8b Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Mon, 20 Oct 2025 12:10:38 +0200 Subject: [PATCH 17/17] comment on tmp dir --- datasets/huggingface.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datasets/huggingface.py b/datasets/huggingface.py index 19fd8710..5cf428bd 100644 --- a/datasets/huggingface.py +++ b/datasets/huggingface.py @@ -254,6 +254,9 @@ def run(self): ds = func(ds) assert isinstance(ds, (Dataset, DatasetDict)), f"After {func} got {type(ds)}" + # We create this tmp dir inside the job work dir, + # because this might need a lot of space, e.g. several TB, e.g. 2TB for Loquacious, + # which is often more than what we have available on the local disk (/var/tmp or so). work_out_d = "tmp-map-output" if os.path.exists(work_out_d): shutil.rmtree(work_out_d)