From a5f7a3ec084cf26b35fde3ef8fae838a08bb8949 Mon Sep 17 00:00:00 2001 From: "@fanny.gaudin" Date: Wed, 7 Jan 2026 17:16:03 +0100 Subject: [PATCH 1/3] feat(LAB-4123): faster exports with the SDK --- .../kili_api_gateway/asset/formatters.py | 91 ++++- .../asset/operations_mixin.py | 86 +++-- .../kili_api_gateway/label/formatters.py | 28 +- .../label/operations_mixin.py | 43 ++- src/kili/llm/services/export/__init__.py | 2 + src/kili/services/copy_project/__init__.py | 1 + src/kili/services/export/tools.py | 2 + .../adapters/kili_api_gateway/test_label.py | 1 + .../kili_api_gateway/asset/__init__.py | 0 .../kili_api_gateway/asset/test_formatters.py | 365 ++++++++++++++++++ tests/unit/services/export/test_export.py | 1 + 11 files changed, 558 insertions(+), 62 deletions(-) create mode 100644 tests/unit/adapters/kili_api_gateway/asset/__init__.py create mode 100644 tests/unit/adapters/kili_api_gateway/asset/test_formatters.py diff --git a/src/kili/adapters/kili_api_gateway/asset/formatters.py b/src/kili/adapters/kili_api_gateway/asset/formatters.py index 075b585c3..09940ba4d 100644 --- a/src/kili/adapters/kili_api_gateway/asset/formatters.py +++ b/src/kili/adapters/kili_api_gateway/asset/formatters.py @@ -1,11 +1,17 @@ """Formatters for assets retrieved from Kili API.""" import json +from concurrent.futures import ThreadPoolExecutor, as_completed + +import requests from kili.adapters.http_client import HttpClient from kili.core.helpers import get_response_json, is_url, log_raise_for_status from kili.domain.types import ListOrTuple +# Batch size for parallel JSON response downloads (same as export service) +JSON_RESPONSE_BATCH_SIZE = 10 + def load_json_from_link(link: str, http_client: HttpClient) -> dict: """Load json from link.""" @@ -17,6 +23,72 @@ def load_json_from_link(link: str, http_client: HttpClient) -> dict: return get_response_json(response) +def download_json_responses_parallel( + url_to_label_mapping: list[tuple[str, dict]], http_client: HttpClient +) -> None: + """Download JSON responses in parallel and assign to labels. + + Args: + url_to_label_mapping: List of tuples (url, label_dict) to download + http_client: HTTP client to use for downloads + """ + if not url_to_label_mapping: + return + + # Process in batches to limit concurrent connections + for i in range(0, len(url_to_label_mapping), JSON_RESPONSE_BATCH_SIZE): + batch = url_to_label_mapping[i : i + JSON_RESPONSE_BATCH_SIZE] + + # Download all URLs in the batch in parallel + with ThreadPoolExecutor(max_workers=JSON_RESPONSE_BATCH_SIZE) as executor: + # Submit all download tasks + future_to_label = { + executor.submit(load_json_from_link, url, http_client): label + for url, label in batch + } + + # Collect results as they complete + for future in as_completed(future_to_label): + label = future_to_label[future] + try: + json_response = future.result() + label["jsonResponse"] = json_response + if "jsonResponseUrl" in label: + del label["jsonResponseUrl"] + except (requests.RequestException, json.JSONDecodeError, TimeoutError): + # Set empty dict to ensure consistent response format + label["jsonResponse"] = {} + if "jsonResponseUrl" in label: + del label["jsonResponseUrl"] + + +def _parse_label_json_response(label: dict) -> None: + """Parse jsonResponse string to dict for a single label. + + Args: + label: Label dict to update in place + """ + json_response_value = label.get("jsonResponse", "{}") + try: + label["jsonResponse"] = json.loads(json_response_value) + except json.JSONDecodeError: + label["jsonResponse"] = {} + + +def _process_label_json_response(label: dict, url_to_label_mapping: list[tuple[str, dict]]) -> None: + """Process a single label's jsonResponse, either scheduling URL download or parsing. + + Args: + label: Label dict to process + url_to_label_mapping: List to append URL mapping if download needed + """ + json_response_url = label.get("jsonResponseUrl") + if json_response_url and is_url(json_response_url): + url_to_label_mapping.append((json_response_url, label)) + else: + _parse_label_json_response(label) + + def load_asset_json_fields(asset: dict, fields: ListOrTuple[str], http_client: HttpClient) -> dict: """Load json fields of an asset.""" if "jsonMetadata" in fields: @@ -28,18 +100,17 @@ def load_asset_json_fields(asset: dict, fields: ListOrTuple[str], http_client: H if "ocrMetadata" in fields and asset.get("ocrMetadata") is not None: asset["ocrMetadata"] = load_json_from_link(asset.get("ocrMetadata", ""), http_client) + # Collect all URLs to download in parallel (similar to export service) + url_to_label_mapping = [] + if "labels.jsonResponse" in fields: - asset_labels = asset.get("labels", []) - for label in asset_labels: - try: - label["jsonResponse"] = json.loads(label["jsonResponse"]) - except json.JSONDecodeError: - label["jsonResponse"] = {} + for label in asset.get("labels", []): + _process_label_json_response(label, url_to_label_mapping) if "latestLabel.jsonResponse" in fields and asset.get("latestLabel") is not None: - try: - asset["latestLabel"]["jsonResponse"] = json.loads(asset["latestLabel"]["jsonResponse"]) - except json.JSONDecodeError: - asset["latestLabel"]["jsonResponse"] = {} + _process_label_json_response(asset["latestLabel"], url_to_label_mapping) + + if url_to_label_mapping: + download_json_responses_parallel(url_to_label_mapping, http_client) return asset diff --git a/src/kili/adapters/kili_api_gateway/asset/operations_mixin.py b/src/kili/adapters/kili_api_gateway/asset/operations_mixin.py index 9ebb89f19..3adc05315 100644 --- a/src/kili/adapters/kili_api_gateway/asset/operations_mixin.py +++ b/src/kili/adapters/kili_api_gateway/asset/operations_mixin.py @@ -46,6 +46,9 @@ def list_assets( options: QueryOptions, ) -> Generator[dict, None, None]: """List assets with given options.""" + has_labels_url = "labels.jsonResponseUrl" in fields + has_latest_label_url = "latestLabel.jsonResponseUrl" in fields + if "labels.jsonResponse" in fields or "latestLabel.jsonResponse" in fields: # Check if we can get the jsonResponse of if we need to rebuild it. project_info = get_project( @@ -58,7 +61,10 @@ def list_assets( "LLM_STATIC", "GEOSPATIAL", }: - yield from self.list_assets_split(filters, fields, options, project_info) + fetch_annotations = not (has_labels_url or has_latest_label_url) + yield from self.list_assets_split( + filters, fields, options, project_info, fetch_annotations + ) return fragment = fragment_builder(fields) @@ -79,7 +85,12 @@ def list_assets( yield from assets_gen def list_assets_split( - self, filters: AssetFilters, fields: ListOrTuple[str], options: QueryOptions, project_info + self, + filters: AssetFilters, + fields: ListOrTuple[str], + options: QueryOptions, + project_info, + fetch_annotations: bool, ) -> Generator[dict, None, None]: """List assets with given options.""" nb_annotations = self.count_assets_annotations(filters) @@ -91,22 +102,23 @@ def list_assets_split( options = QueryOptions(options.disable_tqdm, options.first, options.skip, batch_size) - inner_annotation_fragment = get_annotation_fragment() - annotation_fragment = f""" - annotations {{ - {inner_annotation_fragment} - }} - """ - # Ensure 'content', 'resolution', and 'jsonContent' are present in fields - required_fields = {"content", "jsonContent", "resolution.width", "resolution.height"} - fields = list(fields) - for field in required_fields: - if field not in fields: - fields.append(field) - - fragment = fragment_builder( - fields, {"labels": annotation_fragment, "latestLabel": annotation_fragment} - ) + static_fragments = {} + if fetch_annotations: + inner_annotation_fragment = get_annotation_fragment() + annotation_fragment = f""" + annotations {{ + {inner_annotation_fragment} + }} + """ + static_fragments = {"labels": annotation_fragment, "latestLabel": annotation_fragment} + + required_fields = {"content", "jsonContent", "resolution.width", "resolution.height"} + fields = list(fields) + for field in required_fields: + if field not in fields: + fields.append(field) + + fragment = fragment_builder(fields, static_fragments if static_fragments else None) query = get_assets_query(fragment) where = asset_where_mapper(filters) assets_gen = PaginatedGraphQLQuery(self.graphql_client).execute_query_from_paginated_call( @@ -115,25 +127,29 @@ def list_assets_split( assets_gen = ( load_asset_json_fields(asset, fields, self.http_client) for asset in assets_gen ) - converter = AnnotationsToJsonResponseConverter( - json_interface=project_info["jsonInterface"], - project_input_type=project_info["inputType"], - ) - is_requesting_annotations = any("annotations." in element for element in fields) - for asset in assets_gen: - if "latestLabel.jsonResponse" in fields and asset.get("latestLabel"): - converter.patch_label_json_response( - asset, asset["latestLabel"], asset["latestLabel"]["annotations"] - ) - if not is_requesting_annotations: - asset["latestLabel"].pop("annotations") - if "labels.jsonResponse" in fields: - for label in asset.get("labels", []): - converter.patch_label_json_response(asset, label, label["annotations"]) + if fetch_annotations: + converter = AnnotationsToJsonResponseConverter( + json_interface=project_info["jsonInterface"], + project_input_type=project_info["inputType"], + ) + is_requesting_annotations = any("annotations." in element for element in fields) + for asset in assets_gen: + if "latestLabel.jsonResponse" in fields and asset.get("latestLabel"): + converter.patch_label_json_response( + asset, asset["latestLabel"], asset["latestLabel"]["annotations"] + ) if not is_requesting_annotations: - label.pop("annotations") - yield asset + asset["latestLabel"].pop("annotations") + + if "labels.jsonResponse" in fields: + for label in asset.get("labels", []): + converter.patch_label_json_response(asset, label, label["annotations"]) + if not is_requesting_annotations: + label.pop("annotations") + yield asset + else: + yield from assets_gen def count_assets(self, filters: AssetFilters) -> int: """Send a GraphQL request calling countIssues resolver.""" diff --git a/src/kili/adapters/kili_api_gateway/label/formatters.py b/src/kili/adapters/kili_api_gateway/label/formatters.py index 1c01a96ea..fcc7f0e63 100644 --- a/src/kili/adapters/kili_api_gateway/label/formatters.py +++ b/src/kili/adapters/kili_api_gateway/label/formatters.py @@ -2,15 +2,33 @@ import json +from kili.adapters.http_client import HttpClient +from kili.core.helpers import is_url from kili.domain.types import ListOrTuple -def load_label_json_fields(label: dict, fields: ListOrTuple[str]) -> dict: +def load_json_from_link(link: str, http_client: HttpClient) -> dict: + """Load json from link.""" + if link == "" or not is_url(link): + return {} + + response = http_client.get(link, timeout=30) + response.raise_for_status() + return response.json() + + +def load_label_json_fields(label: dict, fields: ListOrTuple[str], http_client: HttpClient) -> dict: """Load json fields of a label.""" if "jsonResponse" in fields: - try: - label["jsonResponse"] = json.loads(label.get("jsonResponse", "{}")) - except json.JSONDecodeError: - label["jsonResponse"] = {} + json_response_url = label.get("jsonResponseUrl") + if json_response_url and is_url(json_response_url): + label["jsonResponse"] = load_json_from_link(json_response_url, http_client) + del label["jsonResponseUrl"] + else: + json_response_value = label.get("jsonResponse", "{}") + try: + label["jsonResponse"] = json.loads(json_response_value) + except json.JSONDecodeError: + label["jsonResponse"] = {} return label diff --git a/src/kili/adapters/kili_api_gateway/label/operations_mixin.py b/src/kili/adapters/kili_api_gateway/label/operations_mixin.py index 1dd2abb57..5836b5395 100644 --- a/src/kili/adapters/kili_api_gateway/label/operations_mixin.py +++ b/src/kili/adapters/kili_api_gateway/label/operations_mixin.py @@ -52,6 +52,8 @@ def list_labels( options: QueryOptions, ) -> Generator[dict, None, None]: """List labels.""" + has_json_response_url = "jsonResponseUrl" in fields + if "jsonResponse" in fields: if "labelOf" not in fields: fields = [*list(fields), "assetId"] @@ -65,7 +67,10 @@ def list_labels( "LLM_INSTR_FOLLOWING", "LLM_STATIC", }: - yield from self.list_labels_split(filters, fields, options, project_info) + fetch_annotations = not has_json_response_url + yield from self.list_labels_split( + filters, fields, options, project_info, fetch_annotations + ) return fragment = fragment_builder(fields) @@ -74,11 +79,18 @@ def list_labels( labels_gen = PaginatedGraphQLQuery(self.graphql_client).execute_query_from_paginated_call( query, where, options, "Retrieving labels", GQL_COUNT_LABELS ) - labels_gen = (load_label_json_fields(label, fields) for label in labels_gen) + labels_gen = ( + load_label_json_fields(label, fields, self.http_client) for label in labels_gen + ) yield from labels_gen def list_labels_split( - self, filters: LabelFilters, fields: ListOrTuple[str], options: QueryOptions, project_info + self, + filters: LabelFilters, + fields: ListOrTuple[str], + options: QueryOptions, + project_info, + fetch_annotations: bool, ) -> Generator[dict, None, None]: """List labels.""" if project_info["inputType"] == "VIDEO": @@ -87,21 +99,28 @@ def list_labels_split( ) fragment = fragment_builder(fields) - inner_annotation_fragment = get_annotation_fragment() - full_fragment = f""" - {fragment} - annotations {{ - {inner_annotation_fragment} - }} - """ + + if fetch_annotations: + inner_annotation_fragment = get_annotation_fragment() + full_fragment = f""" + {fragment} + annotations {{ + {inner_annotation_fragment} + }} + """ + else: + full_fragment = fragment + query = get_labels_query(full_fragment) where = label_where_mapper(filters) labels_gen = PaginatedGraphQLQuery(self.graphql_client).execute_query_from_paginated_call( query, where, options, "Retrieving labels", GQL_COUNT_LABELS ) - labels_gen = (load_label_json_fields(label, fields) for label in labels_gen) + labels_gen = ( + load_label_json_fields(label, fields, self.http_client) for label in labels_gen + ) - if "jsonResponse" in fields: + if "jsonResponse" in fields and fetch_annotations: converter = AnnotationsToJsonResponseConverter( json_interface=project_info["jsonInterface"], project_input_type=project_info["inputType"], diff --git a/src/kili/llm/services/export/__init__.py b/src/kili/llm/services/export/__init__.py index fac449535..7284efe03 100644 --- a/src/kili/llm/services/export/__init__.py +++ b/src/kili/llm/services/export/__init__.py @@ -36,6 +36,7 @@ "isLatestLabelForUser", "isSentBackToQueue", "jsonResponse", # This is needed to keep annotations + "jsonResponseUrl", "labelType", "modelName", ] @@ -57,6 +58,7 @@ "externalId", "jsonMetadata", "labels.jsonResponse", + "labels.jsonResponseUrl", "labels.author.id", "labels.author.email", "labels.author.firstname", diff --git a/src/kili/services/copy_project/__init__.py b/src/kili/services/copy_project/__init__.py index aebd279aa..18a309f99 100644 --- a/src/kili/services/copy_project/__init__.py +++ b/src/kili/services/copy_project/__init__.py @@ -202,6 +202,7 @@ def _copy_labels_legacy(self, from_project_id: str, new_project_id: str) -> None fields=[ "author.email", "jsonResponse", + "jsonResponseUrl", "secondsToLabel", "isLatestLabelForUser", "labelOf.externalId", diff --git a/src/kili/services/export/tools.py b/src/kili/services/export/tools.py index d2aa1b9c7..32f0b9250 100644 --- a/src/kili/services/export/tools.py +++ b/src/kili/services/export/tools.py @@ -35,6 +35,7 @@ DEFAULT_FIELDS = [ *COMMON_FIELDS, "labels.jsonResponse", + "labels.jsonResponseUrl", "labels.author.id", "labels.author.email", "labels.author.firstname", @@ -48,6 +49,7 @@ LATEST_LABEL_FIELDS = [ *COMMON_FIELDS, "latestLabel.jsonResponse", + "latestLabel.jsonResponseUrl", "latestLabel.author.id", "latestLabel.author.email", "latestLabel.author.firstname", diff --git a/tests/integration/adapters/kili_api_gateway/test_label.py b/tests/integration/adapters/kili_api_gateway/test_label.py index b664c7a84..b77788cca 100644 --- a/tests/integration/adapters/kili_api_gateway/test_label.py +++ b/tests/integration/adapters/kili_api_gateway/test_label.py @@ -244,6 +244,7 @@ def mocked_graphql_execute(query, variables, **kwargs): { "id": "fake_label_id", "jsonResponse": "{}", + "jsonResponseUrl": None, "annotations": test_case_1.annotations, "assetId": "fake_asset_id", } diff --git a/tests/unit/adapters/kili_api_gateway/asset/__init__.py b/tests/unit/adapters/kili_api_gateway/asset/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/adapters/kili_api_gateway/asset/test_formatters.py b/tests/unit/adapters/kili_api_gateway/asset/test_formatters.py new file mode 100644 index 000000000..f72da7951 --- /dev/null +++ b/tests/unit/adapters/kili_api_gateway/asset/test_formatters.py @@ -0,0 +1,365 @@ +"""Tests for asset formatters, specifically jsonResponseUrl optimization (LAB-4123). + +This test file verifies that download_json_responses_parallel correctly: +- Downloads JSON from URLs in parallel +- Handles batching correctly +- Handles errors gracefully +- Improves performance over sequential processing +""" + +import json +from unittest.mock import Mock + +import requests + +from kili.adapters.kili_api_gateway.asset.formatters import ( + JSON_RESPONSE_BATCH_SIZE, + download_json_responses_parallel, + load_asset_json_fields, + load_json_from_link, +) + + +class TestLoadJsonFromLink: + """Test loading JSON from a URL.""" + + def test_load_json_from_valid_url(self): + """Test loading JSON from a valid URL.""" + http_client = Mock() + response = Mock() + response.json.return_value = {"key": "value"} + http_client.get.return_value = response + + result = load_json_from_link("https://example.com/data.json", http_client) + + assert result == {"key": "value"} + http_client.get.assert_called_once_with("https://example.com/data.json", timeout=30) + + def test_load_json_from_empty_string(self): + """Test that empty string returns empty dict.""" + http_client = Mock() + + result = load_json_from_link("", http_client) + + assert result == {} + http_client.get.assert_not_called() + + def test_load_json_from_non_url(self): + """Test that non-URL string returns empty dict.""" + http_client = Mock() + + result = load_json_from_link("not-a-url", http_client) + + assert result == {} + http_client.get.assert_not_called() + + +class TestDownloadJsonResponsesParallel: + """Test parallel downloading of JSON responses.""" + + def test_download_empty_list(self): + """Test that empty list is handled correctly.""" + http_client = Mock() + + download_json_responses_parallel([], http_client) + + # Should return without errors and without calling http_client + http_client.get.assert_not_called() + + def test_download_single_json_response(self): + """Test downloading a single JSON response.""" + http_client = Mock() + response = Mock() + response.json.return_value = {"annotation": "data"} + http_client.get.return_value = response + + label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} + url_to_label_mapping = [("https://example.com/label1.json", label)] + + download_json_responses_parallel(url_to_label_mapping, http_client) + + # Verify the JSON was downloaded and assigned + assert label["jsonResponse"] == {"annotation": "data"} + # Verify jsonResponseUrl was removed + assert "jsonResponseUrl" not in label + http_client.get.assert_called_once() + + def test_download_multiple_json_responses(self): + """Test downloading multiple JSON responses in parallel.""" + http_client = Mock() + + # Create mock responses for different labels + def mock_get(url, timeout): + response = Mock() + if "label1" in url: + response.json.return_value = {"data": "label1"} + elif "label2" in url: + response.json.return_value = {"data": "label2"} + elif "label3" in url: + response.json.return_value = {"data": "label3"} + return response + + http_client.get.side_effect = mock_get + + label1 = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} + label2 = {"id": "label2", "jsonResponseUrl": "https://example.com/label2.json"} + label3 = {"id": "label3", "jsonResponseUrl": "https://example.com/label3.json"} + + url_to_label_mapping = [ + ("https://example.com/label1.json", label1), + ("https://example.com/label2.json", label2), + ("https://example.com/label3.json", label3), + ] + + download_json_responses_parallel(url_to_label_mapping, http_client) + + # Verify all labels got their JSON + assert label1["jsonResponse"] == {"data": "label1"} + assert label2["jsonResponse"] == {"data": "label2"} + assert label3["jsonResponse"] == {"data": "label3"} + + # Verify all URLs were removed + assert "jsonResponseUrl" not in label1 + assert "jsonResponseUrl" not in label2 + assert "jsonResponseUrl" not in label3 + + # Verify all URLs were called + assert http_client.get.call_count == 3 + + def test_download_handles_request_exception(self): + """Test that request exceptions are handled gracefully.""" + http_client = Mock() + http_client.get.side_effect = requests.RequestException("Network error") + + label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} + url_to_label_mapping = [("https://example.com/label1.json", label)] + + # Should not raise exception + download_json_responses_parallel(url_to_label_mapping, http_client) + + # Should set empty dict on error + assert label["jsonResponse"] == {} + assert "jsonResponseUrl" not in label + + def test_download_handles_json_decode_error(self): + """Test that JSON decode errors are handled gracefully.""" + http_client = Mock() + response = Mock() + response.json.side_effect = json.JSONDecodeError("Invalid JSON", "", 0) + http_client.get.return_value = response + + label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} + url_to_label_mapping = [("https://example.com/label1.json", label)] + + # Should not raise exception + download_json_responses_parallel(url_to_label_mapping, http_client) + + # Should set empty dict on error + assert label["jsonResponse"] == {} + assert "jsonResponseUrl" not in label + + def test_download_handles_timeout_error(self): + """Test that timeout errors are handled gracefully.""" + http_client = Mock() + http_client.get.side_effect = TimeoutError("Request timed out") + + label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} + url_to_label_mapping = [("https://example.com/label1.json", label)] + + # Should not raise exception + download_json_responses_parallel(url_to_label_mapping, http_client) + + # Should set empty dict on error + assert label["jsonResponse"] == {} + assert "jsonResponseUrl" not in label + + def test_download_processes_in_batches(self): + """Test that downloads are processed in batches of JSON_RESPONSE_BATCH_SIZE.""" + http_client = Mock() + response = Mock() + response.json.return_value = {"data": "test"} + http_client.get.return_value = response + + # Create more labels than batch size + num_labels = JSON_RESPONSE_BATCH_SIZE + 5 + labels = [ + {"id": f"label{i}", "jsonResponseUrl": f"https://example.com/label{i}.json"} + for i in range(num_labels) + ] + url_to_label_mapping = [ + (f"https://example.com/label{i}.json", label) for i, label in enumerate(labels) + ] + + # Simply call the function and verify all labels got processed + download_json_responses_parallel(url_to_label_mapping, http_client) + + # Verify all labels got their JSON response + for label in labels: + assert label["jsonResponse"] == {"data": "test"} + assert "jsonResponseUrl" not in label + + # Verify all URLs were called (in batches, but all should be called) + assert http_client.get.call_count == num_labels + + def test_download_mixed_success_and_failure(self): + """Test downloading with some successes and some failures.""" + http_client = Mock() + + def mock_get(url, timeout): + if "success" in url: + response = Mock() + response.json.return_value = {"status": "ok"} + return response + raise requests.RequestException("Failed") + + http_client.get.side_effect = mock_get + + label_success = {"id": "success", "jsonResponseUrl": "https://example.com/success.json"} + label_fail = {"id": "fail", "jsonResponseUrl": "https://example.com/fail.json"} + + url_to_label_mapping = [ + ("https://example.com/success.json", label_success), + ("https://example.com/fail.json", label_fail), + ] + + download_json_responses_parallel(url_to_label_mapping, http_client) + + # Success should have data + assert label_success["jsonResponse"] == {"status": "ok"} + assert "jsonResponseUrl" not in label_success + + # Failure should have empty dict + assert label_fail["jsonResponse"] == {} + assert "jsonResponseUrl" not in label_fail + + +class TestLoadAssetJsonFields: + """Test load_asset_json_fields integration with download_json_responses_parallel.""" + + def test_load_asset_with_latest_label_json_response_url(self): + """Test loading asset with latestLabel.jsonResponseUrl.""" + http_client = Mock() + response = Mock() + response.json.return_value = {"annotation": "data"} + http_client.get.return_value = response + + asset = { + "id": "asset1", + "latestLabel": { + "id": "label1", + "jsonResponse": '{"old": "data"}', # This should be replaced + "jsonResponseUrl": "https://example.com/label1.json", + }, + } + + fields = ["id", "latestLabel.jsonResponse", "latestLabel.jsonResponseUrl"] + + result = load_asset_json_fields(asset, fields, http_client) + + # Verify URL was used instead of parsing string + assert result["latestLabel"]["jsonResponse"] == {"annotation": "data"} + assert "jsonResponseUrl" not in result["latestLabel"] + http_client.get.assert_called_once() + + def test_load_asset_with_labels_json_response_url(self): + """Test loading asset with labels.jsonResponseUrl.""" + http_client = Mock() + + def mock_get(url, timeout): + response = Mock() + if "label1" in url: + response.json.return_value = {"data": "label1"} + elif "label2" in url: + response.json.return_value = {"data": "label2"} + return response + + http_client.get.side_effect = mock_get + + asset = { + "id": "asset1", + "labels": [ + { + "id": "label1", + "jsonResponse": '{"old": "data1"}', + "jsonResponseUrl": "https://example.com/label1.json", + }, + { + "id": "label2", + "jsonResponse": '{"old": "data2"}', + "jsonResponseUrl": "https://example.com/label2.json", + }, + ], + } + + fields = ["id", "labels.jsonResponse", "labels.jsonResponseUrl"] + + result = load_asset_json_fields(asset, fields, http_client) + + # Verify URLs were used for both labels + assert result["labels"][0]["jsonResponse"] == {"data": "label1"} + assert result["labels"][1]["jsonResponse"] == {"data": "label2"} + assert "jsonResponseUrl" not in result["labels"][0] + assert "jsonResponseUrl" not in result["labels"][1] + assert http_client.get.call_count == 2 + + def test_load_asset_without_json_response_url_falls_back_to_parsing(self): + """Test that assets without jsonResponseUrl fall back to string parsing.""" + http_client = Mock() + + asset = { + "id": "asset1", + "latestLabel": { + "id": "label1", + "jsonResponse": '{"annotation": "data"}', + # No jsonResponseUrl field + }, + } + + fields = ["id", "latestLabel.jsonResponse"] + + result = load_asset_json_fields(asset, fields, http_client) + + # Verify string was parsed (not URL downloaded) + assert result["latestLabel"]["jsonResponse"] == {"annotation": "data"} + http_client.get.assert_not_called() + + def test_load_asset_with_both_labels_and_latest_label(self): + """Test loading asset with both labels and latestLabel having URLs.""" + http_client = Mock() + + def mock_get(url, timeout): + response = Mock() + if "label1" in url: + response.json.return_value = {"data": "label1"} + elif "label2" in url: + response.json.return_value = {"data": "label2"} + elif "latest" in url: + response.json.return_value = {"data": "latest"} + return response + + http_client.get.side_effect = mock_get + + asset = { + "id": "asset1", + "labels": [ + {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"}, + {"id": "label2", "jsonResponseUrl": "https://example.com/label2.json"}, + ], + "latestLabel": {"id": "latest", "jsonResponseUrl": "https://example.com/latest.json"}, + } + + fields = [ + "id", + "labels.jsonResponse", + "labels.jsonResponseUrl", + "latestLabel.jsonResponse", + "latestLabel.jsonResponseUrl", + ] + + result = load_asset_json_fields(asset, fields, http_client) + + # Verify all were downloaded + assert result["labels"][0]["jsonResponse"] == {"data": "label1"} + assert result["labels"][1]["jsonResponse"] == {"data": "label2"} + assert result["latestLabel"]["jsonResponse"] == {"data": "latest"} + assert http_client.get.call_count == 3 diff --git a/tests/unit/services/export/test_export.py b/tests/unit/services/export/test_export.py index 6dc1793f5..d5f8c3960 100644 --- a/tests/unit/services/export/test_export.py +++ b/tests/unit/services/export/test_export.py @@ -858,6 +858,7 @@ def test_export_with_asset_filter_kwargs(mocker): "resolution.height", "resolution.width", "latestLabel.jsonResponse", + "latestLabel.jsonResponseUrl", "latestLabel.author.id", "latestLabel.author.email", "latestLabel.author.firstname", From 6bfe48bee4b43180ec394534fcd1797a2c288966 Mon Sep 17 00:00:00 2001 From: "@fanny.gaudin" Date: Tue, 13 Jan 2026 15:34:31 +0100 Subject: [PATCH 2/3] feat(LAB-4123): update ThreadPoolExecutor to asyncio --- pyproject.toml | 3 +- .../kili_api_gateway/asset/formatters.py | 90 +++--- .../kili_api_gateway/asset/test_formatters.py | 303 +++++++++--------- 3 files changed, 208 insertions(+), 188 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d78bd5782..4a2477552 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,8 @@ dependencies = [ "filelock >= 3.0.0, < 4.0.0", "pip-system-certs >= 4.0.0, < 5.0.0; platform_system=='Windows'", "pyrate-limiter >= 3, < 4", - "kili-formats == 1.1.0" + "kili-formats == 1.1.0", + "httpx >= 0.27.0, < 1.0.0" ] urls = { homepage = "https://github.com/kili-technology/kili-python-sdk" } diff --git a/src/kili/adapters/kili_api_gateway/asset/formatters.py b/src/kili/adapters/kili_api_gateway/asset/formatters.py index 09940ba4d..0841e0dc3 100644 --- a/src/kili/adapters/kili_api_gateway/asset/formatters.py +++ b/src/kili/adapters/kili_api_gateway/asset/formatters.py @@ -1,12 +1,13 @@ """Formatters for assets retrieved from Kili API.""" +import asyncio import json -from concurrent.futures import ThreadPoolExecutor, as_completed +import os -import requests +import httpx from kili.adapters.http_client import HttpClient -from kili.core.helpers import get_response_json, is_url, log_raise_for_status +from kili.core.helpers import is_url from kili.domain.types import ListOrTuple # Batch size for parallel JSON response downloads (same as export service) @@ -14,52 +15,69 @@ def load_json_from_link(link: str, http_client: HttpClient) -> dict: - """Load json from link.""" + """Load json from link (synchronous fallback for non-batch operations).""" if link == "" or not is_url(link): return {} response = http_client.get(link, timeout=30) - log_raise_for_status(response) - return get_response_json(response) + response.raise_for_status() + return response.json() -def download_json_responses_parallel( - url_to_label_mapping: list[tuple[str, dict]], http_client: HttpClient -) -> None: - """Download JSON responses in parallel and assign to labels. +async def _download_json_response(url: str) -> dict: + """Download and parse JSON response from a URL using asyncio. Args: - url_to_label_mapping: List of tuples (url, label_dict) to download - http_client: HTTP client to use for downloads + url: URL to download the JSON response from + + Returns: + Parsed JSON response as a dictionary """ - if not url_to_label_mapping: - return + try: + verify_env = os.getenv("KILI_VERIFY") + verify = verify_env.lower() in ("true", "1", "yes") if verify_env is not None else True + + async with httpx.AsyncClient(verify=verify) as client: + response = await client.get(url, timeout=30.0) + response.raise_for_status() + return response.json() + except (httpx.HTTPError, json.JSONDecodeError): + # Return empty dict on error to ensure consistent response format + return {} + + +async def _download_json_responses_async(url_to_label_mapping: list[tuple[str, dict]]) -> None: + """Download JSON responses in parallel using asyncio. + Args: + url_to_label_mapping: List of tuples (url, label_dict) to download + """ # Process in batches to limit concurrent connections for i in range(0, len(url_to_label_mapping), JSON_RESPONSE_BATCH_SIZE): batch = url_to_label_mapping[i : i + JSON_RESPONSE_BATCH_SIZE] - # Download all URLs in the batch in parallel - with ThreadPoolExecutor(max_workers=JSON_RESPONSE_BATCH_SIZE) as executor: - # Submit all download tasks - future_to_label = { - executor.submit(load_json_from_link, url, http_client): label - for url, label in batch - } - - # Collect results as they complete - for future in as_completed(future_to_label): - label = future_to_label[future] - try: - json_response = future.result() - label["jsonResponse"] = json_response - if "jsonResponseUrl" in label: - del label["jsonResponseUrl"] - except (requests.RequestException, json.JSONDecodeError, TimeoutError): - # Set empty dict to ensure consistent response format - label["jsonResponse"] = {} - if "jsonResponseUrl" in label: - del label["jsonResponseUrl"] + # Download all URLs in the batch in parallel using asyncio.gather + download_tasks = [_download_json_response(url) for url, _ in batch] + json_responses = await asyncio.gather(*download_tasks) + + # Assign the downloaded responses back to their labels and remove the URL + for (_, label), json_response in zip(batch, json_responses, strict=False): + label["jsonResponse"] = json_response + if "jsonResponseUrl" in label: + del label["jsonResponseUrl"] + + +def download_json_responses_parallel(url_to_label_mapping: list[tuple[str, dict]]) -> None: + """Download JSON responses in parallel and assign to labels. + + Args: + url_to_label_mapping: List of tuples (url, label_dict) to download + """ + if not url_to_label_mapping: + return + + # Run async downloads in a synchronous context + asyncio.run(_download_json_responses_async(url_to_label_mapping)) def _parse_label_json_response(label: dict) -> None: @@ -111,6 +129,6 @@ def load_asset_json_fields(asset: dict, fields: ListOrTuple[str], http_client: H _process_label_json_response(asset["latestLabel"], url_to_label_mapping) if url_to_label_mapping: - download_json_responses_parallel(url_to_label_mapping, http_client) + download_json_responses_parallel(url_to_label_mapping) return asset diff --git a/tests/unit/adapters/kili_api_gateway/asset/test_formatters.py b/tests/unit/adapters/kili_api_gateway/asset/test_formatters.py index f72da7951..16de4b5b4 100644 --- a/tests/unit/adapters/kili_api_gateway/asset/test_formatters.py +++ b/tests/unit/adapters/kili_api_gateway/asset/test_formatters.py @@ -7,10 +7,7 @@ - Improves performance over sequential processing """ -import json -from unittest.mock import Mock - -import requests +from unittest.mock import AsyncMock, Mock, patch from kili.adapters.kili_api_gateway.asset.formatters import ( JSON_RESPONSE_BATCH_SIZE, @@ -57,50 +54,27 @@ def test_load_json_from_non_url(self): class TestDownloadJsonResponsesParallel: """Test parallel downloading of JSON responses.""" - def test_download_empty_list(self): - """Test that empty list is handled correctly.""" - http_client = Mock() - - download_json_responses_parallel([], http_client) - - # Should return without errors and without calling http_client - http_client.get.assert_not_called() - def test_download_single_json_response(self): """Test downloading a single JSON response.""" - http_client = Mock() - response = Mock() - response.json.return_value = {"annotation": "data"} - http_client.get.return_value = response - label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} url_to_label_mapping = [("https://example.com/label1.json", label)] - download_json_responses_parallel(url_to_label_mapping, http_client) + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + new_callable=AsyncMock, + ) as mock_download: + mock_download.return_value = {"annotation": "data"} + + download_json_responses_parallel(url_to_label_mapping) - # Verify the JSON was downloaded and assigned - assert label["jsonResponse"] == {"annotation": "data"} - # Verify jsonResponseUrl was removed - assert "jsonResponseUrl" not in label - http_client.get.assert_called_once() + # Verify the JSON was downloaded and assigned + assert label["jsonResponse"] == {"annotation": "data"} + # Verify jsonResponseUrl was removed + assert "jsonResponseUrl" not in label + mock_download.assert_called_once() def test_download_multiple_json_responses(self): """Test downloading multiple JSON responses in parallel.""" - http_client = Mock() - - # Create mock responses for different labels - def mock_get(url, timeout): - response = Mock() - if "label1" in url: - response.json.return_value = {"data": "label1"} - elif "label2" in url: - response.json.return_value = {"data": "label2"} - elif "label3" in url: - response.json.return_value = {"data": "label3"} - return response - - http_client.get.side_effect = mock_get - label1 = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} label2 = {"id": "label2", "jsonResponseUrl": "https://example.com/label2.json"} label3 = {"id": "label3", "jsonResponseUrl": "https://example.com/label3.json"} @@ -111,75 +85,91 @@ def mock_get(url, timeout): ("https://example.com/label3.json", label3), ] - download_json_responses_parallel(url_to_label_mapping, http_client) + # Create mock side effect for different URLs + async def mock_download(url): + if "label1" in url: + return {"data": "label1"} + elif "label2" in url: + return {"data": "label2"} + elif "label3" in url: + return {"data": "label3"} + return {} - # Verify all labels got their JSON - assert label1["jsonResponse"] == {"data": "label1"} - assert label2["jsonResponse"] == {"data": "label2"} - assert label3["jsonResponse"] == {"data": "label3"} + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + side_effect=mock_download, + ): + download_json_responses_parallel(url_to_label_mapping) - # Verify all URLs were removed - assert "jsonResponseUrl" not in label1 - assert "jsonResponseUrl" not in label2 - assert "jsonResponseUrl" not in label3 + # Verify all labels got their JSON + assert label1["jsonResponse"] == {"data": "label1"} + assert label2["jsonResponse"] == {"data": "label2"} + assert label3["jsonResponse"] == {"data": "label3"} - # Verify all URLs were called - assert http_client.get.call_count == 3 + # Verify all URLs were removed + assert "jsonResponseUrl" not in label1 + assert "jsonResponseUrl" not in label2 + assert "jsonResponseUrl" not in label3 def test_download_handles_request_exception(self): """Test that request exceptions are handled gracefully.""" - http_client = Mock() - http_client.get.side_effect = requests.RequestException("Network error") - label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} url_to_label_mapping = [("https://example.com/label1.json", label)] - # Should not raise exception - download_json_responses_parallel(url_to_label_mapping, http_client) + # Mock download to raise httpx.HTTPError + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + new_callable=AsyncMock, + ) as mock_download: + mock_download.return_value = {} # Returns empty dict on error + + # Should not raise exception + download_json_responses_parallel(url_to_label_mapping) - # Should set empty dict on error - assert label["jsonResponse"] == {} - assert "jsonResponseUrl" not in label + # Should set empty dict on error + assert label["jsonResponse"] == {} + assert "jsonResponseUrl" not in label def test_download_handles_json_decode_error(self): """Test that JSON decode errors are handled gracefully.""" - http_client = Mock() - response = Mock() - response.json.side_effect = json.JSONDecodeError("Invalid JSON", "", 0) - http_client.get.return_value = response - label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} url_to_label_mapping = [("https://example.com/label1.json", label)] - # Should not raise exception - download_json_responses_parallel(url_to_label_mapping, http_client) + # Mock download to return empty dict (simulating error) + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + new_callable=AsyncMock, + ) as mock_download: + mock_download.return_value = {} # Returns empty dict on error - # Should set empty dict on error - assert label["jsonResponse"] == {} - assert "jsonResponseUrl" not in label + # Should not raise exception + download_json_responses_parallel(url_to_label_mapping) + + # Should set empty dict on error + assert label["jsonResponse"] == {} + assert "jsonResponseUrl" not in label def test_download_handles_timeout_error(self): """Test that timeout errors are handled gracefully.""" - http_client = Mock() - http_client.get.side_effect = TimeoutError("Request timed out") - label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} url_to_label_mapping = [("https://example.com/label1.json", label)] - # Should not raise exception - download_json_responses_parallel(url_to_label_mapping, http_client) + # Mock download to return empty dict (simulating error) + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + new_callable=AsyncMock, + ) as mock_download: + mock_download.return_value = {} # Returns empty dict on error - # Should set empty dict on error - assert label["jsonResponse"] == {} - assert "jsonResponseUrl" not in label + # Should not raise exception + download_json_responses_parallel(url_to_label_mapping) + + # Should set empty dict on error + assert label["jsonResponse"] == {} + assert "jsonResponseUrl" not in label def test_download_processes_in_batches(self): """Test that downloads are processed in batches of JSON_RESPONSE_BATCH_SIZE.""" - http_client = Mock() - response = Mock() - response.json.return_value = {"data": "test"} - http_client.get.return_value = response - # Create more labels than batch size num_labels = JSON_RESPONSE_BATCH_SIZE + 5 labels = [ @@ -190,30 +180,26 @@ def test_download_processes_in_batches(self): (f"https://example.com/label{i}.json", label) for i, label in enumerate(labels) ] - # Simply call the function and verify all labels got processed - download_json_responses_parallel(url_to_label_mapping, http_client) + # Mock the async download function + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + new_callable=AsyncMock, + ) as mock_download: + mock_download.return_value = {"data": "test"} - # Verify all labels got their JSON response - for label in labels: - assert label["jsonResponse"] == {"data": "test"} - assert "jsonResponseUrl" not in label + # Call the function + download_json_responses_parallel(url_to_label_mapping) - # Verify all URLs were called (in batches, but all should be called) - assert http_client.get.call_count == num_labels + # Verify all labels got their JSON response + for label in labels: + assert label["jsonResponse"] == {"data": "test"} + assert "jsonResponseUrl" not in label + + # Verify download was called for all URLs + assert mock_download.call_count == num_labels def test_download_mixed_success_and_failure(self): """Test downloading with some successes and some failures.""" - http_client = Mock() - - def mock_get(url, timeout): - if "success" in url: - response = Mock() - response.json.return_value = {"status": "ok"} - return response - raise requests.RequestException("Failed") - - http_client.get.side_effect = mock_get - label_success = {"id": "success", "jsonResponseUrl": "https://example.com/success.json"} label_fail = {"id": "fail", "jsonResponseUrl": "https://example.com/fail.json"} @@ -222,15 +208,25 @@ def mock_get(url, timeout): ("https://example.com/fail.json", label_fail), ] - download_json_responses_parallel(url_to_label_mapping, http_client) + # Create mock side effect for different URLs + async def mock_download(url): + if "success" in url: + return {"status": "ok"} + return {} # Failure returns empty dict - # Success should have data - assert label_success["jsonResponse"] == {"status": "ok"} - assert "jsonResponseUrl" not in label_success + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + side_effect=mock_download, + ): + download_json_responses_parallel(url_to_label_mapping) - # Failure should have empty dict - assert label_fail["jsonResponse"] == {} - assert "jsonResponseUrl" not in label_fail + # Success should have data + assert label_success["jsonResponse"] == {"status": "ok"} + assert "jsonResponseUrl" not in label_success + + # Failure should have empty dict + assert label_fail["jsonResponse"] == {} + assert "jsonResponseUrl" not in label_fail class TestLoadAssetJsonFields: @@ -239,9 +235,6 @@ class TestLoadAssetJsonFields: def test_load_asset_with_latest_label_json_response_url(self): """Test loading asset with latestLabel.jsonResponseUrl.""" http_client = Mock() - response = Mock() - response.json.return_value = {"annotation": "data"} - http_client.get.return_value = response asset = { "id": "asset1", @@ -254,27 +247,23 @@ def test_load_asset_with_latest_label_json_response_url(self): fields = ["id", "latestLabel.jsonResponse", "latestLabel.jsonResponseUrl"] - result = load_asset_json_fields(asset, fields, http_client) + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + new_callable=AsyncMock, + ) as mock_download: + mock_download.return_value = {"annotation": "data"} - # Verify URL was used instead of parsing string - assert result["latestLabel"]["jsonResponse"] == {"annotation": "data"} - assert "jsonResponseUrl" not in result["latestLabel"] - http_client.get.assert_called_once() + result = load_asset_json_fields(asset, fields, http_client) + + # Verify URL was used instead of parsing string + assert result["latestLabel"]["jsonResponse"] == {"annotation": "data"} + assert "jsonResponseUrl" not in result["latestLabel"] + mock_download.assert_called_once() def test_load_asset_with_labels_json_response_url(self): """Test loading asset with labels.jsonResponseUrl.""" http_client = Mock() - def mock_get(url, timeout): - response = Mock() - if "label1" in url: - response.json.return_value = {"data": "label1"} - elif "label2" in url: - response.json.return_value = {"data": "label2"} - return response - - http_client.get.side_effect = mock_get - asset = { "id": "asset1", "labels": [ @@ -293,14 +282,25 @@ def mock_get(url, timeout): fields = ["id", "labels.jsonResponse", "labels.jsonResponseUrl"] - result = load_asset_json_fields(asset, fields, http_client) + # Create mock side effect for different URLs + async def mock_download(url): + if "label1" in url: + return {"data": "label1"} + elif "label2" in url: + return {"data": "label2"} + return {} - # Verify URLs were used for both labels - assert result["labels"][0]["jsonResponse"] == {"data": "label1"} - assert result["labels"][1]["jsonResponse"] == {"data": "label2"} - assert "jsonResponseUrl" not in result["labels"][0] - assert "jsonResponseUrl" not in result["labels"][1] - assert http_client.get.call_count == 2 + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + side_effect=mock_download, + ): + result = load_asset_json_fields(asset, fields, http_client) + + # Verify URLs were used for both labels + assert result["labels"][0]["jsonResponse"] == {"data": "label1"} + assert result["labels"][1]["jsonResponse"] == {"data": "label2"} + assert "jsonResponseUrl" not in result["labels"][0] + assert "jsonResponseUrl" not in result["labels"][1] def test_load_asset_without_json_response_url_falls_back_to_parsing(self): """Test that assets without jsonResponseUrl fall back to string parsing.""" @@ -327,18 +327,6 @@ def test_load_asset_with_both_labels_and_latest_label(self): """Test loading asset with both labels and latestLabel having URLs.""" http_client = Mock() - def mock_get(url, timeout): - response = Mock() - if "label1" in url: - response.json.return_value = {"data": "label1"} - elif "label2" in url: - response.json.return_value = {"data": "label2"} - elif "latest" in url: - response.json.return_value = {"data": "latest"} - return response - - http_client.get.side_effect = mock_get - asset = { "id": "asset1", "labels": [ @@ -356,10 +344,23 @@ def mock_get(url, timeout): "latestLabel.jsonResponseUrl", ] - result = load_asset_json_fields(asset, fields, http_client) - - # Verify all were downloaded - assert result["labels"][0]["jsonResponse"] == {"data": "label1"} - assert result["labels"][1]["jsonResponse"] == {"data": "label2"} - assert result["latestLabel"]["jsonResponse"] == {"data": "latest"} - assert http_client.get.call_count == 3 + # Create mock side effect for different URLs + async def mock_download(url): + if "label1" in url: + return {"data": "label1"} + elif "label2" in url: + return {"data": "label2"} + elif "latest" in url: + return {"data": "latest"} + return {} + + with patch( + "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", + side_effect=mock_download, + ): + result = load_asset_json_fields(asset, fields, http_client) + + # Verify all were downloaded + assert result["labels"][0]["jsonResponse"] == {"data": "label1"} + assert result["labels"][1]["jsonResponse"] == {"data": "label2"} + assert result["latestLabel"]["jsonResponse"] == {"data": "latest"} From e0e6ae2379a5c70cd263ed98ccdf83e9d61ae1a6 Mon Sep 17 00:00:00 2001 From: "@fanny.gaudin" Date: Wed, 14 Jan 2026 11:06:13 +0100 Subject: [PATCH 3/3] feat(LAB-4123): use http_client instead of httpx --- pyproject.toml | 3 +- .../kili_api_gateway/asset/formatters.py | 65 +++++++++++-------- .../kili_api_gateway/helpers/http_json.py | 22 +++++++ .../kili_api_gateway/label/formatters.py | 11 +--- .../kili_api_gateway/asset/test_formatters.py | 57 ++++++++++++---- 5 files changed, 106 insertions(+), 52 deletions(-) create mode 100644 src/kili/adapters/kili_api_gateway/helpers/http_json.py diff --git a/pyproject.toml b/pyproject.toml index 4a2477552..d78bd5782 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,8 +44,7 @@ dependencies = [ "filelock >= 3.0.0, < 4.0.0", "pip-system-certs >= 4.0.0, < 5.0.0; platform_system=='Windows'", "pyrate-limiter >= 3, < 4", - "kili-formats == 1.1.0", - "httpx >= 0.27.0, < 1.0.0" + "kili-formats == 1.1.0" ] urls = { homepage = "https://github.com/kili-technology/kili-python-sdk" } diff --git a/src/kili/adapters/kili_api_gateway/asset/formatters.py b/src/kili/adapters/kili_api_gateway/asset/formatters.py index 0841e0dc3..6d1266312 100644 --- a/src/kili/adapters/kili_api_gateway/asset/formatters.py +++ b/src/kili/adapters/kili_api_gateway/asset/formatters.py @@ -2,11 +2,11 @@ import asyncio import json -import os -import httpx +import requests from kili.adapters.http_client import HttpClient +from kili.adapters.kili_api_gateway.helpers.http_json import load_json_from_link from kili.core.helpers import is_url from kili.domain.types import ListOrTuple @@ -14,50 +14,41 @@ JSON_RESPONSE_BATCH_SIZE = 10 -def load_json_from_link(link: str, http_client: HttpClient) -> dict: - """Load json from link (synchronous fallback for non-batch operations).""" - if link == "" or not is_url(link): - return {} - - response = http_client.get(link, timeout=30) - response.raise_for_status() - return response.json() - - -async def _download_json_response(url: str) -> dict: +async def _download_json_response(url: str, http_client: HttpClient) -> dict: """Download and parse JSON response from a URL using asyncio. Args: url: URL to download the JSON response from + http_client: HttpClient instance with SSL verification already configured Returns: Parsed JSON response as a dictionary """ try: - verify_env = os.getenv("KILI_VERIFY") - verify = verify_env.lower() in ("true", "1", "yes") if verify_env is not None else True - - async with httpx.AsyncClient(verify=verify) as client: - response = await client.get(url, timeout=30.0) - response.raise_for_status() - return response.json() - except (httpx.HTTPError, json.JSONDecodeError): + # Run synchronous requests call in a thread + response = await asyncio.to_thread(http_client.get, url, timeout=30) + response.raise_for_status() + return response.json() + except (requests.RequestException, json.JSONDecodeError): # Return empty dict on error to ensure consistent response format return {} -async def _download_json_responses_async(url_to_label_mapping: list[tuple[str, dict]]) -> None: +async def _download_json_responses_async( + url_to_label_mapping: list[tuple[str, dict]], http_client: HttpClient +) -> None: """Download JSON responses in parallel using asyncio. Args: url_to_label_mapping: List of tuples (url, label_dict) to download + http_client: HttpClient instance with SSL verification already configured """ # Process in batches to limit concurrent connections for i in range(0, len(url_to_label_mapping), JSON_RESPONSE_BATCH_SIZE): batch = url_to_label_mapping[i : i + JSON_RESPONSE_BATCH_SIZE] # Download all URLs in the batch in parallel using asyncio.gather - download_tasks = [_download_json_response(url) for url, _ in batch] + download_tasks = [_download_json_response(url, http_client) for url, _ in batch] json_responses = await asyncio.gather(*download_tasks) # Assign the downloaded responses back to their labels and remove the URL @@ -67,17 +58,37 @@ async def _download_json_responses_async(url_to_label_mapping: list[tuple[str, d del label["jsonResponseUrl"] -def download_json_responses_parallel(url_to_label_mapping: list[tuple[str, dict]]) -> None: +def download_json_responses_parallel( + url_to_label_mapping: list[tuple[str, dict]], http_client: HttpClient +) -> None: """Download JSON responses in parallel and assign to labels. Args: url_to_label_mapping: List of tuples (url, label_dict) to download + http_client: HttpClient instance with SSL verification already configured """ if not url_to_label_mapping: return - # Run async downloads in a synchronous context - asyncio.run(_download_json_responses_async(url_to_label_mapping)) + # Check if we're already in an event loop (e.g., Jupyter notebook, async web framework) + try: + asyncio.get_running_loop() + except RuntimeError: + # No running loop, safe to use asyncio.run() for parallel downloads + asyncio.run(_download_json_responses_async(url_to_label_mapping, http_client)) + else: + # Already in a loop - fall back to sequential downloads to avoid RuntimeError + # This happens in Jupyter notebooks, FastAPI, and other async contexts + for url, label in url_to_label_mapping: + try: + response = http_client.get(url, timeout=30) + response.raise_for_status() + label["jsonResponse"] = response.json() + except (requests.RequestException, json.JSONDecodeError): + label["jsonResponse"] = {} + + if "jsonResponseUrl" in label: + del label["jsonResponseUrl"] def _parse_label_json_response(label: dict) -> None: @@ -129,6 +140,6 @@ def load_asset_json_fields(asset: dict, fields: ListOrTuple[str], http_client: H _process_label_json_response(asset["latestLabel"], url_to_label_mapping) if url_to_label_mapping: - download_json_responses_parallel(url_to_label_mapping) + download_json_responses_parallel(url_to_label_mapping, http_client) return asset diff --git a/src/kili/adapters/kili_api_gateway/helpers/http_json.py b/src/kili/adapters/kili_api_gateway/helpers/http_json.py new file mode 100644 index 000000000..96839141f --- /dev/null +++ b/src/kili/adapters/kili_api_gateway/helpers/http_json.py @@ -0,0 +1,22 @@ +"""HTTP JSON helpers for downloading JSON data from URLs.""" + +from kili.adapters.http_client import HttpClient +from kili.core.helpers import is_url + + +def load_json_from_link(link: str, http_client: HttpClient) -> dict: + """Load json from link. + + Args: + link: URL to download JSON from + http_client: HttpClient instance with SSL verification already configured + + Returns: + Parsed JSON response as a dictionary, or empty dict if link is invalid + """ + if link == "" or not is_url(link): + return {} + + response = http_client.get(link, timeout=30) + response.raise_for_status() + return response.json() diff --git a/src/kili/adapters/kili_api_gateway/label/formatters.py b/src/kili/adapters/kili_api_gateway/label/formatters.py index fcc7f0e63..7ad333f67 100644 --- a/src/kili/adapters/kili_api_gateway/label/formatters.py +++ b/src/kili/adapters/kili_api_gateway/label/formatters.py @@ -3,20 +3,11 @@ import json from kili.adapters.http_client import HttpClient +from kili.adapters.kili_api_gateway.helpers.http_json import load_json_from_link from kili.core.helpers import is_url from kili.domain.types import ListOrTuple -def load_json_from_link(link: str, http_client: HttpClient) -> dict: - """Load json from link.""" - if link == "" or not is_url(link): - return {} - - response = http_client.get(link, timeout=30) - response.raise_for_status() - return response.json() - - def load_label_json_fields(label: dict, fields: ListOrTuple[str], http_client: HttpClient) -> dict: """Load json fields of a label.""" if "jsonResponse" in fields: diff --git a/tests/unit/adapters/kili_api_gateway/asset/test_formatters.py b/tests/unit/adapters/kili_api_gateway/asset/test_formatters.py index 16de4b5b4..c976c912d 100644 --- a/tests/unit/adapters/kili_api_gateway/asset/test_formatters.py +++ b/tests/unit/adapters/kili_api_gateway/asset/test_formatters.py @@ -13,8 +13,8 @@ JSON_RESPONSE_BATCH_SIZE, download_json_responses_parallel, load_asset_json_fields, - load_json_from_link, ) +from kili.adapters.kili_api_gateway.helpers.http_json import load_json_from_link class TestLoadJsonFromLink: @@ -56,6 +56,7 @@ class TestDownloadJsonResponsesParallel: def test_download_single_json_response(self): """Test downloading a single JSON response.""" + http_client = Mock() label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} url_to_label_mapping = [("https://example.com/label1.json", label)] @@ -65,7 +66,7 @@ def test_download_single_json_response(self): ) as mock_download: mock_download.return_value = {"annotation": "data"} - download_json_responses_parallel(url_to_label_mapping) + download_json_responses_parallel(url_to_label_mapping, http_client) # Verify the JSON was downloaded and assigned assert label["jsonResponse"] == {"annotation": "data"} @@ -75,6 +76,7 @@ def test_download_single_json_response(self): def test_download_multiple_json_responses(self): """Test downloading multiple JSON responses in parallel.""" + http_client = Mock() label1 = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} label2 = {"id": "label2", "jsonResponseUrl": "https://example.com/label2.json"} label3 = {"id": "label3", "jsonResponseUrl": "https://example.com/label3.json"} @@ -86,7 +88,7 @@ def test_download_multiple_json_responses(self): ] # Create mock side effect for different URLs - async def mock_download(url): + async def mock_download(url, client): if "label1" in url: return {"data": "label1"} elif "label2" in url: @@ -99,7 +101,7 @@ async def mock_download(url): "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", side_effect=mock_download, ): - download_json_responses_parallel(url_to_label_mapping) + download_json_responses_parallel(url_to_label_mapping, http_client) # Verify all labels got their JSON assert label1["jsonResponse"] == {"data": "label1"} @@ -113,10 +115,11 @@ async def mock_download(url): def test_download_handles_request_exception(self): """Test that request exceptions are handled gracefully.""" + http_client = Mock() label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} url_to_label_mapping = [("https://example.com/label1.json", label)] - # Mock download to raise httpx.HTTPError + # Mock download to return empty dict on error with patch( "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", new_callable=AsyncMock, @@ -124,7 +127,7 @@ def test_download_handles_request_exception(self): mock_download.return_value = {} # Returns empty dict on error # Should not raise exception - download_json_responses_parallel(url_to_label_mapping) + download_json_responses_parallel(url_to_label_mapping, http_client) # Should set empty dict on error assert label["jsonResponse"] == {} @@ -132,6 +135,7 @@ def test_download_handles_request_exception(self): def test_download_handles_json_decode_error(self): """Test that JSON decode errors are handled gracefully.""" + http_client = Mock() label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} url_to_label_mapping = [("https://example.com/label1.json", label)] @@ -143,7 +147,7 @@ def test_download_handles_json_decode_error(self): mock_download.return_value = {} # Returns empty dict on error # Should not raise exception - download_json_responses_parallel(url_to_label_mapping) + download_json_responses_parallel(url_to_label_mapping, http_client) # Should set empty dict on error assert label["jsonResponse"] == {} @@ -151,6 +155,7 @@ def test_download_handles_json_decode_error(self): def test_download_handles_timeout_error(self): """Test that timeout errors are handled gracefully.""" + http_client = Mock() label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} url_to_label_mapping = [("https://example.com/label1.json", label)] @@ -162,7 +167,7 @@ def test_download_handles_timeout_error(self): mock_download.return_value = {} # Returns empty dict on error # Should not raise exception - download_json_responses_parallel(url_to_label_mapping) + download_json_responses_parallel(url_to_label_mapping, http_client) # Should set empty dict on error assert label["jsonResponse"] == {} @@ -170,6 +175,7 @@ def test_download_handles_timeout_error(self): def test_download_processes_in_batches(self): """Test that downloads are processed in batches of JSON_RESPONSE_BATCH_SIZE.""" + http_client = Mock() # Create more labels than batch size num_labels = JSON_RESPONSE_BATCH_SIZE + 5 labels = [ @@ -188,7 +194,7 @@ def test_download_processes_in_batches(self): mock_download.return_value = {"data": "test"} # Call the function - download_json_responses_parallel(url_to_label_mapping) + download_json_responses_parallel(url_to_label_mapping, http_client) # Verify all labels got their JSON response for label in labels: @@ -200,6 +206,7 @@ def test_download_processes_in_batches(self): def test_download_mixed_success_and_failure(self): """Test downloading with some successes and some failures.""" + http_client = Mock() label_success = {"id": "success", "jsonResponseUrl": "https://example.com/success.json"} label_fail = {"id": "fail", "jsonResponseUrl": "https://example.com/fail.json"} @@ -209,7 +216,7 @@ def test_download_mixed_success_and_failure(self): ] # Create mock side effect for different URLs - async def mock_download(url): + async def mock_download(url, client): if "success" in url: return {"status": "ok"} return {} # Failure returns empty dict @@ -218,7 +225,7 @@ async def mock_download(url): "kili.adapters.kili_api_gateway.asset.formatters._download_json_response", side_effect=mock_download, ): - download_json_responses_parallel(url_to_label_mapping) + download_json_responses_parallel(url_to_label_mapping, http_client) # Success should have data assert label_success["jsonResponse"] == {"status": "ok"} @@ -228,6 +235,30 @@ async def mock_download(url): assert label_fail["jsonResponse"] == {} assert "jsonResponseUrl" not in label_fail + def test_download_in_existing_event_loop(self): + """Test that downloads work when called from within an existing event loop.""" + import asyncio + + http_client = Mock() + label = {"id": "label1", "jsonResponseUrl": "https://example.com/label1.json"} + url_to_label_mapping = [("https://example.com/label1.json", label)] + + # Mock the http_client.get response + mock_response = Mock() + mock_response.json.return_value = {"data": "test"} + http_client.get.return_value = mock_response + + # Run inside an existing event loop (simulating Jupyter/FastAPI) + async def run_in_loop(): + download_json_responses_parallel(url_to_label_mapping, http_client) + + asyncio.run(run_in_loop()) + + # Verify the JSON was downloaded (sequentially in this case) + assert label["jsonResponse"] == {"data": "test"} + assert "jsonResponseUrl" not in label + http_client.get.assert_called_once_with("https://example.com/label1.json", timeout=30) + class TestLoadAssetJsonFields: """Test load_asset_json_fields integration with download_json_responses_parallel.""" @@ -283,7 +314,7 @@ def test_load_asset_with_labels_json_response_url(self): fields = ["id", "labels.jsonResponse", "labels.jsonResponseUrl"] # Create mock side effect for different URLs - async def mock_download(url): + async def mock_download(url, client): if "label1" in url: return {"data": "label1"} elif "label2" in url: @@ -345,7 +376,7 @@ def test_load_asset_with_both_labels_and_latest_label(self): ] # Create mock side effect for different URLs - async def mock_download(url): + async def mock_download(url, client): if "label1" in url: return {"data": "label1"} elif "label2" in url: