Skip to content

Commit 08cc682

Browse files
committed
feat(LAB-4123): update ThreadPoolExecutor to asyncio
1 parent a5f7a3e commit 08cc682

File tree

3 files changed

+205
-188
lines changed

3 files changed

+205
-188
lines changed

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ dependencies = [
4444
"filelock >= 3.0.0, < 4.0.0",
4545
"pip-system-certs >= 4.0.0, < 5.0.0; platform_system=='Windows'",
4646
"pyrate-limiter >= 3, < 4",
47-
"kili-formats == 1.1.0"
47+
"kili-formats == 1.1.0",
48+
"httpx >= 0.27.0, < 1.0.0"
4849
]
4950
urls = { homepage = "https://github.com/kili-technology/kili-python-sdk" }
5051

src/kili/adapters/kili_api_gateway/asset/formatters.py

Lines changed: 51 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,80 @@
11
"""Formatters for assets retrieved from Kili API."""
22

3+
import asyncio
34
import json
4-
from concurrent.futures import ThreadPoolExecutor, as_completed
5+
import os
56

6-
import requests
7+
import httpx
78

89
from kili.adapters.http_client import HttpClient
9-
from kili.core.helpers import get_response_json, is_url, log_raise_for_status
10+
from kili.core.helpers import is_url
1011
from kili.domain.types import ListOrTuple
1112

1213
# Batch size for parallel JSON response downloads (same as export service)
1314
JSON_RESPONSE_BATCH_SIZE = 10
1415

1516

1617
def load_json_from_link(link: str, http_client: HttpClient) -> dict:
17-
"""Load json from link."""
18+
"""Load json from link (synchronous fallback for non-batch operations)."""
1819
if link == "" or not is_url(link):
1920
return {}
2021

2122
response = http_client.get(link, timeout=30)
22-
log_raise_for_status(response)
23-
return get_response_json(response)
23+
response.raise_for_status()
24+
return response.json()
2425

2526

26-
def download_json_responses_parallel(
27-
url_to_label_mapping: list[tuple[str, dict]], http_client: HttpClient
28-
) -> None:
29-
"""Download JSON responses in parallel and assign to labels.
27+
async def _download_json_response(url: str) -> dict:
28+
"""Download and parse JSON response from a URL using asyncio.
3029
3130
Args:
32-
url_to_label_mapping: List of tuples (url, label_dict) to download
33-
http_client: HTTP client to use for downloads
31+
url: URL to download the JSON response from
32+
33+
Returns:
34+
Parsed JSON response as a dictionary
3435
"""
35-
if not url_to_label_mapping:
36-
return
36+
try:
37+
async with httpx.AsyncClient(verify=os.getenv("KILI__VERIFY_SSL") != "False") as client:
38+
response = await client.get(url, timeout=30.0)
39+
response.raise_for_status()
40+
return response.json()
41+
except (httpx.HTTPError, json.JSONDecodeError):
42+
# Return empty dict on error to ensure consistent response format
43+
return {}
44+
45+
46+
async def _download_json_responses_async(url_to_label_mapping: list[tuple[str, dict]]) -> None:
47+
"""Download JSON responses in parallel using asyncio.
3748
49+
Args:
50+
url_to_label_mapping: List of tuples (url, label_dict) to download
51+
"""
3852
# Process in batches to limit concurrent connections
3953
for i in range(0, len(url_to_label_mapping), JSON_RESPONSE_BATCH_SIZE):
4054
batch = url_to_label_mapping[i : i + JSON_RESPONSE_BATCH_SIZE]
4155

42-
# Download all URLs in the batch in parallel
43-
with ThreadPoolExecutor(max_workers=JSON_RESPONSE_BATCH_SIZE) as executor:
44-
# Submit all download tasks
45-
future_to_label = {
46-
executor.submit(load_json_from_link, url, http_client): label
47-
for url, label in batch
48-
}
49-
50-
# Collect results as they complete
51-
for future in as_completed(future_to_label):
52-
label = future_to_label[future]
53-
try:
54-
json_response = future.result()
55-
label["jsonResponse"] = json_response
56-
if "jsonResponseUrl" in label:
57-
del label["jsonResponseUrl"]
58-
except (requests.RequestException, json.JSONDecodeError, TimeoutError):
59-
# Set empty dict to ensure consistent response format
60-
label["jsonResponse"] = {}
61-
if "jsonResponseUrl" in label:
62-
del label["jsonResponseUrl"]
56+
# Download all URLs in the batch in parallel using asyncio.gather
57+
download_tasks = [_download_json_response(url) for url, _ in batch]
58+
json_responses = await asyncio.gather(*download_tasks)
59+
60+
# Assign the downloaded responses back to their labels and remove the URL
61+
for (_, label), json_response in zip(batch, json_responses, strict=False):
62+
label["jsonResponse"] = json_response
63+
if "jsonResponseUrl" in label:
64+
del label["jsonResponseUrl"]
65+
66+
67+
def download_json_responses_parallel(url_to_label_mapping: list[tuple[str, dict]]) -> None:
68+
"""Download JSON responses in parallel and assign to labels.
69+
70+
Args:
71+
url_to_label_mapping: List of tuples (url, label_dict) to download
72+
"""
73+
if not url_to_label_mapping:
74+
return
75+
76+
# Run async downloads in a synchronous context
77+
asyncio.run(_download_json_responses_async(url_to_label_mapping))
6378

6479

6580
def _parse_label_json_response(label: dict) -> None:
@@ -111,6 +126,6 @@ def load_asset_json_fields(asset: dict, fields: ListOrTuple[str], http_client: H
111126
_process_label_json_response(asset["latestLabel"], url_to_label_mapping)
112127

113128
if url_to_label_mapping:
114-
download_json_responses_parallel(url_to_label_mapping, http_client)
129+
download_json_responses_parallel(url_to_label_mapping)
115130

116131
return asset

0 commit comments

Comments
 (0)