diff --git a/docs/src/references/decomposition.rst b/docs/src/references/decomposition.rst index 0ee5caf9c..00640bb3c 100644 --- a/docs/src/references/decomposition.rst +++ b/docs/src/references/decomposition.rst @@ -54,3 +54,19 @@ Kernel PCovR .. automethod:: predict .. automethod:: inverse_transform .. automethod:: score + +.. _KPCovC-api: + +Kernel PCovC +------------ + +.. autoclass:: skmatter.decomposition.KernelPCovC + :show-inheritance: + :special-members: + + .. automethod:: fit + .. automethod:: transform + .. automethod:: predict + .. automethod:: inverse_transform + .. automethod:: decision_function + .. automethod:: score \ No newline at end of file diff --git a/src/skmatter/decomposition/__init__.py b/src/skmatter/decomposition/__init__.py index 4fbb6d92c..ddea1975c 100644 --- a/src/skmatter/decomposition/__init__.py +++ b/src/skmatter/decomposition/__init__.py @@ -33,15 +33,20 @@ """ from ._pcov import _BasePCov +from ._kpcov import _BaseKPCov from ._pcovr import PCovR from ._pcovc import PCovC from ._kernel_pcovr import KernelPCovR +from ._kernel_pcovc import KernelPCovC + __all__ = [ "_BasePCov", + "_BaseKPCov", "PCovR", "PCovC", "KernelPCovR", + "KernelPCovC", ] diff --git a/src/skmatter/decomposition/_kernel_pcovc.py b/src/skmatter/decomposition/_kernel_pcovc.py new file mode 100644 index 000000000..1437d2161 --- /dev/null +++ b/src/skmatter/decomposition/_kernel_pcovc.py @@ -0,0 +1,402 @@ +import numpy as np +from sklearn.calibration import LinearSVC, check_classification_targets + + +from sklearn.svm import SVC +from sklearn.utils import ( + check_array, +) +from sklearn.utils.multiclass import check_classification_targets, type_of_target +from sklearn.svm import LinearSVC + +from sklearn.utils import check_array +from sklearn.utils.validation import check_is_fitted, validate_data +from sklearn.linear_model._base import LinearClassifierMixin +from sklearn.utils.multiclass import check_classification_targets, type_of_target + +from skmatter.utils._pcovc_utils import check_svc_fit +from skmatter.decomposition import _BaseKPCov +from sklearn.metrics.pairwise import pairwise_kernels +from skmatter.preprocessing import KernelNormalizer + +import scipy.sparse as sp + + +class KernelPCovC(LinearClassifierMixin, _BaseKPCov): + r"""Kernel Principal Covariates Classification is a modification on the Principal + Covariates Classification proposed in [Jorgensen2025]_. It determines a latent-space + projection :math:`\mathbf{T}` which minimizes a combined loss in supervised and unsupervised + tasks in the reproducing kernel Hilbert space (RKHS). + + This projection is determined by the eigendecomposition of a modified covariance matrix + :math:`\mathbf{\tilde{K}}` + + .. math:: + \mathbf{\tilde{K}} = \alpha \mathbf{K} + + (1 - \alpha) \mathbf{Z}\mathbf{Z}^T + + where :math:`\alpha` is a mixing parameter, + :math:`\mathbf{K}` is the input kernel of shape :math:`(n_{samples}, n_{samples})` + and :math:`\mathbf{Z}` is a matrix of class confidence scores of shape + :math:`(n_{samples}, n_{classes})` + + Parameters + ---------- + mixing : float, default=0.5 + mixing parameter, as described in PCovC as :math:`{\alpha}` + + n_components : int, float or str, default=None + Number of components to keep. + if n_components is not set all components are kept:: + + n_components == n_samples + + svd_solver : {'auto', 'full', 'arpack', 'randomized'}, default='auto' + If auto : + The solver is selected by a default policy based on `X.shape` and + `n_components`: if the input data is larger than 500x500 and the + number of components to extract is lower than 80% of the smallest + dimension of the data, then the more efficient 'randomized' + method is enabled. Otherwise the exact full SVD is computed and + optionally truncated afterwards. + If full : + run exact full SVD calling the standard LAPACK solver via + `scipy.linalg.svd` and select the components by postprocessing + If arpack : + run SVD truncated to n_components calling ARPACK solver via + `scipy.sparse.linalg.svds`. It requires strictly + 0 < n_components < min(X.shape) + If randomized : + run randomized SVD by the method of Halko et al. + + classifier: {`LogisticRegression`, `LogisticRegressionCV`, `LinearSVC`, `LinearDiscriminantAnalysis`, + `RidgeClassifier`, `RidgeClassifierCV`, `SGDClassifier`, `Perceptron`, `precomputed`}, default=None + The classifier to use for computing + the evidence :math:`{\mathbf{Z}}`. + A pre-fitted classifier may be provided. + + If None, ``sklearn.linear_model.LogisticRegression()`` + is used as the classifier. + + kernel : {"linear", "poly", "rbf", "sigmoid", "cosine", "precomputed"}, default="linear + Kernel. + + gamma : {'scale', 'auto'} or float, default=None + Kernel coefficient for rbf, poly and sigmoid kernels. Ignored by other + kernels. + + degree : int, default=3 + Degree for poly kernels. Ignored by other kernels. + + coef0 : float, default=1 + Independent term in poly and sigmoid kernels. + Ignored by other kernels. + + kernel_params : mapping of str to any, default=None + Parameters (keyword arguments) and values for kernel passed as + callable object. Ignored by other kernels. + + center : bool, default=False + Whether to center any computed kernels + + fit_inverse_transform : bool, default=False + Learn the inverse transform for non-precomputed kernels. + (i.e. learn to find the pre-image of a point) + + tol : float, default=1e-12 + Tolerance for singular values computed by svd_solver == 'arpack' + and for matrix inversions. + Must be of range [0.0, infinity). + + n_jobs : int, default=None + The number of parallel jobs to run. + :obj:`None` means 1 unless in a :obj:`joblib.parallel_backend` context. + ``-1`` means using all processors. + + iterated_power : int or 'auto', default='auto' + Number of iterations for the power method computed by + svd_solver == 'randomized'. + Must be of range [0, infinity). + + random_state : int, :class:`numpy.random.RandomState` instance or None, default=None + Used when the 'arpack' or 'randomized' solvers are used. Pass an int + for reproducible results across multiple function calls. + + Attributes + ---------- + classifier : estimator object + The linear classifier passed for fitting. If pre-fitted, it is assummed + to be fit on a precomputed kernel K and Y. + + z_classifier_ : estimator object + The linear classifier fit between the computed kernel K and Y. + + classifier_ : estimator object + The linear classifier fit between T and Y. + + pt__: numpy.darray of size :math:`({n_{components}, n_{components}})` + pseudo-inverse of the latent-space projection, which + can be used to contruct projectors from latent-space + + pkt_: numpy.ndarray of size :math:`({n_{samples}, n_{components}})` + the projector, or weights, from the input kernel :math:`\mathbf{K}` + to the latent-space projection :math:`\mathbf{T}` + + pkz_: numpy.ndarray of size :math:`({n_{samples}, })` or :math:`({n_{samples}, n_{classes}})` + the projector, or weights, from the input kernel :math:`\mathbf{K}` + to the class confidence scores :math:`\mathbf{Z}` + + ptz_: numpy.ndarray of size :math:`({n_{components}, })` or :math:`({n_{components}, n_{classes}})` + the projector, or weights, from the latent-space projection + :math:`\mathbf{T}` to the class confidence scores :math:`\mathbf{Z}` + + ptx_: numpy.ndarray of size :math:`({n_{components}, n_{features}})` + the projector, or weights, from the latent-space projection + :math:`\mathbf{T}` to the feature matrix :math:`\mathbf{X}` + + X_fit_: numpy.ndarray of shape (n_samples, n_features) + The data used to fit the model. This attribute is used to build kernels + from new data. + + Examples + -------- + >>> import numpy as np + >>> from skmatter.decomposition import KernelPCovC + >>> from sklearn.preprocessing import StandardScaler + >>> X = np.array([[-2, 3, -1, 0], [2, 0, -3, 1], [3, 0, -1, 3], [2, -2, 1, 0]]) + >>> X = scaler.fit_transform(X) + >>> Y = np.array([[2], [0], [1], [2]]) + >>> kpcovc = KernelPCovC( + ... mixing=0.1, + ... n_components=2, + ... kernel="rbf", + ... gamma=1, + ... ) + >>> kpcovc.fit(X, Y) + KernelPCovC(classifier=SVC(gamma=1), gamma=1, mixing=0.25, n_components=2) + >>> kpcovc.transform(X) + array([[ 1.91713954, 2.52318389] + [ 2.95581573, 0.78491499] + [ 3.00977646, -1.1252421 ] + [ 2.45390525, -1.5365844 ]]) + >>> kpcovc.predict(X) + array([2, 1, 3, 0]) + >>> kpcovc.score(X, Y) + 1.0 + """ # NoQa: E501 + + def __init__( + self, + mixing=0.5, + n_components=None, + svd_solver="auto", + classifier=None, + kernel="rbf", + gamma="scale", + degree=3, + coef0=0.0, + kernel_params=None, + center=False, + fit_inverse_transform=False, + tol=1e-12, + n_jobs=None, + iterated_power="auto", + random_state=None, + ): + super().__init__( + mixing=mixing, + n_components=n_components, + svd_solver=svd_solver, + tol=tol, + iterated_power=iterated_power, + random_state=random_state, + center=center, + kernel=kernel, + gamma=gamma, + degree=degree, + coef0=coef0, + kernel_params=kernel_params, + n_jobs=n_jobs, + fit_inverse_transform=fit_inverse_transform, + ) + + self.classifier = classifier + + def fit(self, X, Y): + X, Y = validate_data(self, X, Y, y_numeric=False) + check_classification_targets(Y) + self.classes_ = np.unique(Y) + + # from BaseSVC - we only do this once since we don't want to recompute gamma for + # each _get_kernel call (this would then fail check_methods_subset_invariance) + sparse = sp.issparse(X) + if self.gamma == "scale": + X_var = (X.multiply(X)).mean() - (X.mean()) ** 2 if sparse else X.var() + self.gamma_ = 1.0 / (X.shape[1] * X_var) if X_var != 0 else 1.0 + elif self.gamma == "auto": + self.gamma_ = 1.0 / X.shape[1] + else: + self.gamma_ = self.gamma + + super().fit(X) + + K = super()._get_kernel(X) + + if self.center: + self.centerer_ = KernelNormalizer() + K = self.centerer_.fit_transform(K) + + if self.classifier and not isinstance( + self.classifier, + SVC, + ): + raise ValueError("Classifier must be an instance of `SVC`") + + if self.classifier is None: + classifier = SVC( + kernel=self.kernel, + gamma=self.gamma, + degree=self.degree, + coef0=self.coef0, + ) + else: + classifier = self.classifier + kernel_attrs = ["kernel", "gamma", "degree", "coef0"] + if not all( + [ + getattr(self, attr) == getattr(classifier, attr) + for attr in kernel_attrs + ] + ): + raise ValueError( + "Kernel parameter mismatch: the classifier has kernel " + "parameters {%s} and KernelPCovC was initialized with kernel " + "parameters {%s}" + % ( + ", ".join( + [ + "%s: %r" % (attr, getattr(classifier, attr)) + for attr in kernel_attrs + ] + ), + ", ".join( + [ + "%s: %r" % (attr, getattr(self, attr)) + for attr in kernel_attrs + ] + ), + ) + ) + if classifier.decision_function_shape != "ovr": + raise ValueError( + f"Classifier must have parameter `decision_function_shape` set to 'ovr' " + f"but was initialized with '{classifier.decision_function_shape}'" + ) + + # Check if classifier is fitted; if not, fit with precomputed K + # to avoid needing to compute the kernel a second time + self.z_classifier_ = check_svc_fit(classifier, K, X, Y) + + # if we have fit the classifier on a precomputed K, we obtain Z + # from K, otherwise obtain it from X + if self.z_classifier_.kernel == "precomputed": + Z = self.z_classifier_.decision_function(K) + else: + Z = self.z_classifier_.decision_function(X) + + print(f"KPCovC Z: {Z[:5]}") + super()._fit_covariance(K, Z) # gives us T, Pkt, self.pt__ + + if self.fit_inverse_transform: + self.ptx_ = self.pt__ @ X + + self.classifier_ = LinearSVC().fit(K @ self.pkt_, Y) + + self.ptz_ = self.classifier_.coef_.T + self.pkz_ = self.pkt_ @ self.ptz_ + + if len(Y.shape) == 1 and type_of_target(Y) == "binary": + self.pkz_ = self.pkz_.reshape( + K.shape[1], + ) + self.ptz_ = self.ptz_.reshape( + self.n_components_, + ) + + self.components_ = self.pkt_.T # for sklearn compatibility + return self + + def predict(self, X=None, T=None): + """Predicts the property labels using classification on T.""" + check_is_fitted(self, ["pkz_", "ptz_"]) + + if X is None and T is None: + raise ValueError("Either X or T must be supplied.") + + if X is not None: + X = validate_data(self, X, reset=False) + K = self._get_kernel(X, self.X_fit_) + if self.center: + K = self.centerer_.transform(K) + + return self.classifier_.predict(K @ self.pkt_) + else: + return self.classifier_.predict(T) + + def transform(self, X): + """Apply dimensionality reduction to X. + + ``X`` is projected on the first principal components as determined by the + modified Kernel PCovR distances. + + Parameters + ---------- + X : numpy.ndarray, shape (n_samples, n_features) + New data, where n_samples is the number of samples + and n_features is the number of features. + """ + return super().transform(X) + + def inverse_transform(self, T): + r"""Transform input data back to its original space. + + .. math:: + \mathbf{\hat{X}} = \mathbf{T} \mathbf{P}_{TX} + = \mathbf{K} \mathbf{P}_{KT} \mathbf{P}_{TX} + + Similar to KPCA, the original features are not always recoverable, + as the projection is computed from the kernel features, not the original + features, and the mapping between the original and kernel features + is not one-to-one. + + Parameters + ---------- + T : numpy.ndarray, shape (n_samples, n_components) + Projected data, where n_samples is the number of samples and n_components is + the number of components. + + Returns + ------- + X_original : numpy.ndarray, shape (n_samples, n_features) + """ + return super().inverse_transform(T) + + def decision_function(self, X=None, T=None): + """Predicts confidence scores from X or T.""" + check_is_fitted(self, attributes=["pkz_", "ptz_"]) + + if X is None and T is None: + raise ValueError("Either X or T must be supplied.") + + if X is not None: + X = validate_data(self, X, reset=False) + K = self._get_kernel(X, self.X_fit_) + if self.center: + K = self.centerer_.transform(K) + + # Or self.classifier_.decision_function(K @ self.pxt_) + return K @ self.pkz_ + self.classifier_.intercept_ + + else: + T = check_array(T) + return T @ self.ptz_ + self.classifier_.intercept_ diff --git a/src/skmatter/decomposition/_kernel_pcovr.py b/src/skmatter/decomposition/_kernel_pcovr.py index e47e771fb..9634b635f 100644 --- a/src/skmatter/decomposition/_kernel_pcovr.py +++ b/src/skmatter/decomposition/_kernel_pcovr.py @@ -1,25 +1,34 @@ -import numbers +import numpy as np + +from sklearn.exceptions import NotFittedError +from sklearn.kernel_ridge import KernelRidge +from sklearn.utils.validation import _check_n_features, check_is_fitted, validate_data + +from skmatter.utils import check_krr_fit +from skmatter.decomposition import _BaseKPCov import numpy as np +from numpy.linalg import LinAlgError from scipy import linalg +from scipy.linalg import sqrtm as MatrixSqrt from scipy.sparse.linalg import svds from sklearn.decomposition._base import _BasePCA from sklearn.decomposition._pca import _infer_dimension -from sklearn.exceptions import NotFittedError -from sklearn.kernel_ridge import KernelRidge from sklearn.linear_model._base import LinearModel -from sklearn.metrics.pairwise import pairwise_kernels from sklearn.utils import check_random_state from sklearn.utils._arpack import _init_arpack_v0 from sklearn.utils.extmath import randomized_svd, stable_cumsum, svd_flip -from sklearn.utils.validation import _check_n_features, check_is_fitted, validate_data +from sklearn.utils.validation import check_is_fitted + +from skmatter.utils import pcovr_covariance, pcovr_kernel +from sklearn.metrics.pairwise import pairwise_kernels -from ..preprocessing import KernelNormalizer -from ..utils import check_krr_fit, pcovr_kernel +from skmatter.preprocessing import KernelNormalizer +from skmatter.utils import pcovr_kernel -class KernelPCovR(_BasePCA, LinearModel): - r"""Kernel Principal Covariates Regression, as described in [Helfrecht2020]_ +class KernelPCovR(_BaseKPCov): + r"""Kernel Principal Covariates Regression, as described in [Helfrecht2020]_, determines a latent-space projection :math:`\mathbf{T}` which minimizes a combined loss in supervised and unsupervised tasks in the reproducing kernel Hilbert space (RKHS). @@ -76,8 +85,8 @@ class KernelPCovR(_BasePCA, LinearModel): If `precomputed`, we assume that the `y` passed to the `fit` function is the regressed form of the targets :math:`{\mathbf{\hat{Y}}}`. - kernel : "linear" | "poly" | "rbf" | "sigmoid" | "cosine" | "precomputed" - Kernel. Default="linear". + kernel : {"linear", "poly", "rbf", "sigmoid", "cosine", "precomputed"}, default="linear" + Kernel. gamma : float, default=None Kernel coefficient for rbf, poly and sigmoid kernels. Ignored by other @@ -95,7 +104,7 @@ class KernelPCovR(_BasePCA, LinearModel): callable object. Ignored by other kernels. center : bool, default=False - Whether to center any computed kernels + Whether to center any computed kernels fit_inverse_transform : bool, default=False Learn the inverse transform for non-precomputed kernels. @@ -123,24 +132,24 @@ class KernelPCovR(_BasePCA, LinearModel): Attributes ---------- pt__: numpy.darray of size :math:`({n_{components}, n_{components}})` - pseudo-inverse of the latent-space projection, which - can be used to contruct projectors from latent-space + pseudo-inverse of the latent-space projection, which + can be used to contruct projectors from latent-space pkt_: numpy.ndarray of size :math:`({n_{samples}, n_{components}})` - the projector, or weights, from the input kernel :math:`\mathbf{K}` - to the latent-space projection :math:`\mathbf{T}` + the projector, or weights, from the input kernel :math:`\mathbf{K}` + to the latent-space projection :math:`\mathbf{T}` pky_: numpy.ndarray of size :math:`({n_{samples}, n_{properties}})` - the projector, or weights, from the input kernel :math:`\mathbf{K}` - to the properties :math:`\mathbf{Y}` + the projector, or weights, from the input kernel :math:`\mathbf{K}` + to the properties :math:`\mathbf{Y}` pty_: numpy.ndarray of size :math:`({n_{components}, n_{properties}})` - the projector, or weights, from the latent-space projection - :math:`\mathbf{T}` to the properties :math:`\mathbf{Y}` + the projector, or weights, from the latent-space projection + :math:`\mathbf{T}` to the properties :math:`\mathbf{Y}` ptx_: numpy.ndarray of size :math:`({n_{components}, n_{features}})` - the projector, or weights, from the latent-space projection - :math:`\mathbf{T}` to the feature matrix :math:`\mathbf{X}` + the projector, or weights, from the latent-space projection + :math:`\mathbf{T}` to the feature matrix :math:`\mathbf{X}` X_fit_: numpy.ndarray of shape (n_samples, n_features) The data used to fit the model. This attribute is used to build kernels @@ -198,59 +207,23 @@ def __init__( iterated_power="auto", random_state=None, ): - self.mixing = mixing - self.n_components = n_components - - self.svd_solver = svd_solver - self.tol = tol - self.iterated_power = iterated_power - self.random_state = random_state - self.center = center - - self.kernel = kernel - self.gamma = gamma - self.degree = degree - self.coef0 = coef0 - self.kernel_params = kernel_params - - self.n_jobs = n_jobs - - self.fit_inverse_transform = fit_inverse_transform - - self.regressor = regressor - - def _get_kernel(self, X, Y=None): - if callable(self.kernel): - params = self.kernel_params or {} - else: - params = {"gamma": self.gamma, "degree": self.degree, "coef0": self.coef0} - return pairwise_kernels( - X, Y, metric=self.kernel, filter_params=True, n_jobs=self.n_jobs, **params + super().__init__( + mixing=mixing, + n_components=n_components, + svd_solver=svd_solver, + tol=tol, + iterated_power=iterated_power, + random_state=random_state, + center=center, + kernel=kernel, + gamma=gamma, + degree=degree, + coef0=coef0, + kernel_params=kernel_params, + n_jobs=n_jobs, + fit_inverse_transform=fit_inverse_transform, ) - - def _fit(self, K, Yhat, W): - """Fit the model with the computed kernel and approximated properties.""" - K_tilde = pcovr_kernel(mixing=self.mixing, X=K, Y=Yhat, kernel="precomputed") - - if self._fit_svd_solver == "full": - _, S, Vt = self._decompose_full(K_tilde) - elif self._fit_svd_solver in ["arpack", "randomized"]: - _, S, Vt = self._decompose_truncated(K_tilde) - else: - raise ValueError( - "Unrecognized svd_solver='{0}'" "".format(self._fit_svd_solver) - ) - - U = Vt.T - - P = (self.mixing * np.eye(K.shape[0])) + (1.0 - self.mixing) * (W @ Yhat.T) - - S_inv = np.array([1.0 / s if s > self.tol else 0.0 for s in S]) - - self.pkt_ = P @ U @ np.sqrt(np.diagflat(S_inv)) - - T = K @ self.pkt_ - self.pt__ = np.linalg.lstsq(T, np.eye(T.shape[0]), rcond=self.tol)[0] + self.regressor = regressor def fit(self, X, Y, W=None): r"""Fit the model with X and Y. @@ -284,29 +257,20 @@ def fit(self, X, Y, W=None): self: object Returns the instance itself. """ - if self.regressor not in ["precomputed", None] and not isinstance( - self.regressor, KernelRidge - ): - raise ValueError("Regressor must be an instance of `KernelRidge`") - X, Y = validate_data(self, X, Y, y_numeric=True, multi_output=True) - self.X_fit_ = X.copy() - if self.n_components is None: - if self.svd_solver != "arpack": - self.n_components_ = X.shape[0] - else: - self.n_components_ = X.shape[0] - 1 - else: - self.n_components_ = self.n_components + super().fit(X) - K = self._get_kernel(X) + K = super()._get_kernel(X) if self.center: self.centerer_ = KernelNormalizer() K = self.centerer_.fit_transform(K) - self.n_samples_in_, self.n_features_in_ = X.shape + if self.regressor not in ["precomputed", None] and not isinstance( + self.regressor, KernelRidge + ): + raise ValueError("Regressor must be an instance of `KernelRidge`") if self.regressor != "precomputed": if self.regressor is None: @@ -349,12 +313,12 @@ def fit(self, X, Y, W=None): # Check if regressor is fitted; if not, fit with precomputed K # to avoid needing to compute the kernel a second time self.regressor_ = check_krr_fit(regressor, K, X, Y) - W = self.regressor_.dual_coef_.reshape(self.n_samples_in_, -1) # Use this instead of `self.regressor_.predict(K)` # so that we can handle the case of the pre-fitted regressor Yhat = K @ W + # When we have an unfitted regressor, # we fit it with a precomputed K # so we must subsequently "reset" it so that @@ -372,24 +336,7 @@ def fit(self, X, Y, W=None): if W is None: W = np.linalg.lstsq(K, Yhat, self.tol)[0] - # Handle svd_solver - self._fit_svd_solver = self.svd_solver - if self._fit_svd_solver == "auto": - # Small problem or self.n_components_ == 'mle', just call full PCA - if ( - max(self.n_samples_in_, self.n_features_in_) <= 500 - or self.n_components_ == "mle" - ): - self._fit_svd_solver = "full" - elif self.n_components_ >= 1 and self.n_components_ < 0.8 * max( - self.n_samples_in_, self.n_features_in_ - ): - self._fit_svd_solver = "randomized" - # This is also the case of self.n_components_ in (0,1) - else: - self._fit_svd_solver = "full" - - self._fit(K, Yhat, W) + super()._fit_gram(K, Yhat, W) self.ptk_ = self.pt__ @ K self.pty_ = self.pt__ @ Y @@ -425,15 +372,7 @@ def transform(self, X): New data, where n_samples is the number of samples and n_features is the number of features. """ - check_is_fitted(self, ["pkt_", "X_fit_"]) - - X = validate_data(self, X, reset=False) - K = self._get_kernel(X, self.X_fit_) - - if self.center: - K = self.centerer_.transform(K) - - return K @ self.pkt_ + return super().transform(X) def inverse_transform(self, T): r"""Transform input data back to its original space. @@ -457,7 +396,7 @@ def inverse_transform(self, T): ------- X_original : numpy.ndarray, shape (n_samples, n_features) """ - return T @ self.ptx_ + return super().inverse_transform(T) def score(self, X, y): r"""Computes the (negative) loss values for KernelPCovR on the given predictor @@ -519,118 +458,3 @@ def score(self, X, y): Lkpca = np.trace(K_VV - 2 * K_VN @ w + w.T @ K_VV @ w) / np.trace(K_VV) return -sum([Lkpca, Lkrr]) - - def _decompose_truncated(self, mat): - if not 1 <= self.n_components_ <= self.n_samples_in_: - raise ValueError( - "n_components=%r must be between 1 and " - "n_samples=%r with " - "svd_solver='%s'" - % ( - self.n_components_, - self.n_samples_in_, - self.svd_solver, - ) - ) - elif not isinstance(self.n_components_, numbers.Integral): - raise ValueError( - "n_components=%r must be of type int " - "when greater than or equal to 1, was of type=%r" - % (self.n_components_, type(self.n_components_)) - ) - elif self.svd_solver == "arpack" and self.n_components_ == self.n_samples_in_: - raise ValueError( - "n_components=%r must be strictly less than " - "n_samples=%r with " - "svd_solver='%s'" - % ( - self.n_components_, - self.n_samples_in_, - self.svd_solver, - ) - ) - - random_state = check_random_state(self.random_state) - - if self._fit_svd_solver == "arpack": - v0 = _init_arpack_v0(min(mat.shape), random_state) - U, S, Vt = svds(mat, k=self.n_components_, tol=self.tol, v0=v0) - # svds doesn't abide by scipy.linalg.svd/randomized_svd - # conventions, so reverse its outputs. - S = S[::-1] - # flip eigenvectors' sign to enforce deterministic output - U, Vt = svd_flip(U[:, ::-1], Vt[::-1]) - - # We have already eliminated all other solvers, so this must be "randomized" - else: - # sign flipping is done inside - U, S, Vt = randomized_svd( - mat, - n_components=self.n_components_, - n_iter=self.iterated_power, - flip_sign=True, - random_state=random_state, - ) - - U[:, S < self.tol] = 0.0 - Vt[S < self.tol] = 0.0 - S[S < self.tol] = 0.0 - - return U, S, Vt - - def _decompose_full(self, mat): - if self.n_components_ != "mle": - if not (0 <= self.n_components_ <= self.n_samples_in_): - raise ValueError( - "n_components=%r must be between 1 and " - "n_samples=%r with " - "svd_solver='%s'" - % ( - self.n_components_, - self.n_samples_in_, - self.svd_solver, - ) - ) - elif self.n_components_ >= 1: - if not isinstance(self.n_components_, numbers.Integral): - raise ValueError( - "n_components=%r must be of type int " - "when greater than or equal to 1, " - "was of type=%r" - % (self.n_components_, type(self.n_components_)) - ) - - U, S, Vt = linalg.svd(mat, full_matrices=False) - U[:, S < self.tol] = 0.0 - Vt[S < self.tol] = 0.0 - S[S < self.tol] = 0.0 - - # flip eigenvectors' sign to enforce deterministic output - U, Vt = svd_flip(U, Vt) - - # Get variance explained by singular values - explained_variance_ = (S**2) / (self.n_samples_in_ - 1) - total_var = explained_variance_.sum() - explained_variance_ratio_ = explained_variance_ / total_var - - # Postprocess the number of components required - if self.n_components_ == "mle": - self.n_components_ = _infer_dimension( - explained_variance_, self.n_samples_in_ - ) - elif 0 < self.n_components_ < 1.0: - # number of components for which the cumulated explained - # variance percentage is superior to the desired threshold - # side='right' ensures that number of features selected - # their variance is always greater than self.n_components_ float - # passed. More discussion in issue: #15669 - ratio_cumsum = stable_cumsum(explained_variance_ratio_) - self.n_components_ = ( - np.searchsorted(ratio_cumsum, self.n_components_, side="right") + 1 - ) - - return ( - U[:, : self.n_components_], - S[: self.n_components_], - Vt[: self.n_components_], - ) diff --git a/src/skmatter/decomposition/_kpcov.py b/src/skmatter/decomposition/_kpcov.py new file mode 100644 index 000000000..5bf5715b3 --- /dev/null +++ b/src/skmatter/decomposition/_kpcov.py @@ -0,0 +1,316 @@ +import numbers +import numpy as np + +from abc import ABCMeta, abstractmethod +from scipy import linalg +from scipy.sparse.linalg import svds +import scipy.sparse as sp +from sklearn.exceptions import NotFittedError + +from sklearn.decomposition._base import _BasePCA +from sklearn.linear_model._base import LinearModel +from sklearn.decomposition._pca import _infer_dimension +from sklearn.utils import check_random_state +from sklearn.utils._arpack import _init_arpack_v0 +from sklearn.utils.extmath import randomized_svd, stable_cumsum, svd_flip +from sklearn.utils.validation import check_is_fitted +from sklearn.utils.validation import validate_data +from sklearn.metrics.pairwise import pairwise_kernels + +import numpy as np +from numpy.linalg import LinAlgError +import scipy.sparse as sp +from scipy import linalg +from scipy.linalg import sqrtm as MatrixSqrt +from scipy.sparse.linalg import svds + +from skmatter.utils import pcovr_kernel, pcovr_covariance + + +class _BaseKPCov(_BasePCA, LinearModel, metaclass=ABCMeta): + + @abstractmethod + def __init__( + self, + mixing=0.5, + n_components=None, + svd_solver="auto", + kernel="linear", + gamma=None, + degree=3, + coef0=1, + kernel_params=None, + center=False, + fit_inverse_transform=False, + tol=1e-12, + n_jobs=None, + iterated_power="auto", + random_state=None, + ): + self.mixing = mixing + self.n_components = n_components + self.svd_solver = svd_solver + self.kernel = kernel + self.gamma = gamma + self.degree = degree + self.coef0 = coef0 + self.kernel_params = kernel_params + self.center = center + self.fit_inverse_transform = fit_inverse_transform + self.tol = tol + self.n_jobs = n_jobs + self.iterated_power = iterated_power + self.random_state = random_state + + def _get_kernel(self, X, Y=None): + if callable(self.kernel): + params = self.kernel_params or {} + else: + params = {"gamma": getattr(self, "gamma_", self.gamma), "degree": self.degree, "coef0": self.coef0} + + return pairwise_kernels( + X, Y, metric=self.kernel, filter_params=True, n_jobs=self.n_jobs, **params + ) + + @abstractmethod + def fit(self, X): + """This contains the common functionality for KPCovR and KPCovC fit methods, + but leaves the rest of the fit functionality to the subclass. + """ + self.X_fit_ = X.copy() + + if self.n_components is None: + if self.svd_solver != "arpack": + self.n_components_ = X.shape[0] + else: + self.n_components_ = X.shape[0] - 1 + else: + self.n_components_ = self.n_components + + self.n_samples_in_, self.n_features_in_ = X.shape + + # Handle svd_solver + self.fit_svd_solver_ = self.svd_solver + if self.fit_svd_solver_ == "auto": + # Small problem or self.n_components_ == 'mle', just call full PCA + if ( + max(self.n_samples_in_, self.n_features_in_) <= 500 + or self.n_components_ == "mle" + ): + self.fit_svd_solver_ = "full" + elif self.n_components_ >= 1 and self.n_components_ < 0.8 * max( + self.n_samples_in_, self.n_features_in_ + ): + self.fit_svd_solver_ = "randomized" + # This is also the case of self.n_components_ in (0,1) + else: + self.fit_svd_solver_ = "full" + + def _fit_covariance(self, K, Z): + """ + Fit the model with the computed kernel and approximated properties. Uses Covariance Matrix + """ + print(f"KPCovC K: {K[:5, 0]}") + Ct, iCsqrt = pcovr_covariance( + mixing=self.mixing, + X=K, + Y=Z, + rcond=self.tol, + return_isqrt=True, + ) + try: + Csqrt = np.linalg.lstsq(iCsqrt, np.eye(len(iCsqrt)), rcond=None)[0] + + # if we can avoid recomputing Csqrt, we should, but sometimes we + # run into a singular matrix, which is what we do here + except LinAlgError: + Csqrt = np.real(MatrixSqrt(K.T @ K)) + + if self.fit_svd_solver_ == "full": + U, S, Vt = self._decompose_full(Ct) + elif self.fit_svd_solver_ in ["arpack", "randomized"]: + U, S, Vt = self._decompose_truncated(Ct) + else: + raise ValueError(f"Unrecognized svd_solver='{self.fit_svd_solver_}'") + + S_sqrt = np.diagflat([np.sqrt(s) if s > self.tol else 0.0 for s in S]) + S_sqrt_inv = np.diagflat([1.0 / np.sqrt(s) if s > self.tol else 0.0 for s in S]) + + self.pkt_ = np.linalg.multi_dot([iCsqrt, Vt.T, S_sqrt]) + self.ptk_ = np.linalg.multi_dot([S_sqrt_inv, Vt, Csqrt]) + + # if self.mixing == 1.0: + # lambda_i = np.sqrt(S) + # self.pkt_ = self.pkt_ / np.sqrt(lambda_i)[np.newaxis, :] + + T = K @ self.pkt_ + self.pt__ = np.linalg.lstsq(T, np.eye(T.shape[0]), rcond=self.tol)[0] + + def _fit_gram(self, K, Yhat, W): + """ + Fit the model with the computed kernel and approximated properties. + """ + K_tilde = pcovr_kernel(mixing=self.mixing, X=K, Y=Yhat, kernel="precomputed") + + print("KPCovC K: " + str(K[:5, 0])) + print("KPCovC Yhat: " + str(Yhat[:5, 0])) + print("KPCovC K_tilde: " + str(K_tilde[:5, 0])) + + if self.fit_svd_solver_ == "full": + _, S, Vt = self._decompose_full(K_tilde) + elif self.fit_svd_solver_ in ["arpack", "randomized"]: + _, S, Vt = self._decompose_truncated(K_tilde) + else: + raise ValueError( + "Unrecognized svd_solver='{0}'" "".format(self.fit_svd_solver_) + ) + + U = Vt.T + + P = (self.mixing * np.eye(K.shape[0])) + (1.0 - self.mixing) * (W @ Yhat.T) + print("KPCovC P: " + str(P[:5, 0])) + + S_inv = np.array([1.0 / s if s > self.tol else 0.0 for s in S]) + + self.pkt_ = P @ U @ np.sqrt(np.diagflat(S_inv)) + + T = K @ self.pkt_ + self.pt__ = np.linalg.lstsq(T, np.eye(T.shape[0]), rcond=self.tol)[0] + # np.linalg.lstsq(K @ self.pkt_, np.eye(K @ self.pkt_.shape[0]), rcond=self.tol)[0] + # self.ptx = self.pt__ @ X + + def transform(self, X=None): + check_is_fitted(self, ["pkt_", "X_fit_"]) + + X = validate_data(self, X, reset=False) + K = self._get_kernel(X, self.X_fit_) + + if self.center: + K = self.centerer_.transform(K) + + return K @ self.pkt_ + + def inverse_transform(self, T): + if not self.fit_inverse_transform: + raise NotFittedError( + "The fit_inverse_transform parameter was not" + " set to True when instantiating and hence " + "the inverse transform is not available." + ) + + return T @ self.ptx_ + + def _decompose_truncated(self, mat): + if not 1 <= self.n_components_ <= self.n_samples_in_: + raise ValueError( + "n_components=%r must be between 1 and " + "n_samples=%r with " + "svd_solver='%s'" + % ( + self.n_components_, + self.n_samples_in_, + self.svd_solver, + ) + ) + elif not isinstance(self.n_components_, numbers.Integral): + raise ValueError( + "n_components=%r must be of type int " + "when greater than or equal to 1, was of type=%r" + % (self.n_components_, type(self.n_components_)) + ) + elif self.svd_solver == "arpack" and self.n_components_ == self.n_samples_in_: + raise ValueError( + "n_components=%r must be strictly less than " + "n_samples=%r with " + "svd_solver='%s'" + % ( + self.n_components_, + self.n_samples_in_, + self.svd_solver, + ) + ) + + random_state = check_random_state(self.random_state) + + if self.fit_svd_solver_ == "arpack": + v0 = _init_arpack_v0(min(mat.shape), random_state) + U, S, Vt = svds(mat, k=self.n_components_, tol=self.tol, v0=v0) + # svds doesn't abide by scipy.linalg.svd/randomized_svd + # conventions, so reverse its outputs. + S = S[::-1] + # flip eigenvectors' sign to enforce deterministic output + U, Vt = svd_flip(U[:, ::-1], Vt[::-1]) + + # We have already eliminated all other solvers, so this must be "randomized" + else: + # sign flipping is done inside + U, S, Vt = randomized_svd( + mat, + n_components=self.n_components_, + n_iter=self.iterated_power, + flip_sign=True, + random_state=random_state, + ) + + U[:, S < self.tol] = 0.0 + Vt[S < self.tol] = 0.0 + S[S < self.tol] = 0.0 + + return U, S, Vt + + def _decompose_full(self, mat): + if self.n_components_ != "mle": + if not (0 <= self.n_components_ <= self.n_samples_in_): + raise ValueError( + "n_components=%r must be between 1 and " + "n_samples=%r with " + "svd_solver='%s'" + % ( + self.n_components_, + self.n_samples_in_, + self.svd_solver, + ) + ) + elif self.n_components_ >= 1: + if not isinstance(self.n_components_, numbers.Integral): + raise ValueError( + "n_components=%r must be of type int " + "when greater than or equal to 1, " + "was of type=%r" + % (self.n_components_, type(self.n_components_)) + ) + + U, S, Vt = linalg.svd(mat, full_matrices=False) + U[:, S < self.tol] = 0.0 + Vt[S < self.tol] = 0.0 + S[S < self.tol] = 0.0 + + # flip eigenvectors' sign to enforce deterministic output + U, Vt = svd_flip(U, Vt) + + # Get variance explained by singular values + explained_variance_ = (S**2) / (self.n_samples_in_ - 1) + total_var = explained_variance_.sum() + explained_variance_ratio_ = explained_variance_ / total_var + + # Postprocess the number of components required + if self.n_components_ == "mle": + self.n_components_ = _infer_dimension( + explained_variance_, self.n_samples_in_ + ) + elif 0 < self.n_components_ < 1.0: + # number of components for which the cumulated explained + # variance percentage is superior to the desired threshold + # side='right' ensures that number of features selected + # their variance is always greater than self.n_components_ float + # passed. More discussion in issue: #15669 + ratio_cumsum = stable_cumsum(explained_variance_ratio_) + self.n_components_ = ( + np.searchsorted(ratio_cumsum, self.n_components_, side="right") + 1 + ) + + return ( + U[:, : self.n_components_], + S[: self.n_components_], + Vt[: self.n_components_], + ) diff --git a/src/skmatter/decomposition/_pcov.py b/src/skmatter/decomposition/_pcov.py index 972f097dc..dc68d6268 100644 --- a/src/skmatter/decomposition/_pcov.py +++ b/src/skmatter/decomposition/_pcov.py @@ -1,3 +1,4 @@ +from abc import ABCMeta, abstractmethod import numbers import warnings @@ -17,7 +18,9 @@ from skmatter.utils import pcovr_covariance, pcovr_kernel -class _BasePCov(_BasePCA, LinearModel): +class _BasePCov(_BasePCA, LinearModel, metaclass=ABCMeta): + + @abstractmethod def __init__( self, mixing=0.5, @@ -38,6 +41,7 @@ def __init__( self.random_state = random_state self.whiten = whiten + @abstractmethod def fit(self, X): """Contains the common functionality for the PCovR and PCovC fit methods, but leaves the rest of the functionality to the subclass. @@ -128,7 +132,11 @@ def _fit_feature_space(self, X, Y, Yhat, compute_pty_=True): self.pty_ = np.linalg.multi_dot([S_sqrt_inv, Vt, iCsqrt, X.T, Y]) def _fit_sample_space(self, X, Y, Yhat, W, compute_pty_=True): + # Kt = pcovr_kernel(mixing=self.mixing, X=X, Y=Yhat) Kt = pcovr_kernel(mixing=self.mixing, X=X, Y=Yhat) + print("PCovC X: " + str(X[:5, 0])) + print("PCovC Yhat: " + str(Yhat[:5, 0])) + print("PcovC Kt: " + str(Kt[:5, 0])) if self.fit_svd_solver_ == "full": U, S, Vt = self._decompose_full(Kt) @@ -146,12 +154,15 @@ def _fit_sample_space(self, X, Y, Yhat, W, compute_pty_=True): ) P = (self.mixing * X.T) + (1.0 - self.mixing) * W @ Yhat.T + print("PCovC P: " + str(P[:5, 0])) S_sqrt_inv = np.diagflat([1.0 / np.sqrt(s) if s > self.tol else 0.0 for s in S]) T = Vt.T @ S_sqrt_inv self.pxt_ = P @ T self.ptx_ = T.T @ X + print("PcovC pxt: " + str(self.pxt_[:5, 0])) + if compute_pty_: self.pty_ = T.T @ Y diff --git a/src/skmatter/decomposition/_pcovc.py b/src/skmatter/decomposition/_pcovc.py index fe4434b4f..ba54c1337 100644 --- a/src/skmatter/decomposition/_pcovc.py +++ b/src/skmatter/decomposition/_pcovc.py @@ -14,7 +14,6 @@ from sklearn.utils import check_array from sklearn.utils.multiclass import check_classification_targets, type_of_target from sklearn.utils.validation import check_is_fitted, validate_data - from skmatter.decomposition import _BasePCov from skmatter.utils import check_cl_fit @@ -290,9 +289,10 @@ def fit(self, X, Y, W=None): if W is None: W = LogisticRegression().fit(X, Y).coef_.T W = W.reshape(X.shape[1], -1) - + Z = X @ W + print(f"PCovC Z {Z[:5, 0]}") if self.space_ == "feature": self._fit_feature_space(X, Y, Z) else: @@ -305,6 +305,10 @@ def fit(self, X, Y, W=None): self.ptz_ = self.classifier_.coef_.T self.pxz_ = self.pxt_ @ self.ptz_ + print(f"PCovC ptz: {self.ptz_.shape}") + print(f"PCovC classifier_ coef n_classes: {len(self.classifier_.classes_)}") + + print(f"PCovC pxz: {self.pxz_.shape}") if len(Y.shape) == 1 and type_of_target(Y) == "binary": self.pxz_ = self.pxz_.reshape( X.shape[1], @@ -312,6 +316,7 @@ def fit(self, X, Y, W=None): self.ptz_ = self.ptz_.reshape( self.n_components_, ) + print(f"PCovC pxz: {self.pxz_.shape}") self.components_ = self.pxt_.T # for sklearn compatibility return self diff --git a/src/skmatter/utils/__init__.py b/src/skmatter/utils/__init__.py index 70e32616c..0babc0143 100644 --- a/src/skmatter/utils/__init__.py +++ b/src/skmatter/utils/__init__.py @@ -35,6 +35,7 @@ "pcovr_kernel", "check_krr_fit", "check_lr_fit", + "check_cl_fit" "X_orthogonalizer", "Y_sample_orthogonalizer", "Y_feature_orthogonalizer", diff --git a/src/skmatter/utils/_pcovc_utils.py b/src/skmatter/utils/_pcovc_utils.py index ea55dd60a..85d4bb914 100644 --- a/src/skmatter/utils/_pcovc_utils.py +++ b/src/skmatter/utils/_pcovc_utils.py @@ -65,3 +65,67 @@ def check_cl_fit(classifier, X, y): fitted_classifier.fit(X, y) return fitted_classifier + + +def check_svc_fit(classifier, K, X, y): + """ + Checks that an SVC is fitted, and if not, + fits it with the provided data + + Parameters + ---------- + classifier : object + scikit-learn SVC instance + K : array-like + Kernel matrix with which to fit the classifier if it is not already fitted + X : array-like + Feature matrix with which to fit the classifier if it is not already fitted + y : array-like + Target values with which to fit the classifier if it is not already fitted + + Returns + ------- + fitted_classifier : object + The fitted classifier. If input classifier was already fitted and compatible + with the data, returns a deep copy. Otherwise returns a newly fitted classifier. + + Raises + ------ + ValueError + If the fitted classifiers's coefficients have a shape incompatible with the + number of support vectors of the model or the number of classes in y. + + Notes + ----- + For unfitted classifiers, sets the kernel to "precomputed" before fitting with the + provided kernel matrix K to avoid recomputation. + """ + try: + check_is_fitted(classifier) + fitted_classifier = deepcopy(classifier) + + # Check compatibility with X + validate_data(fitted_classifier, X, y, reset=False, multi_output=True) + + # Check compatibility with y + n_classes = len(np.unique(y)) - 1 + n_sv = len(fitted_classifier.support_) + dual_coef_ = fitted_classifier.dual_coef_ + + if dual_coef_.shape[0] != n_classes or dual_coef_.shape[1] != n_sv: + raise ValueError( + "Expected classifier coefficients " + "to have shape " + f"({n_classes}, {n_sv}) but got shape " + f"{dual_coef_.shape}" + ) + + except NotFittedError: + fitted_classifier = clone(classifier) + + # Use a precomputed kernel + # to avoid re-computing K + fitted_classifier.set_params(kernel="precomputed") + fitted_classifier.fit(K, y) + + return fitted_classifier diff --git a/src/skmatter/utils/_pcovr_utils.py b/src/skmatter/utils/_pcovr_utils.py index 29463b633..c6436a02f 100644 --- a/src/skmatter/utils/_pcovr_utils.py +++ b/src/skmatter/utils/_pcovr_utils.py @@ -31,7 +31,7 @@ def check_lr_fit(regressor, X, y): Raises ------ ValueError - If the fitted regressor's coefficients dimensions are incompatible with the + If the fitted regressor's coefficients have a dimension incompatible with the target space. """ try: @@ -100,7 +100,7 @@ def check_krr_fit(regressor, K, X, y): Raises ------ ValueError - If the fitted regressor's coefficients dimensions are incompatible with the + If the fitted regressor's coefficients have a dimension incompatible with the target space. Notes @@ -162,7 +162,7 @@ def pcovr_covariance( \mathbf{\hat{Y}}\mathbf{\hat{Y}}^T \mathbf{X} \left(\mathbf{X}^T \mathbf{X}\right)^{-\frac{1}{2}}\right) - where :math:`\mathbf{\hat{Y}}`` are the properties obtained by linear regression. + where :math:`\mathbf{\hat{Y}}` are the properties obtained by linear regression. Parameters ---------- diff --git a/tests/test_check_estimators.py b/tests/test_check_estimators.py index 1683b53f3..5cf4aae1d 100644 --- a/tests/test_check_estimators.py +++ b/tests/test_check_estimators.py @@ -1,6 +1,5 @@ from sklearn.utils.estimator_checks import parametrize_with_checks - -from skmatter.decomposition import KernelPCovR, PCovC, PCovR +from skmatter.decomposition import KernelPCovC, KernelPCovR, PCovC, PCovR from skmatter.feature_selection import CUR as fCUR from skmatter.feature_selection import FPS as fFPS from skmatter.feature_selection import PCovCUR as fPCovCUR @@ -12,6 +11,7 @@ @parametrize_with_checks( [ KernelPCovR(mixing=0.5), + KernelPCovC(mixing=0.5), PCovR(mixing=0.5), PCovC(mixing=0.5), fCUR(), diff --git a/tests/test_kernel_pcovc.py b/tests/test_kernel_pcovc.py new file mode 100644 index 000000000..ea5b93207 --- /dev/null +++ b/tests/test_kernel_pcovc.py @@ -0,0 +1,454 @@ +import unittest + +import numpy as np +from sklearn import exceptions +from sklearn.datasets import load_breast_cancer as get_dataset + +from sklearn.utils.validation import check_X_y +from sklearn.preprocessing import StandardScaler +from sklearn.linear_model import LogisticRegression +from sklearn.svm import SVC + +from sklearn.metrics.pairwise import pairwise_kernels +from sklearn.metrics import accuracy_score +from skmatter.decomposition import PCovC +from skmatter.preprocessing import KernelNormalizer +from skmatter.decomposition._kernel_pcovc import KernelPCovC + + +class KernelPCovCBaseTest(unittest.TestCase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.random_state = np.random.RandomState(0) + + self.error_tol = 1e-6 + + self.X, self.Y = get_dataset(return_X_y=True) + + # for the sake of expedience, only use a subset of the dataset + idx = self.random_state.choice(len(self.X), 100) + self.X = self.X[idx] + self.Y = self.Y[idx] + + scaler = StandardScaler() + self.X = scaler.fit_transform(self.X) + + self.model = ( + lambda mixing=0.5, classifier=SVC(), n_components=4, **kwargs: KernelPCovC( + mixing=mixing, + classifier=classifier, + n_components=n_components, + svd_solver=kwargs.pop("svd_solver", "full"), + **kwargs, + ) + ) + + def setUp(self): + pass + + +class KernelPCovCErrorTest(KernelPCovCBaseTest): + def test_cl_with_x_errors(self): + """ + Check that KernelPCovC returns a non-null property prediction + and that the prediction error increases with `mixing` + """ + prev_error = -1.0 + + for mixing in np.linspace(0, 1, 6): + kpcovc = KernelPCovC(mixing=mixing, n_components=4, tol=1e-12) + kpcovc.fit(self.X, self.Y) + + error = ( + np.linalg.norm(self.Y - kpcovc.predict(self.X)) ** 2.0 + / np.linalg.norm(self.Y) ** 2.0 + ) + + with self.subTest(error=error): + self.assertFalse(np.isnan(error)) + with self.subTest(error=error, alpha=round(mixing, 4)): + self.assertGreaterEqual(error, prev_error - self.error_tol) + + prev_error = error + + def test_reconstruction_errors(self): + """Check that KernelPCovC returns a non-null reconstructed X and that the + reconstruction error decreases with `mixing`. + """ + prev_error = 10.0 + prev_x_error = 10.0 + + x_errors = [] + errors = [] + for mixing in np.linspace(0, 1, 6): + print(mixing) + kpcovc = KernelPCovC( + mixing=mixing, + n_components=2, + fit_inverse_transform=True, + center=True, + kernel="linear", + ) + kpcovc.fit(self.X, self.Y) + + t = kpcovc.transform(self.X) + K = kpcovc._get_kernel(self.X) + x = kpcovc.inverse_transform(t) + + error = np.linalg.norm(K - t @ t.T) ** 2.0 / np.linalg.norm(K) ** 2.0 + x_error = np.linalg.norm(self.X - x) ** 2.0 / np.linalg.norm(self.X) ** 2.0 + + x_errors.append(x_error) + errors.append(error) + print(error, np.linalg.norm(K - t @ t.T) ** 2.0, np.linalg.norm(K) ** 2.0) + with self.subTest(error=error): + self.assertFalse(np.isnan(error)) + with self.subTest(error=error, alpha=round(mixing, 4)): + self.assertLessEqual(error, prev_error + self.error_tol) + + with self.subTest(error=x_error): + self.assertFalse(np.isnan(x_error)) + with self.subTest(error=x_error, alpha=round(mixing, 4)): + self.assertLessEqual(x_error, prev_x_error + self.error_tol) + + prev_error = error + prev_x_error = x_error + print(x_errors) + print(errors) + + +class KernelPCovCInfrastructureTest(KernelPCovCBaseTest): + def test_nonfitted_failure(self): + """ + Check that KernelPCovC will raise a `NonFittedError` if + `transform` is called before the model is fitted + """ + kpcovc = KernelPCovC(mixing=0.5, n_components=4, tol=1e-12) + with self.assertRaises(exceptions.NotFittedError): + _ = kpcovc.transform(self.X) + + def test_no_arg_predict(self): + """ + Check that KernelPCovC will raise a `ValueError` if + `predict` is called without arguments + """ + kpcovc = KernelPCovC(mixing=0.5, n_components=4, tol=1e-12) + kpcovc.fit(self.X, self.Y) + with self.assertRaises(ValueError): + _ = kpcovc.predict() + + def test_T_shape(self): + """ + Check that KernelPCovC returns a latent space projection + consistent with the shape of the input matrix + """ + n_components = 5 + kpcovc = KernelPCovC(mixing=0.5, n_components=n_components, tol=1e-12) + kpcovc.fit(self.X, self.Y) + T = kpcovc.transform(self.X) + self.assertTrue(check_X_y(self.X, T, multi_output=True) == (self.X, T)) + self.assertTrue(T.shape[-1] == n_components) + + def test_Z_shape(self): + """Check that KPCovC returns an evidence matrix consistent with the number of samples + and the number of classes. + """ + n_components = 5 + pcovc = self.model(n_components=n_components, tol=1e-12) + pcovc.fit(self.X, self.Y) + + # Shape (n_samples, ) for binary classifcation + Z = pcovc.decision_function(self.X) + + self.assertTrue(Z.ndim == 1) + self.assertTrue(Z.shape[0] == self.X.shape[0]) + + # Modify Y so that it now contains three classes + Y_multiclass = self.Y.copy() + Y_multiclass[0] = 2 + pcovc.fit(self.X, Y_multiclass) + n_classes = len(np.unique(Y_multiclass)) + + # Shape (n_samples, n_classes) for multiclass classification + Z = pcovc.decision_function(self.X) + + self.assertTrue(Z.ndim == 2) + self.assertTrue((Z.shape[0], Z.shape[1]) == (self.X.shape[0], n_classes)) + + def test_no_centerer(self): + """Tests that when center=False, no centerer exists.""" + kpcovc = self.model(center=False) + kpcovc.fit(self.X, self.Y) + + with self.assertRaises(AttributeError): + kpcovc.centerer_ + + def test_centerer(self): + """Tests that all functionalities that rely on the centerer work properly.""" + kpcovc = self.model(center=True) + kpcovc.fit(self.X, self.Y) + + self.assertTrue(hasattr(kpcovc, "centerer_")) + _ = kpcovc.predict(self.X) + _ = kpcovc.transform(self.X) + _ = kpcovc.score(self.X, self.Y) + + def test_prefit_classifier(self): + classifier = SVC() + classifier.fit(self.X, self.Y) + + kpcovc = KernelPCovC(mixing=0.5, classifier=classifier) + kpcovc.fit(self.X, self.Y) + + Z_classifier = classifier.decision_function(self.X).reshape(self.X.shape[0], -1) + W_classifier = classifier.dual_coef_ + + Z_kpcovc = kpcovc.z_classifier_.decision_function(self.X).reshape( + self.X.shape[0], -1 + ) + W_kpcovc = kpcovc.z_classifier_.dual_coef_ + + self.assertTrue(np.allclose(Z_classifier, Z_kpcovc)) + self.assertTrue(np.allclose(W_classifier, W_kpcovc)) + + def test_classifier_modifications(self): + classifier = SVC() + kpcovc = self.model(mixing=0.5, classifier=classifier) + self.maxDiff = None + # KPCovC classifier matches the original + self.assertTrue(classifier.get_params() == kpcovc.classifier.get_params()) + + # KPCovC classifier updates its parameters + # to match the original classifier + classifier.set_params(gamma=1.0) + self.assertTrue(classifier.get_params() == kpcovc.classifier.get_params()) + + # Fitting classifier outside KPCovC fits the KPCovC classifier + classifier.fit(self.X, self.Y) + self.assertTrue(hasattr(kpcovc.classifier, "dual_coef_")) + + # Raise error during KPCovC fit since classifier and KPCovC + # kernel parameters now inconsistent + with self.assertRaises(ValueError) as cm: + kpcovc.fit(self.X, self.Y) + self.assertEqual( + str(cm.exception), + "Kernel parameter mismatch: the classifier has kernel parameters " + "{kernel: 'rbf', gamma: 1.0, degree: 3, coef0: 0.0}" + " and KernelPCovC was initialized with kernel parameters " + "{kernel: 'rbf', gamma: 'scale', degree: 3, coef0: 0.0}", + ) + + # reset classifier and try incorrect decision_function_shape + classifier.set_params(gamma=kpcovc.gamma, decision_function_shape="ovo") + + # now raise error when trying to fit KPCovC + with self.assertRaises(ValueError) as cm: + kpcovc.fit(self.X, self.Y) + self.assertEqual( + str(cm.exception), + f"Classifier must have parameter `decision_function_shape` set to 'ovr' " + f"but was initialized with '{classifier.decision_function_shape}'", + ) + + def test_incompatible_classifier(self): + classifier = LogisticRegression() + classifier.fit(self.X, self.Y) + kpcovc = self.model(mixing=0.5, classifier=classifier) + + with self.assertRaises(ValueError) as cm: + kpcovc.fit(self.X, self.Y) + self.assertEqual(str(cm.exception), "Classifier must be an instance of `SVC`") + + def test_none_classifier(self): + kpcovc = KernelPCovC(mixing=0.5, classifier=None) + kpcovc.fit(self.X, self.Y) + self.assertTrue(kpcovc.classifier is None) + self.assertTrue(kpcovc.classifier_ is not None) + + def test_incompatible_coef_shape(self): + classifier = SVC() + Y_multiclass = self.Y.copy() + Y_multiclass[0] = 2 + + classifier.fit(self.X, Y_multiclass) + kpcovc = KernelPCovC(classifier=classifier) + + n_classes_exp = len(np.unique(self.Y)) - 1 + n_sv_exp = len(classifier.support_) + + n_classes_act = len(np.unique(Y_multiclass)) - 1 + n_sv_act = n_sv_exp + + with self.assertRaises(ValueError) as cm: + # try fitting with binary classification + # when internal z_classifier_ is trained with multiclass + kpcovc.fit(self.X, self.Y) + self.assertEqual( + str(cm.exception), + "Expected classifier coefficients " + "to have shape " + f"({n_classes_exp}, {n_sv_exp}) but got shape " + f"({n_classes_act}, {n_sv_act})", + ) + + +class KernelTests(KernelPCovCBaseTest): + def test_kernel_types(self): + """Check that KernelPCovC can handle all kernels passable to sklearn + kernel classes, including callable kernels + """ + + def _linear_kernel(X, Y): + return X @ Y.T + + kernel_params = { + "poly": {"degree": 2, "coef0": 0.75}, + "rbf": {"gamma": "auto"}, + "sigmoid": {"gamma": 3.0, "coef0": 0.5}, + } + + for kernel in ["linear", "poly", "rbf", "sigmoid", _linear_kernel]: + with self.subTest(kernel=kernel): + kpcovc = KernelPCovC( + mixing=0.5, + n_components=2, + classifier=SVC(kernel=kernel, **kernel_params.get(kernel, {})), + kernel=kernel, + **kernel_params.get(kernel, {}), + ) + print(kpcovc.classifier.kernel) + kpcovc.fit(self.X, self.Y) + + +class KernelPCovCTestSVDSolvers(KernelPCovCBaseTest): + def test_svd_solvers(self): + """ + Check that KPCovC works with all svd_solver modes and assigns + the right n_components + """ + for solver in ["arpack", "full", "randomized", "auto"]: + with self.subTest(solver=solver): + kpcovc = self.model(tol=1e-12, n_components=None, svd_solver=solver) + kpcovc.fit(self.X, self.Y) + + if solver == "arpack": + self.assertTrue(kpcovc.n_components_ == self.X.shape[0] - 1) + else: + self.assertTrue(kpcovc.n_components_ == self.X.shape[0]) + + n_component_solvers = { + "mle": "full", + int(0.75 * max(self.X.shape)): "randomized", + 0.1: "full", + } + for n_components, solver in n_component_solvers.items(): + with self.subTest(solver=solver, n_components=n_components): + kpcovc = self.model( + tol=1e-12, n_components=n_components, svd_solver="auto" + ) + if solver == "randomized": + n_copies = (501 // max(self.X.shape)) + 1 + X = np.hstack(np.repeat(self.X.copy(), n_copies)).reshape( + self.X.shape[0] * n_copies, -1 + ) + Y = np.hstack(np.repeat(self.Y.copy(), n_copies)).reshape( + self.X.shape[0] * n_copies, -1 + ) + kpcovc.fit(X, Y) + else: + kpcovc.fit(self.X, self.Y) + + self.assertTrue(kpcovc.fit_svd_solver_ == solver) + + def test_bad_solver(self): + """ + Check that KPCovC will not work with a solver that isn't in + ['arpack', 'full', 'randomized', 'auto'] + """ + with self.assertRaises(ValueError) as cm: + kpcovc = self.model(svd_solver="bad") + kpcovc.fit(self.X, self.Y) + + self.assertEqual(str(cm.exception), "Unrecognized svd_solver='bad'" "") + + def test_good_n_components(self): + """Check that KPCovC will work with any allowed values of n_components.""" + # this one should pass + kpcovc = self.model(n_components=0.5, svd_solver="full") + kpcovc.fit(self.X, self.Y) + + for svd_solver in ["auto", "full"]: + # this one should pass + kpcovc = self.model(n_components=2, svd_solver=svd_solver) + kpcovc.fit(self.X, self.Y) + + # this one should pass + kpcovc = self.model(n_components="mle", svd_solver=svd_solver) + kpcovc.fit(self.X, self.Y) + + def test_bad_n_components(self): + """Check that KPCovC will not work with any prohibited values of n_components.""" + with self.subTest(type="negative_ncomponents"): + with self.assertRaises(ValueError) as cm: + kpcovc = self.model(n_components=-1, svd_solver="auto") + kpcovc.fit(self.X, self.Y) + + self.assertEqual( + str(cm.exception), + "n_components=%r must be between 1 and " + "n_samples=%r with " + "svd_solver='%s'" + % ( + kpcovc.n_components, + self.X.shape[0], + kpcovc.svd_solver, + ), + ) + with self.subTest(type="0_ncomponents"): + with self.assertRaises(ValueError) as cm: + kpcovc = self.model(n_components=0, svd_solver="randomized") + kpcovc.fit(self.X, self.Y) + + self.assertEqual( + str(cm.exception), + "n_components=%r must be between 1 and " + "n_samples=%r with " + "svd_solver='%s'" + % ( + kpcovc.n_components, + self.X.shape[0], + kpcovc.svd_solver, + ), + ) + with self.subTest(type="arpack_X_ncomponents"): + with self.assertRaises(ValueError) as cm: + kpcovc = self.model(n_components=self.X.shape[0], svd_solver="arpack") + kpcovc.fit(self.X, self.Y) + self.assertEqual( + str(cm.exception), + "n_components=%r must be strictly less than " + "n_samples=%r with " + "svd_solver='%s'" + % ( + kpcovc.n_components, + self.X.shape[0], + kpcovc.svd_solver, + ), + ) + + for svd_solver in ["auto", "full"]: + with self.subTest(type="pi_ncomponents"): + with self.assertRaises(ValueError) as cm: + kpcovc = self.model(n_components=np.pi, svd_solver=svd_solver) + kpcovc.fit(self.X, self.Y) + self.assertEqual( + str(cm.exception), + "n_components=%r must be of type int " + "when greater than or equal to 1, was of type=%r" + % (kpcovc.n_components, type(kpcovc.n_components)), + ) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/test_kernel_pcovr.py b/tests/test_kernel_pcovr.py index 9b6e0fb25..fd7c601d4 100644 --- a/tests/test_kernel_pcovr.py +++ b/tests/test_kernel_pcovr.py @@ -7,7 +7,7 @@ from sklearn.linear_model import Ridge, RidgeCV from sklearn.utils.validation import check_X_y -from skmatter.decomposition import KernelPCovR, PCovR +from skmatter.decomposition import PCovR, KernelPCovR from skmatter.preprocessing import StandardFlexibleScaler as SFS @@ -59,7 +59,6 @@ def test_lr_with_x_errors(self): for mixing in np.linspace(0, 1, 6): kpcovr = KernelPCovR(mixing=mixing, n_components=2, tol=1e-12) kpcovr.fit(self.X, self.Y) - error = ( np.linalg.norm(self.Y - kpcovr.predict(self.X)) ** 2.0 / np.linalg.norm(self.Y) ** 2.0 @@ -91,6 +90,7 @@ def test_reconstruction_errors(self): error = np.linalg.norm(K - t @ t.T) ** 2.0 / np.linalg.norm(K) ** 2.0 x_error = np.linalg.norm(self.X - x) ** 2.0 / np.linalg.norm(self.X) ** 2.0 + print(np.linalg.norm(K - t @ t.T) ** 2.0 / np.linalg.norm(K) ** 2.0) with self.subTest(error=error): self.assertFalse(np.isnan(error)) @@ -163,7 +163,7 @@ def test_T_shape(self): kpcovr = KernelPCovR(mixing=0.5, n_components=n_components, tol=1e-12) kpcovr.fit(self.X, self.Y) T = kpcovr.transform(self.X) - self.assertTrue(check_X_y(self.X, T, multi_output=True)) + self.assertTrue(check_X_y(self.X, T, multi_output=True) == (self.X, T)) self.assertTrue(T.shape[-1] == n_components) def test_no_centerer(self): @@ -219,12 +219,12 @@ def test_regressor_modifications(self): # kernel parameters now inconsistent with self.assertRaises(ValueError) as cm: kpcovr.fit(self.X, self.Y) - self.assertTrue( + self.assertEqual( str(cm.exception), "Kernel parameter mismatch: the regressor has kernel parameters " - "{kernel: linear, gamma: 0.2, degree: 3, coef0: 1, kernel_params: None}" + "{kernel: 'rbf', gamma: 0.2, degree: 3, coef0: 1, kernel_params: None}" " and KernelPCovR was initialized with kernel parameters " - "{kernel: linear, gamma: 0.1, degree: 3, coef0: 1, kernel_params: None}", + "{kernel: 'rbf', gamma: 0.1, degree: 3, coef0: 1, kernel_params: None}", ) def test_incompatible_regressor(self): @@ -234,7 +234,7 @@ def test_incompatible_regressor(self): with self.assertRaises(ValueError) as cm: kpcovr.fit(self.X, self.Y) - self.assertTrue( + self.assertEqual( str(cm.exception), "Regressor must be an instance of `KernelRidge`", ) @@ -250,29 +250,34 @@ def test_incompatible_coef_shape(self): # Don't need to test X shape, since this should # be caught by sklearn's _validate_data regressor = KernelRidge(alpha=1e-8, kernel="linear") - regressor.fit(self.X, self.Y[:, 0][:, np.newaxis]) + regressor.fit(self.X, self.Y[:, 0]) kpcovr = self.model(mixing=0.5, regressor=regressor) # Dimension mismatch + print(self.Y.shape,np.zeros(self.Y.shape + (2,)).shape ) with self.assertRaises(ValueError) as cm: - kpcovr.fit(self.X, np.zeros(self.Y.shape + (2,))) - self.assertTrue( + kpcovr.fit(self.X, self.Y) + self.assertEqual( str(cm.exception), "The regressor coefficients have a dimension incompatible " "with the supplied target space. " "The coefficients have dimension %d and the targets " - "have dimension %d" % (regressor.dual_coef_.ndim, self.Y[:, 0].ndim), + "have dimension %d" % (regressor.dual_coef_.ndim, self.Y.ndim), ) + Y_double = np.column_stack((self.Y, self.Y)) + Y_triple = np.column_stack((Y_double, self.Y)) + regressor.fit(self.X, Y_double) + # Shape mismatch (number of targets) with self.assertRaises(ValueError) as cm: - kpcovr.fit(self.X, self.Y) - self.assertTrue( + kpcovr.fit(self.X, Y_triple) + self.assertEqual( str(cm.exception), "The regressor coefficients have a shape incompatible " "with the supplied target space. " "The coefficients have shape %r and the targets " - "have shape %r" % (regressor.dual_coef_.shape, self.Y.shape), + "have shape %r" % (regressor.dual_coef_.shape, Y_triple.shape), ) def test_precomputed_regression(self): @@ -361,11 +366,13 @@ def test_linear_matches_pcovr(self): t_ref = ref_pcovr.transform(self.X) t = kpcovr.transform(self.X) + print(np.linalg.norm(t_ref-t)) K = kpcovr._get_kernel(self.X) k_ref = t_ref @ t_ref.T k = t @ t.T + lk_ref = np.linalg.norm(K - k_ref) ** 2.0 / np.linalg.norm(K) ** 2.0 lk = np.linalg.norm(K - k) ** 2.0 / np.linalg.norm(K) ** 2.0 @@ -375,6 +382,8 @@ def test_linear_matches_pcovr(self): round(ly_ref, rounding), ) + print(lk, lk_ref) + self.assertEqual( round(lk, rounding), round(lk_ref, rounding), @@ -419,7 +428,7 @@ def test_svd_solvers(self): else: kpcovr.fit(self.X, self.Y) - self.assertTrue(kpcovr._fit_svd_solver == solver) + self.assertTrue(kpcovr.fit_svd_solver_ == solver) def test_bad_solver(self): """ @@ -430,7 +439,7 @@ def test_bad_solver(self): kpcovr = self.model(svd_solver="bad") kpcovr.fit(self.X, self.Y) - self.assertTrue(str(cm.exception), "Unrecognized svd_solver='bad'" "") + self.assertEqual(str(cm.exception), "Unrecognized svd_solver='bad'" "") def test_good_n_components(self): """Check that PCovR will work with any allowed values of n_components.""" @@ -454,10 +463,10 @@ def test_bad_n_components(self): kpcovr = self.model(n_components=-1, svd_solver="auto") kpcovr.fit(self.X, self.Y) - self.assertTrue( + self.assertEqual( str(cm.exception), - "self.n_components=%r must be between 0 and " - "min(n_samples, n_features)=%r with " + "n_components=%r must be between 1 and " + "n_samples=%r with " "svd_solver='%s'" % ( kpcovr.n_components, @@ -470,10 +479,10 @@ def test_bad_n_components(self): kpcovr = self.model(n_components=0, svd_solver="randomized") kpcovr.fit(self.X, self.Y) - self.assertTrue( + self.assertEqual( str(cm.exception), - "self.n_components=%r must be between 1 and " - "min(n_samples, n_features)=%r with " + "n_components=%r must be between 1 and " + "n_samples=%r with " "svd_solver='%s'" % ( kpcovr.n_components, @@ -485,10 +494,10 @@ def test_bad_n_components(self): with self.assertRaises(ValueError) as cm: kpcovr = self.model(n_components=self.X.shape[0], svd_solver="arpack") kpcovr.fit(self.X, self.Y) - self.assertTrue( + self.assertEqual( str(cm.exception), - "self.n_components=%r must be strictly less than " - "min(n_samples, n_features)=%r with " + "n_components=%r must be strictly less than " + "n_samples=%r with " "svd_solver='%s'" % ( kpcovr.n_components, @@ -502,9 +511,9 @@ def test_bad_n_components(self): with self.assertRaises(ValueError) as cm: kpcovr = self.model(n_components=np.pi, svd_solver=svd_solver) kpcovr.fit(self.X, self.Y) - self.assertTrue( + self.assertEqual( str(cm.exception), - "self.n_components=%r must be of type int " + "n_components=%r must be of type int " "when greater than or equal to 1, was of type=%r" % (kpcovr.n_components, type(kpcovr.n_components)), ) diff --git a/tests/test_pcovc.py b/tests/test_pcovc.py index 57e501270..4933979bd 100644 --- a/tests/test_pcovc.py +++ b/tests/test_pcovc.py @@ -504,7 +504,6 @@ def test_classifier_modifications(self): self.assertTrue(classifier.get_params() != pcovc.classifier_.get_params()) def test_incompatible_classifier(self): - self.maxDiff = None classifier = GaussianNB() classifier.fit(self.X, self.Y) pcovc = self.model(mixing=0.5, classifier=classifier) diff --git a/tests/test_pcovr.py b/tests/test_pcovr.py index 284a7e778..ddeb65f28 100644 --- a/tests/test_pcovr.py +++ b/tests/test_pcovr.py @@ -392,7 +392,7 @@ def test_T_shape(self): pcovr = self.model(n_components=n_components, tol=1e-12) pcovr.fit(self.X, self.Y) T = pcovr.transform(self.X) - self.assertTrue(check_X_y(self.X, T, multi_output=True)) + self.assertTrue(check_X_y(self.X, T, multi_output=True) == (self.X, T)) self.assertTrue(T.shape[-1] == n_components) def test_default_ncomponents(self):