1+ import os
2+ import rioxarray as rxr
3+ import xarray as xr
4+ from typing import Generator
5+ from affine import Affine
6+
7+ import datetime as dt
8+
9+ from .cmr_downloader import CMRDownloader
10+ from d3tools .spatial import BoundingBox , crop_to_bb
11+
12+ from d3tools .timestepping .timestep import TimeStep
13+
14+ class MERRA2Downloader (CMRDownloader ):
15+
16+ source = 'MERRA2'
17+ name = 'MERRA2_downloader'
18+
19+ available_products = {
20+ 'tavg1_2d' : { # time-averaged (hourly), single level
21+ 'provider' : 'GES_DISC' ,
22+ 'freq' : 'daily' , # files are daily, data is hourly
23+ 'version' : '5.12.4'
24+ }
25+ }
26+
27+ available_variables = {
28+ 'tavg1_2d' : {
29+ 'precipitation' : {'product_id' : 'M2T1NXFLX' , 'varname' : 'PRECTOT' , 'agg_method' : 'sum' },
30+ 'temperature' : {'product_id' : 'M2T1NXSLV' , 'varname' : 'T2M' , 'agg_method' : 'mean' }
31+ }
32+ }
33+
34+ available_agg_methods = ['mean' , 'max' , 'min' , 'sum' ]
35+
36+ default_options = {
37+ 'variables' : ['precipitation' ],
38+ 'agg_method' : ['sum' ]
39+ }
40+
41+ file_ext = ['.nc4' ]
42+
43+ @property
44+ def start (self ):
45+ return dt .datetime (1980 ,1 ,1 )
46+
47+ def set_variables (self , variables : str | list [str ]) -> None :
48+ """
49+ Set the variables to download.
50+ """
51+ if isinstance (variables , str ):
52+ variables = [variables ]
53+ super ().set_variables (variables )
54+
55+ agg_options = self .agg_method
56+ if not isinstance (agg_options , list ):
57+ agg_options = [agg_options ]
58+
59+ if len (agg_options ) != len (variables ):
60+ msg = 'The number of aggregation methods must be the same as the number of variables'
61+ self .log .error (msg )
62+ raise ValueError (msg )
63+
64+ for agg , var in zip (agg_options , variables ):
65+ agg = self .check_agg (agg )
66+ self .variables [var ].update ({'agg_method' : agg })
67+
68+ def check_agg (self , agg ):
69+ if not isinstance (agg , list ): agg = [agg ]
70+ for a in agg :
71+ if a not in self .available_agg_methods :
72+ msg = f'Aggregation method { a } not available'
73+ self .log .error (msg )
74+ raise ValueError (msg )
75+ return agg
76+
77+ def _get_data_ts (self ,
78+ timestep : TimeStep ,
79+ space_bounds : BoundingBox ,
80+ tmp_path : str ) -> Generator [tuple [xr .DataArray , dict ], None , None ]:
81+ """
82+ Get data from the CMR.
83+ """
84+
85+ for var , varopts in self .variables .items ():
86+ self .product_id = varopts ['product_id' ]
87+
88+ # Check the data from the CMR
89+ url_list = self .cmr_search (timestep , space_bounds )
90+
91+ if not url_list :
92+ return None
93+
94+ # download the data (only one file)
95+ file = os .path .join (tmp_path , os .path .basename (url_list [0 ]))
96+ if not os .path .exists (file ):
97+ self .download (url_list , tmp_path )[0 ]
98+
99+ # open the file with rasterio
100+ all_data = xr .open_dataset (file , engine = 'h5netcdf' )
101+
102+ # ensure the latitude is descending
103+ all_data = all_data .sortby ('lat' , ascending = False )
104+
105+ # picke the single variable we need
106+ data = all_data [varopts ['varname' ]]
107+
108+ # set spatial reference
109+ data = data .rio .write_crs ("EPSG:4326" )
110+ data = data .rio .set_spatial_dims (x_dim = "lon" , y_dim = "lat" )
111+
112+ # crop to the bounding box
113+ cropped_data = crop_to_bb (data , space_bounds )
114+
115+ # set the missing value
116+ cropped_data = cropped_data .where (cropped_data < 9.9e14 , other = float ('nan' ))
117+ cropped_data .attrs = {'_FillValue' : float ('nan' )}
118+
119+ # set and convert the unit if needed
120+ if var == 'precipitation' :
121+ # from kg/m2/s to mm (1 kg/m2 = 1 mm of water; multiply by 3600 to get hourly total)
122+ cropped_data = cropped_data * 3600.0
123+ cropped_data .attrs ['units' ] = 'mm'
124+ elif var == 'temperature' :
125+ # from K to °C
126+ cropped_data = cropped_data - 273.15
127+ cropped_data .attrs ['units' ] = '°C'
128+
129+ # Aggregate if needed
130+ agg_methods = varopts ['agg_method' ]
131+ for agg_method in agg_methods :
132+ if agg_method == 'mean' :
133+ agg_data = cropped_data .mean (dim = 'time' )
134+ elif agg_method == 'max' :
135+ agg_data = cropped_data .max (dim = 'time' )
136+ elif agg_method == 'min' :
137+ agg_data = cropped_data .min (dim = 'time' )
138+ elif agg_method == 'sum' :
139+ agg_data = cropped_data .sum (dim = 'time' )
140+ else :
141+ msg = f'Aggregation method { agg_method } not recognized'
142+ self .log .error (msg )
143+ raise ValueError (msg )
144+
145+ yield agg_data , {'variable' : var , 'agg_method' : agg_method }
146+
0 commit comments