Skip to content

Commit cdfe314

Browse files
committed
feat(LAB-4123): faster exports with the SDK
1 parent c578666 commit cdfe314

File tree

11 files changed

+558
-62
lines changed

11 files changed

+558
-62
lines changed
Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
"""Formatters for assets retrieved from Kili API."""
22

33
import json
4+
from concurrent.futures import ThreadPoolExecutor, as_completed
5+
6+
import requests
47

58
from kili.adapters.http_client import HttpClient
69
from kili.core.helpers import get_response_json, is_url, log_raise_for_status
710
from kili.domain.types import ListOrTuple
811

12+
# Batch size for parallel JSON response downloads (same as export service)
13+
JSON_RESPONSE_BATCH_SIZE = 10
14+
915

1016
def load_json_from_link(link: str, http_client: HttpClient) -> dict:
1117
"""Load json from link."""
@@ -17,6 +23,72 @@ def load_json_from_link(link: str, http_client: HttpClient) -> dict:
1723
return get_response_json(response)
1824

1925

26+
def download_json_responses_parallel(
27+
url_to_label_mapping: list[tuple[str, dict]], http_client: HttpClient
28+
) -> None:
29+
"""Download JSON responses in parallel and assign to labels.
30+
31+
Args:
32+
url_to_label_mapping: List of tuples (url, label_dict) to download
33+
http_client: HTTP client to use for downloads
34+
"""
35+
if not url_to_label_mapping:
36+
return
37+
38+
# Process in batches to limit concurrent connections
39+
for i in range(0, len(url_to_label_mapping), JSON_RESPONSE_BATCH_SIZE):
40+
batch = url_to_label_mapping[i : i + JSON_RESPONSE_BATCH_SIZE]
41+
42+
# Download all URLs in the batch in parallel
43+
with ThreadPoolExecutor(max_workers=JSON_RESPONSE_BATCH_SIZE) as executor:
44+
# Submit all download tasks
45+
future_to_label = {
46+
executor.submit(load_json_from_link, url, http_client): label
47+
for url, label in batch
48+
}
49+
50+
# Collect results as they complete
51+
for future in as_completed(future_to_label):
52+
label = future_to_label[future]
53+
try:
54+
json_response = future.result()
55+
label["jsonResponse"] = json_response
56+
if "jsonResponseUrl" in label:
57+
del label["jsonResponseUrl"]
58+
except (requests.RequestException, json.JSONDecodeError, TimeoutError):
59+
# Set empty dict to ensure consistent response format
60+
label["jsonResponse"] = {}
61+
if "jsonResponseUrl" in label:
62+
del label["jsonResponseUrl"]
63+
64+
65+
def _parse_label_json_response(label: dict) -> None:
66+
"""Parse jsonResponse string to dict for a single label.
67+
68+
Args:
69+
label: Label dict to update in place
70+
"""
71+
json_response_value = label.get("jsonResponse", "{}")
72+
try:
73+
label["jsonResponse"] = json.loads(json_response_value)
74+
except json.JSONDecodeError:
75+
label["jsonResponse"] = {}
76+
77+
78+
def _process_label_json_response(label: dict, url_to_label_mapping: list[tuple[str, dict]]) -> None:
79+
"""Process a single label's jsonResponse, either scheduling URL download or parsing.
80+
81+
Args:
82+
label: Label dict to process
83+
url_to_label_mapping: List to append URL mapping if download needed
84+
"""
85+
json_response_url = label.get("jsonResponseUrl")
86+
if json_response_url and is_url(json_response_url):
87+
url_to_label_mapping.append((json_response_url, label))
88+
else:
89+
_parse_label_json_response(label)
90+
91+
2092
def load_asset_json_fields(asset: dict, fields: ListOrTuple[str], http_client: HttpClient) -> dict:
2193
"""Load json fields of an asset."""
2294
if "jsonMetadata" in fields:
@@ -28,18 +100,17 @@ def load_asset_json_fields(asset: dict, fields: ListOrTuple[str], http_client: H
28100
if "ocrMetadata" in fields and asset.get("ocrMetadata") is not None:
29101
asset["ocrMetadata"] = load_json_from_link(asset.get("ocrMetadata", ""), http_client)
30102

103+
# Collect all URLs to download in parallel (similar to export service)
104+
url_to_label_mapping = []
105+
31106
if "labels.jsonResponse" in fields:
32-
asset_labels = asset.get("labels", [])
33-
for label in asset_labels:
34-
try:
35-
label["jsonResponse"] = json.loads(label["jsonResponse"])
36-
except json.JSONDecodeError:
37-
label["jsonResponse"] = {}
107+
for label in asset.get("labels", []):
108+
_process_label_json_response(label, url_to_label_mapping)
38109

39110
if "latestLabel.jsonResponse" in fields and asset.get("latestLabel") is not None:
40-
try:
41-
asset["latestLabel"]["jsonResponse"] = json.loads(asset["latestLabel"]["jsonResponse"])
42-
except json.JSONDecodeError:
43-
asset["latestLabel"]["jsonResponse"] = {}
111+
_process_label_json_response(asset["latestLabel"], url_to_label_mapping)
112+
113+
if url_to_label_mapping:
114+
download_json_responses_parallel(url_to_label_mapping, http_client)
44115

45116
return asset

src/kili/adapters/kili_api_gateway/asset/operations_mixin.py

Lines changed: 51 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ def list_assets(
4646
options: QueryOptions,
4747
) -> Generator[dict, None, None]:
4848
"""List assets with given options."""
49+
has_labels_url = "labels.jsonResponseUrl" in fields
50+
has_latest_label_url = "latestLabel.jsonResponseUrl" in fields
51+
4952
if "labels.jsonResponse" in fields or "latestLabel.jsonResponse" in fields:
5053
# Check if we can get the jsonResponse of if we need to rebuild it.
5154
project_info = get_project(
@@ -58,7 +61,10 @@ def list_assets(
5861
"LLM_STATIC",
5962
"GEOSPATIAL",
6063
}:
61-
yield from self.list_assets_split(filters, fields, options, project_info)
64+
fetch_annotations = not (has_labels_url or has_latest_label_url)
65+
yield from self.list_assets_split(
66+
filters, fields, options, project_info, fetch_annotations
67+
)
6268
return
6369

6470
fragment = fragment_builder(fields)
@@ -79,7 +85,12 @@ def list_assets(
7985
yield from assets_gen
8086

8187
def list_assets_split(
82-
self, filters: AssetFilters, fields: ListOrTuple[str], options: QueryOptions, project_info
88+
self,
89+
filters: AssetFilters,
90+
fields: ListOrTuple[str],
91+
options: QueryOptions,
92+
project_info,
93+
fetch_annotations: bool,
8394
) -> Generator[dict, None, None]:
8495
"""List assets with given options."""
8596
nb_annotations = self.count_assets_annotations(filters)
@@ -91,22 +102,23 @@ def list_assets_split(
91102

92103
options = QueryOptions(options.disable_tqdm, options.first, options.skip, batch_size)
93104

94-
inner_annotation_fragment = get_annotation_fragment()
95-
annotation_fragment = f"""
96-
annotations {{
97-
{inner_annotation_fragment}
98-
}}
99-
"""
100-
# Ensure 'content', 'resolution', and 'jsonContent' are present in fields
101-
required_fields = {"content", "jsonContent", "resolution.width", "resolution.height"}
102-
fields = list(fields)
103-
for field in required_fields:
104-
if field not in fields:
105-
fields.append(field)
106-
107-
fragment = fragment_builder(
108-
fields, {"labels": annotation_fragment, "latestLabel": annotation_fragment}
109-
)
105+
static_fragments = {}
106+
if fetch_annotations:
107+
inner_annotation_fragment = get_annotation_fragment()
108+
annotation_fragment = f"""
109+
annotations {{
110+
{inner_annotation_fragment}
111+
}}
112+
"""
113+
static_fragments = {"labels": annotation_fragment, "latestLabel": annotation_fragment}
114+
115+
required_fields = {"content", "jsonContent", "resolution.width", "resolution.height"}
116+
fields = list(fields)
117+
for field in required_fields:
118+
if field not in fields:
119+
fields.append(field)
120+
121+
fragment = fragment_builder(fields, static_fragments if static_fragments else None)
110122
query = get_assets_query(fragment)
111123
where = asset_where_mapper(filters)
112124
assets_gen = PaginatedGraphQLQuery(self.graphql_client).execute_query_from_paginated_call(
@@ -115,25 +127,29 @@ def list_assets_split(
115127
assets_gen = (
116128
load_asset_json_fields(asset, fields, self.http_client) for asset in assets_gen
117129
)
118-
converter = AnnotationsToJsonResponseConverter(
119-
json_interface=project_info["jsonInterface"],
120-
project_input_type=project_info["inputType"],
121-
)
122-
is_requesting_annotations = any("annotations." in element for element in fields)
123-
for asset in assets_gen:
124-
if "latestLabel.jsonResponse" in fields and asset.get("latestLabel"):
125-
converter.patch_label_json_response(
126-
asset, asset["latestLabel"], asset["latestLabel"]["annotations"]
127-
)
128-
if not is_requesting_annotations:
129-
asset["latestLabel"].pop("annotations")
130130

131-
if "labels.jsonResponse" in fields:
132-
for label in asset.get("labels", []):
133-
converter.patch_label_json_response(asset, label, label["annotations"])
131+
if fetch_annotations:
132+
converter = AnnotationsToJsonResponseConverter(
133+
json_interface=project_info["jsonInterface"],
134+
project_input_type=project_info["inputType"],
135+
)
136+
is_requesting_annotations = any("annotations." in element for element in fields)
137+
for asset in assets_gen:
138+
if "latestLabel.jsonResponse" in fields and asset.get("latestLabel"):
139+
converter.patch_label_json_response(
140+
asset, asset["latestLabel"], asset["latestLabel"]["annotations"]
141+
)
134142
if not is_requesting_annotations:
135-
label.pop("annotations")
136-
yield asset
143+
asset["latestLabel"].pop("annotations")
144+
145+
if "labels.jsonResponse" in fields:
146+
for label in asset.get("labels", []):
147+
converter.patch_label_json_response(asset, label, label["annotations"])
148+
if not is_requesting_annotations:
149+
label.pop("annotations")
150+
yield asset
151+
else:
152+
yield from assets_gen
137153

138154
def count_assets(self, filters: AssetFilters) -> int:
139155
"""Send a GraphQL request calling countIssues resolver."""

src/kili/adapters/kili_api_gateway/label/formatters.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,33 @@
22

33
import json
44

5+
from kili.adapters.http_client import HttpClient
6+
from kili.core.helpers import is_url
57
from kili.domain.types import ListOrTuple
68

79

8-
def load_label_json_fields(label: dict, fields: ListOrTuple[str]) -> dict:
10+
def load_json_from_link(link: str, http_client: HttpClient) -> dict:
11+
"""Load json from link."""
12+
if link == "" or not is_url(link):
13+
return {}
14+
15+
response = http_client.get(link, timeout=30)
16+
response.raise_for_status()
17+
return response.json()
18+
19+
20+
def load_label_json_fields(label: dict, fields: ListOrTuple[str], http_client: HttpClient) -> dict:
921
"""Load json fields of a label."""
1022
if "jsonResponse" in fields:
11-
try:
12-
label["jsonResponse"] = json.loads(label.get("jsonResponse", "{}"))
13-
except json.JSONDecodeError:
14-
label["jsonResponse"] = {}
23+
json_response_url = label.get("jsonResponseUrl")
24+
if json_response_url and is_url(json_response_url):
25+
label["jsonResponse"] = load_json_from_link(json_response_url, http_client)
26+
del label["jsonResponseUrl"]
27+
else:
28+
json_response_value = label.get("jsonResponse", "{}")
29+
try:
30+
label["jsonResponse"] = json.loads(json_response_value)
31+
except json.JSONDecodeError:
32+
label["jsonResponse"] = {}
1533

1634
return label

src/kili/adapters/kili_api_gateway/label/operations_mixin.py

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ def list_labels(
5252
options: QueryOptions,
5353
) -> Generator[dict, None, None]:
5454
"""List labels."""
55+
has_json_response_url = "jsonResponseUrl" in fields
56+
5557
if "jsonResponse" in fields:
5658
if "labelOf" not in fields:
5759
fields = [*list(fields), "assetId"]
@@ -65,7 +67,10 @@ def list_labels(
6567
"LLM_INSTR_FOLLOWING",
6668
"LLM_STATIC",
6769
}:
68-
yield from self.list_labels_split(filters, fields, options, project_info)
70+
fetch_annotations = not has_json_response_url
71+
yield from self.list_labels_split(
72+
filters, fields, options, project_info, fetch_annotations
73+
)
6974
return
7075

7176
fragment = fragment_builder(fields)
@@ -74,11 +79,18 @@ def list_labels(
7479
labels_gen = PaginatedGraphQLQuery(self.graphql_client).execute_query_from_paginated_call(
7580
query, where, options, "Retrieving labels", GQL_COUNT_LABELS
7681
)
77-
labels_gen = (load_label_json_fields(label, fields) for label in labels_gen)
82+
labels_gen = (
83+
load_label_json_fields(label, fields, self.http_client) for label in labels_gen
84+
)
7885
yield from labels_gen
7986

8087
def list_labels_split(
81-
self, filters: LabelFilters, fields: ListOrTuple[str], options: QueryOptions, project_info
88+
self,
89+
filters: LabelFilters,
90+
fields: ListOrTuple[str],
91+
options: QueryOptions,
92+
project_info,
93+
fetch_annotations: bool,
8294
) -> Generator[dict, None, None]:
8395
"""List labels."""
8496
if project_info["inputType"] == "VIDEO":
@@ -87,21 +99,28 @@ def list_labels_split(
8799
)
88100

89101
fragment = fragment_builder(fields)
90-
inner_annotation_fragment = get_annotation_fragment()
91-
full_fragment = f"""
92-
{fragment}
93-
annotations {{
94-
{inner_annotation_fragment}
95-
}}
96-
"""
102+
103+
if fetch_annotations:
104+
inner_annotation_fragment = get_annotation_fragment()
105+
full_fragment = f"""
106+
{fragment}
107+
annotations {{
108+
{inner_annotation_fragment}
109+
}}
110+
"""
111+
else:
112+
full_fragment = fragment
113+
97114
query = get_labels_query(full_fragment)
98115
where = label_where_mapper(filters)
99116
labels_gen = PaginatedGraphQLQuery(self.graphql_client).execute_query_from_paginated_call(
100117
query, where, options, "Retrieving labels", GQL_COUNT_LABELS
101118
)
102-
labels_gen = (load_label_json_fields(label, fields) for label in labels_gen)
119+
labels_gen = (
120+
load_label_json_fields(label, fields, self.http_client) for label in labels_gen
121+
)
103122

104-
if "jsonResponse" in fields:
123+
if "jsonResponse" in fields and fetch_annotations:
105124
converter = AnnotationsToJsonResponseConverter(
106125
json_interface=project_info["jsonInterface"],
107126
project_input_type=project_info["inputType"],

src/kili/llm/services/export/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
"isLatestLabelForUser",
3737
"isSentBackToQueue",
3838
"jsonResponse", # This is needed to keep annotations
39+
"jsonResponseUrl",
3940
"labelType",
4041
"modelName",
4142
]
@@ -57,6 +58,7 @@
5758
"externalId",
5859
"jsonMetadata",
5960
"labels.jsonResponse",
61+
"labels.jsonResponseUrl",
6062
"labels.author.id",
6163
"labels.author.email",
6264
"labels.author.firstname",

src/kili/services/copy_project/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ def _copy_labels_legacy(self, from_project_id: str, new_project_id: str) -> None
202202
fields=[
203203
"author.email",
204204
"jsonResponse",
205+
"jsonResponseUrl",
205206
"secondsToLabel",
206207
"isLatestLabelForUser",
207208
"labelOf.externalId",

0 commit comments

Comments
 (0)