Skip to content

Commit 18bffc9

Browse files
authored
Merge pull request #655 from DagsHub/bug/parsing-errored-annotation
Bug: parsing annotations that failed to download crashes
2 parents 38dfb2b + 7c8fa55 commit 18bffc9

File tree

4 files changed

+138
-48
lines changed

4 files changed

+138
-48
lines changed

dagshub/data_engine/annotation/metadata.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,3 +337,24 @@ def to_ls_task(self) -> Optional[bytes]:
337337

338338
def __repr__(self):
339339
return "Label Studio annotations of unrecognized type"
340+
341+
342+
class ErrorMetadataAnnotations(MetadataAnnotations, metaclass=NotImplementedMeta):
343+
def __init__(
344+
self,
345+
datapoint: "Datapoint",
346+
field: str,
347+
error_message: str,
348+
):
349+
super().__init__(datapoint, field, None, None, None)
350+
self._error_message = error_message
351+
352+
@property
353+
def value(self) -> Optional[bytes]:
354+
raise ValueError(self._error_message)
355+
356+
def to_ls_task(self) -> Optional[bytes]:
357+
raise ValueError(self._error_message)
358+
359+
def __repr__(self):
360+
return f"Label Studio annotation download error: {self._error_message}"

dagshub/data_engine/model/datapoint.py

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33
from dataclasses import dataclass
44
from os import PathLike
55
from pathlib import Path
6-
from typing import Optional, Union, List, Dict, Any, Callable, TYPE_CHECKING, Literal, Sequence
6+
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Literal, Optional, Sequence, Union
77

8-
from tenacity import Retrying, stop_after_attempt, wait_exponential, before_sleep_log, retry_if_exception_type
8+
from tenacity import Retrying, before_sleep_log, retry_if_exception_type, stop_after_attempt, wait_exponential
99

1010
from dagshub.common.download import download_files
1111
from dagshub.common.helpers import http_request
1212
from dagshub.data_engine.annotation import MetadataAnnotations
13-
from dagshub.data_engine.client.models import MetadataSelectFieldSchema, DatapointHistoryResult
13+
from dagshub.data_engine.client.models import DatapointHistoryResult, MetadataSelectFieldSchema
1414
from dagshub.data_engine.dtypes import MetadataFieldType
1515

1616
if TYPE_CHECKING:
@@ -25,6 +25,23 @@
2525
logger = logging.getLogger(__name__)
2626

2727

