diff --git a/examples/steps_blended_forecast.py b/examples/steps_blended_forecast.py index 741679d3e..b67f6ef39 100644 --- a/examples/steps_blended_forecast.py +++ b/examples/steps_blended_forecast.py @@ -195,7 +195,7 @@ n_leadtimes = len(leadtimes_min) for n, leadtime in enumerate(leadtimes_min): # Nowcast with blending into NWP - plt.subplot(n_leadtimes, 2, n * 2 + 1) + ax1 = plt.subplot(n_leadtimes, 2, n * 2 + 1) plot_precip_field( precip_forecast[0, int(leadtime / timestep) - 1, :, :], geodata=radar_metadata, @@ -204,10 +204,11 @@ colorscale="STEPS-NL", colorbar=False, ) + ax1.axis("off") # Raw NWP forecast plt.subplot(n_leadtimes, 2, n * 2 + 2) - plot_precip_field( + ax2 = plot_precip_field( nwp_precip_mmh[0, int(leadtime / timestep) - 1, :, :], geodata=nwp_metadata, title=f"NWP +{leadtime} min", @@ -215,26 +216,27 @@ colorscale="STEPS-NL", colorbar=False, ) + ax2.axis("off") plt.tight_layout() plt.show() ############################################################################### -# It is also possible to blend a deterministic external nowcast (e.g. a pre- -# made nowcast or a deterministic AI-based nowcast) with NWP using the STEPS -# algorithm. In that case, we add a `precip_nowcast` to `blending.steps.forecast`. -# By providing an external nowcast, the STEPS blending method will omit the -# autoregression and advection step for the extrapolation cascade and use the -# existing external nowcast instead (which will thus be decomposed into -# multiplicative cascades!). The weights determination and possible post- -# processings steps will remain the same. +# It is also possible to blend a deterministic or probabilistic external nowcast +# (e.g. a pre-made nowcast or a deterministic AI-based nowcast) with NWP using +# the STEPS algorithm. In that case, we add a `precip_nowcast` to +# `blending.steps.forecast`. By providing an external nowcast, the STEPS blending +# method will omit the autoregression and advection step for the extrapolation +# cascade and use the existing external nowcast instead (which will thus be +# decomposed into multiplicative cascades!). The weights determination and +# possible post-processings steps will remain the same. # # Start with creating an external nowcast # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # We go for a simple advection-only nowcast for the example, but this setup can -# be replaced with any external deterministic nowcast. +# be replaced with any external deterministic or probabilistic nowcast. extrapolate = nowcasts.get_method("extrapolation") radar_precip_to_advect = radar_precip.copy() radar_metadata_to_advect = radar_metadata.copy() @@ -258,8 +260,8 @@ ################################################################################ -# Blend the external nowcast with NWP -# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Blend the external nowcast with NWP - deterministic mode +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ precip_forecast = blending.steps.forecast( precip=radar_precip, @@ -311,7 +313,7 @@ idx = int(leadtime / timestep) - 1 # Blended nowcast - plt.subplot(n_leadtimes, 3, n * 3 + 1) + ax1 = plt.subplot(n_leadtimes, 3, n * 3 + 1) plot_precip_field( precip_forecast[0, idx, :, :], geodata=radar_metadata, @@ -320,9 +322,10 @@ colorscale="STEPS-NL", colorbar=False, ) + ax1.axis("off") # Raw extrapolated nowcast - plt.subplot(n_leadtimes, 3, n * 3 + 2) + ax2 = plt.subplot(n_leadtimes, 3, n * 3 + 2) plot_precip_field( fc_lagrangian_extrapolation_mmh[idx, :, :], geodata=radar_metadata, @@ -331,10 +334,11 @@ colorscale="STEPS-NL", colorbar=False, ) + ax2.axis("off") # Raw NWP forecast plt.subplot(n_leadtimes, 3, n * 3 + 3) - plot_precip_field( + ax3 = plot_precip_field( nwp_precip_mmh[0, idx, :, :], geodata=nwp_metadata, title=f"NWP +{leadtime} min", @@ -342,6 +346,106 @@ colorscale="STEPS-NL", colorbar=False, ) + ax3.axis("off") + + +################################################################################ +# Blend the external nowcast with NWP - ensemble mode +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +precip_forecast = blending.steps.forecast( + precip=radar_precip, + precip_nowcast=fc_lagrangian_extrapolation, + precip_models=nwp_precip, + velocity=velocity_radar, + velocity_models=velocity_nwp, + timesteps=18, + timestep=timestep, + issuetime=date_radar, + n_ens_members=5, + precip_thr=radar_metadata["threshold"], + kmperpixel=radar_metadata["xpixelsize"] / 1000.0, + noise_stddev_adj="auto", + vel_pert_method=None, + nowcasting_method="external_nowcast", + noise_method="nonparametric", + probmatching_method="cdf", + mask_method="incremental", + weights_method="bps", +) + +# Transform the data back into mm/h +precip_forecast, _ = converter(precip_forecast, radar_metadata) +radar_precip_mmh, _ = converter(radar_precip, radar_metadata) +fc_lagrangian_extrapolation_mmh, _ = converter( + fc_lagrangian_extrapolation, radar_metadata_to_advect +) +nwp_precipfc_lagrangian_extrapolation_mmh_mmh, _ = converter(nwp_precip, nwp_metadata) + + +################################################################################ +# Visualize the output +# ~~~~~~~~~~~~~~~~~~~~ + +fig = plt.figure(figsize=(5, 12)) + +leadtimes_min = [30, 60, 90, 120, 150, 180] +n_leadtimes = len(leadtimes_min) + +for n, leadtime in enumerate(leadtimes_min): + idx = int(leadtime / timestep) - 1 + + # Blended nowcast member 1 + ax1 = plt.subplot(n_leadtimes, 4, n * 4 + 1) + plot_precip_field( + precip_forecast[0, idx, :, :], + geodata=radar_metadata, + title="Blend Mem. 1", + axis="off", + colorscale="STEPS-NL", + colorbar=False, + ) + ax1.axis("off") + + # Blended nowcast member 5 + ax2 = plt.subplot(n_leadtimes, 4, n * 4 + 2) + plot_precip_field( + precip_forecast[4, idx, :, :], + geodata=radar_metadata, + title="Blend Mem. 5", + axis="off", + colorscale="STEPS-NL", + colorbar=False, + ) + ax2.axis("off") + + # Raw extrapolated nowcast + ax3 = plt.subplot(n_leadtimes, 4, n * 4 + 3) + plot_precip_field( + fc_lagrangian_extrapolation_mmh[0, idx, :, :], + geodata=radar_metadata, + title=f"NWC + {leadtime} min", + axis="off", + colorscale="STEPS-NL", + colorbar=False, + ) + ax3.axis("off") + + # Raw NWP forecast + ax4 = plt.subplot(n_leadtimes, 4, n * 4 + 4) + plot_precip_field( + nwp_precip_mmh[0, idx, :, :], + geodata=nwp_metadata, + title=f"NWP + {leadtime} min", + axis="off", + colorscale="STEPS-NL", + colorbar=False, + ) + ax4.axis("off") + +plt.show() + +print("Done.") ################################################################################ diff --git a/pysteps/blending/steps.py b/pysteps/blending/steps.py index 0e8f28785..7cde87fd4 100644 --- a/pysteps/blending/steps.py +++ b/pysteps/blending/steps.py @@ -361,6 +361,10 @@ class StepsBlendingState: # Extrapolation states mean_extrapolation: np.ndarray | None = None std_extrapolation: np.ndarray | None = None + mean_nowcast: np.ndarray | None = None + std_nowcast: np.ndarray | None = None + mean_nowcast_timestep: np.ndarray | None = None + std_nowcast_timestep: np.ndarray | None = None rho_extrap_cascade_prev: np.ndarray | None = None rho_extrap_cascade: np.ndarray | None = None precip_cascades_prev_subtimestep: np.ndarray | None = None @@ -633,6 +637,9 @@ def __blended_nowcast_main_loop(self): final_blended_forecast_all_members_one_timestep = [ None for _ in range(self.__config.n_ens_members) ] + if self.__config.nowcasting_method == "external_nowcast": + self.__state.mean_nowcast_timestep = self.__state.mean_nowcast[:, :, t] + self.__state.std_nowcast_timestep = self.__state.std_nowcast[:, :, t] def worker(j): worker_state = copy(self.__state) @@ -795,7 +802,12 @@ def __check_inputs(self): "precip_models must be either a two-dimensional array containing dictionaries with decomposed model fields" "or a four-dimensional array containing the original (NWP) model forecasts" ) - + if self.__precip_nowcast is not None: + precip_nowcast_dim = self.__precip_nowcast.ndim + if precip_nowcast_dim != 4: + raise ValueError( + "precip_nowcast must be a four-dimensional array containing the externally calculated nowcast" + ) if self.__config.extrapolation_kwargs is None: self.__state.extrapolation_kwargs = dict() else: @@ -893,7 +905,7 @@ def __print_forecast_info(self): ) if self.__precip_nowcast is not None: print( - f"input dimensions pre-computed nowcast: {self.__precip_nowcast.shape[1]}x{self.__precip_nowcast.shape[2]}" + f"input dimensions pre-computed nowcast: {self.__precip_nowcast.shape[2]}x{self.__precip_nowcast.shape[3]}" ) if self.__config.kmperpixel is not None: print(f"km/pixel: {self.__config.kmperpixel}") @@ -1039,8 +1051,8 @@ def __prepare_radar_and_NWP_fields(self): # we need to know the zerovalue of precip to replace the mask when decomposing after # extrapolation + self.__params.nowcast_zerovalue = np.nanmin(self.__precip_nowcast) self.__params.precip_zerovalue = np.nanmin(self.__precip) - # 1. Start with the radar rainfall fields. We want the fields in a Lagrangian # space. Advect the previous precipitation fields to the same position with # the most recent one (i.e. transform them into the Lagrangian coordinates). @@ -1083,11 +1095,14 @@ def transform_to_lagrangian(precip, i): ) if self.__precip_nowcast is not None: self.__precip_nowcast = self.__precip_nowcast.copy() - # TODO: check this function for ensemble nowcasts - for i in range(self.__precip_nowcast.shape[0]): - self.__precip_nowcast[i, ~np.isfinite(self.__precip_nowcast[i, :])] = ( - np.nanmin(self.__precip_nowcast[i, :]) - ) + # TODO: check this function for ensemble nowcasts -> now made it always work with 4 dimentions + for ens_mem in range(self.__precip_nowcast.shape[0]): + for t in range(self.__precip_nowcast.shape[1]): + self.__precip_nowcast[ + ens_mem, + t, + ~np.isfinite(self.__precip_nowcast[ens_mem, t, :, :]), + ] = np.nanmin(self.__precip_nowcast[ens_mem, t, :, :]) # Perform the cascade decomposition for the input precip fields and, # if necessary, for the (NWP) model fields @@ -1106,30 +1121,6 @@ def transform_to_lagrangian(precip, i): ) precip_forecast_decomp.append(precip_forecast) - # Decompose precomputed nowcasts and rearange them again into the required components - # TODO: check if this also works if an ensemble nowcast is provided! - if self.__precip_nowcast is not None: - precip_nowcast_decomp = [] - for i in range(self.__precip_nowcast.shape[0]): - cascades = self.__params.decomposition_method( - self.__precip_nowcast[i], - self.__params.bandpass_filter, - n_levels=self.__config.n_cascade_levels, - mask=self.__params.mask_threshold, - method="fft", - fft_method=self.__params.fft, - output_domain=self.__config.domain, - compute_stats=True, - normalize=True, - compact_output=True, - ) - - precip_nowcast_decomp.append(cascades) - - self.__state.precip_nowcast_cascades = nowcast_utils.stack_cascades( - precip_nowcast_decomp, self.__config.n_cascade_levels - ) - # Rearrange the cascaded into a four-dimensional array of shape # (n_cascade_levels,ar_order+1,m,n) for the autoregressive model self.__state.precip_cascades = nowcast_utils.stack_cascades( @@ -1140,6 +1131,27 @@ def transform_to_lagrangian(precip, i): self.__state.mean_extrapolation = np.array(precip_forecast_decomp["means"]) self.__state.std_extrapolation = np.array(precip_forecast_decomp["stds"]) + # Decompose precomputed nowcasts and rearange them again into the required components + if self.__precip_nowcast is not None: + if self.__precip_nowcast.shape[0] == 1: + results = [self.__decompose_member(self.__precip_nowcast[0])] + else: + with ThreadPool(self.__config.num_workers) as pool: + results = pool.map( + partial(self.__decompose_member), + list(self.__precip_nowcast), + ) + + self.__state.precip_nowcast_cascades = np.array( + [result["precip_nowcast_decomp"] for result in results] + ).swapaxes(1, 2) + self.__state.mean_nowcast = np.array( + [result["precip_nowcast_means"] for result in results] + ).swapaxes(1, 2) + self.__state.std_nowcast = np.array( + [result["precip_nowcast_stds"] for result in results] + ).swapaxes(1, 2) + # If necessary, recompose (NWP) model forecasts self.__state.precip_models_cascades = None @@ -1159,13 +1171,14 @@ def transform_to_lagrangian(precip, i): self.__precip_models = np.stack(temp_precip_models) - # Check for zero input fields in the radar and NWP data. + # Check for zero input fields in the radar, nowcast and NWP data. self.__params.zero_precip_radar = check_norain( self.__precip, self.__params.precip_threshold, self.__config.norain_threshold, self.__params.noise_kwargs["win_fun"], ) + # The norain fraction threshold used for nwp is the default value of 0.0, # since nwp does not suffer from clutter. self.__params.zero_precip_model_fields = check_norain( @@ -1175,6 +1188,35 @@ def transform_to_lagrangian(precip, i): self.__params.noise_kwargs["win_fun"], ) + def __decompose_member(self, member_field): + """Loop over timesteps for a single ensemble member.""" + results_decomp = [] + means = [] + stds = [] + for t in range(member_field.shape[0]): # loop over timesteps + res = self.__params.decomposition_method( + field=member_field[t, :, :], + bp_filter=self.__params.bandpass_filter, + n_levels=self.__config.n_cascade_levels, + mask=self.__params.mask_threshold, + method="fft", + fft_method=self.__params.fft, + output_domain=self.__config.domain, + compute_stats=True, + normalize=True, + compact_output=True, + ) + results_decomp.append(res["cascade_levels"]) + means.append(res["means"]) + stds.append(res["stds"]) + results = { + "precip_nowcast_decomp": results_decomp, + "precip_nowcast_means": means, + "precip_nowcast_stds": stds, + } + + return results + def __zero_precipitation_forecast(self): """ Generate a zero-precipitation forecast (filled with the minimum precip value) @@ -1371,7 +1413,6 @@ def __estimate_ar_parameters_radar(self): predefined climatological values. Adjust coefficients if necessary and estimate AR model parameters. """ - # If there are values in the radar fields, compute the auto-correlations GAMMA = np.empty((self.__config.n_cascade_levels, self.__config.ar_order)) if not self.__params.zero_precip_radar: @@ -1719,6 +1760,7 @@ def __find_nowcast_NWP_combination(self, t): With the way it is implemented at this moment: n_ens_members of the output equals the maximum number of (ensemble) members in the input (either the nowcasts or NWP). """ + self.__state.velocity_models_timestep = self.__velocity_models[ :, t, :, :, : ].astype(np.float64, copy=False) @@ -1734,6 +1776,192 @@ def __find_nowcast_NWP_combination(self, t): if self.__config.blend_nwp_members: self.__state.mapping_list_NWP_member_to_ensemble_member = None + elif self.__config.nowcasting_method == "external_nowcast": + self.__state.precip_nowcast_timestep = self.__precip_nowcast[ + :, t, :, : + ].astype(np.float64, copy=False) + + n_ens_members_provided = self.__precip_nowcast.shape[0] + if n_ens_members_provided > self.__config.n_ens_members: + raise ValueError( + "The number of nowcast ensemble members provided is larger than the given number of ensemble members requested. n_ens_members_provided <= n_ens_members." + ) + + n_ens_members_max = self.__config.n_ens_members + n_ens_members_min = min(n_ens_members_provided, n_model_members) + + # Also make a list of the model index numbers. These indices are needed + # for indexing the right climatological skill file when pysteps calculates + # the blended forecast in parallel. + if n_model_members > 1: + self.__state.mapping_list_NWP_member_to_ensemble_member = np.arange( + n_model_members + ) + else: + self.__state.mapping_list_NWP_member_to_ensemble_member = [0] + + def repeat_precip_to_match_ensemble_size(repeats, model_type): + if model_type == "nwp": + print("Repeating the NWP model for all ensemble members") + self.__state.precip_models_cascades_timestep = np.repeat( + self.__state.precip_models_cascades_timestep, + repeats, + axis=0, + ) + self.__state.mean_models_timestep = np.repeat( + self.__state.mean_models_timestep, repeats, axis=0 + ) + self.__state.std_models_timestep = np.repeat( + self.__state.std_models_timestep, repeats, axis=0 + ) + self.__state.velocity_models_timestep = np.repeat( + self.__state.velocity_models_timestep, repeats, axis=0 + ) + # For the prob. matching + self.__state.precip_models_timestep = np.repeat( + self.__state.precip_models_timestep, repeats, axis=0 + ) + # Finally, for the model indices + self.__state.mapping_list_NWP_member_to_ensemble_member = np.repeat( + self.__state.mapping_list_NWP_member_to_ensemble_member, + repeats, + axis=0, + ) + if model_type == "nowcast": + print("Repeating the nowcast for all ensemble members") + self.__state.precip_nowcast_cascades = np.repeat( + self.__state.precip_nowcast_cascades, + repeats, + axis=0, + ) + self.__precip_nowcast = np.repeat( + self.__precip_nowcast, + repeats, + axis=0, + ) + self.__state.mean_nowcast = np.repeat( + self.__state.mean_nowcast, repeats, axis=0 + ) + self.__state.std_nowcast = np.repeat( + self.__state.std_nowcast, repeats, axis=0 + ) + # For the prob. matching + self.__state.precip_nowcast_timestep = np.repeat( + self.__state.precip_nowcast_timestep, repeats, axis=0 + ) + + # Now, repeat the nowcast ensemble members or the nwp models/members until + # it has the same amount of members as n_ens_members_max. For instance, if + # you have 10 ensemble nowcasts members and 3 NWP members, the output will + # be an ensemble of 10 members. Hence, the three NWP members are blended + # with the first three members of the nowcast (member one with member one, + # two with two, etc.), subsequently, the same NWP members are blended with + # the next three members (NWP member one with member 4, NWP member 2 with + # member 5, etc.), until 10 is reached. + if n_ens_members_min != n_ens_members_max: + if n_model_members == 1: + repeat_precip_to_match_ensemble_size(n_ens_members_max, "nwp") + # print("Repeating the NWP model for all ensemble members") + # self.__state.precip_models_cascades_timestep = np.repeat( + # self.__state.precip_models_cascades_timestep, + # n_ens_members_max, + # axis=0, + # ) + # self.__state.mean_models_timestep = np.repeat( + # self.__state.mean_models_timestep, n_ens_members_max, axis=0 + # ) + # self.__state.std_models_timestep = np.repeat( + # self.__state.std_models_timestep, n_ens_members_max, axis=0 + # ) + # self.__state.velocity_models_timestep = np.repeat( + # self.__state.velocity_models_timestep, n_ens_members_max, axis=0 + # ) + # # For the prob. matching + # self.__state.precip_models_timestep = np.repeat( + # self.__state.precip_models_timestep, n_ens_members_max, axis=0 + # ) + # # Finally, for the model indices + # self.__state.mapping_list_NWP_member_to_ensemble_member = np.repeat( + # self.__state.mapping_list_NWP_member_to_ensemble_member, + # n_ens_members_max, + # axis=0, + # ) + if n_ens_members_provided == 1: + repeat_precip_to_match_ensemble_size(n_ens_members_max, "nowcast") + # print("Repeating the nowcast for all ensemble members") + # self.__state.precip_nowcast_cascades = np.repeat( + # self.__state.precip_nowcast_cascades, + # n_ens_members_max, + # axis=0, + # ) + # self.__state.mean_nowcast = np.repeat( + # self.__state.mean_nowcast, n_ens_members_max, axis=0 + # ) + # self.__state.std_nowcast = np.repeat( + # self.__state.std_nowcast, n_ens_members_max, axis=0 + # ) + # # For the prob. matching + # self.__state.precip_nowcast_timestep = np.repeat( + # self.__state.precip_nowcast_timestep, n_ens_members_max, axis=0 + # ) + + if n_model_members == n_ens_members_min and n_model_members != 1: + print("Repeating the NWP model for all ensemble members") + repeats = [ + (n_ens_members_max + i) // n_ens_members_min + for i in range(n_ens_members_min) + ] + repeat_precip_to_match_ensemble_size(repeats, "nwp") + # self.__state.precip_models_cascades_timestep = np.repeat( + # self.__state.precip_models_cascades_timestep, + # repeats, + # axis=0, + # ) + # self.__state.mean_models_timestep = np.repeat( + # self.__state.mean_models_timestep, repeats, axis=0 + # ) + # self.__state.std_models_timestep = np.repeat( + # self.__state.std_models_timestep, repeats, axis=0 + # ) + # self.__state.velocity_models_timestep = np.repeat( + # self.__state.velocity_models_timestep, repeats, axis=0 + # ) + # # For the prob. matching + # self.__state.precip_models_timestep = np.repeat( + # self.__state.precip_models_timestep, repeats, axis=0 + # ) + # # Finally, for the model indices + # self.__state.mapping_list_NWP_member_to_ensemble_member = np.repeat( + # self.__state.mapping_list_NWP_member_to_ensemble_member, + # repeats, + # axis=0, + # ) + if ( + n_ens_members_provided == n_ens_members_min + and n_ens_members_provided != 1 + ): + repeat_precip_to_match_ensemble_size(repeats, "nowcast") + # print("Repeating the nowcast for all ensemble members") + # repeats = [ + # (n_ens_members_max + i) // n_ens_members_min + # for i in range(n_ens_members_min) + # ] + # self.__state.precip_nowcast_cascades = np.repeat( + # self.__state.precip_nowcast_cascades, + # repeats, + # axis=0, + # ) + # self.__state.mean_nowcast = np.repeat( + # self.__state.mean_nowcast, repeats, axis=0 + # ) + # self.__state.std_nowcast = np.repeat( + # self.__state.std_nowcast, repeats, axis=0 + # ) + # # For the prob. matching + # self.__state.precip_nowcast_timestep = np.repeat( + # self.__state.precip_nowcast_timestep, repeats, axis=0 + # ) + else: # Start with determining the maximum and mimimum number of members/models # in both input products @@ -2010,23 +2238,15 @@ def __regress_extrapolation_and_noise_cascades(self, j, worker_state, t): # Regress the extrapolation component to the subsequent time step. # Iterate the AR(p) model for each cascade level - - # If nowcast method seleced is external_nowcast, n_ens members has to be 1 if self.__config.nowcasting_method == "external_nowcast": - print("Using nowcasting method:", self.__config.nowcasting_method) - if self.__config.n_ens_members != 1: - raise ValueError( - "Currently only 1 ensemble member supported when using (deterministic) external_nowcast as input." - ) for i in range(self.__config.n_cascade_levels): # Use a deterministic Externally computed nowcasting model worker_state.precip_cascades[j][i] = ( - self.__state.precip_nowcast_cascades[i][t] + self.__state.precip_nowcast_cascades[j][i][t] ) # Follow the 'standard' STEPS blending approach as described in :cite:`Imhoff2023` elif self.__config.nowcasting_method == "steps": - print("Using nowcasting method:", self.__config.nowcasting_method) for i in range(self.__config.n_cascade_levels): # apply AR(p) process to extrapolation cascade level if ( @@ -2090,35 +2310,12 @@ def __perturb_blend_and_advect_extrapolation_and_noise_to_current_timestep( worker_state.noise_extrapolated_decomp = [] worker_state.precip_extrapolated_probability_matching = [] - if ( - self.__config.nowcasting_method == "external_nowcast" - and self.__config.n_ens_members == 1 - ): - for i in range(self.__config.n_cascade_levels): - precip_extrapolated_decomp = worker_state.precip_cascades[j][i][-1, :] - worker_state.time_prev_timestep[j] = t + 1 - - worker_state.precip_extrapolated_decomp.append( - precip_extrapolated_decomp.copy() - ) - worker_state.precip_extrapolated_probability_matching.append( - precip_extrapolated_decomp.copy() - ) - - worker_state.precip_extrapolated_decomp = np.stack( - worker_state.precip_extrapolated_decomp - )[None, :] - - worker_state.precip_extrapolated_probability_matching = np.stack( - worker_state.precip_extrapolated_probability_matching - ) # [None, :] - - else: - # Extrapolate per sub time step - for t_sub in worker_state.subtimesteps: - if t_sub > 0: - t_diff_prev_subtimestep_int = t_sub - int(t_sub) - if t_diff_prev_subtimestep_int > 0.0: + # Extrapolate per sub time step + for t_sub in worker_state.subtimesteps: + if t_sub > 0: + t_diff_prev_subtimestep_int = t_sub - int(t_sub) + if t_diff_prev_subtimestep_int > 0.0: + if self.__config.nowcasting_method == "steps": precip_forecast_cascade_subtimestep = [ (1.0 - t_diff_prev_subtimestep_int) * worker_state.precip_cascades_prev_subtimestep[j][i][-1, :] @@ -2126,6 +2323,7 @@ def __perturb_blend_and_advect_extrapolation_and_noise_to_current_timestep( * worker_state.precip_cascades[j][i][-1, :] for i in range(self.__config.n_cascade_levels) ] + if self.__config.noise_method is not None: noise_cascade_subtimestep = [ (1.0 - t_diff_prev_subtimestep_int) * worker_state.cascade_noise_prev_subtimestep[j][i][-1, :] @@ -2134,74 +2332,79 @@ def __perturb_blend_and_advect_extrapolation_and_noise_to_current_timestep( for i in range(self.__config.n_cascade_levels) ] - else: + else: + if self.__config.nowcasting_method == "steps": precip_forecast_cascade_subtimestep = [ worker_state.precip_cascades_prev_subtimestep[j][i][-1, :] for i in range(self.__config.n_cascade_levels) ] + if self.__config.noise_method is not None: noise_cascade_subtimestep = [ worker_state.cascade_noise_prev_subtimestep[j][i][-1, :] for i in range(self.__config.n_cascade_levels) ] + if self.__config.nowcasting_method == "steps": precip_forecast_cascade_subtimestep = np.stack( precip_forecast_cascade_subtimestep ) + if self.__config.noise_method is not None: noise_cascade_subtimestep = np.stack(noise_cascade_subtimestep) - t_diff_prev_subtimestep = t_sub - worker_state.time_prev_timestep[j] - worker_state.leadtime_since_start_forecast[ - j - ] += t_diff_prev_subtimestep - - # compute the perturbed motion field - include the NWP - # velocities and the weights. Note that we only perturb - # the extrapolation velocity field, as the NWP velocity - # field is present per time step - if self.__config.velocity_perturbation_method is not None: - velocity_perturbations_extrapolation = ( - self.__velocity - + self.__params.generate_velocity_noise( - self.__params.velocity_perturbations[j], - worker_state.leadtime_since_start_forecast[j] - * self.__config.timestep, - ) - ) + t_diff_prev_subtimestep = t_sub - worker_state.time_prev_timestep[j] + worker_state.leadtime_since_start_forecast[j] += t_diff_prev_subtimestep - # Stack the perturbed extrapolation and the NWP velocities - if self.__config.blend_nwp_members: - velocity_stack_all = np.concatenate( - ( - velocity_perturbations_extrapolation[None, :, :, :], - worker_state.velocity_models_timestep, - ), - axis=0, - ) - else: - velocity_models = worker_state.velocity_models_timestep[j] - velocity_stack_all = np.concatenate( - ( - velocity_perturbations_extrapolation[None, :, :, :], - velocity_models[None, :, :, :], - ), - axis=0, + # compute the perturbed motion field - include the NWP + # velocities and the weights. Note that we only perturb + # the extrapolation velocity field, as the NWP velocity + # field is present per time step + if self.__config.velocity_perturbation_method is not None: + velocity_perturbations_extrapolation = ( + self.__velocity + + self.__params.generate_velocity_noise( + self.__params.velocity_perturbations[j], + worker_state.leadtime_since_start_forecast[j] + * self.__config.timestep, ) - velocity_models = None - - # Obtain a blended optical flow, using the weights of the - # second cascade following eq. 24 in BPS2006 - velocity_blended = blending.utils.blend_optical_flows( - flows=velocity_stack_all, - weights=worker_state.weights[ - :-1, 1 - ], # [(extr_field, n_model_fields), cascade_level=2] ) - # Extrapolate both cascades to the next time step - # First recompose the cascade, advect it and decompose it again - # This is needed to remove the interpolation artifacts. - # In addition, the number of extrapolations is greatly reduced - # A. Radar Rain + # Stack the perturbed extrapolation and the NWP velocities + if self.__config.blend_nwp_members: + velocity_stack_all = np.concatenate( + ( + velocity_perturbations_extrapolation[None, :, :, :], + worker_state.velocity_models_timestep, + ), + axis=0, + ) + else: + velocity_models = worker_state.velocity_models_timestep[j] + velocity_stack_all = np.concatenate( + ( + velocity_perturbations_extrapolation[None, :, :, :], + velocity_models[None, :, :, :], + ), + axis=0, + ) + velocity_models = None + + # Obtain a blended optical flow, using the weights of the + # second cascade following eq. 24 in BPS2006 + velocity_blended = blending.utils.blend_optical_flows( + flows=velocity_stack_all, + weights=worker_state.weights[ + :-1, 1 + ], # [(extr_field, n_model_fields), cascade_level=2] + ) + + # Extrapolate both cascades to the next time step + # First recompose the cascade, advect it and decompose it again + # This is needed to remove the interpolation artefacts. + # In addition, the number of extrapolations is greatly reduced + + # A. The extrapolation component + if self.__config.nowcasting_method == "steps": + # First, recompose the cascades into one forecast precip_forecast_recomp_subtimestep = ( blending.utils.recompose_cascade( combined_cascade=precip_forecast_cascade_subtimestep, @@ -2239,10 +2442,11 @@ def __perturb_blend_and_advect_extrapolation_and_noise_to_current_timestep( precip_forecast_extrapolated_recomp_subtimestep_temp[0].copy() ) temp_mask = ~np.isfinite(precip_extrapolated_recomp_subtimestep) - # TODO: WHERE DO CAN I FIND THIS -15.0 + # Set non-finite values to the zerovalue precip_extrapolated_recomp_subtimestep[ ~np.isfinite(precip_extrapolated_recomp_subtimestep) ] = self.__params.precip_zerovalue + # Decompose the forecast again into multiplicative cascades precip_extrapolated_decomp = self.__params.decomposition_method( precip_extrapolated_recomp_subtimestep, self.__params.bandpass_filter, @@ -2264,7 +2468,21 @@ def __perturb_blend_and_advect_extrapolation_and_noise_to_current_timestep( ) for i in range(self.__config.n_cascade_levels): precip_extrapolated_decomp[i][temp_mask] = np.nan - # B. Noise + + # Append the results to the output lists + worker_state.precip_extrapolated_decomp.append( + precip_extrapolated_decomp.copy() + ) + + precip_forecast_cascade_subtimestep = None + precip_forecast_recomp_subtimestep = None + precip_forecast_extrapolated_recomp_subtimestep_temp = None + precip_extrapolated_recomp_subtimestep = None + precip_extrapolated_decomp = None + + # B. The noise component + if self.__config.noise_method is not None: + # First, recompose the cascades into one forecast noise_cascade_subtimestep_recomp = blending.utils.recompose_cascade( combined_cascade=noise_cascade_subtimestep, combined_mean=worker_state.precip_mean_noise[j], @@ -2285,6 +2503,7 @@ def __perturb_blend_and_advect_extrapolation_and_noise_to_current_timestep( **extrap_kwargs_noise, ) noise_extrapolated_recomp = noise_extrapolated_recomp_temp[0].copy() + # Decompose the noise component again into multiplicative cascades noise_extrapolated_decomp = self.__params.decomposition_method( noise_extrapolated_recomp, self.__params.bandpass_filter, @@ -2301,27 +2520,21 @@ def __perturb_blend_and_advect_extrapolation_and_noise_to_current_timestep( ] # Append the results to the output lists - worker_state.precip_extrapolated_decomp.append( - precip_extrapolated_decomp.copy() - ) worker_state.noise_extrapolated_decomp.append( noise_extrapolated_decomp.copy() ) - precip_forecast_cascade_subtimestep = None - precip_forecast_recomp_subtimestep = None - precip_forecast_extrapolated_recomp_subtimestep_temp = None - precip_extrapolated_recomp_subtimestep = None - precip_extrapolated_decomp = None + noise_cascade_subtimestep = None noise_cascade_subtimestep_recomp = None noise_extrapolated_recomp_temp = None noise_extrapolated_recomp = None noise_extrapolated_decomp = None - # Finally, also extrapolate the initial radar rainfall - # field. This will be blended with the rainfall field(s) - # of the (NWP) model(s) for Lagrangian blended prob. matching - # min_R = np.min(precip) + # Finally, also extrapolate the initial radar rainfall field. This will be + # blended with the rainfall field(s) of the (NWP) model(s) for Lagrangian + # blended prob. matching min_R = np.min(precip). If we use an external + # nowcast, this variable will be set later in this function. + if self.__config.nowcasting_method == "steps": extrap_kwargs_pb["displacement_prev"] = ( worker_state.previous_displacement_prob_matching[j] ) @@ -2346,75 +2559,78 @@ def __perturb_blend_and_advect_extrapolation_and_noise_to_current_timestep( precip_forecast_extrapolated_probability_matching_temp[0] ) - worker_state.time_prev_timestep[j] = t_sub + worker_state.time_prev_timestep[j] = t_sub - if len(worker_state.precip_extrapolated_decomp) > 0: + if len(worker_state.precip_extrapolated_decomp) > 0: + if self.__config.nowcasting_method == "steps": worker_state.precip_extrapolated_decomp = np.stack( worker_state.precip_extrapolated_decomp ) - worker_state.noise_extrapolated_decomp = np.stack( - worker_state.noise_extrapolated_decomp - ) worker_state.precip_extrapolated_probability_matching = np.stack( worker_state.precip_extrapolated_probability_matching ) + if len(worker_state.noise_extrapolated_decomp) > 0: + if self.__config.noise_method is not None: + worker_state.noise_extrapolated_decomp = np.stack( + worker_state.noise_extrapolated_decomp + ) - # advect the forecast field by one time step if no subtimesteps in the - # current interval were found - if not worker_state.subtimesteps: - t_diff_prev_subtimestep = t + 1 - worker_state.time_prev_timestep[j] - worker_state.leadtime_since_start_forecast[j] += t_diff_prev_subtimestep - - # compute the perturbed motion field - include the NWP - # velocities and the weights - if self.__config.velocity_perturbation_method is not None: - velocity_perturbations_extrapolation = ( - self.__velocity - + self.__params.generate_velocity_noise( - self.__params.velocity_perturbations[j], - worker_state.leadtime_since_start_forecast[j] - * self.__config.timestep, - ) - ) + # advect the forecast field by one time step if no subtimesteps in the + # current interval were found + if not worker_state.subtimesteps: + t_diff_prev_subtimestep = t + 1 - worker_state.time_prev_timestep[j] + worker_state.leadtime_since_start_forecast[j] += t_diff_prev_subtimestep - # Stack the perturbed extrapolation and the NWP velocities - if self.__config.blend_nwp_members: - velocity_stack_all = np.concatenate( - ( - velocity_perturbations_extrapolation[None, :, :, :], - worker_state.velocity_models_timestep, - ), - axis=0, - ) - else: - velocity_models = worker_state.velocity_models_timestep[j] - velocity_stack_all = np.concatenate( - ( - velocity_perturbations_extrapolation[None, :, :, :], - velocity_models[None, :, :, :], - ), - axis=0, + # compute the perturbed motion field - include the NWP + # velocities and the weights + if self.__config.velocity_perturbation_method is not None: + velocity_perturbations_extrapolation = ( + self.__velocity + + self.__params.generate_velocity_noise( + self.__params.velocity_perturbations[j], + worker_state.leadtime_since_start_forecast[j] + * self.__config.timestep, ) - velocity_models = None - - # Obtain a blended optical flow, using the weights of the - # second cascade following eq. 24 in BPS2006 - velocity_blended = blending.utils.blend_optical_flows( - flows=velocity_stack_all, - weights=worker_state.weights[ - :-1, 1 - ], # [(extr_field, n_model_fields), cascade_level=2] ) - # Extrapolate the extrapolation and noise cascade - extrap_kwargs_["displacement_prev"] = ( - worker_state.previous_displacement[j] + # Stack the perturbed extrapolation and the NWP velocities + if self.__config.blend_nwp_members: + velocity_stack_all = np.concatenate( + ( + velocity_perturbations_extrapolation[None, :, :, :], + worker_state.velocity_models_timestep, + ), + axis=0, ) - extrap_kwargs_noise["displacement_prev"] = ( - worker_state.previous_displacement_noise_cascade[j] + else: + velocity_models = worker_state.velocity_models_timestep[j] + velocity_stack_all = np.concatenate( + ( + velocity_perturbations_extrapolation[None, :, :, :], + velocity_models[None, :, :, :], + ), + axis=0, ) - extrap_kwargs_noise["map_coordinates_mode"] = "wrap" + velocity_models = None + + # Obtain a blended optical flow, using the weights of the + # second cascade following eq. 24 in BPS2006 + velocity_blended = blending.utils.blend_optical_flows( + flows=velocity_stack_all, + weights=worker_state.weights[ + :-1, 1 + ], # [(extr_field, n_model_fields), cascade_level=2] + ) + # Extrapolate the extrapolation and noise cascade + extrap_kwargs_["displacement_prev"] = worker_state.previous_displacement[j] + extrap_kwargs_noise["displacement_prev"] = ( + worker_state.previous_displacement_noise_cascade[j] + ) + extrap_kwargs_noise["map_coordinates_mode"] = "wrap" + + # Extrapolate the extrapolation cascade + if self.__config.nowcasting_method == "steps": ( _, worker_state.previous_displacement[j], @@ -2425,7 +2641,8 @@ def __perturb_blend_and_advect_extrapolation_and_noise_to_current_timestep( allow_nonfinite_values=True, **extrap_kwargs_, ) - + # Extrapolate the noise cascade + if self.__config.noise_method is not None: ( _, worker_state.previous_displacement_noise_cascade[j], @@ -2436,9 +2653,9 @@ def __perturb_blend_and_advect_extrapolation_and_noise_to_current_timestep( allow_nonfinite_values=True, **extrap_kwargs_noise, ) - - # Also extrapolate the radar observation, used for the probability - # matching and post-processing steps + # Also extrapolate the radar observation, used for the probability + # matching and post-processing steps + if self.__config.nowcasting_method == "steps": extrap_kwargs_pb["displacement_prev"] = ( worker_state.previous_displacement_prob_matching[j] ) @@ -2453,8 +2670,36 @@ def __perturb_blend_and_advect_extrapolation_and_noise_to_current_timestep( **extrap_kwargs_pb, ) + worker_state.time_prev_timestep[j] = t + 1 + + # If an external nowcast is provided, precip_extrapolated_decomp and + # precip_extrapolated_probability_matching have been omitted so far. + # Fill them in with the external nowcast information now. + if self.__config.nowcasting_method == "external_nowcast": + for i in range(self.__config.n_cascade_levels): + precip_extrapolated_decomp = worker_state.precip_cascades[j][i][-1, :] + worker_state.time_prev_timestep[j] = t + 1 + worker_state.precip_extrapolated_decomp.append( + precip_extrapolated_decomp.copy() + ) + + # Also update the probability matching fields + precip_extrapolated = self.__precip_nowcast[j][t][:, :] + worker_state.precip_extrapolated_probability_matching.append( + precip_extrapolated.copy() + ) + + # Stack it for the output + worker_state.precip_extrapolated_decomp = np.stack( + worker_state.precip_extrapolated_decomp + )[None, :] + + worker_state.precip_extrapolated_probability_matching = np.stack( + worker_state.precip_extrapolated_probability_matching + ) # [None, :] + worker_state.precip_cascades_prev_subtimestep[j] = worker_state.precip_cascades[ j ] @@ -2473,31 +2718,43 @@ def __blend_cascades(self, t_sub, j, worker_state): )[0][0] # First concatenate the cascades and the means and sigmas # precip_models = [n_models,timesteps,n_cascade_levels,m,n] - - # If external_nowcast is used, do not use and blend with the noise component if ( self.__config.blend_nwp_members and self.__config.nowcasting_method == "external_nowcast" ): - cascade_stack_all_components = np.concatenate( - ( - worker_state.precip_extrapolated_decomp[ - None, worker_state.subtimestep_index - ], - worker_state.precip_models_cascades_timestep, - ), - axis=0, - ) # [(extr_field, n_model_fields), n_cascade_levels, ...] + if self.__config.noise_method is None: + cascade_stack_all_components = np.concatenate( + ( + worker_state.precip_extrapolated_decomp[ + None, worker_state.subtimestep_index + ], + worker_state.precip_models_cascades_timestep, + ), + axis=0, + ) # [(extr_field, n_model_fields), n_cascade_levels, ...] + else: + cascade_stack_all_components = np.concatenate( + ( + worker_state.precip_extrapolated_decomp[ + None, worker_state.subtimestep_index + ], + worker_state.precip_models_cascades_timestep, + worker_state.noise_extrapolated_decomp[ + None, worker_state.subtimestep_index + ], + ), + axis=0, + ) # [(extr_field, n_model_fields), n_cascade_levels, ...] means_stacked = np.concatenate( ( - worker_state.mean_extrapolation[None, :], + worker_state.mean_nowcast_timestep[None, j, :], worker_state.mean_models_timestep, ), axis=0, ) sigmas_stacked = np.concatenate( ( - worker_state.std_extrapolation[None, :], + worker_state.std_nowcast_timestep[None, j, :], worker_state.std_models_timestep, ), axis=0, @@ -2535,26 +2792,40 @@ def __blend_cascades(self, t_sub, j, worker_state): ) elif self.__config.nowcasting_method == "external_nowcast": - cascade_stack_all_components = np.concatenate( - ( - worker_state.precip_extrapolated_decomp[ - None, worker_state.subtimestep_index - ], - worker_state.precip_models_cascades_timestep[None, j], - ), - axis=0, - ) # [(extr_field, n_model_fields), n_cascade_levels, ...] + if self.__config.noise_method is None: + cascade_stack_all_components = np.concatenate( + ( + worker_state.precip_extrapolated_decomp[ + None, worker_state.subtimestep_index + ], + worker_state.precip_models_cascades_timestep[None, j], + ), + axis=0, + ) # [(extr_field, n_model_fields), n_cascade_levels, ...] + else: + cascade_stack_all_components = np.concatenate( + ( + worker_state.precip_extrapolated_decomp[ + None, worker_state.subtimestep_index + ], + worker_state.precip_models_cascades_timestep[None, j], + worker_state.noise_extrapolated_decomp[ + None, worker_state.subtimestep_index + ], + ), + axis=0, + ) # [(extr_field, n_model_fields), n_cascade_levels, ...] means_stacked = np.concatenate( ( - worker_state.mean_extrapolation[None, :], - worker_state.mean_models_timestep, + worker_state.mean_nowcast_timestep[None, j, :], + worker_state.mean_models_timestep[None, j], ), axis=0, ) sigmas_stacked = np.concatenate( ( - worker_state.std_extrapolation[None, :], - worker_state.std_models_timestep, + worker_state.std_nowcast_timestep[None, j, :], + worker_state.std_models_timestep[None, j], ), axis=0, ) @@ -2622,12 +2893,13 @@ def __blend_cascades(self, t_sub, j, worker_state): ) if ( self.__config.nowcasting_method == "external_nowcast" - and self.__config.n_ens_members == 1 + and self.__config.noise_method is None ): # First determine the weights without noise worker_state.weights = worker_state.weights[:-1, :] / np.sum( worker_state.weights[:-1, :], axis=0 ) + worker_state.weights_model_only = worker_state.weights_model_only[ :-1, : ] / np.sum(worker_state.weights_model_only[:-1, :], axis=0) @@ -2840,7 +3112,6 @@ def __post_process_output( ], axis=0, ) - precip_forecast_probability_matching_blended = np.nansum( [ precip_forecast_probability_matching_blended * mask_radar, @@ -2970,9 +3241,11 @@ def __post_process_output( worker_state.subtimestep_index ] ) + # Adjust the CDF of the forecast to match the resampled distribution combined from # extrapolation and model fields. # Rainfall outside the pure extrapolation domain is not taken into account. + if np.any(np.isfinite(worker_state.final_blended_forecast_recomposed)): worker_state.final_blended_forecast_recomposed = ( probmatching.nonparam_match_empirical_cdf( @@ -2982,6 +3255,7 @@ def __post_process_output( ) ) precip_forecast_probability_matching_resampled = None + elif self.__config.probmatching_method == "mean": # Use R_pm_blended as benchmark field and mean_probabiltity_matching_forecast = np.mean( @@ -3109,10 +3383,8 @@ def forecast( velocity_models: array-like Array of shape (n_models,timestep,2,m,n) containing the x- and y-components of the advection field for the (NWP) model field per forecast lead time. - All values are required to be finite. - - To reduce memory usage, this array - can be given as float32. They will then be converted to float64 before computations + All values are required to be finite. To reduce memory usage, this array can + be given as float32. They will then be converted to float64 before computations to minimize loss in precision. timesteps: int or list of floats Number of time steps to forecast or a list of time steps for which the diff --git a/pysteps/tests/test_blending_steps.py b/pysteps/tests/test_blending_steps.py index 8ab996ce4..f914638f7 100644 --- a/pysteps/tests/test_blending_steps.py +++ b/pysteps/tests/test_blending_steps.py @@ -55,18 +55,21 @@ (5, 3, 5, 6,'steps', "incremental", "cdf", False, "bps", False, 5, False, False, 80, False, None, 40), (5, 3, 5, 6,'steps', "incremental", "cdf", False, "bps", False, 5, False, False, 80, False, None, 60), #Test the externally provided nowcast - (1, 10, 1, 8,'external_nowcast', None, None, False, "spn", True, 1, False, False, 0, False, None, None), - (1, 10, 1, 8,'external_nowcast', "incremental", None, False, "bps", True, 1, False, False, 0, False, None, None), - (1, 10, 1, 8,'external_nowcast', "incremental", None, False, "spn", True, 1, False, False, 80, False, None, None), - (1, 10, 1, 8,'external_nowcast', "incremental", None, False, "bps", True, 1, True, False, 0, False, None, None), - (1, 10, 1, 8,'external_nowcast', "incremental", None, False, "spn", True, 1, False, True, 0, False, None, None), - (1, 10, 1, 8,'external_nowcast', "incremental", None, False, "bps", True, 1, True, True, 0, False, None, None), - (1, 10, 1, 8,'external_nowcast', "incremental", "cdf", False, "spn", True, 1, False, False, 0, True, None, None), - (1, 10, 1, 8,'external_nowcast', "incremental", "obs", False, "bps", True, 1, False, False, 0, False, None, None), - (5, 10, 5, 8,'external_nowcast_det', "incremental", None, False, "spn", True, 5, False, False, 0, False, None, None), + (1, 10, 1, 8,'external_nowcast_det', None, None, False, "spn", True, 1, False, False, 0, False, None, None), + (1, 10, 1, 8,'external_nowcast_det', "incremental", None, False, "bps", True, 1, False, False, 0, False, None, None), + (1, 10, 1, 8,'external_nowcast_det', "incremental", None, False, "spn", True, 1, False, False, 80, False, None, None), + (1, 10, 1, 8,'external_nowcast_det', "incremental", None, False, "bps", True, 1, True, False, 0, False, None, None), + (1, 10, 1, 8,'external_nowcast_det', "incremental", None, False, "spn", True, 1, False, True, 0, False, None, None), + (1, 10, 1, 8,'external_nowcast_det', "incremental", None, False, "bps", True, 1, True, True, 0, False, None, None), + (1, 10, 1, 8,'external_nowcast_det', "incremental", "cdf", False, "spn", True, 1, False, False, 0, True, None, None), + (1, 10, 1, 8,'external_nowcast_det', "incremental", "obs", False, "bps", True, 1, False, False, 0, False, None, None), (5, 10, 5, 8,'external_nowcast_ens', "incremental", None, False, "spn", True, 5, False, False, 0, False, None, None), - (1, 10, 5, 8,'external_nowcast_det', "incremental", None, False, "spn", True, 5, False, False, 0, False, None, None), + (5, 10, 5, 8,'external_nowcast_ens', "incremental", None, False, "spn", True, 5, False, False, 0, False, None, None), + (1, 10, 5, 8,'external_nowcast_ens', "incremental", None, False, "spn", True, 5, False, False, 0, False, None, None), + (1, 10, 1, 8,'external_nowcast_ens', "incremental", "cdf", False, "bps", True, 5, False, False, 0, False, None, None), + (5, 10, 1, 8,'external_nowcast_ens', "incremental", "obs", False, "spn", True, 5, False, False, 0, False, None, None) ] + # fmt:on steps_arg_names = ( @@ -213,8 +216,10 @@ def test_steps_blending( precip_nowcast[ n_ens_member, i, 30:165, 41 + 1 * (i + 1) * n_ens_member ] = 0.1 + if n_ens_members < expected_n_ens_members: + n_ens_members = expected_n_ens_members - elif nowcasting_method[:16] == "external_nowcast": + elif nowcasting_method == "external_nowcast_det": nowcasting_method = "external_nowcast" for i in range(precip_nowcast.shape[1]): precip_nowcast[0, i, 30:165, 30 + 1 * i] = 0.1 @@ -390,9 +395,11 @@ def test_steps_blending( ) assert precip_forecast.ndim == 4, "Wrong amount of dimensions in forecast output" + assert ( precip_forecast.shape[0] == expected_n_ens_members ), "Wrong amount of output ensemble members in forecast output" + assert ( precip_forecast.shape[1] == n_timesteps ), "Wrong amount of output time steps in forecast output" @@ -403,9 +410,11 @@ def test_steps_blending( assert ( precip_forecast.ndim == 4 ), "Wrong amount of dimensions in converted forecast output" + assert ( precip_forecast.shape[0] == expected_n_ens_members ), "Wrong amount of output ensemble members in converted forecast output" + assert ( precip_forecast.shape[1] == n_timesteps ), "Wrong amount of output time steps in converted forecast output"