diff --git a/bayesflow/approximators/continuous_approximator.py b/bayesflow/approximators/continuous_approximator.py index d1d57bb90..ed92795a7 100644 --- a/bayesflow/approximators/continuous_approximator.py +++ b/bayesflow/approximators/continuous_approximator.py @@ -338,7 +338,7 @@ def _sample( **filter_kwargs(kwargs, self.inference_network.sample), ) - def log_prob(self, data: dict[str, np.ndarray], **kwargs) -> np.ndarray: + def log_prob(self, data: dict[str, np.ndarray], **kwargs) -> np.ndarray | dict[str, np.ndarray]: """ Computes the log-probability of given data under the model. The `data` dictionary is preprocessed using the `adapter`. Log-probabilities are returned as NumPy arrays. @@ -358,7 +358,7 @@ def log_prob(self, data: dict[str, np.ndarray], **kwargs) -> np.ndarray: data = self.adapter(data, strict=False, stage="inference", **kwargs) data = keras.tree.map_structure(keras.ops.convert_to_tensor, data) log_prob = self._log_prob(**data, **kwargs) - log_prob = keras.ops.convert_to_numpy(log_prob) + log_prob = keras.tree.map_structure(keras.ops.convert_to_numpy, log_prob) return log_prob @@ -368,7 +368,7 @@ def _log_prob( inference_conditions: Tensor = None, summary_variables: Tensor = None, **kwargs, - ) -> Tensor: + ) -> Tensor | dict[str, Tensor]: if self.summary_network is None: if summary_variables is not None: raise ValueError("Cannot use summary variables without a summary network.") diff --git a/bayesflow/approximators/point_approximator.py b/bayesflow/approximators/point_approximator.py index 836dd060c..5f1acf193 100644 --- a/bayesflow/approximators/point_approximator.py +++ b/bayesflow/approximators/point_approximator.py @@ -5,7 +5,7 @@ ) from bayesflow.types import Tensor -from bayesflow.utils import filter_kwargs, split_arrays, squeeze_inner_estimates_dict +from bayesflow.utils import filter_kwargs, split_arrays, squeeze_inner_estimates_dict, logging from .continuous_approximator import ContinuousApproximator @@ -14,8 +14,9 @@ class PointApproximator(ContinuousApproximator): """ A workflow for fast amortized point estimation of a conditional distribution. - The distribution is approximated by point estimators, parameterized by a feed-forward `PointInferenceNetwork`. - Conditions can be compressed by an optional `SummaryNetwork` or used directly as input to the inference network. + The distribution is approximated by point estimators, parameterized by a feed-forward + :class:`bayesflow.networks.PointInferenceNetwork`. Conditions can be compressed by an optional summary network + (inheriting from :class:`bayesflow.networks.SummaryNetwork`) or used directly as input to the inference network. """ def estimate( @@ -89,7 +90,7 @@ def sample( for the sampling process. split : bool, optional If True, the sampled arrays are split along the last axis, by default False. - Currently not supported for `PointApproximator`. + Currently not supported for :class:`PointApproximator` . **kwargs Additional keyword arguments passed to underlying processing functions. @@ -111,14 +112,50 @@ def sample( if split: raise NotImplementedError("split=True is currently not supported for `PointApproximator`.") samples = split_arrays(samples, axis=-1) - # Squeeze samples if there's only one key-value pair. - samples = self._squeeze_samples(samples) + # Squeeze sample dictionary if there's only one key-value pair. + samples = self._squeeze_parametric_score_major_dict(samples) return samples + def log_prob( + self, + *, + data: dict[str, np.ndarray], + **kwargs, + ) -> np.ndarray | dict[str, np.ndarray]: + """ + Computes the log-probability of given data under the parametric distribution(s) for given input conditions. + + Parameters + ---------- + data : dict[str, np.ndarray] + A dictionary mapping variable names to arrays representing the inference conditions and variables. + **kwargs + Additional keyword arguments passed to underlying processing functions. + + Returns + ------- + log_prob : np.ndarray or dict[str, np.ndarray] + Log-probabilities of the distribution + `p(inference_variables | inference_conditions, h(summary_conditions))` for all parametric scoring rules. + + If only one parametric score is available, output is an array of log-probabilities. + + Output is a dictionary if multiple parametric scores are available. + Then, each key is the name of a score and values are corresponding log-probabilities. + + Log-probabilities have shape (num_datasets,). + """ + log_prob = super().log_prob(data=data, **kwargs) + # Squeeze log probabilities dictionary if there's only one key-value pair. + log_prob = self._squeeze_parametric_score_major_dict(log_prob) + + return log_prob + def _prepare_conditions(self, conditions: dict[str, np.ndarray], **kwargs) -> dict[str, Tensor]: """Adapts and converts the conditions to tensors.""" conditions = self.adapter(conditions, strict=False, stage="inference", **kwargs) + conditions.pop("inference_variables", None) return keras.tree.map_structure(keras.ops.convert_to_tensor, conditions) def _apply_inverse_adapter_to_estimates( @@ -130,6 +167,12 @@ def _apply_inverse_adapter_to_estimates( for score_key, score_val in estimates.items(): processed[score_key] = {} for head_key, estimate in score_val.items(): + if head_key in self.inference_network.scores[score_key].NOT_TRANSFORMING_LIKE_VECTOR_WARNING: + logging.warning( + f"Estimate '{score_key}.{head_key}' is marked to not transform like a vector. " + f"It was treated like a vector by the adapter. Handle '{head_key}' estimates with care." + ) + adapted = self.adapter( {"inference_variables": estimate}, inverse=True, @@ -180,8 +223,10 @@ def _squeeze_estimates( } return squeezed - def _squeeze_samples(self, samples: dict[str, np.ndarray]) -> np.ndarray or dict[str, np.ndarray]: - """Squeezes the samples dictionary to just the value if there is only one key-value pair.""" + def _squeeze_parametric_score_major_dict( + self, samples: dict[str, np.ndarray] + ) -> np.ndarray or dict[str, np.ndarray]: + """Squeezes the dictionary to just the value if there is only one key-value pair.""" if len(samples) == 1: return next(iter(samples.values())) # Extract and return the only item's value return samples diff --git a/bayesflow/links/__init__.py b/bayesflow/links/__init__.py index a32fd6c21..77913f52b 100644 --- a/bayesflow/links/__init__.py +++ b/bayesflow/links/__init__.py @@ -2,7 +2,7 @@ from .ordered import Ordered from .ordered_quantiles import OrderedQuantiles -from .positive_semi_definite import PositiveSemiDefinite +from .positive_definite import PositiveDefinite from ..utils._docs import _add_imports_to_all diff --git a/bayesflow/links/positive_definite.py b/bayesflow/links/positive_definite.py new file mode 100644 index 000000000..616f9080d --- /dev/null +++ b/bayesflow/links/positive_definite.py @@ -0,0 +1,46 @@ +import keras + +from keras.saving import register_keras_serializable as serializable + +from bayesflow.types import Tensor +from bayesflow.utils import keras_kwargs, fill_triangular_matrix + + +@serializable(package="bayesflow.links") +class PositiveDefinite(keras.Layer): + """Activation function to link from flat elements of a lower triangular matrix to a positive definite matrix.""" + + def __init__(self, **kwargs): + super().__init__(**keras_kwargs(kwargs)) + self.built = True + + def call(self, inputs: Tensor) -> Tensor: + # Build cholesky factor from inputs + L = fill_triangular_matrix(inputs, positive_diag=True) + + # calculate positive definite matrix from cholesky factors + psd = keras.ops.matmul( + L, + keras.ops.moveaxis(L, -2, -1), # L transposed + ) + return psd + + def compute_output_shape(self, input_shape): + m = input_shape[-1] + n = int((0.25 + 2.0 * m) ** 0.5 - 0.5) + return input_shape[:-1] + (n, n) + + def compute_input_shape(self, output_shape): + """ + Returns the shape of parameterization of a cholesky factor triangular matrix. + + There are m nonzero elements of a lower triangular nxn matrix with m = n * (n + 1) / 2. + + Example + ------- + >>> PositiveDefinite().compute_output_shape((None, 3, 3)) + 6 + """ + n = output_shape[-1] + m = int(n * (n + 1) / 2) + return output_shape[:-2] + (m,) diff --git a/bayesflow/links/positive_semi_definite.py b/bayesflow/links/positive_semi_definite.py deleted file mode 100644 index a056fc3c3..000000000 --- a/bayesflow/links/positive_semi_definite.py +++ /dev/null @@ -1,20 +0,0 @@ -import keras -from keras.saving import register_keras_serializable as serializable - -from bayesflow.types import Tensor -from bayesflow.utils import keras_kwargs - - -@serializable(package="bayesflow.links") -class PositiveSemiDefinite(keras.Layer): - """Activation function to link from any square matrix to a positive semidefinite matrix.""" - - def __init__(self, **kwargs): - super().__init__(**keras_kwargs(kwargs)) - - def call(self, inputs: Tensor) -> Tensor: - # multiply M * M^T to get symmetric matrix - return keras.ops.einsum("...ij,...kj->...ik", inputs, inputs) - - def compute_output_shape(self, input_shape): - return input_shape diff --git a/bayesflow/networks/point_inference_network.py b/bayesflow/networks/point_inference_network.py index a5447c501..7a3ed1628 100644 --- a/bayesflow/networks/point_inference_network.py +++ b/bayesflow/networks/point_inference_network.py @@ -132,7 +132,9 @@ def call( if xz is None and not self.built: raise ValueError("Cannot build inference network without inference variables.") if conditions is None: # unconditional estimation uses a fixed input vector - conditions = keras.ops.convert_to_tensor([[1.0]], dtype=keras.ops.dtype(xz)) + conditions = keras.ops.convert_to_tensor( + [[1.0]], dtype=keras.ops.dtype(xz) if xz is not None else "float32" + ) # pass conditions to the shared subnet output = self.subnet(conditions, training=training) @@ -165,7 +167,6 @@ def compute_metrics( return metrics | {"loss": neg_score} - # WIP: untested draft of sample method @allow_batch_size def sample(self, batch_shape: Shape, conditions: Tensor = None) -> dict[str, Tensor]: """ @@ -199,7 +200,6 @@ def sample(self, batch_shape: Shape, conditions: Tensor = None) -> dict[str, Ten return samples - # WIP: untested draft of log_prob method def log_prob(self, samples: Tensor, conditions: Tensor = None, **kwargs) -> dict[str, Tensor]: output = self.subnet(conditions) log_probs = {} diff --git a/bayesflow/scores/multivariate_normal_score.py b/bayesflow/scores/multivariate_normal_score.py index 66153fd34..90ccfbbf6 100644 --- a/bayesflow/scores/multivariate_normal_score.py +++ b/bayesflow/scores/multivariate_normal_score.py @@ -4,8 +4,7 @@ from keras.saving import register_keras_serializable as serializable from bayesflow.types import Shape, Tensor -from bayesflow.links import PositiveSemiDefinite -from bayesflow.utils import logging +from bayesflow.links import PositiveDefinite from .parametric_distribution_score import ParametricDistributionScore @@ -17,14 +16,23 @@ class MultivariateNormalScore(ParametricDistributionScore): Scores a predicted mean and covariance matrix with the log-score of the probability of the materialized value. """ + NOT_TRANSFORMING_LIKE_VECTOR_WARNING = ("covariance",) + """ + Marks head for covariance matrix as an exception for adapter transformations. + + This variable contains names of prediction heads that should lead to a warning when the adapter is applied + in inverse direction to them. + + For more information see :class:`ScoringRule`. + """ + def __init__(self, dim: int = None, links: dict = None, **kwargs): super().__init__(links=links, **kwargs) self.dim = dim - self.links = links or {"covariance": PositiveSemiDefinite()} - self.config = {"dim": dim} + self.links = links or {"covariance": PositiveDefinite()} - logging.warning("MultivariateNormalScore is unstable.") + self.config = {"dim": dim} def get_config(self): base_config = super().get_config() @@ -60,12 +68,12 @@ def log_prob(self, x: Tensor, mean: Tensor, covariance: Tensor) -> Tensor: A tensor containing the log probability densities for each sample in `x` under the given Gaussian distribution. """ - diff = x[:, None, :] - mean - inv_covariance = keras.ops.inv(covariance) + diff = x - mean + precision = keras.ops.inv(covariance) log_det_covariance = keras.ops.slogdet(covariance)[1] # Only take the log of the determinant part # Compute the quadratic term in the exponential of the multivariate Gaussian - quadratic_term = keras.ops.einsum("...i,...ij,...j->...", diff, inv_covariance, diff) + quadratic_term = keras.ops.einsum("...i,...ij,...j->...", diff, precision, diff) # Compute the log probability density log_prob = -0.5 * (self.dim * keras.ops.log(2 * math.pi) + log_det_covariance + quadratic_term) @@ -97,6 +105,8 @@ def sample(self, batch_shape: Shape, mean: Tensor, covariance: Tensor) -> Tensor Tensor A tensor of shape (batch_size, num_samples, D) containing the generated samples. """ + if len(batch_shape) == 1: + batch_shape = (1,) + tuple(batch_shape) batch_size, num_samples = batch_shape dim = keras.ops.shape(mean)[-1] if keras.ops.shape(mean) != (batch_size, dim): diff --git a/bayesflow/scores/parametric_distribution_score.py b/bayesflow/scores/parametric_distribution_score.py index 51cef1776..17806ef16 100644 --- a/bayesflow/scores/parametric_distribution_score.py +++ b/bayesflow/scores/parametric_distribution_score.py @@ -51,5 +51,4 @@ def score(self, estimates: dict[str, Tensor], targets: Tensor, weights: Tensor = """ scores = -self.log_prob(x=targets, **estimates) score = self.aggregate(scores, weights) - # multipy to mitigate instability due to relatively high values of parametric score - return score * 0.01 + return score diff --git a/bayesflow/scores/scoring_rule.py b/bayesflow/scores/scoring_rule.py index ef0645cc1..dd671189c 100644 --- a/bayesflow/scores/scoring_rule.py +++ b/bayesflow/scores/scoring_rule.py @@ -15,8 +15,27 @@ class ScoringRule: when sampling from the true distribution. By minimizing an expected score, estimates with different properties can be obtained. - To define a custom ``ScoringRule``, inherit from this class and overwrite the score method. + To define a custom :class:`ScoringRule`, inherit from this class and overwrite the score method. For proper serialization, any new constructor arguments must be taken care of in a `get_config` method. + + Estimates are typically parameterized by projection heads consisting of a neural network component + and a link to project into the correct output space. + + A :class:`ScoringRule` can score estimates consisting of multiple parts. See :class:`MultivariateNormalScore` + for an example of a :class:`ParametricDistributionScore`. That score evaluates an estimated mean + and covariance simultaneously. + """ + + NOT_TRANSFORMING_LIKE_VECTOR_WARNING = tuple() + """ + This variable contains names of prediction heads that should lead to a warning when the adapter is applied + in inverse direction to them. + + Prediction heads can output estimates in spaces other than the target distribution space. + To such estimates the adapter cannot be straightforwardly applied in inverse direction, + because the adapter is built to map vectors from the inference variable space. When subclassing + :class:`ScoringRule`, add the names of such heads to the following list to warn users about difficulties + with a type of estimate whenever the adapter is applied to them in inverse direction. """ def __init__( @@ -58,12 +77,15 @@ def get_head_shapes_from_target_shape(self, target_shape: Shape) -> dict[str, Sh def get_subnet(self, key: str) -> keras.Layer: """For a specified key, request a subnet to be used for projecting the shared condition embedding - before reshaping to the heads output shape. + before further projection and reshaping to the heads output shape. + + If no subnet was specified for the key (e.g. upon initialization), + return just an instance of keras.layers.Identity. Parameters ---------- key : str - Name of head for which to request a link. + Name of head for which to request a subnet. Returns ------- @@ -78,6 +100,8 @@ def get_subnet(self, key: str) -> keras.Layer: def get_link(self, key: str) -> keras.Layer: """For a specified key, request a link from network output to estimation target. + If no link was specified for the key (e.g. upon initialization), return a linear activation. + Parameters ---------- key : str @@ -95,15 +119,26 @@ def get_link(self, key: str) -> keras.Layer: else: return self.links[key] - def get_head(self, key: str, shape: Shape) -> keras.Sequential: - """For a specified head key and shape, request corresponding head network. + def get_head(self, key: str, output_shape: Shape) -> keras.Sequential: + """For a specified head key and output shape, request corresponding head network. + + A head network has the following components that are called sequentially: + + 1. subnet: A keras.Layer. + 2. dense: A trainable linear projection with as many units as are required by the next component. + 3. reshape: Changes shape of output of projection to match requirements of next component. + 4. link: Transforms unconstrained values into a constrained space for the final estimator. + See :mod:`bayesflow.links` for examples. + + This method initializes the components in reverse order to meet all requirements and returns them. Parameters ---------- key : str Name of head for which to request a link. - shape: Shape - The necessary shape for the point estimators. + output_shape: Shape + The necessary shape of estimated values for the given key as returned by + :func:`get_head_shapes_from_target_shape()`. Returns ------- @@ -111,10 +146,19 @@ def get_head(self, key: str, shape: Shape) -> keras.Sequential: Head network consisting of a learnable projection, a reshape and a link operation to parameterize estimates. """ - subnet = self.get_subnet(key) - dense = keras.layers.Dense(units=math.prod(shape)) - reshape = keras.layers.Reshape(target_shape=shape) + # initialize head components back to front link = self.get_link(key) + + # link input shape can differ from output shape + if hasattr(link, "compute_input_shape"): + link_input_shape = link.compute_input_shape(output_shape) + else: + link_input_shape = output_shape + + reshape = keras.layers.Reshape(target_shape=link_input_shape) + dense = keras.layers.Dense(units=math.prod(link_input_shape)) + subnet = self.get_subnet(key) + return keras.Sequential([subnet, dense, reshape, link]) def score(self, estimates: dict[str, Tensor], targets: Tensor, weights: Tensor) -> Tensor: @@ -137,11 +181,11 @@ def score(self, estimates: dict[str, Tensor], targets: Tensor, weights: Tensor) Examples -------- - The following shows how to score estimates with a ``MeanScore``. All ``ScoringRule`` s follow this pattern, - only differing in the structure of the estimates dictionary. + The following shows how to score estimates with a :class:`MeanScore`. All :class:`ScoringRule` s + follow this pattern, only differing in the structure of the estimates dictionary. >>> import keras - ... from bayesflow.scores import MeanScore + >>> from bayesflow.scores import MeanScore >>> >>> # batch of samples from a normal distribution >>> samples = keras.random.normal(shape=(100,)) diff --git a/bayesflow/utils/__init__.py b/bayesflow/utils/__init__.py index ecb546eae..1eeb2d354 100644 --- a/bayesflow/utils/__init__.py +++ b/bayesflow/utils/__init__.py @@ -66,6 +66,7 @@ tile_axis, tree_concatenate, tree_stack, + fill_triangular_matrix, ) from .validators import check_lengths_same from .workflow_utils import find_inference_network, find_summary_network diff --git a/bayesflow/utils/tensor_utils.py b/bayesflow/utils/tensor_utils.py index b65df49a7..0dd73b67e 100644 --- a/bayesflow/utils/tensor_utils.py +++ b/bayesflow/utils/tensor_utils.py @@ -277,3 +277,78 @@ def stack(*items): return keras.ops.stack(items, axis=axis) return keras.tree.map_structure(stack, *structures) + + +def fill_triangular_matrix(x: Tensor, upper: bool = False, positive_diag: bool = False): + """ + Reshapes a batch of matrix elements into a triangular matrix (either upper or lower). + + Note: If final axis has length 1, this simply reshapes to (batch_size, 1, 1) and optionally applies softplus. + + Parameters + ---------- + x : Tensor of shape (batch_size, m) + Batch of flattened nonzero matrix elements for triangular matrix. + upper : bool + Return upper triangular matrix if True, else lower triangular matrix. Default is False. + positive_diag : bool + Whether to apply a softplus operation to diagonal elements. Default is False. + + Returns + ------- + Tensor of shape (batch_size, n, n) + Batch of triangular matrices with m = n * (n + 1) / 2 unique nonzero elements. + + Raises + ------ + ValueError + If provided nonzero elements do not correspond to possible triangular matrix shape + (n,n) with n = sqrt( 1/4 + 2 * m) - 1/2 due to m = n * (n + 1) / 2. + """ + batch_shape = x.shape[:-1] + m = x.shape[-1] + + if m == 1: + y = keras.ops.reshape(x, (-1, 1, 1)) + if positive_diag: + y = keras.activations.softplus(y) + return y + + # Calculate matrix shape + n = (0.25 + 2 * m) ** 0.5 - 0.5 + if not np.isclose(np.floor(n), n): + raise ValueError(f"Input right-most shape ({m}) does not correspond to a triangular matrix.") + else: + n = int(n) + + # Trick: Create triangular matrix by concatenating with a flipped version of its tail, then reshape. + x_tail = keras.ops.take(x, indices=list(range((m - (n**2 - m)), x.shape[-1])), axis=-1) + if not upper: + y = keras.ops.concatenate([x_tail, keras.ops.flip(x, axis=-1)], axis=len(batch_shape)) + y = keras.ops.reshape(y, (-1, n, n)) + y = keras.ops.tril(y) + + if positive_diag: + y_offdiag = keras.ops.tril(y, k=-1) + # carve out diagonal, by setting upper and lower offdiagonals to zero + y_diag = keras.ops.tril( + keras.ops.triu(keras.activations.softplus(y)), # apply softplus to enforce positivity + ) + y = y_diag + y_offdiag + + else: + y = keras.ops.concatenate([x, keras.ops.flip(x_tail, axis=-1)], axis=len(batch_shape)) + y = keras.ops.reshape(y, (-1, n, n)) + y = keras.ops.triu( + y, + ) + + if positive_diag: + y_offdiag = keras.ops.triu(y, k=1) + # carve out diagonal, by setting upper and lower offdiagonals to zero + y_diag = keras.ops.tril( + keras.ops.triu(keras.activations.softplus(y)), # apply softplus to enforce positivity + ) + y = y_diag + y_offdiag + + return y diff --git a/tests/test_links/conftest.py b/tests/test_links/conftest.py index 8beb0bece..53e9eeac8 100644 --- a/tests/test_links/conftest.py +++ b/tests/test_links/conftest.py @@ -15,7 +15,7 @@ def num_variables(): @pytest.fixture() def generic_preactivation(batch_size): - return keras.ops.ones((batch_size, 4, 4)) + return keras.ops.ones((batch_size, 6)) @pytest.fixture() @@ -33,10 +33,10 @@ def ordered_quantiles(): @pytest.fixture() -def positive_semi_definite(): - from bayesflow.links import PositiveSemiDefinite +def positive_definite(): + from bayesflow.links import PositiveDefinite - return PositiveSemiDefinite() + return PositiveDefinite() @pytest.fixture() @@ -44,7 +44,7 @@ def linear(): return keras.layers.Activation("linear") -@pytest.fixture(params=["ordered", "ordered_quantiles", "positive_semi_definite", "linear"], scope="function") +@pytest.fixture(params=["ordered", "ordered_quantiles", "positive_definite", "linear"], scope="function") def link(request): return request.getfixturevalue(request.param) @@ -84,6 +84,6 @@ def unordered(batch_size, num_quantiles, num_variables): return keras.random.normal((batch_size, num_quantiles, num_variables)) -@pytest.fixture() -def random_matrix_batch(batch_size, num_variables): - return keras.random.normal((batch_size, num_variables, num_variables)) +# @pytest.fixture() +# def random_matrix_batch(batch_size, num_variables): +# return keras.random.normal((batch_size, num_variables, num_variables)) diff --git a/tests/test_links/test_links.py b/tests/test_links/test_links.py index b0ea22242..ad1be5753 100644 --- a/tests/test_links/test_links.py +++ b/tests/test_links/test_links.py @@ -3,13 +3,6 @@ import pytest -def test_link_output(link, generic_preactivation): - output_shape = link.compute_output_shape(generic_preactivation.shape) - output = link(generic_preactivation) - - assert output_shape == output.shape - - def test_invalid_shape_for_ordered_quantiles(ordered_quantiles, batch_size, num_quantiles, num_variables): with pytest.raises(AssertionError) as excinfo: ordered_quantiles.build((batch_size, batch_size, num_quantiles, num_variables)) @@ -59,16 +52,21 @@ def test_quantile_ordering(quantiles, unordered): check_ordering(output, axis) -def test_positive_semi_definite(random_matrix_batch): - from bayesflow.links import PositiveSemiDefinite +def test_positive_definite(positive_definite, batch_size, num_variables): + input_shape = positive_definite.compute_input_shape((batch_size, num_variables, num_variables)) - activation = PositiveSemiDefinite() + # Too strongly negative values lead to numerical instabilities -> reduce scale + random_preactivation = keras.random.normal(input_shape) * 0.1 + output = positive_definite(random_preactivation) + output = keras.ops.convert_to_numpy(output) - output = activation(random_matrix_batch) + # Check if output is invertible + np.linalg.inv(output) - output = keras.ops.convert_to_numpy(output) + # Calculated eigenvalues to test for positive definiteness eigenvalues = np.linalg.eig(output).eigenvalues assert np.all(eigenvalues.real > 0) and np.all(np.isclose(eigenvalues.imag, 0)), ( - f"output is not positive semi-definite: real={eigenvalues.real}, imag={eigenvalues.imag}" + f"output is not positive definite: min(real)={np.min(eigenvalues.real)}, " + f"max(abs(imag))={np.max(np.abs(eigenvalues.imag))}" ) diff --git a/tests/test_scores/test_scores.py b/tests/test_scores/test_scores.py index 24765688a..4e44c2ef7 100644 --- a/tests/test_scores/test_scores.py +++ b/tests/test_scores/test_scores.py @@ -13,15 +13,21 @@ def test_require_argument_k(): def test_score_output(scoring_rule, random_conditions): if random_conditions is None: - random_conditions = keras.ops.convert_to_tensor([[1.0]]) + random_conditions = keras.ops.convert_to_tensor([[1.0, 1.0]]) # Using random random_conditions also as targets for the purpose of this test. head_shapes = scoring_rule.get_head_shapes_from_target_shape(random_conditions.shape) print(scoring_rule.get_config()) - estimates = { - k: scoring_rule.get_link(k)(keras.random.normal((random_conditions.shape[0],) + head_shape)) - for k, head_shape in head_shapes.items() - } + estimates = {} + for key, output_shape in head_shapes.items(): + link = scoring_rule.get_link(key) + if hasattr(link, "compute_input_shape"): + link_input_shape = link.compute_input_shape(output_shape) + else: + link_input_shape = output_shape + dummy_input = keras.random.normal((random_conditions.shape[0],) + link_input_shape) + estimates[key] = link(dummy_input) + score = scoring_rule.score(estimates, random_conditions) assert score.ndim == 0 @@ -40,3 +46,17 @@ def test_mean_score_optimality(mean_score, random_conditions): assert suboptimal_score > optimal_score assert keras.ops.isclose(optimal_score, 0) + + +def test_unconditional_mvn(multivariate_normal_score): + mean = keras.ops.convert_to_tensor([[0.0, 1.0]]) + covariance = keras.ops.convert_to_tensor([[[1.0, 0.0], [0.0, 1.0]]]) + multivariate_normal_score.sample((10,), mean, covariance) + + +def test_unconditional_mvn_value_error(multivariate_normal_score): + mean = keras.ops.convert_to_tensor([0.0, 1.0]) + covariance = keras.ops.convert_to_tensor([[1.0, 0.0], [0.0, 1.0]]) + + with pytest.raises(ValueError): + multivariate_normal_score.sample((10,), mean, covariance)