Skip to content
50 changes: 22 additions & 28 deletions pysteps/blending/linear_blending.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,28 @@
"""

import numpy as np
import xarray as xr
from pysteps import nowcasts
from pysteps.utils import conversion
from scipy.stats import rankdata

from pysteps.xarray_helpers import convert_output_to_xarray_dataset


def forecast(
precip,
precip_metadata,
velocity,
radar_dataset: xr.Dataset,
timesteps,
timestep,
nowcast_method,
precip_nwp=None,
precip_nwp_metadata=None,
model_dataset: xr.Dataset = None,
start_blending=120,
end_blending=240,
fill_nwp=True,
saliency=False,
nowcast_kwargs=None,
):
"""Generate a forecast by linearly or saliency-based blending of nowcasts with NWP data
# XR: Update docstring

Parameters
----------
Expand Down Expand Up @@ -105,31 +106,27 @@ def forecast(
if nowcast_kwargs is None:
nowcast_kwargs = dict()

# Ensure that only the most recent precip timestep is used
if len(precip.shape) == 3:
precip = precip[-1, :, :]

# First calculate the number of needed timesteps (up to end_blending) for the nowcast
# to ensure that the nowcast calculation time is limited.
timesteps_nowcast = int(end_blending / timestep)

nowcast_method_func = nowcasts.get_method(nowcast_method)

# Check if NWP data is given as input
if precip_nwp is not None:
if model_dataset is not None:
# Calculate the nowcast
precip_nowcast = nowcast_method_func(
precip,
velocity,
timesteps_nowcast,
**nowcast_kwargs,
nowcast_dataset = nowcast_method_func(
radar_dataset, timesteps_nowcast, **nowcast_kwargs
)

# Make sure that precip_nowcast and precip_nwp are in mm/h
precip_nowcast, _ = conversion.to_rainrate(
precip_nowcast, metadata=precip_metadata
)
precip_nwp, _ = conversion.to_rainrate(precip_nwp, metadata=precip_nwp_metadata)
nowcast_dataset = conversion.to_rainrate(nowcast_dataset)
nowcast_precip_var = nowcast_dataset.attrs["precip_var"]
precip_nowcast = nowcast_dataset[nowcast_precip_var].values

model_dataset = conversion.to_rainrate(model_dataset)
model_precip_var = model_dataset.attrs["precip_var"]
precip_nwp = model_dataset[model_precip_var].values

if len(precip_nowcast.shape) == 4:
n_ens_members_nowcast = precip_nowcast.shape[0]
Expand Down Expand Up @@ -261,22 +258,19 @@ def forecast(

else:
# Calculate the nowcast
precip_nowcast = nowcast_method_func(
precip,
velocity,
timesteps,
**nowcast_kwargs,
nowcast_dataset = nowcast_method_func(
radar_dataset, timesteps, **nowcast_kwargs
)

# Make sure that precip_nowcast and precip_nwp are in mm/h
precip_nowcast, _ = conversion.to_rainrate(
precip_nowcast, metadata=precip_metadata
)
nowcast_dataset = conversion.to_rainrate(nowcast_dataset)
nowcast_precip_var = nowcast_dataset.attrs["precip_var"]
precip_nowcast = nowcast_dataset[nowcast_precip_var].values

# If no NWP data is given, the blended field is simply equal to the nowcast field
precip_blended = precip_nowcast

return precip_blended
return convert_output_to_xarray_dataset(radar_dataset, timesteps, precip_blended)


def _get_slice(n_dims, ref_dim, ref_id):
Expand Down
115 changes: 86 additions & 29 deletions pysteps/blending/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@
calculate_weights_spn
blend_means_sigmas
"""
from datetime import datetime
import math
import time
from copy import copy, deepcopy
from functools import partial
from multiprocessing.pool import ThreadPool

import numpy as np
import xarray as xr
from scipy.linalg import inv
from scipy.ndimage import binary_dilation, generate_binary_structure, iterate_structure

Expand All @@ -57,6 +59,7 @@
from pysteps.postprocessing import probmatching
from pysteps.timeseries import autoregression, correlation
from pysteps.utils.check_norain import check_norain
from pysteps.xarray_helpers import convert_output_to_xarray_dataset

try:
import dask
Expand Down Expand Up @@ -412,20 +415,76 @@ class StepsBlendingState:
class StepsBlendingNowcaster:
def __init__(
self,
precip,
precip_models,
velocity,
velocity_models,
radar_dataset: xr.Dataset,
model_dataset: xr.Dataset,
time_steps,
issue_time,
issue_time: datetime,
steps_blending_config: StepsBlendingConfig,
):
"""Initializes the StepsBlendingNowcaster with inputs and configurations."""
# Store inputs
self.__precip = precip
self.__precip_models = precip_models
self.__velocity = velocity
self.__velocity_models = velocity_models
radar_precip_var = radar_dataset.attrs["precip_var"]
model_precip_var = model_dataset.attrs["precip_var"]
if issue_time != radar_dataset.time.isel(time=-1).values.astype(
"datetime64[us]"
).astype(datetime):
raise ValueError(
"Issue time should be equal to last timestep in radar dataset"
)
time_stepsize_seconds = radar_dataset.time.attrs["stepsize"]
if isinstance(time_steps, list):
# XR: validates this works or just remove the subtimestep stuff
timesteps_seconds = (
np.array(list(range(time_steps[-1] + 1))) * time_stepsize_seconds
)
else:
timesteps_seconds = (
np.array(list(range(time_steps + 1))) * time_stepsize_seconds
)
model_times = radar_dataset.time.isel(
time=-1
).values + timesteps_seconds.astype("timedelta64[s]")
model_dataset = model_dataset.sel(time=model_times)

self.__precip = radar_dataset[radar_precip_var].values
# XR: don't extract to dict but pass dataset
if model_dataset[model_precip_var].ndim == 5:
self.__precip_models = np.array(
[
[
{
"cascade_levels": model_dataset[model_precip_var]
.sel(time=time, ens_number=ens_number)
.values,
"means": model_dataset["means"]
.sel(time=time, ens_number=ens_number)
.values,
"stds": model_dataset["stds"]
.sel(time=time, ens_number=ens_number)
.values,
"domain": model_dataset[model_precip_var].attrs["domain"],
"normalized": model_dataset[model_precip_var].attrs[
"normalized"
],
"compact_output": model_dataset[model_precip_var].attrs[
"compact_output"
],
}
for time in model_dataset.time
]
for ens_number in model_dataset.ens_number
]
)
else:
self.__precip_models = model_dataset[model_precip_var].values
self.__velocity = np.array(
[radar_dataset["velocity_x"].values, radar_dataset["velocity_y"].values]
)
self.__velocity_models = np.array(
[model_dataset["velocity_x"].values, model_dataset["velocity_y"].values]
).transpose(1, 2, 0, 3, 4)
self.__original_timesteps = time_steps
self.__input_radar_dataset = radar_dataset
self.__timesteps = time_steps
self.__issuetime = issue_time

Expand All @@ -447,6 +506,7 @@ def compute_forecast(self):

Parameters
----------
# XR: fix docstring
precip: array-like
Array of shape (ar_order+1,m,n) containing the input precipitation fields
ordered by timestamp from oldest to newest. The time steps between the
Expand Down Expand Up @@ -545,7 +605,7 @@ def compute_forecast(self):

# Determine if rain is present in both radar and NWP fields
if self.__params.zero_precip_radar and self.__params.zero_precip_model_fields:
return self.__zero_precipitation_forecast()
result = self.__zero_precipitation_forecast()
else:
# Prepare the data for the zero precipitation radar case and initialize the noise correctly
if self.__params.zero_precip_radar:
Expand All @@ -572,16 +632,20 @@ def compute_forecast(self):
for j in range(self.__config.n_ens_members)
]
)
if self.__config.measure_time:
return (
self.__state.final_blended_forecast,
self.__init_time,
self.__mainloop_time,
)
else:
return self.__state.final_blended_forecast
result = self.__state.final_blended_forecast
else:
return None
result_dataset = convert_output_to_xarray_dataset(
self.__input_radar_dataset, self.__original_timesteps, result
)
if self.__config.measure_time:
return (
result_dataset,
self.__init_time,
self.__mainloop_time,
)
else:
return result_dataset

def __blended_nowcast_main_loop(self):
"""
Expand Down Expand Up @@ -2817,10 +2881,8 @@ def __measure_time(self, label, start_time):


def forecast(
precip,
precip_models,
velocity,
velocity_models,
radar_dataset,
model_dataset,
timesteps,
timestep,
issuetime,
Expand Down Expand Up @@ -2864,6 +2926,7 @@ def forecast(

Parameters
----------
# XR: fix docstring
precip: array-like
Array of shape (ar_order+1,m,n) containing the input precipitation fields
ordered by timestamp from oldest to newest. The time steps between the
Expand Down Expand Up @@ -3182,13 +3245,7 @@ def forecast(
"""
# Create an instance of the new class with all the provided arguments
blended_nowcaster = StepsBlendingNowcaster(
precip,
precip_models,
velocity,
velocity_models,
timesteps,
issuetime,
blending_config,
radar_dataset, model_dataset, timesteps, issuetime, blending_config
)

forecast_steps_nowcast = blended_nowcaster.compute_forecast()
Expand Down
Loading