Skip to content

Commit 978b76c

Browse files
committed
feat(LAB-4123): update ThreadPoolExecutor to asyncio
1 parent cdfe314 commit 978b76c

File tree

3 files changed

+204
-188
lines changed

3 files changed

+204
-188
lines changed

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ dependencies = [
4444
"filelock >= 3.0.0, < 4.0.0",
4545
"pip-system-certs >= 4.0.0, < 5.0.0; platform_system=='Windows'",
4646
"pyrate-limiter >= 3, < 4",
47-
"kili-formats == 1.0.1"
47+
"kili-formats == 1.0.1",
48+
"httpx >= 0.27.0, < 1.0.0"
4849
]
4950
urls = { homepage = "https://github.com/kili-technology/kili-python-sdk" }
5051

src/kili/adapters/kili_api_gateway/asset/formatters.py

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,79 @@
11
"""Formatters for assets retrieved from Kili API."""
22

3+
import asyncio
34
import json
4-
from concurrent.futures import ThreadPoolExecutor, as_completed
55

6-
import requests
6+
import httpx
77

88
from kili.adapters.http_client import HttpClient
9-
from kili.core.helpers import get_response_json, is_url, log_raise_for_status
9+
from kili.core.helpers import is_url
1010
from kili.domain.types import ListOrTuple
1111

1212
# Batch size for parallel JSON response downloads (same as export service)
1313
JSON_RESPONSE_BATCH_SIZE = 10
1414

1515

1616
def load_json_from_link(link: str, http_client: HttpClient) -> dict:
17-
"""Load json from link."""
17+
"""Load json from link (synchronous fallback for non-batch operations)."""
1818
if link == "" or not is_url(link):
1919
return {}
2020

2121
response = http_client.get(link, timeout=30)
22-
log_raise_for_status(response)
23-
return get_response_json(response)
22+
response.raise_for_status()
23+
return response.json()
2424

2525

26-
def download_json_responses_parallel(
27-
url_to_label_mapping: list[tuple[str, dict]], http_client: HttpClient
28-
) -> None:
29-
"""Download JSON responses in parallel and assign to labels.
26+
async def _download_json_response(url: str) -> dict:
27+
"""Download and parse JSON response from a URL using asyncio.
3028
3129
Args:
32-
url_to_label_mapping: List of tuples (url, label_dict) to download
33-
http_client: HTTP client to use for downloads
30+
url: URL to download the JSON response from
31+
32+
Returns:
33+
Parsed JSON response as a dictionary
3434
"""
35-
if not url_to_label_mapping:
36-
return
35+
try:
36+
async with httpx.AsyncClient(verify=os.getenv("KILI__VERIFY_SSL") != "False") as client:
37+
response = await client.get(url, timeout=30.0)
38+
response.raise_for_status()
39+
return response.json()
40+
except (httpx.HTTPError, json.JSONDecodeError):
41+
# Return empty dict on error to ensure consistent response format
42+
return {}
43+
44+
45+
async def _download_json_responses_async(url_to_label_mapping: list[tuple[str, dict]]) -> None:
46+
"""Download JSON responses in parallel using asyncio.
3747
48+
Args:
49+
url_to_label_mapping: List of tuples (url, label_dict) to download
50+
"""
3851
# Process in batches to limit concurrent connections
3952
for i in range(0, len(url_to_label_mapping), JSON_RESPONSE_BATCH_SIZE):
4053
batch = url_to_label_mapping[i : i + JSON_RESPONSE_BATCH_SIZE]
4154

42-
# Download all URLs in the batch in parallel
43-
with ThreadPoolExecutor(max_workers=JSON_RESPONSE_BATCH_SIZE) as executor:
44-
# Submit all download tasks
45-
future_to_label = {
46-
executor.submit(load_json_from_link, url, http_client): label
47-
for url, label in batch
48-
}
49-
50-
# Collect results as they complete
51-
for future in as_completed(future_to_label):
52-
label = future_to_label[future]
53-
try:
54-
json_response = future.result()
55-
label["jsonResponse"] = json_response
56-
if "jsonResponseUrl" in label:
57-
del label["jsonResponseUrl"]
58-
except (requests.RequestException, json.JSONDecodeError, TimeoutError):
59-
# Set empty dict to ensure consistent response format
60-
label["jsonResponse"] = {}
61-
if "jsonResponseUrl" in label:
62-
del label["jsonResponseUrl"]
55+
# Download all URLs in the batch in parallel using asyncio.gather
56+
download_tasks = [_download_json_response(url) for url, _ in batch]
57+
json_responses = await asyncio.gather(*download_tasks)
58+
59+
# Assign the downloaded responses back to their labels and remove the URL
60+
for (_, label), json_response in zip(batch, json_responses, strict=False):
61+
label["jsonResponse"] = json_response
62+
if "jsonResponseUrl" in label:
63+
del label["jsonResponseUrl"]
64+
65+
66+
def download_json_responses_parallel(url_to_label_mapping: list[tuple[str, dict]]) -> None:
67+
"""Download JSON responses in parallel and assign to labels.
68+
69+
Args:
70+
url_to_label_mapping: List of tuples (url, label_dict) to download
71+
"""
72+
if not url_to_label_mapping:
73+
return
74+
75+
# Run async downloads in a synchronous context
76+
asyncio.run(_download_json_responses_async(url_to_label_mapping))
6377

6478

6579
def _parse_label_json_response(label: dict) -> None:
@@ -111,6 +125,6 @@ def load_asset_json_fields(asset: dict, fields: ListOrTuple[str], http_client: H
111125
_process_label_json_response(asset["latestLabel"], url_to_label_mapping)
112126

113127
if url_to_label_mapping:
114-
download_json_responses_parallel(url_to_label_mapping, http_client)
128+
download_json_responses_parallel(url_to_label_mapping)
115129

116130
return asset

0 commit comments

Comments
 (0)