|
| 1 | +"""SuperSTAC Query Manager""" |
| 2 | + |
| 3 | +from typing import List, Optional, Dict, Any |
| 4 | +from pystac_client import Client |
| 5 | +from pystac import Item |
| 6 | +from superstac._logging import logger |
| 7 | +from superstac.utils import BAND_MAP |
| 8 | + |
| 9 | + |
| 10 | +# todo - support auth and headers for client.open() in query_catalog_with_pystac param. |
| 11 | +# test resolving band alias |
| 12 | + |
| 13 | + |
| 14 | +def query_catalog_with_pystac( |
| 15 | + name: str, |
| 16 | + url: str, |
| 17 | + required_assets: Optional[List[str]] = None, |
| 18 | + max_items: int = 100, |
| 19 | + **search_kwargs: Any, |
| 20 | +) -> List[Dict[str, Any]]: |
| 21 | + """ |
| 22 | + Search a single catalog using pystac-client and normalize the assets. |
| 23 | +
|
| 24 | + Args: |
| 25 | + name: Unique catalog name. |
| 26 | + url: STAC API URL. |
| 27 | + required_assets: Band aliases to filter and rename assets. |
| 28 | + max_items: Maximum number of results. |
| 29 | + **search_kwargs: All valid parameters supported by pystac-client's search(). |
| 30 | +
|
| 31 | + Returns: |
| 32 | + List of STAC items (as dicts), normalized with filtered assets and catalog_name. |
| 33 | + """ |
| 34 | + try: |
| 35 | + client = Client.open(url) |
| 36 | + logger.debug(f"Querying STAC with: {search_kwargs.items()}") |
| 37 | + search = client.search(max_items=max_items, **search_kwargs) |
| 38 | + items: List[Item] = list(search.items()) |
| 39 | + |
| 40 | + collection_id = None |
| 41 | + collections = search_kwargs.get("collections") |
| 42 | + if isinstance(collections, list) and collections: |
| 43 | + collection_id = collections[0] |
| 44 | + elif isinstance(collections, str): |
| 45 | + collection_id = collections |
| 46 | + |
| 47 | + band_map = BAND_MAP.get(collection_id, {}) |
| 48 | + normalized = [] |
| 49 | + |
| 50 | + for item in items: |
| 51 | + item_assets = item.assets |
| 52 | + normalized_assets = {} |
| 53 | + |
| 54 | + for alias, actual in band_map.items(): |
| 55 | + if not required_assets or alias in required_assets: |
| 56 | + if actual in item_assets: |
| 57 | + normalized_assets[alias] = item_assets[actual].to_dict() |
| 58 | + |
| 59 | + if normalized_assets: |
| 60 | + item_dict = item.to_dict() |
| 61 | + item_dict["assets"] = normalized_assets |
| 62 | + item_dict["catalog_name"] = name |
| 63 | + normalized.append(item_dict) |
| 64 | + |
| 65 | + logger.info( |
| 66 | + f"[{name}] Returned {len(normalized)} items for collection '{collection_id}'" |
| 67 | + ) |
| 68 | + return normalized |
| 69 | + |
| 70 | + except Exception as e: |
| 71 | + logger.warning(f"[{name}] Catalog query failed: {e}") |
| 72 | + return [] |
| 73 | + |
| 74 | + |
| 75 | +if __name__ == "__main__": |
| 76 | + results = query_catalog_with_pystac( |
| 77 | + name="aws", |
| 78 | + url="https://earth-search.aws.element84.com/v1", |
| 79 | + collections=["sentinel-2-l2a"], |
| 80 | + bbox=[6.0, 49.0, 7.0, 50.0], |
| 81 | + datetime="2024-01-01/2024-01-31", |
| 82 | + query={"eo:cloud_cover": {"lt": 20}}, |
| 83 | + required_assets=["red", "nir"], |
| 84 | + sortby=[{"field": "properties.datetime", "direction": "desc"}], |
| 85 | + ) |
| 86 | + print(results) |
0 commit comments