Skip to content

Commit 6bfe48b

Browse files
committed
feat(LAB-4123): update ThreadPoolExecutor to asyncio
1 parent a5f7a3e commit 6bfe48b

File tree

3 files changed

+208
-188
lines changed

3 files changed

+208
-188
lines changed

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ dependencies = [
4444
"filelock >= 3.0.0, < 4.0.0",
4545
"pip-system-certs >= 4.0.0, < 5.0.0; platform_system=='Windows'",
4646
"pyrate-limiter >= 3, < 4",
47-
"kili-formats == 1.1.0"
47+
"kili-formats == 1.1.0",
48+
"httpx >= 0.27.0, < 1.0.0"
4849
]
4950
urls = { homepage = "https://github.com/kili-technology/kili-python-sdk" }
5051

src/kili/adapters/kili_api_gateway/asset/formatters.py

Lines changed: 54 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,83 @@
11
"""Formatters for assets retrieved from Kili API."""
22

3+
import asyncio
34
import json
4-
from concurrent.futures import ThreadPoolExecutor, as_completed
5+
import os
56

6-
import requests
7+
import httpx
78

89
from kili.adapters.http_client import HttpClient
9-
from kili.core.helpers import get_response_json, is_url, log_raise_for_status
10+
from kili.core.helpers import is_url
1011
from kili.domain.types import ListOrTuple
1112

1213
# Batch size for parallel JSON response downloads (same as export service)
1314
JSON_RESPONSE_BATCH_SIZE = 10
1415

1516

1617
def load_json_from_link(link: str, http_client: HttpClient) -> dict:
17-
"""Load json from link."""
18+
"""Load json from link (synchronous fallback for non-batch operations)."""
1819
if link == "" or not is_url(link):
1920
return {}
2021

2122
response = http_client.get(link, timeout=30)
22-
log_raise_for_status(response)
23-
return get_response_json(response)
23+
response.raise_for_status()
24+
return response.json()
2425

2526

26-
def download_json_responses_parallel(
27-
url_to_label_mapping: list[tuple[str, dict]], http_client: HttpClient
28-
) -> None:
29-
"""Download JSON responses in parallel and assign to labels.
27+
async def _download_json_response(url: str) -> dict:
28+
"""Download and parse JSON response from a URL using asyncio.
3029
3130
Args:
32-
url_to_label_mapping: List of tuples (url, label_dict) to download
33-
http_client: HTTP client to use for downloads
31+
url: URL to download the JSON response from
32+
33+
Returns:
34+
Parsed JSON response as a dictionary
3435
"""
35-
if not url_to_label_mapping:
36-
return
36+
try:
37+
verify_env = os.getenv("KILI_VERIFY")
38+
verify = verify_env.lower() in ("true", "1", "yes") if verify_env is not None else True
39+
40+
async with httpx.AsyncClient(verify=verify) as client:
41+
response = await client.get(url, timeout=30.0)
42+
response.raise_for_status()
43+
return response.json()
44+
except (httpx.HTTPError, json.JSONDecodeError):
45+
# Return empty dict on error to ensure consistent response format
46+
return {}
47+
48+
49+
async def _download_json_responses_async(url_to_label_mapping: list[tuple[str, dict]]) -> None:
50+
"""Download JSON responses in parallel using asyncio.
3751
52+
Args:
53+
url_to_label_mapping: List of tuples (url, label_dict) to download
54+
"""
3855
# Process in batches to limit concurrent connections
3956
for i in range(0, len(url_to_label_mapping), JSON_RESPONSE_BATCH_SIZE):
4057
batch = url_to_label_mapping[i : i + JSON_RESPONSE_BATCH_SIZE]
4158

42-
# Download all URLs in the batch in parallel
43-
with ThreadPoolExecutor(max_workers=JSON_RESPONSE_BATCH_SIZE) as executor:
44-
# Submit all download tasks
45-
future_to_label = {
46-
executor.submit(load_json_from_link, url, http_client): label
47-
for url, label in batch
48-
}
49-
50-
# Collect results as they complete
51-
for future in as_completed(future_to_label):
52-
label = future_to_label[future]
53-
try:
54-
json_response = future.result()
55-
label["jsonResponse"] = json_response
56-
if "jsonResponseUrl" in label:
57-
del label["jsonResponseUrl"]
58-
except (requests.RequestException, json.JSONDecodeError, TimeoutError):
59-
# Set empty dict to ensure consistent response format
60-
label["jsonResponse"] = {}
61-
if "jsonResponseUrl" in label:
62-
del label["jsonResponseUrl"]
59+
# Download all URLs in the batch in parallel using asyncio.gather
60+
download_tasks = [_download_json_response(url) for url, _ in batch]
61+
json_responses = await asyncio.gather(*download_tasks)
62+
63+
# Assign the downloaded responses back to their labels and remove the URL
64+
for (_, label), json_response in zip(batch, json_responses, strict=False):
65+
label["jsonResponse"] = json_response
66+
if "jsonResponseUrl" in label:
67+
del label["jsonResponseUrl"]
68+
69+
70+
def download_json_responses_parallel(url_to_label_mapping: list[tuple[str, dict]]) -> None:
71+
"""Download JSON responses in parallel and assign to labels.
72+
73+
Args:
74+
url_to_label_mapping: List of tuples (url, label_dict) to download
75+
"""
76+
if not url_to_label_mapping:
77+
return
78+
79+
# Run async downloads in a synchronous context
80+
asyncio.run(_download_json_responses_async(url_to_label_mapping))
6381

6482

6583
def _parse_label_json_response(label: dict) -> None:
@@ -111,6 +129,6 @@ def load_asset_json_fields(asset: dict, fields: ListOrTuple[str], http_client: H
111129
_process_label_json_response(asset["latestLabel"], url_to_label_mapping)
112130

113131
if url_to_label_mapping:
114-
download_json_responses_parallel(url_to_label_mapping, http_client)
132+
download_json_responses_parallel(url_to_label_mapping)
115133

116134
return asset

0 commit comments

Comments
 (0)