33
44import pandas as pd
55import xarray as xr
6+ from intake_esgf import ESGFCatalog
67
78
89class DataRequest (Protocol ):
@@ -14,11 +15,15 @@ class DataRequest(Protocol):
1415 differently to generate the sample data.
1516 """
1617
17- facets : dict [str , str | tuple [str , ...]]
18- remove_ensembles : bool
19- time_span : tuple [str , str ]
18+ def fetch_datasets (self ) -> pd .DataFrame :
19+ """
20+ Fetch the datasets from the source
21+
22+ Returns a dataframe of the metadata and paths to the fetched datasets.
23+ """
24+ ...
2025
21- def decimate_dataset (self , dataset : xr .Dataset , time_span : tuple [ str , str ] | None ) -> xr .Dataset | None :
26+ def decimate_dataset (self , dataset : xr .Dataset ) -> xr .Dataset | None :
2227 """Downscale the dataset to a smaller size."""
2328 ...
2429
@@ -27,3 +32,28 @@ def generate_filename(
2732 ) -> pathlib .Path :
2833 """Create the output filename for the dataset."""
2934 ...
35+
36+
37+ class IntakeESGFDataRequest (DataRequest ):
38+ """
39+ A data request that fetches datasets from ESGF using intake-esgf.
40+ """
41+
42+ facets : dict [str , str | tuple [str , ...]]
43+ remove_ensembles : bool
44+ time_span : tuple [str , str ]
45+
46+ def fetch_datasets (self ) -> pd .DataFrame :
47+ """Fetch the datasets from the ESGF."""
48+ cat = ESGFCatalog ()
49+
50+ cat .search (** self .facets )
51+ if self .remove_ensembles :
52+ cat .remove_ensembles ()
53+
54+ path_dict = cat .to_path_dict (prefer_streaming = False , minimal_keys = False , quiet = True )
55+ merged_df = cat .df .merge (pd .Series (path_dict , name = "files" ), left_on = "key" , right_index = True )
56+ if self .time_span :
57+ merged_df ["time_start" ] = self .time_span [0 ]
58+ merged_df ["time_end" ] = self .time_span [1 ]
59+ return merged_df
0 commit comments