diff --git a/tensorflow_datasets/core/dataset_builder.py b/tensorflow_datasets/core/dataset_builder.py index c4d0befd769..31da50869a9 100644 --- a/tensorflow_datasets/core/dataset_builder.py +++ b/tensorflow_datasets/core/dataset_builder.py @@ -778,17 +778,7 @@ def download_and_prepare( self.info.update_data_dir(self.data_dir) # Clean up incomplete files from preempted workers. - deleted_incomplete_files = [] - for f in self.data_path.glob(f"*{constants.INCOMPLETE_PREFIX}*"): - if utils.is_incomplete_file(f): - deleted_incomplete_files.append(os.fspath(f)) - f.unlink() - if deleted_incomplete_files: - logging.info( - "Deleted %d incomplete files. A small selection: %s", - len(deleted_incomplete_files), - "\n".join(deleted_incomplete_files[:3]), - ) + file_utils.clean_up_incomplete_files(self.data_path) self._log_download_done() diff --git a/tensorflow_datasets/core/utils/__init__.py b/tensorflow_datasets/core/utils/__init__.py index 0539048ed29..248041360f7 100644 --- a/tensorflow_datasets/core/utils/__init__.py +++ b/tensorflow_datasets/core/utils/__init__.py @@ -17,7 +17,11 @@ from tensorflow_datasets.core.units import Size from tensorflow_datasets.core.utils import docs +from tensorflow_datasets.core.utils.file_utils import atomic_write from tensorflow_datasets.core.utils.file_utils import incomplete_dir +from tensorflow_datasets.core.utils.file_utils import incomplete_file +from tensorflow_datasets.core.utils.file_utils import incomplete_files +from tensorflow_datasets.core.utils.file_utils import is_incomplete_file from tensorflow_datasets.core.utils.gcs_utils import gcs_path from tensorflow_datasets.core.utils.image_utils import apply_colormap from tensorflow_datasets.core.utils.image_utils import create_thumbnail @@ -29,7 +33,6 @@ from tensorflow_datasets.core.utils.image_utils import png_to_jpeg from tensorflow_datasets.core.utils.image_utils import THUMBNAIL_SIZE from tensorflow_datasets.core.utils.py_utils import add_sys_path -from tensorflow_datasets.core.utils.py_utils import atomic_write from tensorflow_datasets.core.utils.py_utils import build_synchronize_decorator from tensorflow_datasets.core.utils.py_utils import classproperty from tensorflow_datasets.core.utils.py_utils import dedent @@ -40,10 +43,7 @@ from tensorflow_datasets.core.utils.py_utils import get_class_path from tensorflow_datasets.core.utils.py_utils import get_class_url from tensorflow_datasets.core.utils.py_utils import has_sufficient_disk_space -from tensorflow_datasets.core.utils.py_utils import incomplete_file -from tensorflow_datasets.core.utils.py_utils import incomplete_files from tensorflow_datasets.core.utils.py_utils import indent -from tensorflow_datasets.core.utils.py_utils import is_incomplete_file from tensorflow_datasets.core.utils.py_utils import is_notebook from tensorflow_datasets.core.utils.py_utils import list_info_files from tensorflow_datasets.core.utils.py_utils import map_nested diff --git a/tensorflow_datasets/core/utils/file_utils.py b/tensorflow_datasets/core/utils/file_utils.py index cb57d5186e5..80c3c6762c7 100644 --- a/tensorflow_datasets/core/utils/file_utils.py +++ b/tensorflow_datasets/core/utils/file_utils.py @@ -27,6 +27,7 @@ import re import string import time +import uuid from absl import logging from etils import epath @@ -115,6 +116,94 @@ def _get_incomplete_dir(dir_name: str) -> str: return f'{dir_name.parent}/{constants.INCOMPLETE_PREFIX}{random_suffix}_{dir_name.name}/' +def _tmp_file_prefix() -> str: + return f'{constants.INCOMPLETE_PREFIX}{uuid.uuid4().hex}' + + +def _tmp_file_name( + path: epath.PathLike, + subfolder: str | None = None, +) -> epath.Path: + """Returns the temporary file name for the given path. + + Args: + path: The path to the file. + subfolder: The subfolder to use. If None, then the parent of the path will + be used. + """ + path = epath.Path(path) + file_name = f'{_tmp_file_prefix()}.{path.name}' + if subfolder: + return path.parent / subfolder / file_name + else: + return path.parent / file_name + + +@contextlib.contextmanager +def atomic_write(path: epath.PathLike, mode: str): + """Writes to path atomically, by writing to temp file and renaming it.""" + tmp_path = _tmp_file_name(path) + with tmp_path.open(mode=mode) as file_: + yield file_ + tmp_path.replace(path) + + +def is_incomplete_file(path: epath.Path) -> bool: + """Returns whether the given filename suggests that it's incomplete.""" + regex = rf'{re.escape(constants.INCOMPLETE_PREFIX)}[0-9a-fA-F]{{32}}\..+' + return bool(re.search(rf'^{regex}$', path.name)) + + +@contextlib.contextmanager +def incomplete_file( + path: epath.Path, + subfolder: str | None = None, +) -> Iterator[epath.Path]: + """Writes to path atomically, by writing to temp file and renaming it.""" + tmp_path = _tmp_file_name(path, subfolder=subfolder) + tmp_path.parent.mkdir(exist_ok=True) + try: + yield tmp_path + tmp_path.replace(path) + finally: + # Eventually delete the tmp_path if exception was raised + tmp_path.unlink(missing_ok=True) + + +@contextlib.contextmanager +def incomplete_files( + path: epath.Path, +) -> Iterator[epath.Path]: + """Writes to path atomically, by writing to temp file and renaming it.""" + tmp_file_prefix = _tmp_file_prefix() + tmp_path = path.parent / f'{tmp_file_prefix}.{path.name}' + try: + yield tmp_path + # Rename all tmp files to their final name. + for tmp_file in path.parent.glob(f'{tmp_file_prefix}.*'): + file_name = tmp_file.name.removeprefix(tmp_file_prefix + '.') + tmp_file.replace(path.parent / file_name) + finally: + # Eventually delete the tmp_path if exception was raised + for tmp_file in path.parent.glob(f'{tmp_file_prefix}.*'): + tmp_file.unlink(missing_ok=True) + + +def clean_up_incomplete_files(path: epath.Path) -> None: + """Deletes all incomplete files in the given path.""" + deleted_incomplete_files = [] + for f in path.glob(f'*{constants.INCOMPLETE_PREFIX}*'): + if is_incomplete_file(f): + deleted_incomplete_files.append(os.fspath(f)) + f.unlink() + if deleted_incomplete_files: + logging.info( + 'Deleted %d incomplete files. A small selection: %s', + len(deleted_incomplete_files), + '\n'.join(deleted_incomplete_files[:3]), + ) + + @contextlib.contextmanager def incomplete_dir( dirname: PathLike, diff --git a/tensorflow_datasets/core/utils/file_utils_test.py b/tensorflow_datasets/core/utils/file_utils_test.py index fe5e9c43b7e..85fd3ec2247 100644 --- a/tensorflow_datasets/core/utils/file_utils_test.py +++ b/tensorflow_datasets/core/utils/file_utils_test.py @@ -14,6 +14,7 @@ # limitations under the License. import os +import pathlib import time from unittest import mock @@ -37,6 +38,41 @@ def test_default_data_dir(): assert data_dir +@pytest.mark.parametrize( + ['path', 'subfolder', 'expected'], + [ + ('/a/file.ext', None, '/a/foobar.file.ext'), + ('/a/file.ext', 'sub', '/a/sub/foobar.file.ext'), + ], +) +def test_tmp_file_name(path, subfolder, expected): + with mock.patch.object(file_utils, '_tmp_file_prefix', return_value='foobar'): + assert os.fspath(file_utils._tmp_file_name(path, subfolder)) == expected + + +def test_incomplete_file(tmp_path: pathlib.Path): + tmp_path = epath.Path(tmp_path) + filepath = tmp_path / 'test.txt' + with file_utils.incomplete_file(filepath) as tmp_filepath: + tmp_filepath.write_text('content') + assert not filepath.exists() + assert filepath.read_text() == 'content' + assert not tmp_filepath.exists() # Tmp file is deleted + + +@pytest.mark.parametrize( + ['path', 'is_incomplete'], + [ + ('/a/incomplete.a8c53d7beff74b2eb31b9b86c7d046cf.bcd', True), + ('/a/incomplete-dataset.tfrecord-00000-of-00100', False), + ('/a/prefix.incomplete.a8c53d7beff74b2eb31b9b86c7d046cf', False), + ('/a/incomplete.a8c53d7beff74beb3.bcd', False), + ], +) +def test_is_incomplete_file(path: str, is_incomplete: bool): + assert file_utils.is_incomplete_file(epath.Path(path)) == is_incomplete + + def _create_dataset_dir( data_dir: epath.Path | None = None, version: str = _VERSION ) -> epath.Path: diff --git a/tensorflow_datasets/core/utils/py_utils.py b/tensorflow_datasets/core/utils/py_utils.py index c50ada43b21..18cb081e217 100644 --- a/tensorflow_datasets/core/utils/py_utils.py +++ b/tensorflow_datasets/core/utils/py_utils.py @@ -32,7 +32,6 @@ import threading import typing from typing import Any, Callable, NoReturn, Type, TypeVar -import uuid from absl import logging as absl_logging from etils import epath @@ -304,79 +303,6 @@ def nullcontext(enter_result: T = None) -> Iterator[T]: yield enter_result -def _tmp_file_prefix() -> str: - return f'{constants.INCOMPLETE_PREFIX}{uuid.uuid4().hex}' - - -def _tmp_file_name( - path: epath.PathLike, - subfolder: str | None = None, -) -> epath.Path: - """Returns the temporary file name for the given path. - - Args: - path: The path to the file. - subfolder: The subfolder to use. If None, then the parent of the path will - be used. - """ - path = epath.Path(path) - file_name = f'{_tmp_file_prefix()}.{path.name}' - if subfolder: - return path.parent / subfolder / file_name - else: - return path.parent / file_name - - -@contextlib.contextmanager -def incomplete_file( - path: epath.Path, - subfolder: str | None = None, -) -> Iterator[epath.Path]: - """Writes to path atomically, by writing to temp file and renaming it.""" - tmp_path = _tmp_file_name(path, subfolder=subfolder) - tmp_path.parent.mkdir(exist_ok=True) - try: - yield tmp_path - tmp_path.replace(path) - finally: - # Eventually delete the tmp_path if exception was raised - tmp_path.unlink(missing_ok=True) - - -@contextlib.contextmanager -def incomplete_files( - path: epath.Path, -) -> Iterator[epath.Path]: - """Writes to path atomically, by writing to temp file and renaming it.""" - tmp_file_prefix = _tmp_file_prefix() - tmp_path = path.parent / f'{tmp_file_prefix}.{path.name}' - try: - yield tmp_path - # Rename all tmp files to their final name. - for tmp_file in path.parent.glob(f'{tmp_file_prefix}.*'): - file_name = tmp_file.name.removeprefix(tmp_file_prefix + '.') - tmp_file.replace(path.parent / file_name) - finally: - # Eventually delete the tmp_path if exception was raised - for tmp_file in path.parent.glob(f'{tmp_file_prefix}.*'): - tmp_file.unlink(missing_ok=True) - - -def is_incomplete_file(path: epath.Path) -> bool: - """Returns whether the given filename suggests that it's incomplete.""" - regex = rf'{re.escape(constants.INCOMPLETE_PREFIX)}[0-9a-fA-F]{{32}}\..+' - return bool(re.search(rf'^{regex}$', path.name)) - - -@contextlib.contextmanager -def atomic_write(path: epath.PathLike, mode: str): - """Writes to path atomically, by writing to temp file and renaming it.""" - tmp_path = _tmp_file_name(path) - with tmp_path.open(mode=mode) as file_: - yield file_ - tmp_path.replace(path) - - def reraise( e: Exception, prefix: str | None = None, diff --git a/tensorflow_datasets/core/utils/py_utils_test.py b/tensorflow_datasets/core/utils/py_utils_test.py index 548bb4a43d2..01681e1d897 100644 --- a/tensorflow_datasets/core/utils/py_utils_test.py +++ b/tensorflow_datasets/core/utils/py_utils_test.py @@ -16,9 +16,7 @@ import collections import os import pathlib -from unittest import mock -from etils import epath import pytest import tensorflow as tf from tensorflow_datasets import testing @@ -337,29 +335,6 @@ def test_flatten_with_path(): ) -def test_incomplete_file(tmp_path: pathlib.Path): - tmp_path = epath.Path(tmp_path) - filepath = tmp_path / 'test.txt' - with py_utils.incomplete_file(filepath) as tmp_filepath: - tmp_filepath.write_text('content') - assert not filepath.exists() - assert filepath.read_text() == 'content' - assert not tmp_filepath.exists() # Tmp file is deleted - - -@pytest.mark.parametrize( - ['path', 'is_incomplete'], - [ - ('/a/incomplete.a8c53d7beff74b2eb31b9b86c7d046cf.bcd', True), - ('/a/incomplete-dataset.tfrecord-00000-of-00100', False), - ('/a/prefix.incomplete.a8c53d7beff74b2eb31b9b86c7d046cf', False), - ('/a/incomplete.a8c53d7beff74beb3.bcd', False), - ], -) -def test_is_incomplete_file(path: str, is_incomplete: bool): - assert py_utils.is_incomplete_file(epath.Path(path)) == is_incomplete - - @pytest.mark.parametrize( ['name', 'expected'], [ @@ -372,17 +347,5 @@ def test_make_valid_name(name: str, expected: str): assert py_utils.make_valid_name(name) == expected -@pytest.mark.parametrize( - ['path', 'subfolder', 'expected'], - [ - ('/a/file.ext', None, '/a/foobar.file.ext'), - ('/a/file.ext', 'sub', '/a/sub/foobar.file.ext'), - ], -) -def test_tmp_file_name(path, subfolder, expected): - with mock.patch.object(py_utils, '_tmp_file_prefix', return_value='foobar'): - assert os.fspath(py_utils._tmp_file_name(path, subfolder)) == expected - - if __name__ == '__main__': tf.test.main() diff --git a/tensorflow_datasets/scripts/cli/convert_format_utils.py b/tensorflow_datasets/scripts/cli/convert_format_utils.py index f04fab631b4..030cae0d8db 100644 --- a/tensorflow_datasets/scripts/cli/convert_format_utils.py +++ b/tensorflow_datasets/scripts/cli/convert_format_utils.py @@ -42,7 +42,6 @@ from tensorflow_datasets.core import splits as splits_lib from tensorflow_datasets.core.proto import dataset_info_pb2 from tensorflow_datasets.core.utils import file_utils - from tensorflow_datasets.core.utils import py_utils from tensorflow_datasets.core.utils import type_utils # pylint: enable=g-import-not-at-top @@ -131,7 +130,7 @@ def read_in() -> Iterator[type_utils.KeySerializedExample]: yield i, row.numpy() try: - with py_utils.incomplete_file(self.out_path) as tmp_file: + with file_utils.incomplete_file(self.out_path) as tmp_file: self.config.out_file_adapter.write_examples( path=tmp_file, iterator=read_in() ) @@ -476,7 +475,7 @@ def _convert_dataset( def _remove_incomplete_files(path: epath.Path) -> None: num_incomplete_files = 0 for incomplete_file in path.glob(f'*{constants.INCOMPLETE_PREFIX}*'): - if py_utils.is_incomplete_file(incomplete_file): + if file_utils.is_incomplete_file(incomplete_file): incomplete_file.unlink() num_incomplete_files += 1 logging.info('Removed %d incomplete files.', num_incomplete_files)