Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 117 additions & 17 deletions src/kili/adapters/kili_api_gateway/asset/formatters.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,121 @@
"""Formatters for assets retrieved from Kili API."""

import asyncio
import json

import requests

from kili.adapters.http_client import HttpClient
from kili.core.helpers import get_response_json, is_url, log_raise_for_status
from kili.adapters.kili_api_gateway.helpers.http_json import load_json_from_link
from kili.core.helpers import is_url
from kili.domain.types import ListOrTuple

# Batch size for parallel JSON response downloads (same as export service)
JSON_RESPONSE_BATCH_SIZE = 10


async def _download_json_response(url: str, http_client: HttpClient) -> dict:
"""Download and parse JSON response from a URL using asyncio.

Args:
url: URL to download the JSON response from
http_client: HttpClient instance with SSL verification already configured

def load_json_from_link(link: str, http_client: HttpClient) -> dict:
"""Load json from link."""
if link == "" or not is_url(link):
Returns:
Parsed JSON response as a dictionary
"""
try:
# Run synchronous requests call in a thread
response = await asyncio.to_thread(http_client.get, url, timeout=30)
response.raise_for_status()
return response.json()
except (requests.RequestException, json.JSONDecodeError):
# Return empty dict on error to ensure consistent response format
return {}

response = http_client.get(link, timeout=30)
log_raise_for_status(response)
return get_response_json(response)

async def _download_json_responses_async(
url_to_label_mapping: list[tuple[str, dict]], http_client: HttpClient
) -> None:
"""Download JSON responses in parallel using asyncio.

Args:
url_to_label_mapping: List of tuples (url, label_dict) to download
http_client: HttpClient instance with SSL verification already configured
"""
# Process in batches to limit concurrent connections
for i in range(0, len(url_to_label_mapping), JSON_RESPONSE_BATCH_SIZE):
batch = url_to_label_mapping[i : i + JSON_RESPONSE_BATCH_SIZE]

# Download all URLs in the batch in parallel using asyncio.gather
download_tasks = [_download_json_response(url, http_client) for url, _ in batch]
json_responses = await asyncio.gather(*download_tasks)

# Assign the downloaded responses back to their labels and remove the URL
for (_, label), json_response in zip(batch, json_responses, strict=False):
label["jsonResponse"] = json_response
if "jsonResponseUrl" in label:
del label["jsonResponseUrl"]


def download_json_responses_parallel(
url_to_label_mapping: list[tuple[str, dict]], http_client: HttpClient
) -> None:
"""Download JSON responses in parallel and assign to labels.

Args:
url_to_label_mapping: List of tuples (url, label_dict) to download
http_client: HttpClient instance with SSL verification already configured
"""
if not url_to_label_mapping:
return

# Check if we're already in an event loop (e.g., Jupyter notebook, async web framework)
try:
asyncio.get_running_loop()
except RuntimeError:
# No running loop, safe to use asyncio.run() for parallel downloads
asyncio.run(_download_json_responses_async(url_to_label_mapping, http_client))
else:
# Already in a loop - fall back to sequential downloads to avoid RuntimeError
# This happens in Jupyter notebooks, FastAPI, and other async contexts
for url, label in url_to_label_mapping:
try:
response = http_client.get(url, timeout=30)
response.raise_for_status()
label["jsonResponse"] = response.json()
except (requests.RequestException, json.JSONDecodeError):
label["jsonResponse"] = {}

if "jsonResponseUrl" in label:
del label["jsonResponseUrl"]


def _parse_label_json_response(label: dict) -> None:
"""Parse jsonResponse string to dict for a single label.

Args:
label: Label dict to update in place
"""
json_response_value = label.get("jsonResponse", "{}")
try:
label["jsonResponse"] = json.loads(json_response_value)
except json.JSONDecodeError:
label["jsonResponse"] = {}


