Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions tensorflow_datasets/core/dataset_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -778,17 +778,7 @@ def download_and_prepare(
self.info.update_data_dir(self.data_dir)

# Clean up incomplete files from preempted workers.
deleted_incomplete_files = []
for f in self.data_path.glob(f"*{constants.INCOMPLETE_PREFIX}*"):
if utils.is_incomplete_file(f):
deleted_incomplete_files.append(os.fspath(f))
f.unlink()
if deleted_incomplete_files:
logging.info(
"Deleted %d incomplete files. A small selection: %s",
len(deleted_incomplete_files),
"\n".join(deleted_incomplete_files[:3]),
)
file_utils.clean_up_incomplete_files(self.data_path)

self._log_download_done()

Expand Down
8 changes: 4 additions & 4 deletions tensorflow_datasets/core/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

from tensorflow_datasets.core.units import Size
from tensorflow_datasets.core.utils import docs
from tensorflow_datasets.core.utils.file_utils import atomic_write
from tensorflow_datasets.core.utils.file_utils import incomplete_dir
from tensorflow_datasets.core.utils.file_utils import incomplete_file
from tensorflow_datasets.core.utils.file_utils import incomplete_files
from tensorflow_datasets.core.utils.file_utils import is_incomplete_file
from tensorflow_datasets.core.utils.gcs_utils import gcs_path
from tensorflow_datasets.core.utils.image_utils import apply_colormap
from tensorflow_datasets.core.utils.image_utils import create_thumbnail
Expand All @@ -29,7 +33,6 @@
from tensorflow_datasets.core.utils.image_utils import png_to_jpeg
from tensorflow_datasets.core.utils.image_utils import THUMBNAIL_SIZE
from tensorflow_datasets.core.utils.py_utils import add_sys_path
from tensorflow_datasets.core.utils.py_utils import atomic_write
from tensorflow_datasets.core.utils.py_utils import build_synchronize_decorator
from tensorflow_datasets.core.utils.py_utils import classproperty
from tensorflow_datasets.core.utils.py_utils import dedent
Expand All @@ -40,10 +43,7 @@
from tensorflow_datasets.core.utils.py_utils import get_class_path
from tensorflow_datasets.core.utils.py_utils import get_class_url
from tensorflow_datasets.core.utils.py_utils import has_sufficient_disk_space
from tensorflow_datasets.core.utils.py_utils import incomplete_file
from tensorflow_datasets.core.utils.py_utils import incomplete_files
from tensorflow_datasets.core.utils.py_utils import indent
from tensorflow_datasets.core.utils.py_utils import is_incomplete_file
from tensorflow_datasets.core.utils.py_utils import is_notebook
from tensorflow_datasets.core.utils.py_utils import list_info_files
from tensorflow_datasets.core.utils.py_utils import map_nested
Expand Down
89 changes: 89 additions & 0 deletions tensorflow_datasets/core/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import re
import string
import time
import uuid

from absl import logging
from etils import epath
Expand Down Expand Up @@ -115,6 +116,94 @@ def _get_incomplete_dir(dir_name: str) -> str:
return f'{dir_name.parent}/{constants.INCOMPLETE_PREFIX}{random_suffix}_{dir_name.name}/'


def _tmp_file_prefix() -> str:
return f'{constants.INCOMPLETE_PREFIX}{uuid.uuid4().hex}'


def _tmp_file_name(
path: epath.PathLike,
subfolder: str | None = None,
) -> epath.Path:
"""Returns the temporary file name for the given path.

Args:
path: The path to the file.
subfolder: The subfolder to use. If None, then the parent of the path will
be used.
"""
path = epath.Path(path)
file_name = f'{_tmp_file_prefix()}.{path.name}'
if subfolder:
return path.parent / subfolder / file_name
else:
return path.parent / file_name


@contextlib.contextmanager
def atomic_write(path: epath.PathLike, mode: str):
"""Writes to path atomically, by writing to temp file and renaming it."""
tmp_path = _tmp_file_name(path)
with tmp_path.open(mode=mode) as file_:
yield file_
tmp_path.replace(path)


def is_incomplete_file(path: epath.Path) -> bool:
"""Returns whether the given filename suggests that it's incomplete."""
regex = rf'{re.escape(constants.INCOMPLETE_PREFIX)}[0-9a-fA-F]{{32}}\..+'
return bool(re.search(rf'^{regex}$', path.name))


@contextlib.contextmanager
def incomplete_file(
path: epath.Path,
subfolder: str | None = None,
) -> Iterator[epath.Path]:
"""Writes to path atomically, by writing to temp file and renaming it."""
tmp_path = _tmp_file_name(path, subfolder=subfolder)
tmp_path.parent.mkdir(exist_ok=True)
try:
yield tmp_path
tmp_path.replace(path)
finally:
# Eventually delete the tmp_path if exception was raised
tmp_path.unlink(missing_ok=True)


@contextlib.contextmanager
def incomplete_files(
path: epath.Path,
) -> Iterator[epath.Path]:
"""Writes to path atomically, by writing to temp file and renaming it."""
tmp_file_prefix = _tmp_file_prefix()
tmp_path = path.parent / f'{tmp_file_prefix}.{path.name}'
try:
yield tmp_path
# Rename all tmp files to their final name.
for tmp_file in path.parent.glob(f'{tmp_file_prefix}.*'):
file_name = tmp_file.name.removeprefix(tmp_file_prefix + '.')
tmp_file.replace(path.parent / file_name)
finally:
# Eventually delete the tmp_path if exception was raised
for tmp_file in path.parent.glob(f'{tmp_file_prefix}.*'):
tmp_file.unlink(missing_ok=True)


def clean_up_incomplete_files(path: epath.Path) -> None:
"""Deletes all incomplete files in the given path."""
deleted_incomplete_files = []
for f in path.glob(f'*{constants.INCOMPLETE_PREFIX}*'):
if is_incomplete_file(f):
deleted_incomplete_files.append(os.fspath(f))
f.unlink()
if deleted_incomplete_files:
logging.info(
'Deleted %d incomplete files. A small selection: %s',
len(deleted_incomplete_files),
'\n'.join(deleted_incomplete_files[:3]),
)


@contextlib.contextmanager
def incomplete_dir(
dirname: PathLike,
Expand Down
36 changes: 36 additions & 0 deletions tensorflow_datasets/core/utils/file_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

import os
import pathlib
import time
from unittest import mock

Expand All @@ -37,6 +38,41 @@ def test_default_data_dir():
assert data_dir


@pytest.mark.parametrize(
['path', 'subfolder', 'expected'],
[
('/a/file.ext', None, '/a/foobar.file.ext'),
('/a/file.ext', 'sub', '/a/sub/foobar.file.ext'),
],
)
def test_tmp_file_name(path, subfolder, expected):
with mock.patch.object(file_utils, '_tmp_file_prefix', return_value='foobar'):
assert os.fspath(file_utils._tmp_file_name(path, subfolder)) == expected


def test_incomplete_file(tmp_path: pathlib.Path):
tmp_path = epath.Path(tmp_path)
filepath = tmp_path / 'test.txt'
with file_utils.incomplete_file(filepath) as tmp_filepath:
tmp_filepath.write_text('content')
assert not filepath.exists()
assert filepath.read_text() == 'content'
assert not tmp_filepath.exists() # Tmp file is deleted


@pytest.mark.parametrize(
['path', 'is_incomplete'],
[
('/a/incomplete.a8c53d7beff74b2eb31b9b86c7d046cf.bcd', True),
('/a/incomplete-dataset.tfrecord-00000-of-00100', False),
('/a/prefix.incomplete.a8c53d7beff74b2eb31b9b86c7d046cf', False),
('/a/incomplete.a8c53d7beff74beb3.bcd', False),
],
)
def test_is_incomplete_file(path: str, is_incomplete: bool):
assert file_utils.is_incomplete_file(epath.Path(path)) == is_incomplete


def _create_dataset_dir(
data_dir: epath.Path | None = None, version: str = _VERSION
) -> epath.Path:
Expand Down
74 changes: 0 additions & 74 deletions tensorflow_datasets/core/utils/py_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import threading
import typing
from typing import Any, Callable, NoReturn, Type, TypeVar
import uuid

from absl import logging as absl_logging
from etils import epath
Expand Down Expand Up @@ -304,79 +303,6 @@ def nullcontext(enter_result: T = None) -> Iterator[T]:
yield enter_result


def _tmp_file_prefix() -> str:
return f'{constants.INCOMPLETE_PREFIX}{uuid.uuid4().hex}'


def _tmp_file_name(
path: epath.PathLike,
subfolder: str | None = None,
) -> epath.Path:
"""Returns the temporary file name for the given path.

Args:
path: The path to the file.
subfolder: The subfolder to use. If None, then the parent of the path will
be used.
"""
path = epath.Path(path)
file_name = f'{_tmp_file_prefix()}.{path.name}'
if subfolder:
return path.parent / subfolder / file_name
else:
return path.parent / file_name


@contextlib.contextmanager
def incomplete_file(
path: epath.Path,
subfolder: str | None = None,
) -> Iterator[epath.Path]:
"""Writes to path atomically, by writing to temp file and renaming it."""
tmp_path = _tmp_file_name(path, subfolder=subfolder)
tmp_path.parent.mkdir(exist_ok=True)
try:
yield tmp_path
tmp_path.replace(path)
finally:
# Eventually delete the tmp_path if exception was raised
tmp_path.unlink(missing_ok=True)


@contextlib.contextmanager
def incomplete_files(
path: epath.Path,
) -> Iterator[epath.Path]:
"""Writes to path atomically, by writing to temp file and renaming it."""
tmp_file_prefix = _tmp_file_prefix()
tmp_path = path.parent / f'{tmp_file_prefix}.{path.name}'
try:
yield tmp_path
# Rename all tmp files to their final name.
for tmp_file in path.parent.glob(f'{tmp_file_prefix}.*'):
file_name = tmp_file.name.removeprefix(tmp_file_prefix + '.')
tmp_file.replace(path.parent / file_name)
finally:
# Eventually delete the tmp_path if exception was raised
for tmp_file in path.parent.glob(f'{tmp_file_prefix}.*'):
tmp_file.unlink(missing_ok=True)


def is_incomplete_file(path: epath.Path) -> bool:
"""Returns whether the given filename suggests that it's incomplete."""
regex = rf'{re.escape(constants.INCOMPLETE_PREFIX)}[0-9a-fA-F]{{32}}\..+'
return bool(re.search(rf'^{regex}$', path.name))


@contextlib.contextmanager
def atomic_write(path: epath.PathLike, mode: str):
"""Writes to path atomically, by writing to temp file and renaming it."""
tmp_path = _tmp_file_name(path)
with tmp_path.open(mode=mode) as file_:
yield file_
tmp_path.replace(path)


def reraise(
e: Exception,
prefix: str | None = None,
Expand Down
37 changes: 0 additions & 37 deletions tensorflow_datasets/core/utils/py_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
import collections
import os
import pathlib
from unittest import mock

from etils import epath
import pytest
import tensorflow as tf
from tensorflow_datasets import testing
Expand Down Expand Up @@ -337,29 +335,6 @@ def test_flatten_with_path():
)


def test_incomplete_file(tmp_path: pathlib.Path):
tmp_path = epath.Path(tmp_path)
filepath = tmp_path / 'test.txt'
with py_utils.incomplete_file(filepath) as tmp_filepath:
tmp_filepath.write_text('content')
assert not filepath.exists()
assert filepath.read_text() == 'content'
assert not tmp_filepath.exists() # Tmp file is deleted


@pytest.mark.parametrize(
['path', 'is_incomplete'],
[
('/a/incomplete.a8c53d7beff74b2eb31b9b86c7d046cf.bcd', True),
('/a/incomplete-dataset.tfrecord-00000-of-00100', False),
('/a/prefix.incomplete.a8c53d7beff74b2eb31b9b86c7d046cf', False),
('/a/incomplete.a8c53d7beff74beb3.bcd', False),
],
)
def test_is_incomplete_file(path: str, is_incomplete: bool):
assert py_utils.is_incomplete_file(epath.Path(path)) == is_incomplete


@pytest.mark.parametrize(
['name', 'expected'],
[
Expand All @@ -372,17 +347,5 @@ def test_make_valid_name(name: str, expected: str):
assert py_utils.make_valid_name(name) == expected


@pytest.mark.parametrize(
['path', 'subfolder', 'expected'],
[
('/a/file.ext', None, '/a/foobar.file.ext'),
('/a/file.ext', 'sub', '/a/sub/foobar.file.ext'),
],
)
def test_tmp_file_name(path, subfolder, expected):
with mock.patch.object(py_utils, '_tmp_file_prefix', return_value='foobar'):
assert os.fspath(py_utils._tmp_file_name(path, subfolder)) == expected


if __name__ == '__main__':
tf.test.main()
5 changes: 2 additions & 3 deletions tensorflow_datasets/scripts/cli/convert_format_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
from tensorflow_datasets.core import splits as splits_lib
from tensorflow_datasets.core.proto import dataset_info_pb2
from tensorflow_datasets.core.utils import file_utils
from tensorflow_datasets.core.utils import py_utils
from tensorflow_datasets.core.utils import type_utils

# pylint: enable=g-import-not-at-top
Expand Down Expand Up @@ -131,7 +130,7 @@ def read_in() -> Iterator[type_utils.KeySerializedExample]:
yield i, row.numpy()

try:
with py_utils.incomplete_file(self.out_path) as tmp_file:
with file_utils.incomplete_file(self.out_path) as tmp_file:
self.config.out_file_adapter.write_examples(
path=tmp_file, iterator=read_in()
)
Expand Down Expand Up @@ -476,7 +475,7 @@ def _convert_dataset(
def _remove_incomplete_files(path: epath.Path) -> None:
num_incomplete_files = 0
for incomplete_file in path.glob(f'*{constants.INCOMPLETE_PREFIX}*'):
if py_utils.is_incomplete_file(incomplete_file):
if file_utils.is_incomplete_file(incomplete_file):
incomplete_file.unlink()
num_incomplete_files += 1
logging.info('Removed %d incomplete files.', num_incomplete_files)
Expand Down