28+
@dataclass(frozen=True)
29+
class BlobHashMetadata:
30+
hash: str
31+
32+
def __str__(self) -> str:
33+
return self.hash
34+
35+
def __repr__(self) -> str:
36+
return f"BlobHashMetadata(hash={self.hash!r})"
37+
38+
39+
class BlobDownloadError(Exception):
40+
def __init__(self, message):
41+
super().__init__(message)
42+
self.message = message
43+
44+
2845
@dataclass
2946
class Datapoint:
3047
datapoint_id: int
@@ -128,6 +145,7 @@ def from_gql_edge(edge: Dict, datasource: "Datasource", fields: List[MetadataSel
128145

129146
float_fields = {f.name for f in fields if f.valueType == MetadataFieldType.FLOAT}
130147
date_fields = {f.name for f in fields if f.valueType == MetadataFieldType.DATETIME}
148+
blob_fields = {f.name for f in fields if f.valueType == MetadataFieldType.BLOB}
131149

132150
for meta_dict in edge["node"]["metadata"]:
133151
key = meta_dict["key"]
@@ -138,6 +156,8 @@ def from_gql_edge(edge: Dict, datasource: "Datasource", fields: List[MetadataSel
138156
if key in date_fields:
139157
timezone = meta_dict.get("timeZone")
140158
value = _datetime_from_timestamp(value / 1000, timezone or "+00:00")
159+
elif key in blob_fields and isinstance(value, str):
160+
value = BlobHashMetadata(value)
141161
res.metadata[key] = value
142162
return res
143163

@@ -164,7 +184,7 @@ def get_blob(self, column: str, cache_on_disk=True, store_value=False) -> bytes:
164184
if type(current_value) is bytes:
165185
# Bytes - it's already there!
166186
return current_value
167-
if isinstance(current_value, Path):
187+
elif isinstance(current_value, Path):
168188
# Path - assume the path exists and is already downloaded,
169189
# because it's unlikely that the user has set it themselves
170190
with current_value.open("rb") as f:
@@ -173,25 +193,28 @@ def get_blob(self, column: str, cache_on_disk=True, store_value=False) -> bytes:
173193
self.metadata[column] = content
174194
return content
175195

176-
elif type(current_value) is str:
177-
# String - This is probably the hash of the blob, get that from dagshub
178-
blob_url = self.blob_url(current_value)
179-
blob_location = self.blob_cache_location / current_value
196+
elif isinstance(current_value, BlobHashMetadata):
197+
# Blob hash metadata - download blob from DagsHub
198+
blob_url = self.blob_url(current_value.hash)
199+
blob_location = self.blob_cache_location / current_value.hash
180200

181201
# Make sure that the cache location exists
182202
if cache_on_disk:
183203
self.blob_cache_location.mkdir(parents=True, exist_ok=True)
184204

185205
content = _get_blob(blob_url, blob_location, self.datasource.source.repoApi.auth, cache_on_disk, True)
186-
if type(content) is str:
187-
raise RuntimeError(f"Error while downloading blob: {content}")
188206

189207
if store_value:
190208
self.metadata[column] = content
191209
elif cache_on_disk:
192210
self.metadata[column] = blob_location
193211

194212
return content
213+
elif isinstance(current_value, MetadataAnnotations):
214+
ls_task = current_value.to_ls_task()
215+
if ls_task is None:
216+
return b""
217+
return ls_task
195218
else:
196219
raise ValueError(f"Can't extract blob metadata from value {current_value} of type {type(current_value)}")
197220

@@ -274,10 +297,17 @@ def _get_blob(
274297
"""
275298
Args:
276299
url: url to download the blob from
277-
cache_path: where the cache for the blob is (laods from it if exists, stores there if it doesn't)
300+
cache_path: where the cache for the blob is (loads from it if exists, stores there if it doesn't)
278301
auth: auth to use for getting the blob
279302
cache_on_disk: whether to store the downloaded blob on disk. If False we also turn off the cache checking
280303
return_blob: if True returns the blob of the downloaded data, if False returns the path to the file with it
304+
path_format: if return_blob is False, controls path representation. "path" returns Path, "str" returns str
305+
306+
Returns:
307+
bytes, Path, or str path on success.
308+
309+
Raises:
310+
BlobDownloadError on download failure.
281311
"""
282312
if url is None:
283313
return None
@@ -313,7 +343,7 @@ def get():
313343
with attempt:
314344
content = get()
315345
except Exception as e:
316-
return f"Error while downloading binary blob: {e}"
346+
raise BlobDownloadError(str(e)) from e
317347

318348
if cache_on_disk:
319349
with cache_path.open("wb") as f:

dagshub/data_engine/model/query_result.py

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,21 @@
3030
from dagshub.common.rich_util import get_rich_progress
3131
from dagshub.common.util import lazy_load, multi_urljoin
3232
from dagshub.data_engine.annotation import MetadataAnnotations
33-
from dagshub.data_engine.annotation.metadata import UnsupportedMetadataAnnotations
33+
from dagshub.data_engine.annotation.metadata import ErrorMetadataAnnotations, UnsupportedMetadataAnnotations
3434
from dagshub.data_engine.annotation.voxel_conversion import (
3535
add_ls_annotations,
3636
add_voxel_annotations,
3737
)
3838
from dagshub.data_engine.client.loaders.base import DagsHubDataset
3939
from dagshub.data_engine.client.models import DatasourceType, MetadataSelectFieldSchema
4040
from dagshub.data_engine.dtypes import MetadataFieldType
41-
from dagshub.data_engine.model.datapoint import Datapoint, _generated_fields, _get_blob
41+
from dagshub.data_engine.model.datapoint import (
42+
BlobDownloadError,
43+
BlobHashMetadata,
44+
Datapoint,
45+
_generated_fields,
46+
_get_blob,
47+
)
4248
from dagshub.data_engine.model.schema_util import dacite_config
4349
from dagshub.data_engine.voxel_plugin_server.utils import set_voxel_envvars
4450

@@ -390,10 +396,9 @@ def get_blob_fields(
390396
for dp in self.entries:
391397
for fld in fields:
392398
field_value = dp.metadata.get(fld)
393-
# If field_value is a blob or a path, then ignore, means it's already been downloaded
394-
if not isinstance(field_value, str):
399+
if not isinstance(field_value, BlobHashMetadata):
395400
continue
396-
download_task = (dp, fld, dp.blob_url(field_value), dp.blob_cache_location / field_value)
401+
download_task = (dp, fld, dp.blob_url(field_value.hash), dp.blob_cache_location / field_value.hash)
397402
to_download.append(download_task)
398403

399404
progress = get_rich_progress(rich.progress.MofNCompleteColumn())
@@ -403,8 +408,6 @@ def get_blob_fields(
403408

404409
def _get_blob_fn(dp: Datapoint, field: str, url: str, blob_path: Path):
405410
blob_or_path = _get_blob(url, blob_path, auth, cache_on_disk, load_into_memory, path_format)
406-
if isinstance(blob_or_path, str) and path_format != "str":
407-
logger.warning(f"Error while downloading blob for field {field} in datapoint {dp.path}:{blob_or_path}")
408411
dp.metadata[field] = blob_or_path
409412

410413
with progress:
@@ -416,7 +419,7 @@ def _get_blob_fn(dp: Datapoint, field: str, url: str, blob_path: Path):
416419
logger.warning(f"Got exception {type(exc)} while downloading blob: {exc}")
417420
progress.update(task, advance=1)
418421

419-
self._convert_annotation_fields(*fields, load_into_memory=load_into_memory)
422+
self._convert_annotation_fields(*fields)
420423

421424
# Convert any downloaded document fields
422425
document_fields = [f for f in fields if f in self.document_fields]
@@ -425,18 +428,17 @@ def _get_blob_fn(dp: Datapoint, field: str, url: str, blob_path: Path):
425428
if document_fields:
426429
for dp in self:
427430
for fld in document_fields:
428-
if fld in dp.metadata:
429-
# Defensive check to not mangle annotation fields by accident
430-
if isinstance(dp.metadata[fld], MetadataAnnotations):
431-
continue
432-
# Force load the content into memory, even if load_into_memory was set to False
433-
if not load_into_memory or isinstance(dp.metadata[fld], Path):
434-
dp.metadata[fld] = Path(dp.metadata[fld]).read_bytes()
435-
dp.metadata[fld] = dp.metadata[fld].decode("utf-8")
431+
if fld not in dp.metadata:
432+
continue
433+
try:
434+
content = dp.get_blob(fld)
435+
dp.metadata[fld] = content.decode("utf-8")
436+
except BlobDownloadError as e:
437+
logger.warning(f"Failed to download document field '{fld}' for datapoint '{dp.path}': {e}")
436438

437439
return self
438440

439-
def _convert_annotation_fields(self, *fields, load_into_memory):
441+
def _convert_annotation_fields(self, *fields):
440442
# Convert any downloaded annotation column
441443
annotation_fields = [f for f in fields if f in self.annotation_fields]
442444
if not annotation_fields:
@@ -448,7 +450,7 @@ def _convert_annotation_fields(self, *fields, load_into_memory):
448450
for dp in self:
449451
for fld in annotation_fields:
450452
metadata_value = dp.metadata.get(fld)
451-
# No value - create ampty annotation container
453+
# No value - create empty annotation container
452454
if metadata_value is None:
453455
dp.metadata[fld] = MetadataAnnotations(datapoint=dp, field=fld)
454456
continue
@@ -457,16 +459,17 @@ def _convert_annotation_fields(self, *fields, load_into_memory):
457459
continue
458460
# Parse annotation from the content of the field
459461
else:
460-
# Force load the content into memory, even if load_into_memory was set to False
461-
if not load_into_memory or isinstance(dp.metadata[fld], Path):
462-
metadata_value = Path(metadata_value).read_bytes()
463462
try:
463+
annotation_content = dp.get_blob(fld)
464464
dp.metadata[fld] = MetadataAnnotations.from_ls_task(
465-
datapoint=dp, field=fld, ls_task=metadata_value
465+
datapoint=dp, field=fld, ls_task=annotation_content
466466
)
467+
except BlobDownloadError as e:
468+
dp.metadata[fld] = ErrorMetadataAnnotations(datapoint=dp, field=fld, error_message=e.message)
469+
bad_annotations[fld].append(dp.path)
467470
except ValidationError:
468471
dp.metadata[fld] = UnsupportedMetadataAnnotations(
469-
datapoint=dp, field=fld, original_value=metadata_value
472+
datapoint=dp, field=fld, original_value=annotation_content
470473
)
471474
bad_annotations[fld].append(dp.path)
472475

tests/data_engine/annotation_import/test_annotation_parsing.py

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@
99
from pytest import MonkeyPatch
1010

1111
from dagshub.data_engine.annotation import MetadataAnnotations
12-
from dagshub.data_engine.annotation.metadata import UnsupportedMetadataAnnotations
12+
from dagshub.data_engine.annotation.metadata import ErrorMetadataAnnotations, UnsupportedMetadataAnnotations
1313
from dagshub.data_engine.dtypes import MetadataFieldType, ReservedTags
14-
from dagshub.data_engine.model import query_result
14+
from dagshub.data_engine.model import datapoint, query_result
15+
from dagshub.data_engine.model.datapoint import BlobDownloadError, BlobHashMetadata
1516
from dagshub.data_engine.model.datasource import Datasource
1617
from dagshub.data_engine.model.query_result import QueryResult
1718
from tests.data_engine.util import add_metadata_field
@@ -61,12 +62,15 @@ def mock_get_blob(*args, **kwargs) -> Union[bytes, PathLike]:
6162
load_into_memory = args[4]
6263
blob_path = _res_folder / f"{blob_hash}.json"
6364

64-
if not blob_path.exists():
65-
raise FileNotFoundError(f"Mock blob file not found: {blob_path}")
66-
if load_into_memory:
67-
return blob_path.read_bytes()
68-
else:
69-
return blob_path
65+
try:
66+
if not blob_path.exists():
67+
raise FileNotFoundError(f"Blob with hash {blob_hash} not found in res folder")
68+
if load_into_memory:
69+
return blob_path.read_bytes()
70+
else:
71+
return blob_path
72+
except Exception as e:
73+
raise BlobDownloadError(str(e)) from e
7074

7175

7276
def _ds_with_annotation(ds: "Datasource", monkeypatch: MonkeyPatch, annotation_hash: str):
@@ -82,6 +86,7 @@ def _ds_with_annotation(ds: "Datasource", monkeypatch: MonkeyPatch, annotation_h
8286
)
8387

8488
monkeypatch.setattr(query_result, "_get_blob", mock_get_blob)
89+
monkeypatch.setattr(datapoint, "_get_blob", mock_get_blob)
8590

8691
return ds
8792

@@ -91,11 +96,6 @@ def ds_with_document_annotation(ds, monkeypatch):
9196
yield _ds_with_annotation(ds, monkeypatch, "annotation1")
9297

9398

94-
@pytest.fixture
95-
def ds_with_unsupported_annotation(ds, monkeypatch):
96-
yield _ds_with_annotation(ds, monkeypatch, "audio_annotation")
97-
98-
9999
def test_annotation_with_document_are_parsed_as_annotation(ds_with_document_annotation):
100100
qr = ds_with_document_annotation.all()
101101
_test_annotation(qr)
@@ -115,6 +115,11 @@ def _test_annotation(qr: QueryResult):
115115
assert isinstance(annotation.annotations[0], IRSegmentationImageAnnotation)
116116

117117

118+
@pytest.fixture
119+
def ds_with_unsupported_annotation(ds, monkeypatch):
120+
yield _ds_with_annotation(ds, monkeypatch, "audio_annotation")
121+
122+
118123
def test_handling_unsupported_annotation(ds_with_unsupported_annotation):
119124
qr = ds_with_unsupported_annotation.all()
120125

@@ -132,3 +137,34 @@ def test_handling_unsupported_annotation(ds_with_unsupported_annotation):
132137
expected_content = (_res_folder / "audio_annotation.json").read_bytes()
133138
assert annotation.value == expected_content
134139
assert annotation.to_ls_task() == expected_content
140+
141+
142+
@pytest.fixture
143+
def ds_with_nonexistent_annotation(ds, monkeypatch):
144+
yield _ds_with_annotation(ds, monkeypatch, "nonexistent_annotation")
145+
146+
147+
def test_nonexistent_annotation(ds_with_nonexistent_annotation):
148+
qr = ds_with_nonexistent_annotation.all(load_documents=False, load_annotations=False)
149+
qr.get_annotations()
150+
151+
annotation: MetadataAnnotations = qr[0].metadata[_annotation_field_name]
152+
153+
assert isinstance(annotation, ErrorMetadataAnnotations)
154+
# Error annotation is still a subclass of regular annotation
155+
# This is crucial for logic that checks if annotation metadata was parsed already,
156+
# so if this starts failing, that logic will need to be changed too
157+
assert isinstance(annotation, MetadataAnnotations)
158+
159+
with pytest.raises(NotImplementedError):
160+
annotation.add_image_bbox("cat", 0, 0, 10, 10, 1920, 1080)
161+
162+
with pytest.raises(ValueError, match="Blob with hash nonexistent_annotation not found in res folder"):
163+
_ = annotation.value
164+
with pytest.raises(ValueError, match="Blob with hash nonexistent_annotation not found in res folder"):
165+
annotation.to_ls_task()
166+
167+
168+
def test_blob_metadata_is_wrapped_from_backend(ds_with_document_annotation):
169+
qr = ds_with_document_annotation.all(load_documents=False, load_annotations=False)
170+
assert isinstance(qr[0].metadata[_annotation_field_name], BlobHashMetadata)

0 commit comments

Comments
 (0)