Skip to content

Commit d4878b8

Browse files
Padeanujgaucher-cs
andauthored
RSPY578: fix cadip limit (#816)
* First implementation * Fix UT * Revert search tests * Implemented parralellisation? * Revert this config * fix fmt * Sort after regroup * Fixed limit value * removed unneccesary sort * Review refactor * Sonar fix * Removed indexing * Moved some func * clean: code * fix: cadip Item formating * fix: cadip Item formating --------- Co-authored-by: Julien Gaucher <[email protected]>
1 parent 37b1e5f commit d4878b8

File tree

7 files changed

+194
-100
lines changed

7 files changed

+194
-100
lines changed

services/adgs/rs_server_adgs/api/adgs_search.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def process_search(
100100
limit: int,
101101
page: int,
102102
) -> stac_pydantic.ItemCollection:
103-
"""Do the search for the given collection and OData parameters."""
103+
"""Search adgs products for the given collection and OData parameters."""
104104

105105
return process_product_search(
106106
collection.get("station", "adgs"),

services/cadip/rs_server_cadip/api/cadip_search.py

Lines changed: 72 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
It includes an API endpoint, utility functions, and initialization for accessing EODataAccessGateway.
1919
"""
2020

21-
# pylint: disable=redefined-builtin
2221
import json
22+
import threading
2323
import traceback
24+
25+
# pylint: disable=redefined-builtin
26+
from collections import defaultdict
2427
from typing import Annotated, List, Literal, Union
2528

2629
import requests
@@ -37,6 +40,7 @@
3740
from rs_server_cadip.cadip_utils import (
3841
CADIP_CONFIG,
3942
cadip_map_mission,
43+
cadip_stac_mapper,
4044
link_assets_to_session,
4145
prepare_collection,
4246
read_conf,
@@ -50,6 +54,7 @@
5054
set_eodag_auth_token,
5155
)
5256
from rs_server_common.data_retrieval.provider import CreateProviderFailed
57+
from rs_server_common.rspy_models import Item
5358
from rs_server_common.stac_api_common import (
5459
CollectionType,
5560
DateTimeType,
@@ -97,15 +102,15 @@ def __init__(self, request: Request | None = None, readwrite: Literal["r", "w"]
97102
self.sortby = "-published"
98103

99104
@handle_exceptions
100-
def process_search(
105+
def process_search( # type: ignore
101106
self,
102107
collection: dict,
103108
odata_params: dict,
104109
limit: int,
105110
page: int,
106111
) -> stac_pydantic.ItemCollection:
107-
"""Do the search for the given collection and OData parameters."""
108-
session_data = process_session_search(
112+
"""Search cadip sessions the given collection and OData parameters."""
113+
return process_session_search(
109114
self.request,
110115
collection.get("station", "cadip"),
111116
odata_params.get("SessionId", []),
@@ -116,18 +121,30 @@ def process_search(
116121
limit,
117122
page,
118123
)
119-
if not session_data.features:
120-
# If there are no sessions, don't proceed to assets allocation
121-
return session_data
122124

123-
# To be updated with proper ('')
124-
features_ids = ", ".join(feature.id for feature in session_data.features)
125+
@handle_exceptions
126+
def process_asset_search(
127+
self,
128+
station: str,
129+
session_features: list[Item],
130+
):
131+
"""
132+
Search cadip files for each input cadip session and their associated station.
133+
Update input session assets with their associated files.
134+
135+
Args:
136+
station (str): station identifier
137+
session_features (list[Item]): sessions as Item objects
138+
"""
139+
140+
# Join session ids with ', '
141+
features_ids = ", ".join(feature.id for feature in session_features)
125142

126143
assets: list[dict] = []
127144
page = 1
128145
while True:
129146
chunked_assets = process_files_search(
130-
collection.get("station", "cadip"),
147+
station,
131148
features_ids,
132149
map_to_session=True,
133150
page=page,
@@ -141,8 +158,49 @@ def process_search(
141158
# If assets are equal to maximum limit, then send another request for the next page
142159
page += 1
143160

144-
with open(CADIP_CONFIG / "cadip_stac_mapper.json", encoding="utf-8") as mapper:
145-
return link_assets_to_session(session_data, assets, json.loads(mapper.read()))
161+
# Update input session items with assets
162+
link_assets_to_session(session_features, assets)
163+
164+
def process_files(self, empty_sessions_data: dict) -> dict:
165+
"""
166+
Search cadip files for each input cadip session. Update the sessions data with their files data.
167+
168+
Args:
169+
empty_sessions_data (dict): dict representation of an ItemCollection
170+
171+
Returns:
172+
dict: updated input dict.
173+
"""
174+
175+
# Convert input dict into stac object
176+
item_collection = stac_pydantic.ItemCollection.model_validate(empty_sessions_data)
177+
178+
# Group sessions coming from the same collection. {col1: "item1, item2", col2: "item3" }
179+
grouped_sessions = defaultdict(list)
180+
for session in item_collection.features:
181+
grouped_sessions[session.collection].append(session)
182+
183+
# Update input session assets with their associated files, in separate threads
184+
file_threads = [
185+
threading.Thread(
186+
target=self.process_asset_search,
187+
args=(
188+
self.select_config(collection_id)["station"],
189+
session_features,
190+
),
191+
)
192+
for collection_id, session_features in grouped_sessions.items()
193+
]
194+
for thread in file_threads:
195+
thread.start()
196+
for thread in file_threads:
197+
thread.join()
198+
199+
# Convert back the stac object into dict.
200+
# We implemented some custom Item formating, so we do a back and forth conversion
201+
# to apply the formating, then finally return a dict.
202+
formatted = [Item.model_validate(feature.model_dump()) for feature in item_collection.features]
203+
return stac_pydantic.ItemCollection(features=formatted, type=item_collection.type).model_dump()
146204

147205

148206
def auth_validation(request: Request, collection_id: str, access_type: str):
@@ -455,15 +513,12 @@ def process_session_search( # type: ignore # pylint: disable=too-many-arguments
455513
products = validate_products(products)
456514
feature_template_path = CADIP_CONFIG / "cadip_session_ODataToSTAC_template.json"
457515
stac_mapper_path = CADIP_CONFIG / "cadip_sessions_stac_mapper.json"
458-
expanded_session_mapper_path = CADIP_CONFIG / "cadip_stac_mapper.json"
459516
with (
460517
open(feature_template_path, encoding="utf-8") as template,
461518
open(stac_mapper_path, encoding="utf-8") as stac_map,
462-
open(expanded_session_mapper_path, encoding="utf-8") as expanded_session_mapper,
463519
):
464520
feature_template = json.loads(template.read())
465521
stac_mapper = json.loads(stac_map.read())
466-
expanded_session_mapper = json.loads(expanded_session_mapper.read())
467522
collection = create_stac_collection(products, feature_template, stac_mapper)
468523
return prepare_collection(collection)
469524

@@ -574,14 +629,9 @@ def process_files_search( # pylint: disable=too-many-locals
574629
if kwargs.get("deprecated", False):
575630
write_search_products_to_db(CadipDownloadStatus, products)
576631
feature_template_path = CADIP_CONFIG / "ODataToSTAC_template.json"
577-
stac_mapper_path = CADIP_CONFIG / "cadip_stac_mapper.json"
578-
with (
579-
open(feature_template_path, encoding="utf-8") as template,
580-
open(stac_mapper_path, encoding="utf-8") as stac_map,
581-
):
632+
with open(feature_template_path, encoding="utf-8") as template:
582633
feature_template = json.loads(template.read())
583-
stac_mapper = json.loads(stac_map.read())
584-
cadip_item_collection = create_stac_collection(products, feature_template, stac_mapper)
634+
cadip_item_collection = create_stac_collection(products, feature_template, cadip_stac_mapper())
585635
logger.info("Succesfully listed and processed products from CADIP station")
586636
if kwargs.get("map_to_session", False):
587637
return [product.properties for product in products]

services/cadip/rs_server_cadip/cadip_utils.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import eodag
3131
import yaml
3232
from fastapi import HTTPException, status
33+
from rs_server_common.rspy_models import Item
3334
from rs_server_common.stac_api_common import map_stac_platform
3435
from rs_server_common.utils.logging import Logging
3536
from stac_pydantic import ItemCollection, ItemProperties
@@ -48,7 +49,15 @@ def read_conf():
4849
cadip_search_config = os.environ.get("RSPY_CADIP_SEARCH_CONFIG", str(search_yaml.absolute()))
4950
with open(cadip_search_config, encoding="utf-8") as search_conf:
5051
config = yaml.safe_load(search_conf)
51-
return config # WARNING: if the caller wants to modify this cached object, it must deepcopy it first
52+
return config # WARNING: if the caller wants to modify this cached object, he must deepcopy it first
53+
54+
55+
@lru_cache()
56+
def cadip_stac_mapper():
57+
"""Used each time to read the cadip_stac_mapper config yaml."""
58+
with open(CADIP_CONFIG / "cadip_stac_mapper.json", encoding="utf-8") as mapper:
59+
config = json.loads(mapper.read())
60+
return config # WARNING: if the caller wants to modify this cached object, he must deepcopy it first
5261

5362

5463
def select_config(configuration_id: str) -> dict | None:
@@ -187,14 +196,16 @@ def cadip_reverse_map_mission(platform: Union[str, None]) -> Tuple[Union[str, No
187196
return None, None
188197

189198

190-
def link_assets_to_session(session_data, assets_dict, mapper):
191-
"""Function used to allocate assets to propper session item based on session id property."""
199+
def link_assets_to_session(session_features: list[Item], asset_items: list[dict]):
200+
"""Update input session items with associated assets based on session id property."""
192201
# Validity check to be later added.
193-
for feature in session_data.features:
194-
matching_assets = [asset_item for asset_item in assets_dict if feature.id == asset_item["SessionId"]]
202+
for feature in session_features:
203+
matching_assets = [asset_item for asset_item in asset_items if feature.id == asset_item["SessionId"]]
195204
for asset_item in matching_assets:
196205
asset_dict = {
197-
map_key: asset_item[map_value] for map_key, map_value in mapper.items() if map_value in asset_item
206+
map_key: asset_item[map_value]
207+
for map_key, map_value in cadip_stac_mapper().items()
208+
if map_value in asset_item
198209
}
199210
asset: Asset = Asset(title=asset_dict.pop("id"), roles=["cadu"], **asset_dict)
200211
if asset.title:
@@ -223,7 +234,6 @@ def link_assets_to_session(session_data, assets_dict, mapper):
223234
except ValueError as e:
224235
logger.warning(f"Cannot update start/end datetime for {feature.id}: {e}")
225236
continue
226-
return session_data
227237

228238

229239
def prepare_collection(collection: ItemCollection) -> ItemCollection:

services/common/rs_server_common/rspy_models.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
Module used to overwrite stac_pydantic with RSPY types.
1717
"""
1818

19+
from datetime import datetime
20+
1921
# mypy: ignore-errors
20-
from typing import Optional, Sequence
22+
from typing import Any, Optional, Sequence
2123

2224
import stac_pydantic
2325
from geojson_pydantic import FeatureCollection
@@ -47,6 +49,16 @@ class ItemProperties(WrapStacCommonMetadata):
4749

4850
model_config = ConfigDict(extra="allow")
4951

52+
def __init__(self, **data: Any):
53+
"""Force convert datetime to str if any in init."""
54+
data = {
55+
key: (value.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" if isinstance(value, datetime) else value)
56+
for key, value in data.items()
57+
}
58+
59+
# Call the parent class's initializer
60+
super().__init__(**data)
61+
5062

5163
class Item(stac_pydantic.item.Item):
5264
"""

services/common/rs_server_common/stac_api_common.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,11 @@ def read_cql(filt: dict):
570570
data = ItemCollection(features=list(all_items.values()), type="FeatureCollection")
571571
dict_data: Dict[str, Any] = self.paginate(data)
572572

573+
# In cadip, we retrieved the sessions data.
574+
# We need to fill their assets with the session files data.
575+
if self.cadip:
576+
dict_data = self.process_files(dict_data)
577+
573578
# Handle pagination links.
574579
if len(dict_data["features"]) > 0:
575580
# Don't create next page if the current one does not have features
@@ -667,7 +672,7 @@ def process_collection( # pylint: disable=too-many-locals, too-many-branches, t
667672
# Don't forward limit value for /search endpoints
668673
# just use maximum to gather all possible results, page is always 1
669674
if "/search" in self.request.url.path:
670-
search_limit = SEARCH_LIMIT
675+
search_limit = self.limit * self.page
671676
search_page = 1
672677

673678
# Do the search for this collection
@@ -704,6 +709,25 @@ def process_search(
704709
) -> ItemCollection:
705710
"""Do the search for the given collection and OData parameters."""
706711

712+
def process_asset_search(
713+
self,
714+
station: str,
715+
session_features: list[Item],
716+
):
717+
"""
718+
Implemented only by cadip.
719+
Search cadip files for each input cadip session and their associated station.
720+
Update input session assets with their associated files.
721+
"""
722+
raise NotImplementedError
723+
724+
def process_files(self, empty_sessions_data: dict) -> dict:
725+
"""
726+
Implemented only by cadip.
727+
Search cadip files for each input cadip session. Update the sessions data with their files data.
728+
"""
729+
raise NotImplementedError
730+
707731

708732
def create_collection(collection: dict) -> stac_pydantic.Collection:
709733
"""Used to create stac_pydantic Model Collection based on given collection data."""

0 commit comments

Comments
 (0)