Skip to content

Commit bb31de7

Browse files
mer-a-omranst
andauthored
breaking down PR/660 into smaller pieces -- part2, empty obs (#669)
* fetch empty obs and clean save obs diag * Update src/swell/tasks/get_observations.py Co-authored-by: Michael Anstett <michael.anstett@nasa.gov> * better handling of obs with some empty files * address test failure * write missing obs in jedi.yaml --------- Co-authored-by: Michael Anstett <michael.anstett@nasa.gov>
1 parent 52bc305 commit bb31de7

File tree

2 files changed

+121
-118
lines changed

2 files changed

+121
-118
lines changed

src/swell/tasks/get_observations.py

Lines changed: 116 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88
# --------------------------------------------------------------------------------------------------
99

1010
import isodate
11+
import netCDF4 as nc
1112
import numpy as np
1213
import os
1314
import r2d2
14-
import netCDF4 as nc
15+
import shutil
1516
from typing import Union
1617

1718
from datetime import timedelta, datetime as dt
@@ -180,8 +181,22 @@ def execute(self) -> None:
180181
try:
181182
r2d2.fetch(**fetch_criteria)
182183
self.logger.info(f"Successfully fetched {target_file}")
183-
except Exception as e:
184-
self.logger.info(f"Failed to fetch {target_file}: {str(e)}")
184+
except Exception:
185+
self.logger.info(
186+
f"Failed to fetch {target_file}. "
187+
"Fetch empty observation instead."
188+
)
189+
190+
# fetch empty obs
191+
r2d2.fetch(
192+
item='observation',
193+
provider='empty_provider',
194+
observation_type='empty_type',
195+
file_extension='nc4',
196+
window_start='19700101T030000Z',
197+
window_length='PT6H',
198+
target_file=target_file,
199+
)
185200

186201
# Check how many of the combine_input_files exist in the cycle directory.
187202
# If all of them are missing proceed without creating an observation input
@@ -441,85 +456,106 @@ def read_and_combine(self, input_filenames: list, output_filename: str) -> None:
441456
existing_files = [f for f in input_filenames if os.path.exists(f)]
442457
input_filenames = existing_files
443458

444-
# Loop through the input files and get the total dimension size for each dimension
445-
# Location requires special handling to get the cumulative sum of the dimension size
446-
# ---------------------------------------------------------------------------------
447-
out_dim_size = {'Location': 0}
448-
for input_filename in input_filenames:
449-
with nc.Dataset(input_filename, 'r') as ds:
450-
for dim_name, dim in ds.dimensions.items():
451-
if dim_name == 'Location':
452-
out_dim_size[dim_name] += dim.size
459+
# Remove empty files from input_filenames
460+
# -------------------------------------------------------------
461+
valid_files = []
462+
463+
for fname in input_filenames:
464+
try:
465+
with nc.Dataset(fname, 'r') as ds:
466+
if 'Location' in ds.dimensions and ds.dimensions['Location'].size > 0:
467+
valid_files.append(fname)
453468
else:
454-
out_dim_size[dim_name] = dim.size
455-
456-
with nc.Dataset(output_filename, 'w') as out_ds:
457-
# Open the input NetCDF files for reading
458-
# ---------------------------------------
459-
self.logger.info(f"Combining files {input_filenames} ")
460-
461-
# Create an output file template based on the first input file
462-
# ------------------------------------------------------------
463-
with nc.Dataset(input_filenames[0], 'r') as ds:
464-
# Access groups and create dimensions
465-
# -----------------------------------
466-
input_groups = ds.groups.keys()
467-
468-
for dim_name, dim in ds.dimensions.items():
469-
out_ds.createDimension(dim_name, out_dim_size[dim_name])
470-
471-
# Loop through groups and process variables
472-
# -----------------------------------------
473-
for group_name in input_groups:
474-
group = ds[group_name]
475-
476-
# Create the groups in output file
477-
# --------------------------------
478-
out_group = out_ds.createGroup(group_name)
479-
480-
# Access variables within a group
481-
# -------------------------------
482-
variables_in_group = group.variables.keys()
483-
484-
# Loop over variables from input files, combine, and write to the new file
485-
# ------------------------------------------------------------------------
486-
for var_name in variables_in_group:
487-
list_data = []
488-
489-
# Get the dimensions of the variable
490-
# ----------------------------------
491-
var_dims = group[var_name].dimensions
492-
493-
# Loop over all the files and combine the variable data into a list
494-
# Channel dimensions remain the same, so we can break the loop
495-
# ----------------------------------------------------------------
496-
for input_file in input_filenames:
497-
list_data.append(self.get_data(input_file, group_name, var_name))
498-
# Only break if the first dimension is Channel
499-
if var_dims[0] == 'Channel':
500-
break
501-
502-
# Concatenate the masked arrays along the first dimension
503-
# --------------------------------------------------------
504-
variable_data = np.ma.concatenate(list_data, axis=0)
505-
506-
# Fill value needs to be assigned while creating variables
507-
# --------------------------------------------------------
508-
subset_var = out_group.createVariable(
509-
var_name,
510-
variable_data.dtype,
511-
var_dims,
512-
fill_value=group[var_name].getncattr('_FillValue')
513-
)
514-
for attr_name in group[var_name].ncattrs():
515-
if attr_name == '_FillValue':
516-
continue
517-
subset_var.setncattr(
518-
attr_name, group[var_name].getncattr(attr_name)
519-
)
469+
empty_template = fname
470+
except OSError:
471+
continue
520472

521-
# Write subset data to the new file
473+
input_filenames = valid_files
474+
475+
if input_filenames:
476+
# Loop through the input files and get the total dimension size for each dimension
477+
# Location requires special handling to get the cumulative sum of the dimension size
478+
# ---------------------------------------------------------------------------------
479+
out_dim_size = {'Location': 0}
480+
for input_filename in input_filenames:
481+
with nc.Dataset(input_filename, 'r') as ds:
482+
for dim_name, dim in ds.dimensions.items():
483+
if dim_name == 'Location':
484+
out_dim_size[dim_name] += dim.size
485+
else:
486+
out_dim_size[dim_name] = dim.size
487+
488+
with nc.Dataset(output_filename, 'w') as out_ds:
489+
# Open the input NetCDF files for reading
490+
# ---------------------------------------
491+
self.logger.info(f"Combining files {input_filenames} ")
492+
493+
# Create an output file template based on the first input file
494+
# ------------------------------------------------------------
495+
with nc.Dataset(input_filenames[0], 'r') as ds:
496+
# Access groups and create dimensions
497+
# -----------------------------------
498+
input_groups = ds.groups.keys()
499+
500+
for dim_name, dim in ds.dimensions.items():
501+
out_ds.createDimension(dim_name, out_dim_size[dim_name])
502+
503+
# Loop through groups and process variables
504+
# -----------------------------------------
505+
for group_name in input_groups:
506+
group = ds[group_name]
507+
508+
# Create the groups in output file
522509
# --------------------------------
523-
subset_var[:] = variable_data
510+
out_group = out_ds.createGroup(group_name)
511+
512+
# Access variables within a group
513+
# -------------------------------
514+
variables_in_group = group.variables.keys()
515+
516+
# Loop over variables from input files, combine, and write to the new file
517+
# ------------------------------------------------------------------------
518+
for var_name in variables_in_group:
519+
list_data = []
520+
521+
# Get the dimensions of the variable
522+
# ----------------------------------
523+
var_dims = group[var_name].dimensions
524+
525+
# Loop over all the files and combine the variable data into a list
526+
# Channel dimensions remain the same, so we can break the loop
527+
# ----------------------------------------------------------------
528+
for input_file in input_filenames:
529+
list_data.append(self.get_data(input_file, group_name, var_name))
530+
# Only break if the first dimension is Channel
531+
if var_dims[0] == 'Channel':
532+
break
533+
534+
# Concatenate the masked arrays along the first dimension
535+
# --------------------------------------------------------
536+
variable_data = np.ma.concatenate(list_data, axis=0)
537+
538+
# Fill value needs to be assigned while creating variables
539+
# --------------------------------------------------------
540+
subset_var = out_group.createVariable(
541+
var_name,
542+
variable_data.dtype,
543+
var_dims,
544+
fill_value=group[var_name].getncattr('_FillValue')
545+
)
546+
for attr_name in group[var_name].ncattrs():
547+
if attr_name == '_FillValue':
548+
continue
549+
subset_var.setncattr(
550+
attr_name, group[var_name].getncattr(attr_name)
551+
)
552+
553+
# Write subset data to the new file
554+
# --------------------------------
555+
subset_var[:] = variable_data
556+
557+
else:
524558

559+
# If all the files are empty copy of them as the output file
560+
shutil.copyfile(empty_template, output_filename)
525561
# ----------------------------------------------------------------------------------------------

src/swell/tasks/save_obs_diags.py

Lines changed: 5 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
# --------------------------------------------------------------------------------------------------
99

10-
import os
1110
import r2d2
1211
from swell.tasks.base.task_base import taskBase
1312
from swell.utilities.r2d2 import create_r2d2_config
@@ -61,51 +60,19 @@ def execute(self) -> None:
6160
self.logger.info(f'Checking input observation file: {input_obs_file}')
6261

6362
use_obs = check_obs(self.jedi_rendering.observing_system_records_path, observation,
64-
observation_dict, self.cycle_time_dto())
63+
observation_dict, self.cycle_time_dto(), input_and_output=True)
6564

