Skip to content

Commit 32d7840

Browse files
author
The TensorFlow Datasets Authors
committed
Add incomplete_files method for when multiple files with the same incomplete prefix are written
PiperOrigin-RevId: 653384142
1 parent cb012e6 commit 32d7840

File tree

3 files changed

+26
-5
lines changed

3 files changed

+26
-5
lines changed

tensorflow_datasets/core/utils/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
from tensorflow_datasets.core.utils.py_utils import get_class_url
4343
from tensorflow_datasets.core.utils.py_utils import has_sufficient_disk_space
4444
from tensorflow_datasets.core.utils.py_utils import incomplete_file
45+
from tensorflow_datasets.core.utils.py_utils import incomplete_files
4546
from tensorflow_datasets.core.utils.py_utils import indent
4647
from tensorflow_datasets.core.utils.py_utils import is_incomplete_file
4748
from tensorflow_datasets.core.utils.py_utils import is_notebook

tensorflow_datasets/core/utils/py_utils.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -310,12 +310,13 @@ def nullcontext(enter_result: T = None) -> Iterator[T]:
310310
yield enter_result
311311

312312

313+
def _tmp_file_prefix() -> str:
314+
return f'{constants.INCOMPLETE_PREFIX}{uuid.uuid4().hex}'
315+
316+
313317
def _tmp_file_name(path: epath.PathLike) -> epath.Path:
314318
path = epath.Path(path)
315-
return (
316-
path.parent
317-
/ f'{constants.INCOMPLETE_PREFIX}{uuid.uuid4().hex}.{path.name}'
318-
)
319+
return path.parent / f'{_tmp_file_prefix()}.{path.name}'
319320

320321

321322
@contextlib.contextmanager
@@ -332,6 +333,25 @@ def incomplete_file(
332333
tmp_path.unlink(missing_ok=True)
333334

334335

336+
@contextlib.contextmanager
337+
def incomplete_files(
338+
path: epath.Path,
339+
) -> Iterator[epath.Path]:
340+
"""Writes to path atomically, by writing to temp file and renaming it."""
341+
tmp_file_prefix = _tmp_file_prefix()
342+
tmp_path = path.parent / f'{tmp_file_prefix}.{path.name}'
343+
try:
344+
yield tmp_path
345+
# Rename all tmp files to their final name.
346+
for tmp_file in path.parent.glob(f'{tmp_file_prefix}.*'):
347+
file_name = tmp_file.name.removeprefix(tmp_file_prefix + '.')
348+
tmp_file.replace(path.parent / file_name)
349+
finally:
350+
# Eventually delete the tmp_path if exception was raised
351+
for tmp_file in path.parent.glob(f'{tmp_file_prefix}.*'):
352+
tmp_file.unlink(missing_ok=True)
353+
354+
335355
def is_incomplete_file(path: epath.Path) -> bool:
336356
"""Returns whether the given filename suggests that it's incomplete."""
337357
return bool(

tensorflow_datasets/core/writer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ def _write_final_shard(
483483
shard_path = self._filename_template.sharded_filepath(
484484
shard_index=shard_id, num_shards=len(non_empty_shard_ids)
485485
)
486-
with utils.incomplete_file(epath.Path(shard_path)) as tmp_path:
486+
with utils.incomplete_files(epath.Path(shard_path)) as tmp_path:
487487
logging.info(
488488
"Writing %d examples to %s.", len(example_by_key), os.fspath(tmp_path)
489489
)

0 commit comments

Comments
 (0)