Skip to content

Commit 3a31c0d

Browse files
fineguyThe TensorFlow Datasets Authors
authored andcommitted
Move file-related helper functions from py_utils to file_utils.
PiperOrigin-RevId: 804841318
1 parent 57cad96 commit 3a31c0d

File tree

7 files changed

+132
-129
lines changed

7 files changed

+132
-129
lines changed

tensorflow_datasets/core/dataset_builder.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -778,17 +778,7 @@ def download_and_prepare(
778778
self.info.update_data_dir(self.data_dir)
779779

780780
# Clean up incomplete files from preempted workers.
781-
deleted_incomplete_files = []
782-
for f in self.data_path.glob(f"*{constants.INCOMPLETE_PREFIX}*"):
783-
if utils.is_incomplete_file(f):
784-
deleted_incomplete_files.append(os.fspath(f))
785-
f.unlink()
786-
if deleted_incomplete_files:
787-
logging.info(
788-
"Deleted %d incomplete files. A small selection: %s",
789-
len(deleted_incomplete_files),
790-
"\n".join(deleted_incomplete_files[:3]),
791-
)
781+
file_utils.clean_up_incomplete_files(self.data_path)
792782

793783
self._log_download_done()
794784

tensorflow_datasets/core/utils/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717

1818
from tensorflow_datasets.core.units import Size
1919
from tensorflow_datasets.core.utils import docs
20+
from tensorflow_datasets.core.utils.file_utils import atomic_write
2021
from tensorflow_datasets.core.utils.file_utils import incomplete_dir
22+
from tensorflow_datasets.core.utils.file_utils import incomplete_file
23+
from tensorflow_datasets.core.utils.file_utils import incomplete_files
24+
from tensorflow_datasets.core.utils.file_utils import is_incomplete_file
2125
from tensorflow_datasets.core.utils.gcs_utils import gcs_path
2226
from tensorflow_datasets.core.utils.image_utils import apply_colormap
2327
from tensorflow_datasets.core.utils.image_utils import create_thumbnail
@@ -29,7 +33,6 @@
2933
from tensorflow_datasets.core.utils.image_utils import png_to_jpeg
3034
from tensorflow_datasets.core.utils.image_utils import THUMBNAIL_SIZE
3135
from tensorflow_datasets.core.utils.py_utils import add_sys_path
32-
from tensorflow_datasets.core.utils.py_utils import atomic_write
3336
from tensorflow_datasets.core.utils.py_utils import build_synchronize_decorator
3437
from tensorflow_datasets.core.utils.py_utils import classproperty
3538
from tensorflow_datasets.core.utils.py_utils import dedent
@@ -40,10 +43,7 @@
4043
from tensorflow_datasets.core.utils.py_utils import get_class_path
4144
from tensorflow_datasets.core.utils.py_utils import get_class_url
4245
from tensorflow_datasets.core.utils.py_utils import has_sufficient_disk_space
43-
from tensorflow_datasets.core.utils.py_utils import incomplete_file
44-
from tensorflow_datasets.core.utils.py_utils import incomplete_files
4546
from tensorflow_datasets.core.utils.py_utils import indent
46-
from tensorflow_datasets.core.utils.py_utils import is_incomplete_file
4747
from tensorflow_datasets.core.utils.py_utils import is_notebook
4848
from tensorflow_datasets.core.utils.py_utils import list_info_files
4949
from tensorflow_datasets.core.utils.py_utils import map_nested

tensorflow_datasets/core/utils/file_utils.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import re
2828
import string
2929
import time
30+
import uuid
3031

3132
from absl import logging
3233
from etils import epath
@@ -115,6 +116,94 @@ def _get_incomplete_dir(dir_name: str) -> str:
115116
return f'{dir_name.parent}/{constants.INCOMPLETE_PREFIX}{random_suffix}_{dir_name.name}/'
116117

117118

119+
def _tmp_file_prefix() -> str:
120+
return f'{constants.INCOMPLETE_PREFIX}{uuid.uuid4().hex}'
121+
122+
123+
def _tmp_file_name(
124+
path: epath.PathLike,
125+
subfolder: str | None = None,
126+
) -> epath.Path:
127+
"""Returns the temporary file name for the given path.
128+
129+
Args:
130+
path: The path to the file.
131+
subfolder: The subfolder to use. If None, then the parent of the path will
132+
be used.
133+
"""
134+
path = epath.Path(path)
135+
file_name = f'{_tmp_file_prefix()}.{path.name}'
136+
if subfolder:
137+
return path.parent / subfolder / file_name
138+
else:
139+
return path.parent / file_name
140+
141+
142+
@contextlib.contextmanager
143+
def atomic_write(path: epath.PathLike, mode: str):
144+
"""Writes to path atomically, by writing to temp file and renaming it."""
145+
tmp_path = _tmp_file_name(path)
146+
with tmp_path.open(mode=mode) as file_:
147+
yield file_
148+
tmp_path.replace(path)
149+
150+
151+
def is_incomplete_file(path: epath.Path) -> bool:
152+
"""Returns whether the given filename suggests that it's incomplete."""
153+
regex = rf'{re.escape(constants.INCOMPLETE_PREFIX)}[0-9a-fA-F]{{32}}\..+'
154+
return bool(re.search(rf'^{regex}$', path.name))
155+
156+
157+
@contextlib.contextmanager
158+
def incomplete_file(
159+
path: epath.Path,
160+
subfolder: str | None = None,
161+
) -> Iterator[epath.Path]:
162+
"""Writes to path atomically, by writing to temp file and renaming it."""
163+
tmp_path = _tmp_file_name(path, subfolder=subfolder)
164+
tmp_path.parent.mkdir(exist_ok=True)
165+
try:
166+
yield tmp_path
167+
tmp_path.replace(path)
168+
finally:
169+
# Eventually delete the tmp_path if exception was raised
170+
tmp_path.unlink(missing_ok=True)
171+
172+
173+
@contextlib.contextmanager
174+
def incomplete_files(
175+
path: epath.Path,
176+
) -> Iterator[epath.Path]:
177+
"""Writes to path atomically, by writing to temp file and renaming it."""
178+
tmp_file_prefix = _tmp_file_prefix()
179+
tmp_path = path.parent / f'{tmp_file_prefix}.{path.name}'
180+
try:
181+
yield tmp_path
182+
# Rename all tmp files to their final name.
183+
for tmp_file in path.parent.glob(f'{tmp_file_prefix}.*'):
184+
file_name = tmp_file.name.removeprefix(tmp_file_prefix + '.')
185+
tmp_file.replace(path.parent / file_name)
186+
finally:
187+
# Eventually delete the tmp_path if exception was raised
188+
for tmp_file in path.parent.glob(f'{tmp_file_prefix}.*'):
189+
tmp_file.unlink(missing_ok=True)
190+
191+
192+
def clean_up_incomplete_files(path: epath.Path) -> None:
193+
"""Deletes all incomplete files in the given path."""
194+
deleted_incomplete_files = []
195+
for f in path.glob(f'*{constants.INCOMPLETE_PREFIX}*'):
196+
if is_incomplete_file(f):
197+
deleted_incomplete_files.append(os.fspath(f))
198+
f.unlink()
199+
if deleted_incomplete_files:
200+
logging.info(
201+
'Deleted %d incomplete files. A small selection: %s',
202+
len(deleted_incomplete_files),
203+
'\n'.join(deleted_incomplete_files[:3]),
204+
)
205+
206+
118207
@contextlib.contextmanager
119208
def incomplete_dir(
120209
dirname: PathLike,

tensorflow_datasets/core/utils/file_utils_test.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# limitations under the License.
1515

1616
import os
17+
import pathlib
1718
import time
1819
from unittest import mock
1920

@@ -37,6 +38,41 @@ def test_default_data_dir():
3738
assert data_dir
3839

3940

41+
@pytest.mark.parametrize(
42+
['path', 'subfolder', 'expected'],
43+
[
44+
('/a/file.ext', None, '/a/foobar.file.ext'),
45+
('/a/file.ext', 'sub', '/a/sub/foobar.file.ext'),
46+
],
47+
)
48+
def test_tmp_file_name(path, subfolder, expected):
49+
with mock.patch.object(file_utils, '_tmp_file_prefix', return_value='foobar'):
50+
assert os.fspath(file_utils._tmp_file_name(path, subfolder)) == expected
51+
52+
53+
def test_incomplete_file(tmp_path: pathlib.Path):
54+
tmp_path = epath.Path(tmp_path)
55+
filepath = tmp_path / 'test.txt'
56+
with file_utils.incomplete_file(filepath) as tmp_filepath:
57+
tmp_filepath.write_text('content')
58+
assert not filepath.exists()
59+
assert filepath.read_text() == 'content'
60+
assert not tmp_filepath.exists() # Tmp file is deleted
61+
62+
63+
@pytest.mark.parametrize(
64+
['path', 'is_incomplete'],
65+
[
66+
('/a/incomplete.a8c53d7beff74b2eb31b9b86c7d046cf.bcd', True),
67+
('/a/incomplete-dataset.tfrecord-00000-of-00100', False),
68+
('/a/prefix.incomplete.a8c53d7beff74b2eb31b9b86c7d046cf', False),
69+
('/a/incomplete.a8c53d7beff74beb3.bcd', False),
70+
],
71+
)
72+
def test_is_incomplete_file(path: str, is_incomplete: bool):
73+
assert file_utils.is_incomplete_file(epath.Path(path)) == is_incomplete
74+
75+
4076
def _create_dataset_dir(
4177
data_dir: epath.Path | None = None, version: str = _VERSION
4278
) -> epath.Path:

tensorflow_datasets/core/utils/py_utils.py

Lines changed: 0 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import threading
3333
import typing
3434
from typing import Any, Callable, NoReturn, Type, TypeVar
35-
import uuid
3635

3736
from absl import logging as absl_logging
3837
from etils import epath
@@ -304,79 +303,6 @@ def nullcontext(enter_result: T = None) -> Iterator[T]:
304303
yield enter_result
305304

306305

307-
def _tmp_file_prefix() -> str:
308-
return f'{constants.INCOMPLETE_PREFIX}{uuid.uuid4().hex}'
309-
310-
311-
def _tmp_file_name(
312-
path: epath.PathLike,
313-
subfolder: str | None = None,
314-
) -> epath.Path:
315-
"""Returns the temporary file name for the given path.
316-
317-
Args:
318-
path: The path to the file.
319-
subfolder: The subfolder to use. If None, then the parent of the path will
320-
be used.
321-
"""
322-
path = epath.Path(path)
323-
file_name = f'{_tmp_file_prefix()}.{path.name}'
324-
if subfolder:
325-
return path.parent / subfolder / file_name
326-
else:
327-
return path.parent / file_name
328-
329-
330-
@contextlib.contextmanager
331-
def incomplete_file(
332-
path: epath.Path,
333-
subfolder: str | None = None,
334-
) -> Iterator[epath.Path]:
335-
"""Writes to path atomically, by writing to temp file and renaming it."""
336-
tmp_path = _tmp_file_name(path, subfolder=subfolder)
337-
tmp_path.parent.mkdir(exist_ok=True)
338-
try:
339-
yield tmp_path
340-
tmp_path.replace(path)
341-
finally:
342-
# Eventually delete the tmp_path if exception was raised
343-
tmp_path.unlink(missing_ok=True)
344-
345-
346-
@contextlib.contextmanager
347-
def incomplete_files(
348-
path: epath.Path,
349-
) -> Iterator[epath.Path]:
350-
"""Writes to path atomically, by writing to temp file and renaming it."""
351-
tmp_file_prefix = _tmp_file_prefix()
352-
tmp_path = path.parent / f'{tmp_file_prefix}.{path.name}'
353-
try:
354-
yield tmp_path
355-
# Rename all tmp files to their final name.
356-
for tmp_file in path.parent.glob(f'{tmp_file_prefix}.*'):
357-
file_name = tmp_file.name.removeprefix(tmp_file_prefix + '.')
358-
tmp_file.replace(path.parent / file_name)
359-
finally:
360-
# Eventually delete the tmp_path if exception was raised
361-
for tmp_file in path.parent.glob(f'{tmp_file_prefix}.*'):
362-
tmp_file.unlink(missing_ok=True)
363-
364-
365-
def is_incomplete_file(path: epath.Path) -> bool:
366-
"""Returns whether the given filename suggests that it's incomplete."""
367-
regex = rf'{re.escape(constants.INCOMPLETE_PREFIX)}[0-9a-fA-F]{{32}}\..+'
368-
return bool(re.search(rf'^{regex}$', path.name))
369-
370-
371-
@contextlib.contextmanager
372-
def atomic_write(path: epath.PathLike, mode: str):
373-
"""Writes to path atomically, by writing to temp file and renaming it."""
374-
tmp_path = _tmp_file_name(path)
375-
with tmp_path.open(mode=mode) as file_:
376-
yield file_
377-
tmp_path.replace(path)
378-
379-
380306
def reraise(
381307
e: Exception,
382308
prefix: str | None = None,

tensorflow_datasets/core/utils/py_utils_test.py

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
import collections
1717
import os
1818
import pathlib
19-
from unittest import mock
2019

21-
from etils import epath
2220
import pytest
2321
import tensorflow as tf
2422
from tensorflow_datasets import testing
@@ -337,29 +335,6 @@ def test_flatten_with_path():
337335
)
338336

339337

340-
def test_incomplete_file(tmp_path: pathlib.Path):
341-
tmp_path = epath.Path(tmp_path)
342-
filepath = tmp_path / 'test.txt'
343-
with py_utils.incomplete_file(filepath) as tmp_filepath:
344-
tmp_filepath.write_text('content')
345-
assert not filepath.exists()
346-
assert filepath.read_text() == 'content'
347-
assert not tmp_filepath.exists() # Tmp file is deleted
348-
349-
350-
@pytest.mark.parametrize(
351-
['path', 'is_incomplete'],
352-
[
353-
('/a/incomplete.a8c53d7beff74b2eb31b9b86c7d046cf.bcd', True),
354-
('/a/incomplete-dataset.tfrecord-00000-of-00100', False),
355-
('/a/prefix.incomplete.a8c53d7beff74b2eb31b9b86c7d046cf', False),
356-
('/a/incomplete.a8c53d7beff74beb3.bcd', False),
357-
],
358-
)
359-
def test_is_incomplete_file(path: str, is_incomplete: bool):
360-
assert py_utils.is_incomplete_file(epath.Path(path)) == is_incomplete
361-
362-
363338
@pytest.mark.parametrize(
364339
['name', 'expected'],
365340
[
@@ -372,17 +347,5 @@ def test_make_valid_name(name: str, expected: str):
372347
assert py_utils.make_valid_name(name) == expected
373348

374349

375-
@pytest.mark.parametrize(
376-
['path', 'subfolder', 'expected'],
377-
[
378-
('/a/file.ext', None, '/a/foobar.file.ext'),
379-
('/a/file.ext', 'sub', '/a/sub/foobar.file.ext'),
380-
],
381-
)
382-
def test_tmp_file_name(path, subfolder, expected):
383-
with mock.patch.object(py_utils, '_tmp_file_prefix', return_value='foobar'):
384-
assert os.fspath(py_utils._tmp_file_name(path, subfolder)) == expected
385-
386-
387350
if __name__ == '__main__':
388351
tf.test.main()

tensorflow_datasets/scripts/cli/convert_format_utils.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
from tensorflow_datasets.core import splits as splits_lib
4343
from tensorflow_datasets.core.proto import dataset_info_pb2
4444
from tensorflow_datasets.core.utils import file_utils
45-
from tensorflow_datasets.core.utils import py_utils
4645
from tensorflow_datasets.core.utils import type_utils
4746

4847
# pylint: enable=g-import-not-at-top
@@ -131,7 +130,7 @@ def read_in() -> Iterator[type_utils.KeySerializedExample]:
131130
yield i, row.numpy()
132131

133132
try:
134-
with py_utils.incomplete_file(self.out_path) as tmp_file:
133+
with file_utils.incomplete_file(self.out_path) as tmp_file:
135134
self.config.out_file_adapter.write_examples(
136135
path=tmp_file, iterator=read_in()
137136
)
@@ -476,7 +475,7 @@ def _convert_dataset(
476475
def _remove_incomplete_files(path: epath.Path) -> None:
477476
num_incomplete_files = 0
478477
for incomplete_file in path.glob(f'*{constants.INCOMPLETE_PREFIX}*'):
479-
if py_utils.is_incomplete_file(incomplete_file):
478+
if file_utils.is_incomplete_file(incomplete_file):
480479
incomplete_file.unlink()
481480
num_incomplete_files += 1
482481
logging.info('Removed %d incomplete files.', num_incomplete_files)

0 commit comments

Comments
 (0)