1+ import os
2+ from typing import Generator , Optional , Sequence
3+ import xarray as xr
4+ import datetime as dt
5+ import requests
6+ import tempfile
7+
8+ from ...base_downloaders import URLDownloader
9+
10+ from d3tools import timestepping as ts
11+ from d3tools .timestepping .timestep import TimeStep
12+ from d3tools .timestepping .fixed_num_timestep import FixedNTimeStep
13+ from d3tools .spatial import BoundingBox , crop_to_bb
14+
15+ class JRADownloader (URLDownloader ):
16+ source = "JRA"
17+ name = "JRA_downloader"
18+
19+ single_temp_folder = True
20+ separate_vars = True
21+
22+ default_options = {
23+ "resolution" : 0.375 ,
24+ "freq" : 'd' ,
25+ 'variables' : 'precipitation' ,
26+ 'agg_method' : None
27+ }
28+
29+ grid_codes = {
30+ 0.375 : 'gauss' ,
31+ 1.25 : 'll125'
32+ }
33+
34+ home = "https://thredds.rda.ucar.edu/thredds/fileServer/files/g/d640000/"
35+
36+
37+ available_agg_methods = ['mean' , 'max' , 'min' , 'sum' ]
38+
39+ available_products : dict = {
40+ "jra-3q" : {
41+ "url_blank" : home + "{dataset}/{month.start:%Y%m}/jra3q.{dataset}.{var_code}.{var_name}-{grid_code}.{month.start:%Y%m%d}00_{month.end:%Y%m%d}23.nc" ,
42+ "data_list" : "https://thredds.rda.ucar.edu/thredds/catalog/files/g/d640000/{dataset}/catalog.html"
43+ }
44+ }
45+
46+ available_variables : dict = {
47+ "jra-3q" : {
48+ "precipitation" : {
49+ "dataset" : 'fcst_phy2m' ,
50+ "var_code" : '0_1_52' ,
51+ "var_name" : "tprate1have-sfc-fc" , # this is a rate in mm/s, will need to multiply by 3600 to get mm/h and then sum to get total precipitation
52+ "agg_method" : 'sum'
53+ }
54+ }
55+ }
56+
57+ def __init__ (self , product : str ) -> None :
58+ self .set_product (product )
59+ super ().__init__ (self .url_blank , protocol = 'http' )
60+
61+ def set_variables (self , variables : str | list [str ]) -> None :
62+ """
63+ Set the variables to download.
64+ """
65+ if isinstance (variables , str ):
66+ variables = [variables ]
67+ super ().set_variables (variables )
68+
69+ agg_options = self .agg_method
70+ if not isinstance (agg_options , list ):
71+ agg_options = [agg_options ]
72+
73+ if len (agg_options ) != len (variables ):
74+ msg = 'The number of aggregation methods must be the same as the number of variables'
75+ self .log .error (msg )
76+ raise ValueError (msg )
77+
78+ for agg , var in zip (agg_options , variables ):
79+ agg = self .check_agg (agg )
80+ self .variables [var ].update ({'agg_method' : agg })
81+
82+ def check_agg (self , agg ):
83+ if not isinstance (agg , list ): agg = [agg ]
84+ for a in agg :
85+ if a not in self .available_agg_methods :
86+ msg = f'Aggregation method { a } not available'
87+ self .log .error (msg )
88+ raise ValueError (msg )
89+ return agg
90+
91+ def get_last_published_ts (self , ** kwargs ) -> ts .TimeRange :
92+
93+ """
94+ Get the last published date for the dataset.
95+ """
96+
97+ last_date = self .get_last_published_date (** kwargs )
98+
99+ # get the timestep of the last date
100+ freq = self .freq if hasattr (self , 'freq' ) else 'd'
101+ last_date_timestep = ts .TimeStep .from_unit (freq ).from_date (last_date )
102+
103+ # if the last date is the last day of its timestep, return the last timestep
104+ if last_date == last_date_timestep .end :
105+ return last_date_timestep
106+ # else, return the timestep before the one of the last date
107+ else :
108+ return last_date_timestep - 1
109+
110+ def get_last_published_date (self , ** kwargs ) -> dt .datetime :
111+
112+ """
113+ Get the last published date for the dataset.
114+ """
115+ import re
116+ last_month = None
117+ for variable in self .variables :
118+ if 'dataset' not in self .variables [variable ]:
119+ raise ValueError (f'Dataset not defined for variable { variable } ' )
120+
121+ url = self .data_list .format (dataset = self .variables [variable ]['dataset' ])
122+ with requests .get (url ) as response :
123+ # this is 100% not the best way to do this, but it works for now
124+ matches = re .findall (r'href="(\d{4})(\d{2})/catalog.html"' , response .text )
125+
126+ this_last_month = ts .Month (int (matches [- 1 ][0 ]), int (matches [- 1 ][1 ]))
127+ last_month = this_last_month if last_month is None else min (last_month , this_last_month )
128+
129+ return last_month .end
130+
131+ def _get_data_ts (self ,
132+ timestep : TimeStep ,
133+ space_bounds : BoundingBox ,
134+ tmp_path : str ) -> Generator [tuple [xr .DataArray , dict ], None , None ]:
135+
136+ this_var = self .variables [self .variable ]
137+ this_month = ts .Month (timestep .year , timestep .month )
138+ tmp_file_nc = f'temp_{ self .product } { this_month .year } { this_month .month } .nc'
139+
140+ # check if the file is not already downloaded in the tmp_path
141+ tmp_destination = os .path .join (tmp_path , tmp_file_nc )
142+ if not os .path .exists (tmp_destination ):
143+ tags = {
144+ 'dataset' : this_var ['dataset' ],
145+ 'var_code' : this_var ['var_code' ],
146+ 'var_name' : this_var ['var_name' ],
147+ 'grid_code' : self .grid_codes [self .resolution ],
148+ 'month' : this_month
149+ }
150+ # download the file
151+ self .download (tmp_destination , min_size = 2000 , missing_action = 'warning' , ** tags )
152+
153+ # once we download a month, we can delete the previous month
154+ prev_month = this_month - 1
155+ prev_file = f'temp_{ self .product } { prev_month .year } { prev_month .month } .nc'
156+ prev_file = os .path .join (tmp_path , prev_file )
157+ if os .path .exists (prev_file ):
158+ os .remove (prev_file )
159+
160+ # open the file
161+ raw_data = xr .open_dataset (tmp_destination , engine = 'netcdf4' )
162+ vardata = raw_data [f"{ this_var ['var_name' ]} -{ self .grid_codes [self .resolution ]} " ]
163+
164+ # only select the relevant time range
165+ inrange = (vardata .time .dt .date >= timestep .start .date ()) & (vardata .time .dt .date <= timestep .end .date ())
166+ vardata = vardata .sel (time = inrange )
167+
168+ # crop the data
169+ cropped = crop_to_bb (vardata , space_bounds )
170+
171+ # if this is precipitation data, we need to transform it to mm/h
172+ if this_var ['var_name' ] == 'tprate1have-sfc-fc' :
173+ cropped *= 3600
174+
175+ # aggregate the data
176+ for agg_method in this_var ['agg_method' ]:
177+ if agg_method == 'sum' :
178+ aggregated = cropped .sum (dim = 'time' )
179+ elif agg_method == 'mean' :
180+ aggregated = cropped .mean (dim = 'time' )
181+ elif agg_method == 'max' :
182+ aggregated = cropped .max (dim = 'time' )
183+ elif agg_method == 'min' :
184+ aggregated = cropped .min (dim = 'time' )
185+ else :
186+ raise ValueError (f'Aggregation method { self .agg_method } not recognized' )
187+
188+ yield aggregated , {'agg_method' : agg_method , 'variable' : self .variable , 'resolution' : str (self .resolution ).replace ('.' , '' )}
0 commit comments