Skip to content
77 changes: 74 additions & 3 deletions src/nwp_consumer/internal/entities/tensorstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from collections.abc import Mapping, MutableMapping
from typing import Any

import numpy as np
import pandas as pd
import xarray as xr
import zarr
Expand Down Expand Up @@ -321,15 +322,84 @@ def write_to_region(
self.size_kb += nbytes // 1024
return Success(nbytes)

def validate_store(self) -> ResultE[bool]:
@staticmethod
def _has_nans(store_da: xr.DataArray) -> ResultE[bool]:
"""Check the store for NaN values."""
nans_in_image_threshold: float = 0.1
images_failing_nan_check_threshold: float = 0.0
def _calc_null_percentage(data: np.typing.NDArray[np.float32]) -> float:
nulls = np.isnan(data)
if 0 in data.shape:
log.warning(
"Validation region has 0 area, check input slices correspond"
"to coordinate values in the dataset",
)
return 1.0
return float(nulls.sum() / np.prod(nulls.shape))

if "latitude" in store_da.dims:
spatial_dims: list[str] = ["latitude", "longitude"]
elif "x_osgb" in store_da.dims:
spatial_dims = ["x_osgb", "y_osgb"]
elif "x_laea" in store_da.dims:
spatial_dims = ["x_laea", "y_laea"]
else:
return Failure(ValueError(
"Store does not have expected spatial dimensions. "
"Expected: ['latitude', 'longitude'], ['x_osgb', 'y_osgb'], ['x_laea', 'y_laea']. "
f"Got: {store_da.dims}.",
))

result = xr.apply_ufunc(
_calc_null_percentage,
store_da,
input_core_dims=[spatial_dims],
vectorize=True,
dask="parallelized",
)

failed_image_count: int = (result > nans_in_image_threshold).sum().values
total_image_count: int = result.size
failed_image_percentage: float = failed_image_count / total_image_count
if failed_image_percentage > images_failing_nan_check_threshold:
log.warning(
f"Dataset failed validation. "
f"{failed_image_percentage:.2%} of images have greater than "
f"{int(nans_in_image_threshold * 100)}% null values"
f"({failed_image_count}/{total_image_count})",
)
return Success(True)
log.info(
f"{failed_image_count}/{total_image_count} "
f"({failed_image_percentage:.2%}) of images have greater than "
f"{int(nans_in_image_threshold * 100)}% null values",
)
return Success(False)

def validate_store(self) -> ResultE[None]:
"""Validate the store.

This method checks the store for the presence of all expected parameters.

Returns:
A bool indicating the result of the validation.
"""
log.debug(f"Validating store at '{self.path}'")
store_da: xr.DataArray = xr.open_dataarray(self.path, engine="zarr")

# Check for NaNs
has_nans_result = self._has_nans(store_da=store_da)
if isinstance(has_nans_result, Failure):
return has_nans_result
else:
if has_nans_result.unwrap():
return Failure(ValueError(
"Store contains NaN values. "
"Check the source data for missing values and reprocess the data.",
))

# TODO: Use consistency checks instead
"""
# Consistency check on the coordinates of the store
coords_result = NWPDimensionCoordinateMap.from_xarray(store_da)
match coords_result:
Expand All @@ -352,9 +422,10 @@ def validate_store(self) -> ResultE[bool]:
case Success(scan):
log.debug(f"Scanned parameter {param.name}: {scan.__repr__()}")
if not scan.is_valid or scan.has_nulls:
return Success(False)
return Failure(ValueError("Parameter validation failed."))
"""

return Success(True)
return Success(None)

def delete_store(self) -> ResultE[None]:
"""Delete the store."""
Expand Down
20 changes: 20 additions & 0 deletions src/nwp_consumer/internal/services/consumer_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ def consume(

missing_times_result = store.missing_times()
if isinstance(missing_times_result, Failure):
delete_store_result = store.delete_store()
if isinstance(delete_store_result, Failure):
log.error(
f"Failed to delete store after error: {delete_store_result}",
)
return missing_times_result

for n, it in enumerate(missing_times_result.unwrap()):
Expand All @@ -218,8 +223,23 @@ def consume(
functools.partial(self._fold_dataarrays_generator, store=store),
)
if isinstance(process_result, Failure):
delete_store_result = store.delete_store()
if isinstance(delete_store_result, Failure):
log.error(
f"Failed to delete store after error: {delete_store_result}",
)
return process_result

validation_result = store.validate_store()
if isinstance(validation_result, Failure):
delete_store_result = store.delete_store()
if isinstance(delete_store_result, Failure):
log.error(
f"Failed to delete store after error: {delete_store_result}",
)
return validation_result


notification_message = entities.StoreCreatedNotification(
filename=pathlib.Path(store.path).name,
size_mb=store.size_kb // 1024,
Expand Down