Skip to content
3 changes: 2 additions & 1 deletion pysensors/optimizers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ._ccqr import CCQR, qr_reflector
from ._gqr import GQR
from ._qr import QR
from ._tpgr import TPGR

__all__ = ["CCQR", "QR", "GQR"]
__all__ = ["CCQR", "QR", "GQR", "TPGR"]
92 changes: 92 additions & 0 deletions pysensors/optimizers/_tpgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import warnings

import numpy as np
from sklearn.base import BaseEstimator
from sklearn.utils.validation import check_is_fitted


class TPGR(BaseEstimator):
"""
2-Point Greedy Algorithm for Sensor Selection.
See the following reference for more information
Klishin, Andrei A., et. al.
Data-Induced Interactions of Sparse Sensors. 2023.
arXiv:2307.11838 [cond-mat.stat-mech]
"""

def __init__(self, n_sensors=None, noise=None, prior="decreasing"):
self.n_sensors = n_sensors
self.noise = noise
self.sensors_ = None
self.prior = prior

def fit(self, basis_matrix, singular_values=None):
if isinstance(self.prior, str) and self.prior == "decreasing":
computed_prior = singular_values
elif isinstance(self.prior, np.ndarray):
if self.prior.ndim != 1:
raise ValueError("prior must be a 1D array")
if self.prior.shape[0] != basis_matrix.shape[1]:
raise ValueError(
f"prior must be of shape {(basis_matrix.shape[1],)},"
f" but got {self.prior.shape[0]}"
)
computed_prior = self.prior
if self.noise is None:
warnings.warn(
"noise is None. noise will be set to the " "average of the prior"
)
self.noise = computed_prior.mean()
G = basis_matrix @ np.diag(computed_prior)
self.G = G
n = G.shape[0]
mask = np.ones(n, dtype=bool)
one_pt_energies = self._one_pt_energy(G)
i = np.argmin(one_pt_energies)
self.sensors_ = [i]
mask[i] = False
G_selected = G[[i], :]
while G_selected.shape[0] < self.n_sensors:
G_remaining = G[mask]
q = np.argmin(
self._one_pt_energy(G_remaining)
+ 2 * self._two_pt_energy(G_selected, G_remaining)
)
remaining_indices = np.where(mask)[0]
selected_index = remaining_indices[q]
self.sensors_.append(selected_index)
mask[selected_index] = False
G_selected = np.vstack(
(G_selected, G[selected_index : selected_index + 1, :])
)
return self

def _one_pt_energy(self, G):
"""
Compute the 1-pt energy
"""
return -np.log(1 + np.einsum("ij,ij->i", G, G) / self.noise**2)

def _two_pt_energy(self, G_selected, G_remaining):
"""
Compute the 2-pt energy
"""
J = 0.5 * np.sum(
((G_remaining @ G_selected.T) ** 2)
/ (
np.outer(
1 + (np.sum(G_remaining**2, axis=1)) / self.noise**2,
1 + (np.sum(G_selected**2, axis=1)) / self.noise**2,
)
* self.noise**4
),
axis=1,
)
return J

def get_sensors(self):
check_is_fitted(self, "sensors_")
return self.sensors_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to merge

212 changes: 197 additions & 15 deletions pysensors/reconstruction/_sspor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from sklearn.utils.validation import check_is_fitted

from ..basis import Identity
from ..optimizers import CCQR, QR
from ..optimizers import CCQR, QR, TPGR
from ..utils import validate_input

INT_DTYPES = (int, np.int64, np.int32, np.int16, np.int8)
Expand Down Expand Up @@ -150,11 +150,19 @@ def fit(self, x, quiet=False, prefit_basis=False, seed=None, **optimizer_kws):
# Check that n_sensors doesn't exceed dimension of basis vectors and
# that it doesn't exceed the number of samples when using the CCQR optimizer.
self._validate_n_sensors()

