Skip to content

Commit b6fbf92

Browse files
Support to cleanup disk cache when enabled (#816)
* Support to cleanup disk cache when enabled * lint errors * remove broad exception
1 parent ec31c47 commit b6fbf92

File tree

3 files changed

+92
-3
lines changed

3 files changed

+92
-3
lines changed

petastorm/local_disk_cache.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222

2323
class LocalDiskCache(CacheBase):
24-
def __init__(self, path, size_limit_bytes, expected_row_size_bytes, shards=6, cleanup=False, **settings):
24+
def __init__(self, path, size_limit_bytes, expected_row_size_bytes, shards=6, cleanup=True, **settings):
2525
"""LocalDiskCache is an adapter to a diskcache implementation.
2626
2727
LocalDiskCache can be used by a petastorm Reader class to temporarily keep parts of the dataset on a local

petastorm/reader.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ def __init__(self, pyarrow_filesystem, dataset_path, schema_fields=None,
448448
raise NotImplementedError('Using timestamp_overlap=False is not implemented with'
449449
' shuffle_options.shuffle_row_drop_partitions > 1')
450450

451-
cache = cache or NullCache()
451+
self.cache = cache or NullCache()
452452

453453
self._workers_pool = reader_pool or ThreadPool(10, shuffle_rows=shuffle_rows, seed=seed)
454454

@@ -491,7 +491,7 @@ def __init__(self, pyarrow_filesystem, dataset_path, schema_fields=None,
491491

492492
# 5. Start workers pool
493493
self._workers_pool.start(worker_class, (pyarrow_filesystem, dataset_path, storage_schema,
494-
self.ngram, row_groups, cache, transform_spec,
494+
self.ngram, row_groups, self.cache, transform_spec,
495495
self.schema, filters, shuffle_rows, seed,
496496
convert_early_to_numpy),
497497
ventilator=self.ventilator)
@@ -690,6 +690,14 @@ def join(self):
690690
"""Joins all worker threads/processes. Will block until all worker workers have been fully terminated."""
691691
self._workers_pool.join()
692692

693+
def cleanup_cache(self):
694+
if isinstance(self.cache, LocalDiskCache):
695+
try:
696+
self.cache.cleanup()
697+
except (OSError, IOError, AttributeError) as e:
698+
print(f"Error cleaning cache: {e}")
699+
print("Cache cleanup complete.")
700+
693701
@property
694702
def diagnostics(self):
695703
return self._workers_pool.diagnostics
@@ -719,3 +727,4 @@ def __enter__(self):
719727
def __exit__(self, exc_type, exc_val, exc_tb):
720728
self.stop()
721729
self.join()
730+
self.cleanup_cache()

petastorm/tests/test_reader.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,83 @@ def test_deprecated_shard_seed(synthetic_dataset, reader_factory):
9494
match_str = 'shard_seed was deprecated and will be removed in future versions.'
9595
with pytest.warns(UserWarning, match=match_str):
9696
reader_factory(synthetic_dataset.url, shard_seed=123)
97+
98+
99+
def test_cleanup_cache_with_local_disk_cache(synthetic_dataset, tmpdir):
100+
"""Test that cleanup_cache properly removes local disk cache directory"""
101+
import os
102+
cache_location = tmpdir.strpath
103+
104+
with make_reader(synthetic_dataset.url,
105+
cache_type='local-disk',
106+
cache_location=cache_location,
107+
cache_size_limit=1000000,
108+
cache_row_size_estimate=100) as reader:
109+
# Read some data to populate cache
110+
next(reader)
111+
# Cache directory should exist
112+
assert os.path.exists(cache_location)
113+
114+
# After context manager exit, cache should be cleaned up
115+
assert not os.path.exists(cache_location)
116+
117+
118+
def test_cleanup_cache_with_null_cache(synthetic_dataset, capsys):
119+
"""Test that cleanup_cache works properly with null cache"""
120+
with make_reader(synthetic_dataset.url, cache_type='null') as reader:
121+
next(reader)
122+
# Manually call cleanup_cache to test it
123+
reader.cleanup_cache()
124+
125+
# Check that cleanup message was printed
126+
captured = capsys.readouterr()
127+
assert "Cache cleanup complete." in captured.out
128+
129+
130+
def test_cleanup_cache_manual_call(synthetic_dataset, tmpdir):
131+
"""Test manually calling cleanup_cache method"""
132+
import os
133+
cache_location = tmpdir.strpath
134+
135+
reader = make_reader(synthetic_dataset.url,
136+
cache_type='local-disk',
137+
cache_location=cache_location,
138+
cache_size_limit=1000000,
139+
cache_row_size_estimate=100)
140+
141+
try:
142+
next(reader)
143+
assert os.path.exists(cache_location)
144+
145+
# Manually call cleanup_cache
146+
reader.cleanup_cache()
147+
assert not os.path.exists(cache_location)
148+
finally:
149+
reader.stop()
150+
reader.join()
151+
152+
153+
def test_cleanup_cache_exception_handling(synthetic_dataset, tmpdir, capsys, monkeypatch):
154+
"""Test that cleanup_cache handles exceptions gracefully"""
155+
cache_location = tmpdir.strpath
156+
157+
with make_reader(synthetic_dataset.url,
158+
cache_type='local-disk',
159+
cache_location=cache_location,
160+
cache_size_limit=1000000,
161+
cache_row_size_estimate=100) as reader:
162+
next(reader)
163+
164+
# Mock the cleanup method to raise an exception
165+
def mock_cleanup():
166+
raise OSError("Simulated cleanup error")
167+
168+
monkeypatch.setattr(reader.cache, 'cleanup', mock_cleanup)
169+
170+
# Call cleanup_cache - should handle exception gracefully
171+
reader.cleanup_cache()
172+
173+
# Check that error message was printed
174+
captured = capsys.readouterr()
175+
assert "Error cleaning cache: Simulated cleanup error" in captured.out
176+
assert "Cache cleanup complete." in captured.out

0 commit comments

Comments
 (0)