2424import os
2525import pathlib
2626
27+ import numpy as np
2728import pandas as pd
2829import xarray as xr
2930from xcube .core .store import DatasetDescriptor , VariableDescriptor
@@ -45,6 +46,10 @@ def __init__(self):
4546 "Monthly drought indices from 1940–present derived "
4647 "from ERA5 reanalysis (main run)"
4748 ),
49+ "derived-drought-historical-monthly:ensemble_members" : (
50+ "Monthly drought indices from 1940–present derived "
51+ "from ERA5 ensemble (10 members)"
52+ ),
4853 }
4954 self ._variable_names = [
5055 "standardised_precipitation_index" ,
@@ -114,13 +119,18 @@ def describe_data(self, data_id: str) -> DatasetDescriptor:
114119 var_name , accum_period
115120 )
116121
122+ if data_id .endswith ("reanalysis" ):
123+ dims = ("time" , "lat" , "lon" )
124+ else :
125+ dims = ("time" , "number" , "lat" , "lon" )
126+
117127 variable_descriptors = []
118128 for var_name , attrs in mapping_varname_attrs .items ():
119129 variable_descriptors .append (
120130 VariableDescriptor (
121131 name = var_name ,
122132 dtype = "float64" ,
123- dims = ( "time" , "lat" , "lon" ) ,
133+ dims = dims ,
124134 attrs = attrs ,
125135 )
126136 )
@@ -177,33 +187,72 @@ def read_file(
177187 zip_ref .extractall (path_temp )
178188 file_paths = glob .glob (f"{ path_temp } /*" )
179189 dss = []
180- for var_name in open_params ["variable_names" ]:
181- for accum_period in open_params ["accumulation_periods" ]:
182- pattern = self ._get_filepath_pattern (var_name , accum_period )
183- file_sel = [path for path in file_paths if pattern in path ]
184- file_sel = sorted (file_sel )
185- ds = xr .open_mfdataset (
186- file_sel ,
187- engine = "netcdf4" ,
188- chunks = "auto" ,
189- combine_attrs = "drop_conflicts" ,
190- )
191- if "standardised_precipitation" in var_name :
192- ds = ds .sel (
193- time = slice (
194- open_params ["time_range" ][0 ], open_params ["time_range" ][1 ]
195- )
190+
191+ if cds_api_params ["product_type" ] == ["reanalysis" ]:
192+ for var_name in open_params ["variable_names" ]:
193+ for accum_period in open_params ["accumulation_periods" ]:
194+ pattern = self ._get_filepath_pattern (var_name , accum_period )
195+ file_sel = [path for path in file_paths if pattern in path ]
196+ file_sel = sorted (file_sel )
197+ ds = xr .open_mfdataset (
198+ file_sel ,
199+ engine = "netcdf4" ,
200+ chunks = "auto" ,
201+ combine_attrs = "drop_conflicts" ,
196202 )
197- else :
198- ds = self ._resample_quality_ds (ds , open_params ["time_range" ])
199- assert len (ds .data_vars ) == 1
200- ds_varname = self ._get_varname (var_name , accum_period )
201- ds = ds .rename ({list (ds .data_vars .keys ())[0 ]: ds_varname })
202- dss .append (ds )
203- ds_final = xr .merge (dss , join = "outer" , combine_attrs = "drop_conflicts" )
204- ds_final = ds_final .sel (
205- time = slice (open_params ["time_range" ][0 ], open_params ["time_range" ][1 ])
206- )
203+ if "standardised_precipitation" in var_name :
204+ ds = ds .sel (
205+ time = slice (
206+ open_params ["time_range" ][0 ],
207+ open_params ["time_range" ][1 ],
208+ )
209+ )
210+ else :
211+ ds = self ._resample_quality_ds (ds , open_params ["time_range" ])
212+ assert len (ds .data_vars ) == 1
213+ ds_varname = self ._get_varname (var_name , accum_period )
214+ ds = ds .rename ({list (ds .data_vars .keys ())[0 ]: ds_varname })
215+ dss .append (ds )
216+ ds_final = xr .merge (dss , join = "outer" , combine_attrs = "drop_conflicts" )
217+ ds_final = ds_final .sel (
218+ time = slice (open_params ["time_range" ][0 ], open_params ["time_range" ][1 ])
219+ )
220+ else :
221+ for var_name in open_params ["variable_names" ]:
222+ for accum_period in open_params ["accumulation_periods" ]:
223+ pattern = self ._get_filepath_pattern (var_name , accum_period )
224+ file_sel = [path for path in file_paths if pattern in path ]
225+ file_sel = sorted (file_sel )
226+ dss_inner = []
227+ for path in file_sel :
228+ ds = xr .open_dataset (path , engine = "netcdf4" , chunks = "auto" )
229+ time_axis = ds .time
230+ # The data from the backend uses the confusing name `time` for the
231+ # ensemble member index. We rename it to `number` to be consistent
232+ # with other ERA5 datasets, and to free up the name `time` for the actual
233+ # time.
234+ ds = ds .rename ({"time" : "number" })
235+ ds = ds .assign_coords (number = np .arange (10 ))
236+ ds = ds .expand_dims (time = [time_axis [0 ].values ])
237+ dss_inner .append (ds )
238+ ds = xr .concat (dss_inner , "time" , combine_attrs = "drop_conflicts" )
239+ if "standardised_precipitation" in var_name :
240+ ds = ds .sel (
241+ time = slice (
242+ open_params ["time_range" ][0 ],
243+ open_params ["time_range" ][1 ],
244+ )
245+ )
246+ else :
247+ ds = self ._resample_quality_ds (ds , open_params ["time_range" ])
248+ assert len (ds .data_vars ) == 1
249+ ds_varname = self ._get_varname (var_name , accum_period )
250+ ds = ds .rename ({list (ds .data_vars .keys ())[0 ]: ds_varname })
251+ dss .append (ds )
252+ ds_final = xr .merge (dss , join = "outer" , combine_attrs = "drop_conflicts" )
253+ ds_final = ds_final .sel (
254+ time = slice (open_params ["time_range" ][0 ], open_params ["time_range" ][1 ])
255+ )
207256 return ds_final
208257
209258 @staticmethod
0 commit comments