Skip to content

Commit 2f2d764

Browse files
authored
Merge pull request #143 from c-hydro/dev
Dev
2 parents 7ac2c11 + c831b6b commit 2f2d764

File tree

7 files changed

+381
-106
lines changed

7 files changed

+381
-106
lines changed

src/door/base_downloaders.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -492,8 +492,6 @@ def check_data(self, blank_path, **kwargs) -> bool:
492492
pass
493493
return False
494494

495-
496-
497495
class APIDownloader(DOORDownloader):
498496
"""
499497
Downloader for data from an API.
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
#from .cds_downloader import CDSDownloader
2-
from .era5_downloader import ERA5Downloader
2+
from .era5_downloader import ERA5Downloader
3+
from .glofas_downloader import GLOFASDownloader

src/door/data_sources/cds/cds_downloader.py

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,12 @@
11
import cdsapi
2+
import datetime as dt
3+
from typing import Generator
4+
import xarray as xr
5+
import cfgrib
6+
7+
import d3tools.timestepping as ts
8+
from d3tools.spatial import BoundingBox
9+
210
from ...base_downloaders import APIDownloader
311

412
import os
@@ -7,7 +15,6 @@ class CDSDownloader(APIDownloader):
715

816
name = "CDS_downloader"
917
apikey_env_vars = 'CDSAPI_KEY' # this should be in the form UID:API_KEY already
10-
cds_url = 'https://cds.climate.copernicus.eu/api'
1118

1219
def __init__(self, dataset) -> None:
1320

@@ -31,3 +38,117 @@ def download(self, request: dict, destination: str,
3138
"""
3239
return super().download(destination, min_size, missing_action, name = self.dataset, request = request, target = destination)
3340

41+
def get_last_published_ts(self, ts_per_year = None, **kwargs) -> ts.TimeRange:
42+
43+
"""
44+
Get the last published date for the dataset.
45+
"""
46+
if ts_per_year is None:
47+
ts_per_year = self.ts_per_year
48+
49+
# get the last published timestep
50+
last_published = self.get_last_published_date()
51+
if ts_per_year == 365:
52+
TimeStep = ts.Day
53+
else:
54+
TimeStep = ts.fixed_num_timestep.FixedNTimeStep.get_subclass(ts_per_year)
55+
return TimeStep.from_date(last_published + dt.timedelta(days=1)) - 1
56+
57+
def build_request(self,
58+
time:ts.TimeRange,
59+
space_bounds:BoundingBox) -> dict:
60+
"""
61+
Make a request for the CDS API.
62+
"""
63+
variables = [var for var in self.variables.keys()]
64+
65+
# get the correct timesteps
66+
start = time.start
67+
end = time.end
68+
69+
years = set()
70+
months = set()
71+
days = set()
72+
73+
this_time = start
74+
while this_time <= end:
75+
years.add(this_time.year)
76+
months.add(this_time.month)
77+
days.add(this_time.day)
78+
this_time += dt.timedelta(days=1)
79+
80+
years_str = [str(y) for y in years]
81+
months_str = [str(m).zfill(2) for m in months]
82+
days_str = [str(d).zfill(2) for d in days]
83+
84+
# Get the bounding box in the correct order
85+
W, S, E, N = space_bounds.bbox
86+
87+
request = {
88+
'data_format': 'grib', # we always want grib, it's smaller, then we convert
89+
'download_format' : 'unarchived', #TODO: change this to "zip" and handle unzipping before opening the data!
90+
'variable': variables,
91+
'year' : years_str,
92+
'month': months_str,
93+
'day' : days_str,
94+
'area': [N, W, S, E],
95+
}
96+
97+
return request
98+
99+
def _get_data_ts(self,
100+
timestep: ts.TimeStep,
101+
space_bounds: BoundingBox,
102+
tmp_path: str) -> Generator[tuple[xr.DataArray, dict], None, None]:
103+
104+
105+
106+
timestep_start = timestep.start
107+
timestep_end = timestep.end
108+
109+
tmp_filename = f'temp_{self.dataset}_{timestep_start:%Y%m%d}-{timestep_end:%Y%m%d}.grib2'
110+
tmp_destination = os.path.join(tmp_path, tmp_filename)
111+
112+
113+
request = self.build_request(timestep, space_bounds)
114+
success = self.download(request, tmp_destination, min_size = 100, missing_action = 'e')
115+
116+
# this will create a list of xarray datasets, one for each "well-formed" cube in the grib file,
117+
# this is needed because requesting multiple variables at once will return a single grib file that might contain multiple cubes
118+
# (if the variable have different dimensions)
119+
return cfgrib.open_datasets(tmp_destination)
120+
121+
def _aggregate_variable(self, vardata, timestep, varopts):
122+
123+
agg_timesteps = timestep.get_timesteps_from_tsnumber(self.ts_per_year_agg)
124+
125+
for agg_timestep in agg_timesteps:
126+
127+
timestep_start = agg_timestep.start
128+
timestep_end = agg_timestep.end
129+
130+
# filter data to the aggregation timestep
131+
inrange = (vardata.time.dt.date >= timestep_start.date()) & (vardata.time.dt.date <= timestep_end.date())
132+
vardata_ts = vardata.sel(time = inrange)
133+
134+
# add start and end time as attributes
135+
vardata_ts.attrs['start_time'] = timestep_start
136+
vardata_ts.attrs['end_time'] = timestep_end
137+
138+
# do the necessary aggregations:
139+
for agg in varopts['agg_method']:
140+
141+
vardata_ts.attrs['agg_function'] = agg
142+
if agg == 'mean':
143+
aggdata = vardata_ts.mean(dim='time', skipna = False)
144+
elif agg == 'max':
145+
aggdata = vardata_ts.max(dim='time', skipna = False)
146+
elif agg == 'min':
147+
aggdata = vardata_ts.min(dim='time', skipna = False)
148+
elif agg == 'sum':
149+
aggdata = vardata_ts.sum(dim='time', skipna = False)
150+
151+
aggdata = aggdata.rio.set_spatial_dims('longitude', 'latitude')
152+
aggdata = aggdata.rio.write_crs(self.spatial_ref)
153+
154+
yield aggdata, {'variable': varopts.get('var'), 'agg_method': agg, 'timestep': agg_timestep}