# Calculate the singular values
X_proj = x @ self.basis_matrix_
# Normalized singular values
self.singular_values = np.linalg.norm(X_proj, axis=0) / np.sqrt(x.shape[0])
# Find sparse sensor locations
self.ranked_sensors_ = self.optimizer.fit(
self.basis_matrix_, **optimizer_kws
).get_sensors()
if isinstance(self.optimizer, TPGR):
self.ranked_sensors_ = self.optimizer.fit(
self.basis_matrix_, self.singular_values, **optimizer_kws
).get_sensors()
else:
self.ranked_sensors_ = self.optimizer.fit(
self.basis_matrix_, **optimizer_kws
).get_sensors()

# Randomly shuffle sensors after self.basis.n_basis_modes
rng = np.random.default_rng(seed)
Expand All @@ -165,7 +173,7 @@ def fit(self, x, quiet=False, prefit_basis=False, seed=None, **optimizer_kws):

return self

def predict(self, x, **solve_kws):
def predict(self, x, method=None, noise=None, prior="decreasing", **solve_kws):
"""
Predict values at all positions given measurements at sensor locations.

Expand Down Expand Up @@ -199,16 +207,72 @@ def predict(self, x, **solve_kws):
"n_sensors exceeds dimension of basis modes. Performance may be poor"
)

# Square matrix
if self.n_sensors == self.basis_matrix_.shape[1]:
return self._square_predict(
x, self.ranked_sensors_[: self.n_sensors], **solve_kws
)
# Rectangular matrix
if method is None:
if isinstance(prior, str) and prior == "decreasing":
computed_prior = self.singular_values
elif isinstance(prior, np.ndarray):
if prior.ndim != 1:
raise ValueError("prior must be a 1D array")
if prior.shape[0] != self.basis_matrix_.shape[1]:
raise ValueError(
f"prior must be of shape {(self.basis_matrix_.shape[1],)},"
f" but got {prior.shape}"
)
computed_prior = prior
if noise is None:
warnings.warn(
"noise is None. noise will be set to the "
"average of the normalized prior"
)
noise = computed_prior.mean()
return self._regularized_reconstruction(x, computed_prior, noise)
elif method == "unregularized":
# Square matrix
if self.n_sensors == self.basis_matrix_.shape[1]:
return self._square_predict(
x, self.ranked_sensors_[: self.n_sensors], **solve_kws
)
# Rectangular matrix
else:
return self._rectangular_predict(
x, self.ranked_sensors_[: self.n_sensors], **solve_kws
)
else:
return self._rectangular_predict(
x, self.ranked_sensors_[: self.n_sensors], **solve_kws
)
raise NotImplementedError("Method not implemented")

def _regularized_reconstruction(self, x, prior, noise):
"""
Reconstruct the state using regularized reconstruction

See the following reference for more information

Klishin, Andrei A., et. al.
Data-Induced Interactions of Sparse Sensors. 2023.
arXiv:2307.11838 [cond-mat.stat-mech]

x: numpy array, shape (n_features, n_sensors)
Measurements

prior: numpy array (n_basis_modes,)
Prior variance

noise: float (default None)
Magnitude of the gaussian uncorrelated sensor measurement noise
"""
if noise is None:
noise = 1
if not isinstance(prior, np.ndarray):
raise ValueError("prior must be a numpy array")
prior_cov = 1 / (prior**2)
low_rank_selection_matrix = self.basis_matrix_[self.selected_sensors, :]
composite_matrix = np.diag(prior_cov) + (
low_rank_selection_matrix.T @ low_rank_selection_matrix
) / (noise**2)
rhs = low_rank_selection_matrix.T @ x
reconstructed_state = self.basis_matrix_ @ np.linalg.solve(
composite_matrix, rhs / noise**2
)
return reconstructed_state.T

def _square_predict(self, x, sensors, **solve_kws):
"""Get prediction when the problem is square."""
Expand Down Expand Up @@ -511,3 +575,121 @@ def _validate_n_sensors(self):
"Number of sensors exceeds number of samples, which may cause CCQR to "
"select sensors in constrained regions."
)

def std(self, prior, noise=None):
"""
Compute standard deviation of noise in each pixel of the reconstructed state.