66-
self.logger.info(f'Checking observation {observation}: use_obs = {use_obs}')
65+
# use_obs is false when obs input file (or feedback file) doesn't exist or is empty.
66+
# The case when the feedback file is listed in yaml but doesn't exit never happens,
67+
# as JEDI execution fails when input obs file is missing.
6768

6869
if not use_obs:
69-
self.logger.info(f'Input observation file analysis for {observation}:')
70-
self.logger.info(f' Expected file: {input_obs_file}')
71-
# Check if file exists and is readable
72-
# ---------------------------------------
73-
try:
74-
import netCDF4 as nc
75-
dataset = nc.Dataset(input_obs_file, 'r')
76-
dims = {dim_name: dim.size for dim_name, dim in dataset.dimensions.items()}
77-
self.logger.info(f' File exists but dimensions: {dims}')
78-
dataset.close()
79-
except Exception as e:
80-
self.logger.info(f' File exists but error reading: {str(e)}')
81-
82-
self.logger.info(f' Skipping {observation}')
70+
self.logger.info(f'Empty feedback (obs diag) {input_obs_file} file. Skip saving.')
8371
continue
8472

85-
# Store diagnostic/feedback files produced by JEDI executables
86-
# (e.g., variational, hofx, localensembleda).
87-
# --------------------------------------------------------------
88-
8973
name = observation_dict['obs space']['name']
9074
obs_path_file = observation_dict['obs space']['obsdataout']['engine']['obsfile']
9175

92-
self.logger.info(f'Looking for diagnostic output file: {obs_path_file}')
93-
94-
# Check for need to add 0000 to the file
95-
if not os.path.exists(obs_path_file):
96-
obs_path_file_name, obs_path_file_ext = os.path.splitext(obs_path_file)
97-
obs_path_file_0000 = obs_path_file_name + '_0000' + obs_path_file_ext
98-
self.logger.info(f'Primary file not found, checking: {obs_path_file_0000}')
99-
100-
if not os.path.exists(obs_path_file_0000):
101-
self.logger.info(f'Diagnostic output files not found for {observation}:')
102-
self.logger.info(f' Expected: {obs_path_file}')
103-
self.logger.info(f' Expected: {obs_path_file_0000}')
104-
self.logger.info(f' RunJediVariationalExecutable did not run successfully')
105-
self.logger.info(f' Skipping storage of {observation} diagnostic file')
106-
continue
107-
obs_path_file = obs_path_file_0000
108-
10976
self.logger.info(f'Found diagnostic output file: {obs_path_file}')
11077

11178
# Store to R2D2

0 commit comments

Comments
 (0)