diff --git a/src/kili/adapters/kili_api_gateway/asset/formatters.py b/src/kili/adapters/kili_api_gateway/asset/formatters.py index 075b585c3..6d1266312 100644 --- a/src/kili/adapters/kili_api_gateway/asset/formatters.py +++ b/src/kili/adapters/kili_api_gateway/asset/formatters.py @@ -1,20 +1,121 @@ """Formatters for assets retrieved from Kili API.""" +import asyncio import json +import requests + from kili.adapters.http_client import HttpClient -from kili.core.helpers import get_response_json, is_url, log_raise_for_status +from kili.adapters.kili_api_gateway.helpers.http_json import load_json_from_link +from kili.core.helpers import is_url from kili.domain.types import ListOrTuple +# Batch size for parallel JSON response downloads (same as export service) +JSON_RESPONSE_BATCH_SIZE = 10 + + +async def _download_json_response(url: str, http_client: HttpClient) -> dict: + """Download and parse JSON response from a URL using asyncio. + + Args: + url: URL to download the JSON response from + http_client: HttpClient instance with SSL verification already configured -def load_json_from_link(link: str, http_client: HttpClient) -> dict: - """Load json from link.""" - if link == "" or not is_url(link): + Returns: + Parsed JSON response as a dictionary + """ + try: + # Run synchronous requests call in a thread + response = await asyncio.to_thread(http_client.get, url, timeout=30) + response.raise_for_status() + return response.json() + except (requests.RequestException, json.JSONDecodeError): + # Return empty dict on error to ensure consistent response format return {} - response = http_client.get(link, timeout=30) - log_raise_for_status(response) - return get_response_json(response) + +async def _download_json_responses_async( + url_to_label_mapping: list[tuple[str, dict]], http_client: HttpClient +) -> None: + """Download JSON responses in parallel using asyncio. + + Args: + url_to_label_mapping: List of tuples (url, label_dict) to download + http_client: HttpClient instance with SSL verification already configured + """ + # Process in batches to limit concurrent connections + for i in range(0, len(url_to_label_mapping), JSON_RESPONSE_BATCH_SIZE): + batch = url_to_label_mapping[i : i + JSON_RESPONSE_BATCH_SIZE] + + # Download all URLs in the batch in parallel using asyncio.gather + download_tasks = [_download_json_response(url, http_client) for url, _ in batch] + json_responses = await asyncio.gather(*download_tasks) + + # Assign the downloaded responses back to their labels and remove the URL + for (_, label), json_response in zip(batch, json_responses, strict=False): + label["jsonResponse"] = json_response + if "jsonResponseUrl" in label: + del label["jsonResponseUrl"] + + +def download_json_responses_parallel( + url_to_label_mapping: list[tuple[str, dict]], http_client: HttpClient +) -> None: + """Download JSON responses in parallel and assign to labels. + + Args: + url_to_label_mapping: List of tuples (url, label_dict) to download + http_client: HttpClient instance with SSL verification already configured + """ + if not url_to_label_mapping: + return + + # Check if we're already in an event loop (e.g., Jupyter notebook, async web framework) + try: + asyncio.get_running_loop() + except RuntimeError: + # No running loop, safe to use asyncio.run() for parallel downloads + asyncio.run(_download_json_responses_async(url_to_label_mapping, http_client)) + else: + # Already in a loop - fall back to sequential downloads to avoid RuntimeError + # This happens in Jupyter notebooks, FastAPI, and other async contexts + for url, label in url_to_label_mapping: + try: + response = http_client.get(url, timeout=30) + response.raise_for_status() + label["jsonResponse"] = response.json() + except (requests.RequestException, json.JSONDecodeError): + label["jsonResponse"] = {} + + if "jsonResponseUrl" in label: + del label["jsonResponseUrl"] + + +def _parse_label_json_response(label: dict) -> None: + """Parse jsonResponse string to dict for a single label. + + Args: + label: Label dict to update in place + """ + json_response_value = label.get("jsonResponse", "{}") + try: + label["jsonResponse"] = json.loads(json_response_value) + except json.JSONDecodeError: + label["jsonResponse"] = {} + + +def _process_label_json_response(label: dict, url_to_label_mapping: list[tuple[str, dict]]) -> None: + """Process a single label's jsonResponse, either scheduling URL download or parsing. + + Args: + label: Label dict to process + url_to_label_mapping: List to append URL mapping if download needed + """ + json_response_url = label.get("jsonResponseUrl") + if json_response_url and is_url(json_response_url): + url_to_label_mapping.append((json_response_url, label)) + else: + _parse_label_json_response(label) def load_asset_json_fields(asset: dict, fields: ListOrTuple[str], http_client: HttpClient) -> dict: @@ -28,18 +129,17 @@ def load_asset_json_fields(asset: dict, fields: ListOrTuple[str], http_client: H if "ocrMetadata" in fields and asset.get("ocrMetadata") is not None: asset["ocrMetadata"] = load_json_from_link(asset.get("ocrMetadata", ""), http_client) + # Collect all URLs to download in parallel (similar to export service) + url_to_label_mapping = [] + if "labels.jsonResponse" in fields: - asset_labels = asset.get("labels", []) - for label in asset_labels: - try: - label["jsonResponse"] = json.loads(label["jsonResponse"]) - except json.JSONDecodeError: - label["jsonResponse"] = {} + for label in asset.get("labels", []): + _process_label_json_response(label, url_to_label_mapping) if "latestLabel.jsonResponse" in fields and asset.get("latestLabel") is not None: - try: - asset["latestLabel"]["jsonResponse"] = json.loads(asset["latestLabel"]["jsonResponse"]) - except json.JSONDecodeError: - asset["latestLabel"]["jsonResponse"] = {} + _process_label_json_response(asset["latestLabel"], url_to_label_mapping) + + if url_to_label_mapping: + download_json_responses_parallel(url_to_label_mapping, http_client) return asset diff --git a/src/kili/adapters/kili_api_gateway/asset/operations_mixin.py b/src/kili/adapters/kili_api_gateway/asset/operations_mixin.py index 9ebb89f19..3adc05315 100644 --- a/src/kili/adapters/kili_api_gateway/asset/operations_mixin.py +++ b/src/kili/adapters/kili_api_gateway/asset/operations_mixin.py @@ -46,6 +46,9 @@ def list_assets( options: QueryOptions, ) -> Generator[dict, None, None]: """List assets with given options.""" + has_labels_url = "labels.jsonResponseUrl" in fields + has_latest_label_url = "latestLabel.jsonResponseUrl" in fields + if "labels.jsonResponse" in fields or "latestLabel.jsonResponse" in fields: # Check if we can get the jsonResponse of if we need to rebuild it. project_info = get_project( @@ -58,7 +61,10 @@ def list_assets( "LLM_STATIC", "GEOSPATIAL", }: - yield from self.list_assets_split(filters, fields, options, project_info) + fetch_annotations = not (has_labels_url or has_latest_label_url) + yield from self.list_assets_split( + filters, fields, options, project_info, fetch_annotations + ) return fragment = fragment_builder(fields) @@ -79,7 +85,12 @@ def list_assets( yield from assets_gen def list_assets_split( - self, filters: AssetFilters, fields: ListOrTuple[str], options: QueryOptions, project_info + self, + filters: AssetFilters, + fields: ListOrTuple[str], + options: QueryOptions, + project_info, + fetch_annotations: bool, ) -> Generator[dict, None, None]: """List assets with given options.""" nb_annotations = self.count_assets_annotations(filters) @@ -91,22 +102,23 @@ def list_assets_split( options = QueryOptions(options.disable_tqdm, options.first, options.skip, batch_size) - inner_annotation_fragment = get_annotation_fragment() - annotation_fragment = f""" - annotations {{ - {inner_annotation_fragment} - }} - """ - # Ensure 'content', 'resolution', and 'jsonContent' are present in fields - required_fields = {"content", "jsonContent", "resolution.width", "resolution.height"} - fields = list(fields) - for field in required_fields: - if field not in fields: - fields.append(field) - - fragment = fragment_builder( - fields, {"labels": annotation_fragment, "latestLabel": annotation_fragment} - ) + static_fragments = {} + if fetch_annotations: + inner_annotation_fragment = get_annotation_fragment() + annotation_fragment = f""" + annotations {{ + {inner_annotation_fragment} + }} + """ + static_fragments = {"labels": annotation_fragment, "latestLabel": annotation_fragment} + + required_fields = {"content", "jsonContent", "resolution.width", "resolution.height"} + fields = list(fields) + for field in required_fields: + if field not in fields: + fields.append(field) + + fragment = fragment_builder(fields, static_fragments if static_fragments else None) query = get_assets_query(fragment) where = asset_where_mapper(filters) assets_gen = PaginatedGraphQLQuery(self.graphql_client).execute_query_from_paginated_call( @@ -115,25 +127,29 @@ def list_assets_split( assets_gen = ( load_asset_json_fields(asset, fields, self.http_client) for asset in assets_gen ) - converter = AnnotationsToJsonResponseConverter( - json_interface=project_info["jsonInterface"], - project_input_type=project_info["inputType"], - ) - is_requesting_annotations = any("annotations." in element for element in fields) - for asset in assets_gen: - if "latestLabel.jsonResponse" in fields and asset.get("latestLabel"): - converter.patch_label_json_response( - asset, asset["latestLabel"], asset["latestLabel"]["annotations"] - ) - if not is_requesting_annotations: - asset["latestLabel"].pop("annotations") - if "labels.jsonResponse" in fields: - for label in asset.get("labels", []): - converter.patch_label_json_response(asset, label, label["annotations"]) + if fetch_annotations: + converter = AnnotationsToJsonResponseConverter( + json_interface=project_info["jsonInterface"], + project_input_type=project_info["inputType"], + ) + is_requesting_annotations = any("annotations." in element for element in fields) + for asset in assets_gen: + if "latestLabel.jsonResponse" in fields and asset.get("latestLabel"): + converter.patch_label_json_response( + asset, asset["latestLabel"], asset["latestLabel"]["annotations"] + ) if not is_requesting_annotations: - label.pop("annotations") - yield asset + asset["latestLabel"].pop("annotations") + + if "labels.jsonResponse" in fields: + for label in asset.get("labels", []): + converter.patch_label_json_response(asset, label, label["annotations"]) + if not is_requesting_annotations: + label.pop("annotations") + yield asset + else: + yield from assets_gen def count_assets(self, filters: AssetFilters) -> int: """Send a GraphQL request calling countIssues resolver.""" diff --git a/src/kili/adapters/kili_api_gateway/helpers/http_json.py b/src/kili/adapters/kili_api_gateway/helpers/http_json.py new file mode 100644 index 000000000..96839141f --- /dev/null +++ b/src/kili/adapters/kili_api_gateway/helpers/http_json.py @@ -0,0 +1,22 @@ +"""HTTP JSON helpers for downloading JSON data from URLs.""" + +from kili.adapters.http_client import HttpClient +from kili.core.helpers import is_url + + +def load_json_from_link(link: str, http_client: HttpClient) -> dict: + """Load json from link. + + Args: + link: URL to download JSON from + http_client: HttpClient instance with SSL verification already configured + + Returns: + Parsed JSON response as a dictionary, or empty dict if link is invalid + """ + if link == "" or not is_url(link): + return {} + + response = http_client.get(link, timeout=30) + response.raise_for_status() + return response.json() diff --git a/src/kili/adapters/kili_api_gateway/label/formatters.py b/src/kili/adapters/kili_api_gateway/label/formatters.py index 1c01a96ea..7ad333f67 100644 --- a/src/kili/adapters/kili_api_gateway/label/formatters.py +++ b/src/kili/adapters/kili_api_gateway/label/formatters.py @@ -2,15 +2,24 @@ import json +from kili.adapters.http_client import HttpClient +from kili.adapters.kili_api_gateway.helpers.http_json import load_json_from_link +from kili.core.helpers import is_url from kili.domain.types import ListOrTuple -def load_label_json_fields(label: dict, fields: ListOrTuple[str]) -> dict: +def load_label_json_fields(label: dict, fields: ListOrTuple[str], http_client: HttpClient) -> dict: """Load json fields of a label.""" if "jsonResponse" in fields: - try: - label["jsonResponse"] = json.loads(label.get("jsonResponse", "{}")) - except json.JSONDecodeError: - label["jsonResponse"] = {} + json_response_url = label.get("jsonResponseUrl") + if json_response_url and is_url(json_response_url): + label["jsonResponse"] = load_json_from_link(json_response_url, http_client) + del label["jsonResponseUrl"] + else: + json_response_value = label.get("jsonResponse", "{}") + try: + label["jsonResponse"] = json.loads(json_response_value) + except json.JSONDecodeError: + label["jsonResponse"] = {} return label diff --git a/src/kili/adapters/kili_api_gateway/label/operations_mixin.py b/src/kili/adapters/kili_api_gateway/label/operations_mixin.py index 1dd2abb57..5836b5395 100644 --- a/src/kili/adapters/kili_api_gateway/label/operations_mixin.py +++ b/src/kili/adapters/kili_api_gateway/label/operations_mixin.py @@ -52,6 +52,8 @@ def list_labels( options: QueryOptions, ) -> Generator[dict, None, None]: """List labels.""" + has_json_response_url = "jsonResponseUrl" in fields + if "jsonResponse" in fields: if "labelOf" not in fields: fields = [*list(fields), "assetId"] @@ -65,7 +67,10 @@ def list_labels( "LLM_INSTR_FOLLOWING", "LLM_STATIC", }: - yield from self.list_labels_split(filters, fields, options, project_info) + fetch_annotations = not has_json_response_url + yield from self.list_labels_split( + filters, fields, options, project_info, fetch_annotations + ) return fragment = fragment_builder(fields) @@ -74,11 +79,18 @@ def list_labels( labels_gen = PaginatedGraphQLQuery(self.graphql_client).execute_query_from_paginated_call( query, where, options, "Retrieving labels", GQL_COUNT_LABELS ) - labels_gen = (load_label_json_fields(label, fields) for label in labels_gen) + labels_gen = ( + load_label_json_fields(label, fields, self.http_client) for label in labels_gen + ) yield from labels_gen def list_labels_split( - self, filters: LabelFilters, fields: ListOrTuple[str], options: QueryOptions, project_info + self, + filters: LabelFilters, + fields: ListOrTuple[str], + options: QueryOptions, + project_info, + fetch_annotations: bool, ) -> Generator[dict, None, None]: """List labels.""" if project_info["inputType"] == "VIDEO": @@ -87,21 +99,28 @@ def list_labels_split( ) fragment = fragment_builder(fields) - inner_annotation_fragment = get_annotation_fragment() - full_fragment = f""" - {fragment} - annotations {{ - {inner_annotation_fragment} - }} - """ + + if fetch_annotations: + inner_annotation_fragment = get_annotation_fragment() + full_fragment = f""" + {fragment} + annotations {{ + {inner_annotation_fragment} + }} + """ + else: + full_fragment = fragment + query = get_labels_query(full_fragment) where = label_where_mapper(filters) labels_gen = PaginatedGraphQLQuery(self.graphql_client).execute_query_from_paginated_call( query, where, options, "Retrieving labels", GQL_COUNT_LABELS ) - labels_gen = (load_label_json_fields(label, fields) for label in labels_gen) + labels_gen = ( + load_label_json_fields(label, fields, self.http_client) for label in labels_gen + ) - if "jsonResponse" in fields: + if "jsonResponse" in fields and fetch_annotations: converter = AnnotationsToJsonResponseConverter( json_interface=project_info["jsonInterface"], project_input_type=project_info["inputType"], diff --git a/src/kili/llm/services/export/__init__.py b/src/kili/llm/services/export/__init__.py index fac449535..7284efe03 100644 --- a/src/kili/llm/services/export/__init__.py +++ b/src/kili/llm/services/export/__init__.py @@ -36,6 +36,7 @@ "isLatestLabelForUser", "isSentBackToQueue", "jsonResponse", # This is needed to keep annotations + "jsonResponseUrl", "labelType", "modelName", ] @@ -57,6 +58,7 @@ "externalId", "jsonMetadata", "labels.jsonResponse", + "labels.jsonResponseUrl", "labels.author.id", "labels.author.email", "labels.author.firstname", diff --git a/src/kili/services/copy_project/__init__.py b/src/kili/services/copy_project/__init__.py index aebd279aa..18a309f99 100644 --- a/src/kili/services/copy_project/__init__.py +++ b/src/kili/services/copy_project/__init__.py @@ -202,6 +202,7 @@ def _copy_labels_legacy(self, from_project_id: str, new_project_id: str) -> None fields=[ "author.email", "jsonResponse", + "jsonResponseUrl", "secondsToLabel", "isLatestLabelForUser", "labelOf.externalId", diff --git a/src/kili/services/export/tools.py b/src/kili/services/export/tools.py index d2aa1b9c7..32f0b9250 100644 --- a/src/kili/services/export/tools.py +++ b/src/kili/services/export/tools.py @@ -35,6 +35,7 @@ DEFAULT_FIELDS = [ *COMMON_FIELDS, "labels.jsonResponse", + "labels.jsonResponseUrl", "labels.author.id", "labels.author.email", "labels.author.firstname", @@ -48,6 +49,7 @@ LATEST_LABEL_FIELDS = [ *COMMON_FIELDS, "latestLabel.jsonResponse", + "latestLabel.jsonResponseUrl", "latestLabel.author.id", "latestLabel.author.email", "latestLabel.author.firstname", diff --git a/tests/integration/adapters/kili_api_gateway/test_label.py b/tests/integration/adapters/kili_api_gateway/test_label.py index b664c7a84..b77788cca 100644 --- a/tests/integration/adapters/kili_api_gateway/test_label.py +++ b/tests/integration/adapters/kili_api_gateway/test_label.py @@ -244,6 +244,7 @@ def mocked_graphql_execute(query, variables, **kwargs): { "id": "fake_label_id", "jsonResponse": "{}", + "jsonResponseUrl": None, "annotations": test_case_1.annotations, "assetId": "fake_asset_id", } diff --git a/tests/unit/adapters/kili_api_gateway/asset/__init__.py b/tests/unit/adapters/kili_api_gateway/asset/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/adapters/kili_api_gateway/asset/test_formatters.py b/tests/unit/adapters/kili_api_gateway/asset/test_formatters.py new file mode 100644 index 000000000..c976c912d --- /dev/null +++ b/tests/unit/adapters/kili_api_gateway/asset/test_formatters.py @@ -0,0 +1,397 @@ +"""Tests for asset formatters, specifically jsonResponseUrl optimization (LAB-4123). + +This test file verifies that download_json_responses_parallel correctly: +- Downloads JSON from URLs in parallel +- Handles batching correctly +- Handles errors gracefully +- Improves performance over sequential processing +""" + +from unittest.mock import AsyncMock, Mock, patch + +from kili.adapters.kili_api_gateway.asset.formatters import ( + JSON_RESPONSE_BATCH_SIZE, + download_json_responses_parallel, + load_asset_json_fields, +) +from kili.adapters.kili_api_gateway.helpers.http_json import load_json_from_link + + +class TestLoadJsonFromLink: + """Test loading JSON from a URL.""" + + def test_load_json_from_valid_url(self): + """Test loading JSON from a valid URL.""" + http_client = Mock() + response = Mock() + response.json.return_value = {"key": "value"} + http_client.get.return_value = response + + result = load_json_from_link("https://example.com/data.json", http_client) + + assert result == {"key": "value"} + http_client.get.assert_called_once_with("https://example.com/data.json", timeout=30) + + def test_load_json_from_empty_string(self): + """Test that empty string returns empty dict.""" + http_client = Mock() + + result = load_json_from_link("", http_client) + + assert result == {} + http_client.get.assert_not_called() + + def test_load_json_from_non_url(self): + """Test that non-URL string returns empty dict.""" + http_client = Mock() + + result = load_json_from_link("not-a-url", http_client) + + assert result == {} + http_client.get.assert_not_called() + + +class TestDownloadJsonResponsesParallel: + """Test parallel downloading of JSON responses.""" + + def test_download_single_json_response(self): + """Test downloading a single JSON response.""" + http_client = Mock() + label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} + url_to_label_mapping = [("https://example.com/label1.json", label)] + + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + new_callable=AsyncMock, + ) as mock_download: + mock_download.return_value = {"annotation": "data"} + + download_json_responses_parallel(url_to_label_mapping, http_client) + + # Verify the JSON was downloaded and assigned + assert label["jsonResponse"] == {"annotation": "data"} + # Verify jsonResponseUrl was removed + assert "jsonResponseUrl" not in label + mock_download.assert_called_once() + + def test_download_multiple_json_responses(self): + """Test downloading multiple JSON responses in parallel.""" + http_client = Mock() + label1 = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} + label2 = {"id": "label2", "jsonResponseUrl": "https://example.com/label2.json"} + label3 = {"id": "label3", "jsonResponseUrl": "https://example.com/label3.json"} + + url_to_label_mapping = [ + ("https://example.com/label1.json", label1), + ("https://example.com/label2.json", label2), + ("https://example.com/label3.json", label3), + ] + + # Create mock side effect for different URLs + async def mock_download(url, client): + if "label1" in url: + return {"data": "label1"} + elif "label2" in url: + return {"data": "label2"} + elif "label3" in url: + return {"data": "label3"} + return {} + + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + side_effect=mock_download, + ): + download_json_responses_parallel(url_to_label_mapping, http_client) + + # Verify all labels got their JSON + assert label1["jsonResponse"] == {"data": "label1"} + assert label2["jsonResponse"] == {"data": "label2"} + assert label3["jsonResponse"] == {"data": "label3"} + + # Verify all URLs were removed + assert "jsonResponseUrl" not in label1 + assert "jsonResponseUrl" not in label2 + assert "jsonResponseUrl" not in label3 + + def test_download_handles_request_exception(self): + """Test that request exceptions are handled gracefully.""" + http_client = Mock() + label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} + url_to_label_mapping = [("https://example.com/label1.json", label)] + + # Mock download to return empty dict on error + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + new_callable=AsyncMock, + ) as mock_download: + mock_download.return_value = {} # Returns empty dict on error + + # Should not raise exception + download_json_responses_parallel(url_to_label_mapping, http_client) + + # Should set empty dict on error + assert label["jsonResponse"] == {} + assert "jsonResponseUrl" not in label + + def test_download_handles_json_decode_error(self): + """Test that JSON decode errors are handled gracefully.""" + http_client = Mock() + label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} + url_to_label_mapping = [("https://example.com/label1.json", label)] + + # Mock download to return empty dict (simulating error) + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + new_callable=AsyncMock, + ) as mock_download: + mock_download.return_value = {} # Returns empty dict on error + + # Should not raise exception + download_json_responses_parallel(url_to_label_mapping, http_client) + + # Should set empty dict on error + assert label["jsonResponse"] == {} + assert "jsonResponseUrl" not in label + + def test_download_handles_timeout_error(self): + """Test that timeout errors are handled gracefully.""" + http_client = Mock() + label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} + url_to_label_mapping = [("https://example.com/label1.json", label)] + + # Mock download to return empty dict (simulating error) + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + new_callable=AsyncMock, + ) as mock_download: + mock_download.return_value = {} # Returns empty dict on error + + # Should not raise exception + download_json_responses_parallel(url_to_label_mapping, http_client) + + # Should set empty dict on error + assert label["jsonResponse"] == {} + assert "jsonResponseUrl" not in label + + def test_download_processes_in_batches(self): + """Test that downloads are processed in batches of JSON_RESPONSE_BATCH_SIZE.""" + http_client = Mock() + # Create more labels than batch size + num_labels = JSON_RESPONSE_BATCH_SIZE + 5 + labels = [ + {"id": f"label{i}", "jsonResponseUrl": f"https://example.com/label{i}.json"} + for i in range(num_labels) + ] + url_to_label_mapping = [ + (f"https://example.com/label{i}.json", label) for i, label in enumerate(labels) + ] + + # Mock the async download function + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + new_callable=AsyncMock, + ) as mock_download: + mock_download.return_value = {"data": "test"} + + # Call the function + download_json_responses_parallel(url_to_label_mapping, http_client) + + # Verify all labels got their JSON response + for label in labels: + assert label["jsonResponse"] == {"data": "test"} + assert "jsonResponseUrl" not in label + + # Verify download was called for all URLs + assert mock_download.call_count == num_labels + + def test_download_mixed_success_and_failure(self): + """Test downloading with some successes and some failures.""" + http_client = Mock() + label_success = {"id": "success", "jsonResponseUrl": "https://example.com/success.json"} + label_fail = {"id": "fail", "jsonResponseUrl": "https://example.com/fail.json"} + + url_to_label_mapping = [ + ("https://example.com/success.json", label_success), + ("https://example.com/fail.json", label_fail), + ] + + # Create mock side effect for different URLs + async def mock_download(url, client): + if "success" in url: + return {"status": "ok"} + return {} # Failure returns empty dict + + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + side_effect=mock_download, + ): + download_json_responses_parallel(url_to_label_mapping, http_client) + + # Success should have data + assert label_success["jsonResponse"] == {"status": "ok"} + assert "jsonResponseUrl" not in label_success + + # Failure should have empty dict + assert label_fail["jsonResponse"] == {} + assert "jsonResponseUrl" not in label_fail + + def test_download_in_existing_event_loop(self): + """Test that downloads work when called from within an existing event loop.""" + import asyncio + + http_client = Mock() + label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} + url_to_label_mapping = [("https://example.com/label1.json", label)] + + # Mock the http_client.get response + mock_response = Mock() + mock_response.json.return_value = {"data": "test"} + http_client.get.return_value = mock_response + + # Run inside an existing event loop (simulating Jupyter/FastAPI) + async def run_in_loop(): + download_json_responses_parallel(url_to_label_mapping, http_client) + + asyncio.run(run_in_loop()) + + # Verify the JSON was downloaded (sequentially in this case) + assert label["jsonResponse"] == {"data": "test"} + assert "jsonResponseUrl" not in label + http_client.get.assert_called_once_with("https://example.com/label1.json", timeout=30) + + +class TestLoadAssetJsonFields: + """Test load_asset_json_fields integration with download_json_responses_parallel.""" + + def test_load_asset_with_latest_label_json_response_url(self): + """Test loading asset with latestLabel.jsonResponseUrl.""" + http_client = Mock() + + asset = { + "id": "asset1", + "latestLabel": { + "id": "label1", + "jsonResponse": '{"old": "data"}', # This should be replaced + "jsonResponseUrl": "https://example.com/label1.json", + }, + } + + fields = ["id", "latestLabel.jsonResponse", "latestLabel.jsonResponseUrl"] + + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + new_callable=AsyncMock, + ) as mock_download: + mock_download.return_value = {"annotation": "data"} + + result = load_asset_json_fields(asset, fields, http_client) + + # Verify URL was used instead of parsing string + assert result["latestLabel"]["jsonResponse"] == {"annotation": "data"} + assert "jsonResponseUrl" not in result["latestLabel"] + mock_download.assert_called_once() + + def test_load_asset_with_labels_json_response_url(self): + """Test loading asset with labels.jsonResponseUrl.""" + http_client = Mock() + + asset = { + "id": "asset1", + "labels": [ + { + "id": "label1", + "jsonResponse": '{"old": "data1"}', + "jsonResponseUrl": "https://example.com/label1.json", + }, + { + "id": "label2", + "jsonResponse": '{"old": "data2"}', + "jsonResponseUrl": "https://example.com/label2.json", + }, + ], + } + + fields = ["id", "labels.jsonResponse", "labels.jsonResponseUrl"] + + # Create mock side effect for different URLs + async def mock_download(url, client): + if "label1" in url: + return {"data": "label1"} + elif "label2" in url: + return {"data": "label2"} + return {} + + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + side_effect=mock_download, + ): + result = load_asset_json_fields(asset, fields, http_client) + + # Verify URLs were used for both labels + assert result["labels"][0]["jsonResponse"] == {"data": "label1"} + assert result["labels"][1]["jsonResponse"] == {"data": "label2"} + assert "jsonResponseUrl" not in result["labels"][0] + assert "jsonResponseUrl" not in result["labels"][1] + + def test_load_asset_without_json_response_url_falls_back_to_parsing(self): + """Test that assets without jsonResponseUrl fall back to string parsing.""" + http_client = Mock() + + asset = { + "id": "asset1", + "latestLabel": { + "id": "label1", + "jsonResponse": '{"annotation": "data"}', + # No jsonResponseUrl field + }, + } + + fields = ["id", "latestLabel.jsonResponse"] + + result = load_asset_json_fields(asset, fields, http_client) + + # Verify string was parsed (not URL downloaded) + assert result["latestLabel"]["jsonResponse"] == {"annotation": "data"} + http_client.get.assert_not_called() + + def test_load_asset_with_both_labels_and_latest_label(self): + """Test loading asset with both labels and latestLabel having URLs.""" + http_client = Mock() + + asset = { + "id": "asset1", + "labels": [ + {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"}, + {"id": "label2", "jsonResponseUrl": "https://example.com/label2.json"}, + ], + "latestLabel": {"id": "latest", "jsonResponseUrl": "https://example.com/latest.json"}, + } + + fields = [ + "id", + "labels.jsonResponse", + "labels.jsonResponseUrl", + "latestLabel.jsonResponse", + "latestLabel.jsonResponseUrl", + ] + + # Create mock side effect for different URLs + async def mock_download(url, client): + if "label1" in url: + return {"data": "label1"} + elif "label2" in url: + return {"data": "label2"} + elif "latest" in url: + return {"data": "latest"} + return {} + + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + side_effect=mock_download, + ): + result = load_asset_json_fields(asset, fields, http_client) + + # Verify all were downloaded + assert result["labels"][0]["jsonResponse"] == {"data": "label1"} + assert result["labels"][1]["jsonResponse"] == {"data": "label2"} + assert result["latestLabel"]["jsonResponse"] == {"data": "latest"} diff --git a/tests/unit/services/export/test_export.py b/tests/unit/services/export/test_export.py index 6dc1793f5..d5f8c3960 100644 --- a/tests/unit/services/export/test_export.py +++ b/tests/unit/services/export/test_export.py @@ -858,6 +858,7 @@ def test_export_with_asset_filter_kwargs(mocker): "resolution.height", "resolution.width", "latestLabel.jsonResponse", + "latestLabel.jsonResponseUrl", "latestLabel.author.id", "latestLabel.author.email", "latestLabel.author.firstname",