See the following reference for more information

Klishin, Andrei A., et. al.
Data-Induced Interactions of Sparse Sensors. 2023.
arXiv:2307.11838 [cond-mat.stat-mech]

Parameters
----------
prior: np.ndarray (n_basis_modes,)
Prior Covariance Vector, typically a scaled identity vector or a vector
containing normalized sigular values.

noise: float (default None)
Magnitude of the gaussian uncorrelated sensor measurement noise.

Returns
-------
sigma: numpy array, shape (n_features,)
Level of uncertainty of each pixel of the reconstructed state

"""
check_is_fitted(self, "basis_matrix_")
if noise is None:
noise = 1
if noise <= 0:
raise ValueError("Noise must be positive")
if isinstance(prior, str) and prior == "decreasing":
computed_prior = self.singular_values
elif isinstance(prior, np.ndarray):
if prior.ndim != 1:
raise ValueError("prior must be a 1D array")
if prior.shape[0] != self.basis_matrix_.shape[1]:
raise ValueError(
f"prior must be of shape {(self.basis_matrix_.shape[1],)},"
f" but got {prior.shape}"
)
computed_prior = prior
sq_inv_prior = 1.0 / (computed_prior**2)
low_rank_selection_matrix = self.basis_matrix_[self.selected_sensors, :]
composite_matrix = np.diag(sq_inv_prior) + (
low_rank_selection_matrix.T @ low_rank_selection_matrix
) / (noise**2)
diag_cov_matrix = (
self.basis_matrix_
@ np.linalg.inv(composite_matrix)
@ low_rank_selection_matrix.T
/ (noise**2)
)
sigma = noise * np.sqrt(np.sum(diag_cov_matrix**2, axis=1))
return sigma

def one_pt_energy_landscape(self, prior="decreasing", noise=None):
check_is_fitted(self, "optimizer")
if isinstance(prior, str) and prior == "decreasing":
computed_prior = self.singular_values
elif isinstance(prior, np.ndarray):
if prior.ndim != 1:
raise ValueError("prior must be a 1D array")
if prior.shape[0] != self.basis_matrix_.shape[1]:
raise ValueError(
f"prior must be of shape {(self.basis_matrix_.shape[1],)},"
f" but got {prior.shape}"
)
computed_prior = prior
if noise is None:
warnings.warn(
"noise is None. noise will be set to the "
"average of the normalized prior"
)
noise = computed_prior.mean()
G = self.basis_matrix_ @ np.diag(computed_prior)
return -np.log(1 + np.einsum("ij,ij->i", G, G) / noise**2)

def two_pt_energy_landscape(self, selected_sensors, prior="decreasing", noise=None):
check_is_fitted(self, "optimizer")
if isinstance(prior, str) and prior == "decreasing":
computed_prior = self.singular_values
elif isinstance(prior, np.ndarray):
if prior.ndim != 1:
raise ValueError("prior must be a 1D array")
if prior.shape[0] != self.basis_matrix_.shape[1]:
raise ValueError(
f"prior must be of shape {(self.basis_matrix_.shape[1],)},"
f" but got {prior.shape}"
)
computed_prior = prior
if noise is None:
warnings.warn(
"noise is None. noise will be set to the "
"average of the normalized prior"
)
noise = computed_prior.mean()
G = self.basis_matrix_ @ np.diag(computed_prior)
mask = np.ones(G.shape[0], dtype=bool)
mask[selected_sensors] = False
G_selected = G[selected_sensors, :]
if len(selected_sensors) == 1:
G_selected.reshape(-1, 1)
G_remaining = G[mask, :]
J = 0.5 * np.sum(
((G_remaining @ G_selected.T) ** 2)
/ (
np.outer(
1 + (np.sum(G_remaining**2, axis=1)) / noise**2,
1 + (np.sum(G_selected**2, axis=1)) / noise**2,
)
* noise**4
),
axis=1,
)
J_full = np.full(G.shape[0], np.nan)
J_full[mask] = J
return J_full
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to merge

Loading
Loading