@@ -46,7 +46,8 @@ def era5_repository_update(update_settings: Era5UpdateSettings) -> RepositoryUpd
4646 starting_moment_of_update = datetime .now (UTC )
4747 cutoff_time = starting_moment_of_update + relativedelta (minutes = update_settings .maximum_runtime_in_minutes )
4848 logger .info (
49- f"Starting update of ERA5 data for { update_settings .era5_dataset_to_update_from } to: { update_settings .target_storage_location } "
49+ f"Starting update of ERA5 data for { update_settings .era5_dataset_to_update_from } "
50+ f"to: { update_settings .target_storage_location } "
5051 )
5152 logger .debug (f" - Attempting update for time range: { update_settings .repository_time_range } " )
5253 logger .debug (f" - Factors to process: { update_settings .factors_to_process } " )
@@ -78,8 +79,9 @@ def _era5_update_month_by_month(
7879 )
7980
8081 while update_month > target_update_month :
82+ logger .info (f" > Processing month: { update_month .year } -{ update_month .month } " )
8183 if datetime .now (UTC ) + relativedelta (minutes = average_time_per_month_in_minutes ) > cutoff_time :
82- print (
84+ logger . warning (
8385 "MAXIMUM RUNTIME REACHED: " ,
8486 cutoff_time ,
8587 datetime .now (UTC ) + relativedelta (minutes = average_time_per_month_in_minutes ),
@@ -97,7 +99,6 @@ def _era5_update_month_by_month(
9799 logger .warning ("More than 50% of the months failed to process. Stopping update." )
98100 break
99101
100- update_month = update_month - relativedelta (month = 1 )
101102 average_time_per_month_in_minutes = (
102103 (datetime .now (UTC ) - starting_moment_of_update ).total_seconds () / 60 / amount_of_months_processed
103104 )
@@ -116,7 +117,7 @@ def _era5_update_month(update_settings: Era5UpdateSettings, update_month: dateti
116117
117118 month_file_base = f"{ update_settings .filename_prefix } _{ update_month .year } _{ update_month .month :02d} "
118119 month_file = update_settings .target_storage_location / f"{ month_file_base } "
119- threshold_date = (datetime .now (UTC ) - relativedelta (days = 5 , months = 3 )).replace (day = 1 )
120+ threshold_date = (datetime .now (UTC ) - relativedelta (days = 5 )).replace (day = 1 )
120121
121122 if file_requires_update (month_file , update_month , threshold_date ):
122123 logger .debug (f" > File { month_file } requires update." )
@@ -132,19 +133,19 @@ def _era5_update_month(update_settings: Era5UpdateSettings, update_month: dateti
132133 month = [str (update_month .month )],
133134 day = [str (i ) for i in list (range (1 , 32 ))],
134135 time = [f"{ hour :02d} :00" for hour in range (24 )],
135- area = (7.22 , 50.75 , 3.2 , 53.7 ),
136+ area = (53.510403 , 3.314971 , 50.803721 , 7.092053 ),
136137 ),
137138 target_location = str (month_file_name ),
138139 )
139140
140- print ("Stored file at: " , month_file_name )
141+ logger . debug ("Stored file at: " , month_file_name )
141142
142143 _recombine_multiple_files (month_file_name )
143144
144145 _format_downloaded_file (month_file_name , update_settings .factor_dictionary )
145146
146147 month_file_name .rename (month_file .with_suffix (Era5FileSuffixes .FORMATTED ))
147- print ("Renamed to: " , month_file .with_suffix (Era5FileSuffixes .UNFORMATTED ))
148+ logger . debug ("Renamed to: " , month_file .with_suffix (Era5FileSuffixes .FORMATTED ))
148149 _finalize_formatted_file (month_file , update_month , threshold_date )
149150
150151 except Exception as e :
@@ -155,7 +156,9 @@ def _era5_update_month(update_settings: Era5UpdateSettings, update_month: dateti
155156
156157
157158def _get_update_month (update_settings : Era5UpdateSettings ) -> datetime :
158- NORMAL_FIRST_MOMENT_AVAILABLE_FOR_ERA5 = datetime .now (UTC ) - relativedelta (days = 5 )
159+ NORMAL_FIRST_MOMENT_AVAILABLE_FOR_ERA5 = (datetime .now (UTC ) - relativedelta (days = 5 )).replace (
160+ hour = 0 , minute = 0 , second = 0 , microsecond = 0
161+ )
159162 update_moment = update_settings .repository_time_range [1 ]
160163
161164 update_moment = (
@@ -184,11 +187,15 @@ def _verify_first_day_available_for_era5(update_moment: datetime, update_setting
184187 try :
185188 download_era5_data (
186189 dataset = update_settings .era5_dataset_to_update_from ,
187- product_type = update_settings .era5_product_type ,
188- weather_factors = ["stl1" ], # A factor that exists in all supported ERA5 datasets
189- years = [update_moment .year ],
190- months = [update_moment .month ],
191- days = [update_moment .day ],
190+ cds_request = CDSRequest (
191+ product_type = [update_settings .era5_product_type ],
192+ variables = ["stl1" ], # A factor that exists in all supported ERA5 datasets
193+ year = [str (update_moment .year )],
194+ month = [str (update_moment .month )],
195+ day = [str (update_moment .day )],
196+ time = [f"{ hour :02d} :00" for hour in range (2 )],
197+ area = (53.510403 , 3.314971 , 50.803721 , 7.092053 ), # The Netherlands area
198+ ),
192199 target_location = tempfile .NamedTemporaryFile ().name ,
193200 )
194201 break
@@ -198,7 +205,8 @@ def _verify_first_day_available_for_era5(update_moment: datetime, update_setting
198205
199206 if update_moment < update_settings .repository_time_range [1 ] - relativedelta (days = 45 ):
200207 raise ValueError (
201- "The first day available for ERA5 data could not be found within 40 days of the target date. Aborting update."
208+ "The first day available for ERA5 data could not be found within 40 days of the target date. "
209+ "Aborting update."
202210 )
203211
204212 return update_moment
@@ -221,21 +229,20 @@ def _finalize_formatted_file(file_path: Path, current_moment: date, verification
221229 except Exception as e :
222230 logger .error (f" > Failed to remove temporary file { file_path .with_suffix (file_suffix )} : { e } " )
223231
224- # Rename the file to its proper name:
225- if current_moment == verification_date .replace (day = 1 ):
226- # Current month means an incomplete file
227- file_path .with_suffix (Era5FileSuffixes .FORMATTED ).rename (file_path .with_suffix (Era5FileSuffixes .INCOMPLETE ))
228- logger .debug (
229- f"Month [{ current_moment } ] was renamed to: { file_path .with_suffix (Era5FileSuffixes .INCOMPLETE )} "
230- )
231- elif permanent_month < current_moment < incomplete_month :
232- # Non-permanent file
233- file_path .with_suffix (Era5FileSuffixes .FORMATTED ).rename (file_path .with_suffix (Era5FileSuffixes .TEMP ))
234- logger .debug (f"Month [{ current_moment } ] was renamed to: { file_path .with_suffix (Era5FileSuffixes .TEMP )} " )
235- else :
236- # Permanent file
237- file_path .with_suffix (Era5FileSuffixes .FORMATTED ).rename (file_path .with_suffix (".nc" ))
238- logger .debug (f'Month [{ current_moment } ] was renamed to: { file_path .with_suffix (".nc" )} ' )
232+ # Rename the file to its proper name:
233+ print ("RENAMING FILE" , current_moment , verification_date , permanent_month , incomplete_month )
234+ if current_moment == verification_date .replace (day = 1 ):
235+ # Current month means an incomplete file
236+ file_path .with_suffix (Era5FileSuffixes .FORMATTED ).rename (file_path .with_suffix (Era5FileSuffixes .INCOMPLETE ))
237+ logger .debug (f"Month [{ current_moment } ] was renamed to: { file_path .with_suffix (Era5FileSuffixes .INCOMPLETE )} " )
238+ elif permanent_month < current_moment < incomplete_month :
239+ # Non-permanent file
240+ file_path .with_suffix (Era5FileSuffixes .FORMATTED ).rename (file_path .with_suffix (Era5FileSuffixes .TEMP ))
241+ logger .debug (f"Month [{ current_moment } ] was renamed to: { file_path .with_suffix (Era5FileSuffixes .TEMP )} " )
242+ else :
243+ # Permanent file
244+ file_path .with_suffix (Era5FileSuffixes .FORMATTED ).rename (file_path .with_suffix (".nc" ))
245+ logger .debug (f'Month [{ current_moment } ] was renamed to: { file_path .with_suffix (".nc" )} ' )
239246
240247
241248def file_requires_update (file_path : Path , current_month : date , verification_date : date ) -> bool :
@@ -284,8 +291,8 @@ def _format_downloaded_file(unformatted_file: Path, allowed_factors: dict) -> No
284291 # We remove the expver index used to denominate temporary data (5) and regular data (1) and add a field for it
285292 # NOTE: We removed the drop_sel version as it didn't quite have the same result as drop yet. Reverting until
286293 # the proper use has been validated...
287- ds_unformatted_expver5 = ds_unformatted .sel (expver = 5 ).drop ("expver" ).dropna ("time " , how = "all" )
288- ds_unformatted_expver1 = ds_unformatted .sel (expver = 1 ).drop ("expver" ).dropna ("time " , how = "all" )
294+ ds_unformatted_expver5 = ds_unformatted .sel (expver = 5 ).drop ("expver" ).dropna ("valid_time " , how = "all" )
295+ ds_unformatted_expver1 = ds_unformatted .sel (expver = 1 ).drop ("expver" ).dropna ("valid_time " , how = "all" )
289296
290297 # Recombine the data
291298 ds_unformatted = ds_unformatted_expver1 .merge (ds_unformatted_expver5 )
@@ -299,8 +306,8 @@ def _format_downloaded_file(unformatted_file: Path, allowed_factors: dict) -> No
299306 ds_unformatted = ds_unformatted .rename_vars ({factor : allowed_factors [factor ]})
300307
301308 # Rename and encode data where needed:
302- ds_unformatted .time .encoding ["units" ] = "hours since 2016-01-01"
303- ds_unformatted = ds_unformatted .rename (name_dict = {"latitude" : "lat" , "longitude" : "lon" })
309+ ds_unformatted .valid_time .encoding ["units" ] = "hours since 2016-01-01"
310+ ds_unformatted = ds_unformatted .rename (name_dict = {"latitude" : "lat" , "longitude" : "lon" , "valid_time" : "time" })
304311
305312 # Store the data
306313 ds_unformatted .to_netcdf (path = unformatted_file , format = "NETCDF4" , engine = "netcdf4" )
@@ -336,20 +343,15 @@ def _recombine_multiple_files(unformatted_file: Path) -> None:
336343 with zipfile .ZipFile (unformatted_file , "r" ) as zip_ref :
337344 zip_ref .extractall (temp_dir )
338345
339- combined_dataset = xr .Dataset ()
340- for file in Path (temp_dir ).glob ("*.nc" ):
341- # Now use xarray to open each NetCDF file and merge them
342- new_file_dataset = xr .open_dataset (file )
343- print ("PROCESSING FILE: " , file )
346+ # Load the data
344347
345- if not combined_dataset .time .size or combined_dataset .time .size == 0 :
346- print ("SETTING FILE: " , file )
347- combined_dataset = new_file_dataset .copy (deep = True )
348- else :
349- print ("MERGING FILE: " , file )
350- combined_dataset = xr .merge ([combined_dataset , new_file_dataset ])
348+ data_stream_land_accum = xr .open_dataset (Path (temp_dir ).joinpath ("data_stream-oper_stepType-accum.nc" ))
349+ data_stream_land_instant = xr .open_dataset (Path (temp_dir ).joinpath ("data_stream-oper_stepType-instant.nc" ))
350+ data_stream_wave_instant = xr .open_dataset (Path (temp_dir ).joinpath ("data_stream-wave_stepType-instant.nc" ))
351351
352- combined_dataset .to_netcdf (unformatted_file , format = "NETCDF4" , engine = "netcdf4" )
352+ # Merge the data
353+ combined_data = xr .merge ([data_stream_land_accum , data_stream_land_instant , data_stream_wave_instant ])
354+ combined_data .to_netcdf (unformatted_file , format = "NETCDF4" , engine = "netcdf4" )
353355
354356
355357def download_era5_data (
@@ -358,8 +360,6 @@ def download_era5_data(
358360 target_location : str ,
359361) -> None :
360362 """A function to download ERA5 data."""
361- print (cds_request .request_parameters )
362-
363363 try :
364364 CDS_CLIENT .retrieve (
365365 dataset ,
0 commit comments