Skip to content

Commit d737906

Browse files
authored
fix: handle filesystems that don't implement ls in bulk_exists (#653)
1 parent e144550 commit d737906

File tree

2 files changed

+45
-10
lines changed

2 files changed

+45
-10
lines changed

src/dvc_data/index/index.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
from abc import ABC, abstractmethod
55
from collections import defaultdict
66
from collections.abc import Iterator, MutableMapping
7-
from typing import TYPE_CHECKING, Any, Callable, Optional, cast
7+
from itertools import repeat
8+
from typing import TYPE_CHECKING, Any, Callable, Optional, Union, cast
89

910
import attrs
1011
from fsspec import Callback
@@ -17,6 +18,8 @@
1718
from dvc_data.hashfile.tree import Tree
1819

1920
if TYPE_CHECKING:
21+
from collections.abc import Iterable
22+
2023
from dvc_objects.fs.base import FileSystem
2124

2225
from dvc_data.hashfile.db import HashFileDB
@@ -240,7 +243,7 @@ def exists(self, entry: "DataIndexEntry", refresh: bool = False) -> bool:
240243
finally:
241244
self.index.commit()
242245

243-
def bulk_exists( # noqa: C901
246+
def bulk_exists( # noqa: C901, PLR0912
244247
self,
245248
entries: list["DataIndexEntry"],
246249
refresh: bool = False,
@@ -274,17 +277,24 @@ def bulk_exists( # noqa: C901
274277
_, path = self.get(entry)
275278
path_to_entries[path].append(entry)
276279

280+
info_results: Union[
281+
Iterable[Union[Exception, Optional[dict[str, Any]]]], None
282+
] = None
277283
try:
278284
self.fs.ls(self.odb.path) # check for fs access
279-
except FileNotFoundError:
285+
except FileNotFoundError as exc:
286+
info_results = repeat(exc, len(path_to_entries))
287+
callback.relative_update(len(entries_with_hash))
288+
except NotImplementedError:
289+
# some filesystems don't implement ls
280290
pass
281-
282-
info_results = self.fs.info(
283-
list(path_to_entries),
284-
batch_size=jobs,
285-
return_exceptions=True,
286-
callback=callback,
287-
)
291+
if info_results is None:
292+
info_results = self.fs.info(
293+
list(path_to_entries),
294+
batch_size=jobs,
295+
return_exceptions=True,
296+
callback=callback,
297+
)
288298

289299
for (path, _entries), info in zip(path_to_entries.items(), info_results):
290300
if isinstance(info, Exception) and not isinstance(info, FileNotFoundError):
@@ -302,6 +312,7 @@ def bulk_exists( # noqa: C901
302312
results.update(dict.fromkeys(_entries, exists))
303313

304314
if self.index is not None:
315+
logger.debug("Committing index results")
305316
self.index.commit()
306317

307318
return results

tests/index/test_storage.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pytest
22
from dvc_objects.fs.local import LocalFileSystem
33

4+
from dvc_data.hashfile.db import HashFileDB
45
from dvc_data.hashfile.hash_info import HashInfo
56
from dvc_data.hashfile.meta import Meta
67
from dvc_data.index import (
@@ -201,6 +202,29 @@ def test_duplicate_hashes_not_exist(self, odb, use_index, refresh):
201202
result = storage.bulk_exists(entries, refresh=refresh)
202203
assert result == {entries[0]: False, entries[1]: False}
203204

205+
def test_bulk_check_with_ls_not_implemented(self, tmp_path_factory):
206+
class NonTraversableFileSystem(LocalFileSystem):
207+
def ls(self, *args, **kwargs):
208+
raise NotImplementedError
209+
210+
index = DataIndex()
211+
path = tmp_path_factory.mktemp("odb")
212+
odb = HashFileDB(fs=NonTraversableFileSystem(), path=path)
213+
storage = ObjectStorage(key=(), odb=odb, index=index)
214+
entries = [
215+
DataIndexEntry(
216+
key=("foo",),
217+
hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"),
218+
),
219+
DataIndexEntry(
220+
key=("bar",),
221+
hash_info=HashInfo("md5", "c157a79031e1c40f85931829bc5fc552"),
222+
),
223+
]
224+
225+
result = storage.bulk_exists(entries, refresh=True)
226+
assert result == {entries[0]: False, entries[1]: False}
227+
204228

205229
class TestStorageMappingBulkExists:
206230
def test_bulk_cache_exists_empty(self, odb):

0 commit comments

Comments
 (0)