11from __future__ import annotations
22
3+ import warnings
34import logging
45from functools import cached_property
5- from typing import Any , Callable , List , Optional , Type , Union
6+ from typing import Any , Callable , List , Optional , Sequence , Type , Union , Dict , Generator
67
78import croniter
89from mapchete import Bounds
1718from mapchete .path import MPath
1819from mapchete .tile import BufferedTile
1920from mapchete .types import MPathLike , NodataVal , NodataVals
20- from pydantic import BaseModel
21+ from pydantic import BaseModel , model_validator
22+ from pystac import Item
2123from rasterio .enums import Resampling
2224from rasterio .features import geometry_mask
2325from shapely .geometry import mapping
2426from shapely .geometry .base import BaseGeometry
2527
26- from mapchete_eo .archives .base import Archive
2728from mapchete_eo .exceptions import CorruptedProductMetadata , PreprocessingNotFinished
2829from mapchete_eo .io import (
2930 products_to_np_array ,
3031 products_to_xarray ,
3132 read_levelled_cube_to_np_array ,
3233 read_levelled_cube_to_xarray ,
3334)
35+ from mapchete_eo .source import Source
3436from mapchete_eo .product import EOProduct
3537from mapchete_eo .protocols import EOProductProtocol
36- from mapchete_eo .search .stac_static import STACStaticCatalog
3738from mapchete_eo .settings import mapchete_eo_settings
3839from mapchete_eo .sort import SortMethodConfig , TargetDateSort
3940from mapchete_eo .time import to_datetime
4445
4546class BaseDriverConfig (BaseModel ):
4647 format : str
47- time : Union [TimeRange , List [TimeRange ]]
48+ source : Sequence [Source ]
49+ time : Optional [Union [TimeRange , List [TimeRange ]]] = None
4850 cat_baseurl : Optional [str ] = None
4951 cache : Optional [Any ] = None
5052 footprint_buffer : float = 0
5153 area : Optional [Union [MPathLike , dict , type [BaseGeometry ]]] = None
5254 preprocessing_tasks : bool = False
53- archive : Optional [Type [Archive ]] = None
55+ search_kwargs : Optional [Dict [str , Any ]] = None
56+
57+ @model_validator (mode = "before" )
58+ def to_list (cls , values : Dict [str , Any ]) -> Dict [str , Any ]:
59+ """Expands source to list."""
60+ for field in ["source" ]:
61+ value = values .get (field )
62+ if value is not None and not isinstance (value , list ):
63+ values [field ] = [value ]
64+ return values
65+
66+ @model_validator (mode = "before" )
67+ def deprecate_cat_baseurl (cls , values : Dict [str , Any ]) -> Dict [str , Any ]:
68+ cat_baseurl = values .get ("cat_baseurl" )
69+ if cat_baseurl : # pragma: no cover
70+ warnings .warn (
71+ "'cat_baseurl' will be deprecated soon. Please use 'catalog_type=static' in the source." ,
72+ category = DeprecationWarning ,
73+ stacklevel = 2 ,
74+ )
75+ if values .get ("source" , []):
76+ raise ValueError (
77+ "deprecated cat_baseurl field found alongside sources."
78+ )
79+ values ["source" ] = [dict (collection = cat_baseurl , catalog_type = "static" )]
80+ return values
5481
5582
5683class EODataCube (base .InputTile ):
@@ -63,7 +90,7 @@ class EODataCube(base.InputTile):
6390
6491 tile : BufferedTile
6592 eo_bands : dict
66- time : List [TimeRange ]
93+ time : Optional [ List [TimeRange ] ]
6794 area : BaseGeometry
6895 area_pixelbuffer : int = 0
6996
@@ -72,7 +99,7 @@ def __init__(
7299 tile : BufferedTile ,
73100 products : Optional [List [EOProductProtocol ]],
74101 eo_bands : dict ,
75- time : List [TimeRange ],
102+ time : Optional [ List [TimeRange ]] = None ,
76103 input_key : Optional [str ] = None ,
77104 area : Optional [BaseGeometry ] = None ,
78105 ** kwargs ,
@@ -314,27 +341,25 @@ def filter_products(
314341 """
315342 Return a filtered list of input products.
316343 """
317- if any ([start_time , end_time , timestamps ]):
344+ if any ([start_time , end_time , timestamps ]): # pragma: no cover
318345 raise NotImplementedError ("time subsets are not yet implemented" )
319346
320347 if time_pattern :
321348 # filter products by time pattern
322- tz = tzutc ()
323- coord_time = [
324- t .replace (tzinfo = tz )
325- for t in croniter .croniter_range (
326- to_datetime (self .start_time ),
327- to_datetime (self .end_time ),
328- time_pattern ,
329- )
330- ]
331349 return [
332350 product
333351 for product in self .products
334- if product .item .datetime in coord_time
352+ if product .item .datetime
353+ in [
354+ t .replace (tzinfo = tzutc ())
355+ for t in croniter .croniter_range (
356+ to_datetime (self .start_time ),
357+ to_datetime (self .end_time ),
358+ time_pattern ,
359+ )
360+ ]
335361 ]
336- else :
337- return self .products
362+ return self .products
338363
339364 def is_empty (self ) -> bool : # pragma: no cover
340365 """
@@ -358,16 +383,16 @@ def default_read_values(
358383 nodatavals = self .default_read_nodataval
359384 merge_products_by = merge_products_by or self .default_read_merge_products_by
360385 merge_method = merge_method or self .default_read_merge_method
361- if resampling is None :
362- resampling = self .default_read_resampling
363- else :
364- resampling = (
365- resampling
366- if isinstance (resampling , Resampling )
367- else Resampling [resampling ]
368- )
369386 return dict (
370- resampling = resampling ,
387+ resampling = (
388+ self .default_read_resampling
389+ if resampling is None
390+ else (
391+ resampling
392+ if isinstance (resampling , Resampling )
393+ else Resampling [resampling ]
394+ )
395+ ),
371396 nodatavals = nodatavals ,
372397 merge_products_by = merge_products_by ,
373398 merge_method = merge_method ,
@@ -401,8 +426,7 @@ class InputData(base.InputData):
401426 default_preprocessing_task : Callable = staticmethod (EOProduct .from_stac_item )
402427 driver_config_model : Type [BaseDriverConfig ] = BaseDriverConfig
403428 params : BaseDriverConfig
404- archive : Archive
405- time : Union [TimeRange , List [TimeRange ]]
429+ time : Optional [Union [TimeRange , List [TimeRange ]]]
406430 area : BaseGeometry
407431 _products : Optional [IndexedFeatures ] = None
408432
@@ -421,6 +445,8 @@ def __init__(
421445 self .standalone = standalone
422446
423447 self .params = self .driver_config_model (** input_params ["abstract" ])
448+ self .conf_dir = input_params .get ("conf_dir" )
449+
424450 # we have to make sure, the cache path is absolute
425451 # not quite fond of this solution
426452 if self .params .cache :
@@ -429,14 +455,18 @@ def __init__(
429455 ).absolute_path (base_dir = input_params .get ("conf_dir" ))
430456 self .area = self ._init_area (input_params )
431457 self .time = self .params .time
432- if self .readonly : # pragma: no cover
433- return
434458
435- self .set_archive (base_dir = input_params ["conf_dir" ])
459+ self .eo_bands = [
460+ eo_band
461+ for source in self .params .source
462+ for eo_band in source .eo_bands (base_dir = self .conf_dir )
463+ ]
436464
465+ if self .readonly : # pragma: no cover
466+ return
437467 # don't use preprocessing tasks for Sentinel-2 products:
438468 if self .params .preprocessing_tasks or self .params .cache is not None :
439- for item in self .archive . items ():
469+ for item in self .source_items ():
440470 self .add_preprocessing_task (
441471 self .default_preprocessing_task ,
442472 fargs = (item ,),
@@ -455,7 +485,7 @@ def __init__(
455485 self .default_preprocessing_task (
456486 item , cache_config = self .params .cache , cache_all = True
457487 )
458- for item in self .archive . items ()
488+ for item in self .source_items ()
459489 ]
460490 )
461491
@@ -481,20 +511,30 @@ def _init_area(self, input_params: dict) -> BaseGeometry:
481511 )
482512 return process_area
483513
484- def set_archive (self , base_dir : MPath ):
485- # this only works with some static archive:
486- if self .params .cat_baseurl :
487- self .archive = Archive (
488- catalog = STACStaticCatalog (
489- baseurl = MPath (self .params .cat_baseurl ).absolute_path (
490- base_dir = base_dir
491- ),
492- ),
493- area = self .bbox (mapchete_eo_settings .default_catalog_crs ),
494- time = self .time ,
514+ def source_items (self ) -> Generator [Item , None , None ]:
515+ already_returned = set ()
516+ for source in self .params .source :
517+ area = reproject_geometry (
518+ self .area ,
519+ src_crs = self .crs ,
520+ dst_crs = source .catalog_crs ,
495521 )
496- else :
497- raise NotImplementedError ()
522+ if area .is_empty :
523+ continue
524+ for item in source .search (
525+ time = self .time ,
526+ area = area ,
527+ base_dir = self .conf_dir ,
528+ ):
529+ # if item was already found in previous source, skip
530+ if item .id in already_returned :
531+ continue
532+
533+ # if item is new, add to list and yield
534+ already_returned .add (item .id )
535+ item .properties ["mapchete_eo:source" ] = source
536+ yield item
537+ logger .debug ("returned set of %s items" , len (already_returned ))
498538
499539 def bbox (self , out_crs : Optional [str ] = None ) -> BaseGeometry :
500540 """Return data bounding box."""
@@ -517,15 +557,15 @@ def products(self) -> IndexedFeatures:
517557 return self ._products
518558
519559 # TODO: copied it from mapchete_satellite, not yet sure which use case this is
520- elif self .standalone :
560+ elif self .standalone : # pragma: no cover
521561 raise NotImplementedError ()
522562
523563 # if preprocessing tasks are ready, index them for further use
524564 elif self .preprocessing_tasks_results :
525565 return IndexedFeatures (
526566 [
527567 self .get_preprocessing_task_result (item .id )
528- for item in self .archive . items ()
568+ for item in self .source_items ()
529569 if not isinstance (item , CorruptedProductMetadata )
530570 ],
531571 crs = self .crs ,
@@ -557,7 +597,7 @@ def open(self, tile, **kwargs) -> EODataCube:
557597 return self .input_tile_cls (
558598 tile ,
559599 products = tile_products ,
560- eo_bands = self .archive . catalog . eo_bands ,
600+ eo_bands = self .eo_bands ,
561601 time = self .time ,
562602 # passing on the input key is essential so dependent preprocessing tasks can be found!
563603 input_key = self .input_key ,
0 commit comments