|
| 1 | +import keras |
| 2 | +from keras import ops |
| 3 | +from keras.saving import register_keras_serializable as serializable |
| 4 | + |
| 5 | +from bayesflow.types import Tensor |
| 6 | +from bayesflow.utils import find_network, keras_kwargs, concatenate, log_jacobian_determinant, jvp, vjp |
| 7 | + |
| 8 | +from ..inference_network import InferenceNetwork |
| 9 | + |
| 10 | + |
| 11 | +@serializable(package="networks.free_form_flow") |
| 12 | +class FreeFormFlow(InferenceNetwork): |
| 13 | + """Implements a dimensionality-preserving Free-form Flow. |
| 14 | + Incorporates ideas from [1-2]. |
| 15 | +
|
| 16 | + [1] Draxler, F., Sorrenson, P., Zimmermann, L., Rousselot, A., & Köthe, U. (2024).F |
| 17 | + ree-form flows: Make Any Architecture a Normalizing Flow. |
| 18 | + In International Conference on Artificial Intelligence and Statistics. |
| 19 | +
|
| 20 | + [2] Sorrenson, P., Draxler, F., Rousselot, A., Hummerich, S., Zimmermann, L., & |
| 21 | + Köthe, U. (2024). Lifting Architectural Constraints of Injective Flows. |
| 22 | + In International Conference on Learning Representations. |
| 23 | + """ |
| 24 | + |
| 25 | + def __init__( |
| 26 | + self, |
| 27 | + beta: float = 50.0, |
| 28 | + encoder_subnet: str | type = "mlp", |
| 29 | + decoder_subnet: str | type = "mlp", |
| 30 | + base_distribution: str = "normal", |
| 31 | + hutchinson_sampling: str = "qr", |
| 32 | + **kwargs, |
| 33 | + ): |
| 34 | + """Creates an instance of a Free-form Flow. |
| 35 | +
|
| 36 | + Parameters: |
| 37 | + ----------- |
| 38 | + beta : float, optional, default: 50.0 |
| 39 | + encoder_subnet : str or type, optional, default: "mlp" |
| 40 | + A neural network type for the flow, will be instantiated using |
| 41 | + encoder_subnet_kwargs. Will be equipped with a projector to ensure |
| 42 | + the correct output dimension and a global skip connection. |
| 43 | + decoder_subnet : str or type, optional, default: "mlp" |
| 44 | + A neural network type for the flow, will be instantiated using |
| 45 | + decoder_subnet_kwargs. Will be equipped with a projector to ensure |
| 46 | + the correct output dimension and a global skip connection. |
| 47 | + base_distribution : str, optional, default: "normal" |
| 48 | + The latent distribution |
| 49 | + hutchinson_sampling : str, optional, default: "qr |
| 50 | + One of `["sphere", "qr"]`. Select the sampling scheme for the |
| 51 | + vectors of the Hutchinson trace estimator. |
| 52 | + **kwargs : dict, optional, default: {} |
| 53 | + Additional keyword arguments |
| 54 | + """ |
| 55 | + super().__init__(base_distribution=base_distribution, **keras_kwargs(kwargs)) |
| 56 | + self.encoder_subnet = find_network(encoder_subnet, **kwargs.get("encoder_subnet_kwargs", {})) |
| 57 | + self.encoder_projector = keras.layers.Dense(units=None, bias_initializer="zeros", kernel_initializer="zeros") |
| 58 | + self.decoder_subnet = find_network(decoder_subnet, **kwargs.get("decoder_subnet_kwargs", {})) |
| 59 | + self.decoder_projector = keras.layers.Dense(units=None, bias_initializer="zeros", kernel_initializer="zeros") |
| 60 | + |
| 61 | + self.hutchinson_sampling = hutchinson_sampling |
| 62 | + self.beta = beta |
| 63 | + |
| 64 | + self.seed_generator = keras.random.SeedGenerator() |
| 65 | + |
| 66 | + # noinspection PyMethodOverriding |
| 67 | + def build(self, xz_shape, conditions_shape=None): |
| 68 | + super().build(xz_shape) |
| 69 | + self.encoder_projector.units = xz_shape[-1] |
| 70 | + self.decoder_projector.units = xz_shape[-1] |
| 71 | + |
| 72 | + # construct input shape for subnet and subnet projector |
| 73 | + input_shape = list(xz_shape) |
| 74 | + |
| 75 | + if conditions_shape is not None: |
| 76 | + input_shape[-1] += conditions_shape[-1] |
| 77 | + |
| 78 | + input_shape = tuple(input_shape) |
| 79 | + |
| 80 | + self.encoder_subnet.build(input_shape) |
| 81 | + self.decoder_subnet.build(input_shape) |
| 82 | + |
| 83 | + input_shape = self.encoder_subnet.compute_output_shape(input_shape) |
| 84 | + self.encoder_projector.build(input_shape) |
| 85 | + |
| 86 | + input_shape = self.decoder_subnet.compute_output_shape(input_shape) |
| 87 | + self.decoder_projector.build(input_shape) |
| 88 | + |
| 89 | + def _forward( |
| 90 | + self, x: Tensor, conditions: Tensor = None, density: bool = False, training: bool = False, **kwargs |
| 91 | + ) -> Tensor | tuple[Tensor, Tensor]: |
| 92 | + if density: |
| 93 | + if conditions is None: |
| 94 | + # None cannot be batched, so supply as keyword argument |
| 95 | + z, log_det = log_jacobian_determinant(x, self.encode, conditions=None, training=training, **kwargs) |
| 96 | + else: |
| 97 | + # conditions should be batched, supply as positional argument |
| 98 | + z, log_det = log_jacobian_determinant(x, self.encode, conditions, training=training, **kwargs) |
| 99 | + |
| 100 | + log_density = self.base_distribution.log_prob(z) + log_det |
| 101 | + return z, log_density |
| 102 | + |
| 103 | + z = self.encode(x, conditions, training=training, **kwargs) |
| 104 | + return z |
| 105 | + |
| 106 | + def _inverse( |
| 107 | + self, z: Tensor, conditions: Tensor = None, density: bool = False, training: bool = False, **kwargs |
| 108 | + ) -> Tensor | tuple[Tensor, Tensor]: |
| 109 | + if density: |
| 110 | + if conditions is None: |
| 111 | + # None cannot be batched, so supply as keyword argument |
| 112 | + x, log_det = log_jacobian_determinant(z, self.decode, conditions=None, training=training, **kwargs) |
| 113 | + else: |
| 114 | + # conditions should be batched, supply as positional argument |
| 115 | + x, log_det = log_jacobian_determinant(z, self.decode, conditions, training=training, **kwargs) |
| 116 | + log_density = self.base_distribution.log_prob(z) - log_det |
| 117 | + return x, log_density |
| 118 | + |
| 119 | + x = self.decode(z, conditions, training=training, **kwargs) |
| 120 | + return x |
| 121 | + |
| 122 | + def encode(self, x: Tensor, conditions: Tensor = None, training: bool = False, **kwargs) -> Tensor: |
| 123 | + if conditions is None: |
| 124 | + inp = x |
| 125 | + else: |
| 126 | + inp = concatenate(x, conditions, axis=-1) |
| 127 | + network_out = self.encoder_projector( |
| 128 | + self.encoder_subnet(inp, training=training, **kwargs), training=training, **kwargs |
| 129 | + ) |
| 130 | + return network_out + x |
| 131 | + |
| 132 | + def decode(self, z: Tensor, conditions: Tensor = None, training: bool = False, **kwargs) -> Tensor: |
| 133 | + if conditions is None: |
| 134 | + inp = z |
| 135 | + else: |
| 136 | + inp = concatenate(z, conditions, axis=-1) |
| 137 | + network_out = self.decoder_projector( |
| 138 | + self.decoder_subnet(inp, training=training, **kwargs), training=training, **kwargs |
| 139 | + ) |
| 140 | + return network_out + z |
| 141 | + |
| 142 | + def _sample_v(self, x): |
| 143 | + batch_size = ops.shape(x)[0] |
| 144 | + total_dim = ops.shape(x)[-1] |
| 145 | + match self.hutchinson_sampling: |
| 146 | + case "qr": |
| 147 | + # Use QR decomposition as described in [2] |
| 148 | + v_raw = keras.random.normal((batch_size, total_dim, 1), dtype=ops.dtype(x), seed=self.seed_generator) |
| 149 | + q = ops.reshape(ops.qr(v_raw)[0], ops.shape(x)) |
| 150 | + v = q * ops.sqrt(total_dim) |
| 151 | + case "sphere": |
| 152 | + # Sample from sphere with radius sqrt(total_dim), as implemented in [1] |
| 153 | + v_raw = keras.random.normal((batch_size, total_dim), dtype=ops.dtype(x), seed=self.seed_generator) |
| 154 | + v = v_raw * ops.sqrt(total_dim) / ops.sqrt(ops.sum(v_raw**2, axis=-1, keepdims=True)) |
| 155 | + case _: |
| 156 | + raise ValueError(f"{self.hutchinson_sampling} is not a valid value for hutchinson_sampling.") |
| 157 | + return v |
| 158 | + |
| 159 | + def compute_metrics(self, x: Tensor, conditions: Tensor = None, stage: str = "training") -> dict[str, Tensor]: |
| 160 | + base_metrics = super().compute_metrics(x, conditions=conditions, stage=stage) |
| 161 | + # sample random vector |
| 162 | + v = self._sample_v(x) |
| 163 | + |
| 164 | + def encode(x): |
| 165 | + return self.encode(x, conditions, training=stage == "training") |
| 166 | + |
| 167 | + def decode(z): |
| 168 | + return self.decode(z, conditions, training=stage == "training") |
| 169 | + |
| 170 | + # VJP computation |
| 171 | + z, vjp_fn = vjp(encode, x) |
| 172 | + v1 = vjp_fn(v)[0] |
| 173 | + # JVP computation |
| 174 | + x_pred, v2 = jvp(decode, (z,), (v,)) |
| 175 | + |
| 176 | + # equivalent: surrogate = ops.matmul(ops.stop_gradient(v2[:, None]), v1[:, :, None])[:, 0, 0] |
| 177 | + surrogate = ops.sum((ops.stop_gradient(v2) * v1), axis=-1) |
| 178 | + nll = -self.base_distribution.log_prob(z) |
| 179 | + maximum_likelihood_loss = nll - surrogate |
| 180 | + reconstruction_loss = ops.sum((x - x_pred) ** 2, axis=-1) |
| 181 | + loss = ops.mean(maximum_likelihood_loss + self.beta * reconstruction_loss) |
| 182 | + |
| 183 | + return base_metrics | {"loss": loss} |
0 commit comments