src/door/data_sources/cds/era5_downloader.py

Lines changed: 15 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import datetime as dt
2-
import os
32
from typing import Generator
43
import xarray as xr
54
import numpy as np
@@ -9,12 +8,12 @@
98
from d3tools.spatial import BoundingBox
109
from d3tools import timestepping as ts
1110
from d3tools.timestepping.timestep import TimeStep
12-
from d3tools.timestepping.fixed_num_timestep import FixedNTimeStep
1311

1412
class ERA5Downloader(CDSDownloader):
1513

1614
source = "ERA5"
1715
name = "ERA5_downloader"
16+
cds_url = 'https://cds.climate.copernicus.eu/api'
1817

1918
available_products = ['reanalysis-era5-single-levels', 'reanalysis-era5-land']
2019

@@ -105,42 +104,17 @@ def build_request(self,
105104
"""
106105
Make a request for the CDS API.
107106
"""
108-
variables = [var for var in self.variables.keys()]
109-
110-
# get the correct timesteps
111-
start = time.start
112-
end = time.end
113-
114107
# If in the variable list we have total precipitation, we need to download the data for the next day as well
115108
if 'total_precipitation' in self.variables:
116-
end += dt.timedelta(days=1)
109+
time = time.extend(ts.TimeWindow(1, 'd'))
117110

118-
years = set()
119-
months = set()
120-
days = set()
121-
122-
this_time = start
123-
while this_time <= end:
124-
years.add(this_time.year)
125-
months.add(this_time.month)
126-
days.add(this_time.day)
127-
this_time += dt.timedelta(days=1)
111+
request = super().build_request(
112+
time, space_bounds
113+
)
128114

129-
years_str = [str(y) for y in years]
130-
months_str = [str(m).zfill(2) for m in months]
131-
days_str = [str(d).zfill(2) for d in days]
132-
133-
# Get the bounding box in the correct order
134-
W, S, E, N = space_bounds.bbox
135-
136-
request = {
115+
# add ERA5 specific parameters
116+
request.update({
137117
'product_type': 'reanalysis',
138-
'data_format': 'grib', # we always want grib, it's smaller, then we convert
139-
'download_format' : 'unarchived', #TODO: change this to "zip" and handle unzipping before opening the data!
140-
'variable': variables,
141-
'year' : years_str,
142-
'month': months_str,
143-
'day' : days_str,
144118
'time': [ # we always want all times in a day
145119
'00:00', '01:00', '02:00',
146120
'03:00', '04:00', '05:00',
@@ -151,26 +125,9 @@ def build_request(self,
151125
'18:00', '19:00', '20:00',
152126
'21:00', '22:00', '23:00',
153127
],
154-
'area': [N, W, S, E],
155-
}
128+
})
156129

157130
return request
158-
159-
def get_last_published_ts(self, ts_per_year = None, **kwargs) -> ts.TimeRange:
160-
161-
"""
162-
Get the last published date for the dataset.
163-
"""
164-
if ts_per_year is None:
165-
ts_per_year = self.ts_per_year
166-
167-
# get the last published timestep
168-
last_published = self.get_last_published_date()
169-
if ts_per_year == 365:
170-
TimeStep = ts.Day
171-
else:
172-
TimeStep = FixedNTimeStep.get_subclass(ts_per_year)
173-
return TimeStep.from_date(last_published + dt.timedelta(days=1)) - 1
174131

175132
def get_last_published_date(self, **kwargs) -> dt.datetime:
176133
now = dt.datetime.now()
@@ -182,26 +139,12 @@ def _get_data_ts(self,
182139
space_bounds: BoundingBox,
183140
tmp_path: str) -> Generator[tuple[xr.DataArray, dict], None, None]:
184141

185-
import cfgrib
186-
187-
timestep_start = timestep.start
188-
timestep_end = timestep.end
189-
190-
tmp_filename = f'temp_{self.dataset}_{timestep_start:%Y%m%d}-{timestep_end:%Y%m%d}.grib2'
191-
tmp_destination = os.path.join(tmp_path, tmp_filename)
192-
193-
194-
request = self.build_request(timestep, space_bounds)
195-
success = self.download(request, tmp_destination, min_size = 100, missing_action = 'e')
196-
197-
# this will create a list of xarray datasets, one for each "well-formed" cube in the grib file,
198-
# this is needed because requesting multiple variables at once will return a single grib file that might contain multiple cubes
199-
# (if the variable have different dimensions)
200-
all_data = cfgrib.open_datasets(tmp_destination)
142+
all_data = super()._get_data_ts(timestep, space_bounds, tmp_path)
201143

202144
# loop over the variables
203145
for var, varopts in self.variables.items():
204146
varname = varopts['varname']
147+
varopts['var'] = var
205148

206149
# find the data for the variable
207150
for this_data in all_data:
@@ -236,7 +179,7 @@ def _get_data_ts(self,
236179
vardata = vardata.assign_coords(time=valid_times)
237180

238181
# filter data to the selected days (we have to do this because the API returns data for longer periods than we actually need)
239-
inrange = (vardata.time.dt.date >= timestep_start.date()) & (vardata.time.dt.date <= timestep_end.date())
182+
inrange = (vardata.time.dt.date >= timestep.start.date()) & (vardata.time.dt.date <= timestep.end.date())
240183
vardata = vardata.sel(time = inrange)
241184

242185
# Convert Kelvin to Celsius if we are dealing with temperatures
@@ -247,8 +190,8 @@ def _get_data_ts(self,
247190
vardata = vardata.squeeze()
248191

249192
# verify that we have all the data we need (i.e. no timesteps of complete nans)!
250-
time_to_check = timestep_start
251-
while time_to_check <= timestep_end:
193+
time_to_check = timestep.start
194+
while time_to_check <= timestep.end:
252195
istoday = vardata.time.dt.date == time_to_check.date()
253196
this_data = vardata.sel(time = istoday)
254197
for time in this_data.time:
@@ -263,35 +206,5 @@ def _get_data_ts(self,
263206
if attr.startswith('GRIB'):
264207
del vardata.attrs[attr]
265208

266-
ts_as_tr = ts.TimeRange(start = timestep_start, end = timestep_end)
267-
agg_timesteps = ts_as_tr.get_timesteps_from_tsnumber(self.ts_per_year_agg)
268-
269-
for agg_timestep in agg_timesteps:
270-
timestep_start = agg_timestep.start
271-
timestep_end = agg_timestep.end
272-
273-
# filter data to the aggregation timestep
274-
inrange = (vardata.time.dt.date >= timestep_start.date()) & (vardata.time.dt.date <= timestep_end.date())
275-
vardata_ts = vardata.sel(time = inrange)
276-
277-
# add start and end time as attributes
278-
vardata_ts.attrs['start_time'] = timestep_start
279-
vardata_ts.attrs['end_time'] = timestep_end
280-
281-
# do the necessary aggregations:
282-
for agg in varopts['agg_method']:
283-
284-
vardata_ts.attrs['agg_function'] = agg
285-
if agg == 'mean':
286-
aggdata = vardata_ts.mean(dim='time', skipna = False)
287-
elif agg == 'max':
288-
aggdata = vardata_ts.max(dim='time', skipna = False)
289-
elif agg == 'min':
290-
aggdata = vardata_ts.min(dim='time', skipna = False)
291-
elif agg == 'sum':
292-
aggdata = vardata_ts.sum(dim='time', skipna = False)
293-
294-
aggdata = aggdata.rio.set_spatial_dims('longitude', 'latitude')
295-
aggdata = aggdata.rio.write_crs(self.spatial_ref)
296-
297-
yield aggdata, {'variable': var, 'agg_method': agg, 'timestep': agg_timestep}
209+
# aggregate in the superclass and yield
210+
yield from self._aggregate_variable(vardata, timestep, varopts)

0 commit comments

Comments
 (0)