Skip to content

Commit 527a239

Browse files
authored
fix: handle duplicate hashes in bulk_exists (#651)
1 parent 6301414 commit 527a239

File tree

2 files changed

+64
-18
lines changed

2 files changed

+64
-18
lines changed

src/dvc_data/index/index.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -240,13 +240,15 @@ def exists(self, entry: "DataIndexEntry", refresh: bool = False) -> bool:
240240
finally:
241241
self.index.commit()
242242

243-
def bulk_exists(
243+
def bulk_exists( # noqa: C901
244244
self,
245245
entries: list["DataIndexEntry"],
246246
refresh: bool = False,
247247
jobs: Optional[int] = None,
248248
callback: "Callback" = DEFAULT_CALLBACK,
249249
) -> dict["DataIndexEntry", bool]:
250+
from .build import build_entry
251+
250252
entries_with_hash = [e for e in entries if e.hash_info]
251253
entries_without_hash = [e for e in entries if not e.hash_info]
252254
results = dict.fromkeys(entries_without_hash, False)
@@ -267,39 +269,37 @@ def bulk_exists(
267269
results[entry] = exists
268270
return results
269271

270-
entry_map: dict[str, DataIndexEntry] = {
271-
self.get(entry)[1]: entry for entry in entries_with_hash
272-
}
272+
path_to_entries: dict[str, list[DataIndexEntry]] = defaultdict(list)
273+
for entry in entries_with_hash:
274+
_, path = self.get(entry)
275+
path_to_entries[path].append(entry)
273276

274277
try:
275278
self.fs.ls(self.odb.path) # check for fs access
276279
except FileNotFoundError:
277280
pass
278281

279282
info_results = self.fs.info(
280-
list(entry_map.keys()),
283+
list(path_to_entries),
281284
batch_size=jobs,
282285
return_exceptions=True,
283286
callback=callback,
284287
)
285288

286-
for (path, entry), info in zip(entry_map.items(), info_results):
289+
for (path, _entries), info in zip(path_to_entries.items(), info_results):
290+
if isinstance(info, Exception) and not isinstance(info, FileNotFoundError):
291+
raise info
292+
assert _entries
293+
entry = _entries[0]
287294
assert entry.hash_info # built from entries_with_hash
288295
value = cast("str", entry.hash_info.value)
289296
key = self.odb._oid_parts(value)
290-
291-
if isinstance(info, FileNotFoundError) or info is None:
297+
exists = info is not None and not isinstance(info, FileNotFoundError)
298+
if exists:
299+
self.index[key] = build_entry(path, self.fs, info=info)
300+
else:
292301
self.index.pop(key, None)
293-
results[entry] = False
294-
continue
295-
if isinstance(info, Exception):
296-
raise info
297-
298-
from .build import build_entry
299-
300-
built_entry = build_entry(path, self.fs, info=info)
301-
self.index[key] = built_entry
302-
results[entry] = True
302+
results.update(dict.fromkeys(entries, exists))
303303

304304
if self.index is not None:
305305
self.index.commit()

tests/index/test_storage.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import pytest
12
from dvc_objects.fs.local import LocalFileSystem
23

34
from dvc_data.hashfile.hash_info import HashInfo
@@ -155,6 +156,51 @@ def test_multiple_entries(self, odb):
155156
result = storage.bulk_exists(entries)
156157
assert all(result[e] is True for e in entries)
157158

159+
@pytest.mark.parametrize("use_index", [True, False])
160+
@pytest.mark.parametrize("refresh", [True, False])
161+
def test_duplicate_hashes_exist(self, odb, use_index, refresh):
162+
"""Multiple entries with same hash should all return True if exists."""
163+
index = None
164+
if use_index:
165+
index = DataIndex()
166+
key = odb._oid_parts("d3b07384d113edec49eaa6238ad5ff00")
167+
index[key] = DataIndexEntry(key=key)
168+
169+
storage = ObjectStorage(key=(), odb=odb, index=index)
170+
entries = [
171+
DataIndexEntry(
172+
key=("foo",),
173+
hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"),
174+
),
175+
DataIndexEntry(
176+
key=("bar",),
177+
hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"),
178+
),
179+
]
180+
181+
result = storage.bulk_exists(entries, refresh=refresh)
182+
assert result == {entries[0]: True, entries[1]: True}
183+
184+
@pytest.mark.parametrize("use_index", [True, False])
185+
@pytest.mark.parametrize("refresh", [True, False])
186+
def test_duplicate_hashes_not_exist(self, odb, use_index, refresh):
187+
"""Multiple entries with same hash should all return False if not exists."""
188+
index = DataIndex() if use_index else None
189+
storage = ObjectStorage(key=(), odb=odb, index=index)
190+
entries = [
191+
DataIndexEntry(
192+
key=("foo",),
193+
hash_info=HashInfo("md5", "00000000000000000000000000000000"),
194+
),
195+
DataIndexEntry(
196+
key=("bar",),
197+
hash_info=HashInfo("md5", "00000000000000000000000000000000"),
198+
),
199+
]
200+
201+
result = storage.bulk_exists(entries, refresh=refresh)
202+
assert result == {entries[0]: False, entries[1]: False}
203+
158204

159205
class TestStorageMappingBulkExists:
160206
def test_bulk_cache_exists_empty(self, odb):

0 commit comments

Comments
 (0)