@@ -41,7 +41,7 @@ class Era5UpdateSettings(BaseModel):
4141 maximum_runtime_in_minutes : int = 2 * 60 # 2 hours
4242
4343
44- def era5_repository_update (update_settings : Era5UpdateSettings ) -> RepositoryUpdateResult :
44+ def era5_repository_update (update_settings : Era5UpdateSettings , test_mode : bool ) -> RepositoryUpdateResult :
4545 """A function to update a variant of ERA5 data into the repository."""
4646 starting_moment_of_update = datetime .now (UTC )
4747 cutoff_time = starting_moment_of_update + relativedelta (minutes = update_settings .maximum_runtime_in_minutes )
@@ -54,7 +54,7 @@ def era5_repository_update(update_settings: Era5UpdateSettings) -> RepositoryUpd
5454 logger .info (f" - Maximum runtime: { update_settings .maximum_runtime_in_minutes } minutes ({ cutoff_time } )" )
5555
5656 try :
57- _era5_update_month_by_month (update_settings , starting_moment_of_update , cutoff_time )
57+ _era5_update_month_by_month (update_settings , starting_moment_of_update , cutoff_time , test_mode )
5858 except Exception as e :
5959 logger .error (f"Failed to update ERA5 data. Reason: { e } " )
6060 return RepositoryUpdateResult .failure
@@ -67,7 +67,7 @@ def era5_repository_update(update_settings: Era5UpdateSettings) -> RepositoryUpd
6767
6868
6969def _era5_update_month_by_month (
70- update_settings : Era5UpdateSettings , starting_moment_of_update : datetime , cutoff_time : datetime
70+ update_settings : Era5UpdateSettings , starting_moment_of_update : datetime , cutoff_time : datetime , test_mode : bool
7171):
7272 """A function to update a variant of ERA5 data into the repository."""
7373 amount_of_months_processed = amount_of_months_not_processable = 0
@@ -90,7 +90,7 @@ def _era5_update_month_by_month(
9090 logger .warning ("Maximum runtime reached. Stopping update." )
9191 break
9292
93- update_result = _era5_update_month (update_settings , update_month )
93+ update_result = _era5_update_month (update_settings , update_month , test_mode )
9494 if update_result == RepositoryUpdateResult .failure :
9595 amount_of_months_not_processable += 1
9696 amount_of_months_processed += 1
@@ -111,7 +111,9 @@ def _era5_update_month_by_month(
111111 logger .info (f"Average time per month: { average_time_per_month_in_minutes } minutes" )
112112
113113
114- def _era5_update_month (update_settings : Era5UpdateSettings , update_month : datetime ) -> RepositoryUpdateResult :
114+ def _era5_update_month (
115+ update_settings : Era5UpdateSettings , update_month : datetime , test_mode : bool
116+ ) -> RepositoryUpdateResult :
115117 """A function to update a variant of ERA5 data into the repository."""
116118 logger .debug (f" > Processing month: { update_month .year } -{ update_month .month } " )
117119
@@ -123,6 +125,9 @@ def _era5_update_month(update_settings: Era5UpdateSettings, update_month: dateti
123125 logger .debug (f" > File { month_file } requires update." )
124126 month_file_name = month_file .with_suffix (Era5FileSuffixes .UNFORMATTED )
125127
128+ # Only the first day of each month in test mode, otherwise all days:
129+ day = [str (i ) for i in list (range (1 , 32 ))] if not test_mode else ["1" ]
130+
126131 try :
127132 download_era5_data (
128133 update_settings .era5_dataset_to_update_from ,
@@ -131,9 +136,8 @@ def _era5_update_month(update_settings: Era5UpdateSettings, update_month: dateti
131136 variables = update_settings .factors_to_process ,
132137 year = [str (update_month .year )],
133138 month = [str (update_month .month )],
134- day = [ str ( i ) for i in list ( range ( 1 , 32 ))] ,
139+ day = day ,
135140 time = [f"{ hour :02d} :00" for hour in range (24 )],
136- area = (53.510403 , 3.314971 , 50.803721 , 7.092053 ),
137141 ),
138142 target_location = str (month_file_name ),
139143 )
@@ -194,7 +198,6 @@ def _verify_first_day_available_for_era5(update_moment: datetime, update_setting
194198 month = [str (update_moment .month )],
195199 day = [str (update_moment .day )],
196200 time = [f"{ hour :02d} :00" for hour in range (2 )],
197- area = (53.510403 , 3.314971 , 50.803721 , 7.092053 ), # The Netherlands area
198201 ),
199202 target_location = tempfile .NamedTemporaryFile ().name ,
200203 )
@@ -230,7 +233,6 @@ def _finalize_formatted_file(file_path: Path, current_moment: date, verification
230233 logger .error (f" > Failed to remove temporary file { file_path .with_suffix (file_suffix )} : { e } " )
231234
232235 # Rename the file to its proper name:
233- print ("RENAMING FILE" , current_moment , verification_date , permanent_month , incomplete_month )
234236 if current_moment == verification_date .replace (day = 1 ):
235237 # Current month means an incomplete file
236238 file_path .with_suffix (Era5FileSuffixes .FORMATTED ).rename (file_path .with_suffix (Era5FileSuffixes .INCOMPLETE ))
@@ -271,7 +273,8 @@ def file_requires_update(file_path: Path, current_month: date, verification_date
271273 return True # An update should both clean the UNFORMATTED file and generate a proper one
272274
273275 if not file_path .with_suffix (".nc" ).exists () or file_path .with_suffix (Era5FileSuffixes .INCOMPLETE ).exists ():
274- logger .debug (" > No file exists, or it is still incomplete: UPDATE REQUIRED " )
276+ logger .debug (" > No file exists, or it is still incomplete: UPDATE REQUIRED" )
277+ print ("File path: " , file_path )
275278 return True # No file matching the mask or incomplete files always mean the update is required!
276279
277280 files_in_folder = glob .glob (f"{ file_path } *.nc" )
@@ -343,15 +346,34 @@ def _recombine_multiple_files(unformatted_file: Path) -> None:
343346 with zipfile .ZipFile (unformatted_file , "r" ) as zip_ref :
344347 zip_ref .extractall (temp_dir )
345348
346- # Load the data
347-
348- data_stream_land_accum = xr .open_dataset (Path (temp_dir ).joinpath ("data_stream-oper_stepType-accum.nc" ))
349- data_stream_land_instant = xr .open_dataset (Path (temp_dir ).joinpath ("data_stream-oper_stepType-instant.nc" ))
350- data_stream_wave_instant = xr .open_dataset (Path (temp_dir ).joinpath ("data_stream-wave_stepType-instant.nc" ))
349+ concatenated_dataset = xr .Dataset ()
350+ files_to_load_in_order = [
351+ "data_stream-oper_stepType-instant" ,
352+ "data_stream-oper_stepType-accum" ,
353+ # TODO: Add the following file back in when we can properly handle it
354+ # "data_stream-wave_stepType-instant", # Something about this data doesn't mesh well anymore with the rest...
355+ ]
356+
357+ # TODO: Load, convert to dataframe, merge, convert back to xarray
358+ concatenated_dataset = xr .Dataset ()
359+ for filename in files_to_load_in_order :
360+ file_path = Path (temp_dir ).joinpath (f"{ filename } .nc" )
361+ if not file_path .exists ():
362+ logger .error (f" > Required file { filename } .nc does not exist. Aborting recombination." )
363+ raise FileNotFoundError (f" > Required file { filename } .nc does not exist. Aborting recombination." )
364+
365+ dataset = xr .open_dataset (file_path )
366+ dataset = dataset .drop ("expver" , errors = "raise" )
367+
368+ if not concatenated_dataset .data_vars :
369+ concatenated_dataset = dataset .copy (deep = True )
370+ else :
371+ concatenated_dataset = xr .merge (
372+ [concatenated_dataset , dataset ], join = "outer" , compat = "no_conflicts" , combine_attrs = "override"
373+ )
351374
352- # Merge the data
353- combined_data = xr .merge ([data_stream_land_accum , data_stream_land_instant , data_stream_wave_instant ])
354- combined_data .to_netcdf (unformatted_file , format = "NETCDF4" , engine = "netcdf4" )
375+ concatenated_dataset .to_netcdf (unformatted_file , format = "NETCDF4" , engine = "netcdf4" )
376+ # raise ValueError("This is not working yet")
355377
356378
357379def download_era5_data (
0 commit comments