def _process_label_json_response(label: dict, url_to_label_mapping: list[tuple[str, dict]]) -> None:
"""Process a single label's jsonResponse, either scheduling URL download or parsing.

Args:
label: Label dict to process
url_to_label_mapping: List to append URL mapping if download needed
"""
json_response_url = label.get("jsonResponseUrl")
if json_response_url and is_url(json_response_url):
url_to_label_mapping.append((json_response_url, label))
else:
_parse_label_json_response(label)


def load_asset_json_fields(asset: dict, fields: ListOrTuple[str], http_client: HttpClient) -> dict:
Expand All @@ -28,18 +129,17 @@ def load_asset_json_fields(asset: dict, fields: ListOrTuple[str], http_client: H
if "ocrMetadata" in fields and asset.get("ocrMetadata") is not None:
asset["ocrMetadata"] = load_json_from_link(asset.get("ocrMetadata", ""), http_client)

# Collect all URLs to download in parallel (similar to export service)
url_to_label_mapping = []

if "labels.jsonResponse" in fields:
asset_labels = asset.get("labels", [])
for label in asset_labels:
try:
label["jsonResponse"] = json.loads(label["jsonResponse"])
except json.JSONDecodeError:
label["jsonResponse"] = {}
for label in asset.get("labels", []):
_process_label_json_response(label, url_to_label_mapping)

if "latestLabel.jsonResponse" in fields and asset.get("latestLabel") is not None:
try:
asset["latestLabel"]["jsonResponse"] = json.loads(asset["latestLabel"]["jsonResponse"])
except json.JSONDecodeError:
asset["latestLabel"]["jsonResponse"] = {}
_process_label_json_response(asset["latestLabel"], url_to_label_mapping)

if url_to_label_mapping:
download_json_responses_parallel(url_to_label_mapping, http_client)

return asset
86 changes: 51 additions & 35 deletions src/kili/adapters/kili_api_gateway/asset/operations_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ def list_assets(
options: QueryOptions,
) -> Generator[dict, None, None]:
"""List assets with given options."""
has_labels_url = "labels.jsonResponseUrl" in fields
has_latest_label_url = "latestLabel.jsonResponseUrl" in fields

if "labels.jsonResponse" in fields or "latestLabel.jsonResponse" in fields:
# Check if we can get the jsonResponse of if we need to rebuild it.
project_info = get_project(
Expand All @@ -58,7 +61,10 @@ def list_assets(
"LLM_STATIC",
"GEOSPATIAL",
}:
yield from self.list_assets_split(filters, fields, options, project_info)
fetch_annotations = not (has_labels_url or has_latest_label_url)
yield from self.list_assets_split(
filters, fields, options, project_info, fetch_annotations
)
return

fragment = fragment_builder(fields)
Expand All @@ -79,7 +85,12 @@ def list_assets(
yield from assets_gen

def list_assets_split(
self, filters: AssetFilters, fields: ListOrTuple[str], options: QueryOptions, project_info
self,
filters: AssetFilters,
fields: ListOrTuple[str],
options: QueryOptions,
project_info,
fetch_annotations: bool,
) -> Generator[dict, None, None]:
"""List assets with given options."""
nb_annotations = self.count_assets_annotations(filters)
Expand All @@ -91,22 +102,23 @@ def list_assets_split(

options = QueryOptions(options.disable_tqdm, options.first, options.skip, batch_size)

inner_annotation_fragment = get_annotation_fragment()
annotation_fragment = f"""
annotations {{
{inner_annotation_fragment}
}}
"""
# Ensure 'content', 'resolution', and 'jsonContent' are present in fields
required_fields = {"content", "jsonContent", "resolution.width", "resolution.height"}
fields = list(fields)
for field in required_fields:
if field not in fields:
fields.append(field)

fragment = fragment_builder(
fields, {"labels": annotation_fragment, "latestLabel": annotation_fragment}
)
static_fragments = {}
if fetch_annotations:
inner_annotation_fragment = get_annotation_fragment()
annotation_fragment = f"""
annotations {{
{inner_annotation_fragment}
}}
"""
static_fragments = {"labels": annotation_fragment, "latestLabel": annotation_fragment}

required_fields = {"content", "jsonContent", "resolution.width", "resolution.height"}
fields = list(fields)
for field in required_fields:
if field not in fields:
fields.append(field)

fragment = fragment_builder(fields, static_fragments if static_fragments else None)
query = get_assets_query(fragment)
where = asset_where_mapper(filters)
assets_gen = PaginatedGraphQLQuery(self.graphql_client).execute_query_from_paginated_call(
Expand All @@ -115,25 +127,29 @@ def list_assets_split(
assets_gen = (
load_asset_json_fields(asset, fields, self.http_client) for asset in assets_gen
)
converter = AnnotationsToJsonResponseConverter(
json_interface=project_info["jsonInterface"],
project_input_type=project_info["inputType"],
)
is_requesting_annotations = any("annotations." in element for element in fields)
for asset in assets_gen:
if "latestLabel.jsonResponse" in fields and asset.get("latestLabel"):
converter.patch_label_json_response(
asset, asset["latestLabel"], asset["latestLabel"]["annotations"]
)
if not is_requesting_annotations:
asset["latestLabel"].pop("annotations")

if "labels.jsonResponse" in fields:
for label in asset.get("labels", []):
converter.patch_label_json_response(asset, label, label["annotations"])
if fetch_annotations:
converter = AnnotationsToJsonResponseConverter(
json_interface=project_info["jsonInterface"],
project_input_type=project_info["inputType"],
)
is_requesting_annotations = any("annotations." in element for element in fields)
for asset in assets_gen:
if "latestLabel.jsonResponse" in fields and asset.get("latestLabel"):
converter.patch_label_json_response(
asset, asset["latestLabel"], asset["latestLabel"]["annotations"]
)
if not is_requesting_annotations:
label.pop("annotations")
yield asset
asset["latestLabel"].pop("annotations")

if "labels.jsonResponse" in fields:
for label in asset.get("labels", []):
converter.patch_label_json_response(asset, label, label["annotations"])
if not is_requesting_annotations:
label.pop("annotations")
yield asset
else:
yield from assets_gen

def count_assets(self, filters: AssetFilters) -> int:
"""Send a GraphQL request calling countIssues resolver."""
Expand Down
22 changes: 22 additions & 0 deletions src/kili/adapters/kili_api_gateway/helpers/http_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""HTTP JSON helpers for downloading JSON data from URLs."""

from kili.adapters.http_client import HttpClient
from kili.core.helpers import is_url


def load_json_from_link(link: str, http_client: HttpClient) -> dict:
"""Load json from link.

Args:
link: URL to download JSON from
http_client: HttpClient instance with SSL verification already configured

Returns:
Parsed JSON response as a dictionary, or empty dict if link is invalid
"""
if link == "" or not is_url(link):
return {}

response = http_client.get(link, timeout=30)
response.raise_for_status()
return response.json()
19 changes: 14 additions & 5 deletions src/kili/adapters/kili_api_gateway/label/formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,24 @@

import json

from kili.adapters.http_client import HttpClient
from kili.adapters.kili_api_gateway.helpers.http_json import load_json_from_link
from kili.core.helpers import is_url
from kili.domain.types import ListOrTuple


def load_label_json_fields(label: dict, fields: ListOrTuple[str]) -> dict:
def load_label_json_fields(label: dict, fields: ListOrTuple[str], http_client: HttpClient) -> dict:
"""Load json fields of a label."""
if "jsonResponse" in fields:
try:
label["jsonResponse"] = json.loads(label.get("jsonResponse", "{}"))
except json.JSONDecodeError:
label["jsonResponse"] = {}
json_response_url = label.get("jsonResponseUrl")
if json_response_url and is_url(json_response_url):
label["jsonResponse"] = load_json_from_link(json_response_url, http_client)
del label["jsonResponseUrl"]
else:
json_response_value = label.get("jsonResponse", "{}")
try:
label["jsonResponse"] = json.loads(json_response_value)
except json.JSONDecodeError:
label["jsonResponse"] = {}

return label
Loading