diff --git a/docs/src/api.rst b/docs/src/api.rst index 51074f1dc..7e62884fe 100644 --- a/docs/src/api.rst +++ b/docs/src/api.rst @@ -37,6 +37,25 @@ Classes BaseVariableImportance BasePerturbation LOCO + CRT + ConditionalRandimizationTest CFI PFI D0CRT + +Helper Classes +============== +.. autosummary:: + :toctree: ./generated/api/class/ + :template: class.rst + + statistical_tools.gaussian_distribution.GaussianDistribution + +Helper Functions +================ +.. autosummary:: + :toctree: ./generated/api/class/ + :template: function.rst + + statistical_tools.lasso_test.lasso_statistic + diff --git a/examples/plot_pitfalls_permutation_importance.py b/examples/plot_pitfalls_permutation_importance.py index af4deb83e..adc144a62 100644 --- a/examples/plot_pitfalls_permutation_importance.py +++ b/examples/plot_pitfalls_permutation_importance.py @@ -26,7 +26,7 @@ from sklearn.preprocessing import StandardScaler from hidimstat import CFI, PFI -from hidimstat.conditional_sampling import ConditionalSampler +from hidimstat.statistical_tools.conditional_sampling import ConditionalSampler rng = np.random.RandomState(0) diff --git a/src/hidimstat/__init__.py b/src/hidimstat/__init__.py index 81d5a0cce..3649e0e7e 100644 --- a/src/hidimstat/__init__.py +++ b/src/hidimstat/__init__.py @@ -15,6 +15,8 @@ ) from .distilled_conditional_randomization_test import d0crt, D0CRT from .conditional_feature_importance import CFI +from .conditional_randomization_test import ConditionalRandimizationTest +from .conditional_randomization_test import ConditionalRandimizationTest as CRT from .knockoffs import ( model_x_knockoff, model_x_knockoff_pvalue, diff --git a/src/hidimstat/base_variable_importance.py b/src/hidimstat/base_variable_importance.py index b4f539024..911435a8e 100644 --- a/src/hidimstat/base_variable_importance.py +++ b/src/hidimstat/base_variable_importance.py @@ -3,6 +3,9 @@ from sklearn.base import BaseEstimator import numpy as np +from hidimstat.statistical_tools.multiple_testing import fdr_threshold +from hidimstat.statistical_tools.aggregation import quantile_aggregation + class BaseVariableImportance(BaseEstimator): """ @@ -34,24 +37,35 @@ def __init__(self): self.importances_ = None self.pvalues_ = None self.selections_ = None + self.test_scores_ = None + self.threshold_fdr_ = None + self.aggregated_pval_ = None + self.aggregated_eval_ = None + + def _check_importance(self): + """ + Checks if the importance scores and p-values have been computed. + """ + if self.importances_ is None: + raise ValueError( + "The importances need to be called before calling this method" + ) def selection( self, k_best=None, percentile=None, threshold=None, threshold_pvalue=None ): """ Selects features based on variable importance. - In case several arguments are different from None, - the returned selection is the conjunction of all of them. Parameters ---------- - k_best : int, optional, default=None + k_best : int, default=None Selects the top k features based on importance scores. - percentile : float, optional, default=None + percentile : float, default=None Selects features based on a specified percentile of importance scores. - threshold : float, optional, default=None + threshold : float, default=None Selects features with importance scores above the specified threshold. - threshold_pvalue : float, optional, default=None + threshold_pvalue : float, default=None Selects features with p-values below the specified threshold. Returns @@ -61,17 +75,23 @@ def selection( """ self._check_importance() if k_best is not None: - if not isinstance(k_best, str) and k_best > self.importances_.shape[1]: + if not isinstance(k_best, str) and k_best > self.importances_.shape[0]: warnings.warn( - f"k={k_best} is greater than n_features={self.importances_.shape[1]}. " + f"k={k_best} is greater than n_features={self.importances_.shape[0]}. " "All the features will be returned." ) - assert k_best > 0, "k_best needs to be positive and not null" + if isinstance(k_best, str): + assert k_best == "all" + else: + assert k_best >= 0, "k_best needs to be positive or null" if percentile is not None: assert ( - 0 < percentile and percentile < 100 + 0 <= percentile and percentile <= 100 ), "percentile needs to be between 0 and 100" if threshold_pvalue is not None: + assert ( + self.pvalues_ is not None + ), "This method doesn't support a threshold on p-values" assert ( 0 < threshold_pvalue and threshold_pvalue < 1 ), "threshold_pvalue needs to be between 0 and 1" @@ -96,9 +116,9 @@ def selection( elif percentile == 0: mask_percentile = np.zeros(len(self.importances_), dtype=bool) elif percentile is not None: - threshold = np.percentile(self.importances_, 100 - percentile) - mask_percentile = self.importances_ > threshold - ties = np.where(self.importances_ == threshold)[0] + threshold_percentile = np.percentile(self.importances_, 100 - percentile) + mask_percentile = self.importances_ > threshold_percentile + ties = np.where(self.importances_ == threshold_percentile)[0] if len(ties): max_feats = int(len(self.importances_) * percentile / 100) kept_ties = ties[: max_feats - mask_percentile.sum()] @@ -123,11 +143,177 @@ def selection( return self.selections_ - def _check_importance(self): + def selection_fdr( + self, + fdr, + fdr_control="bhq", + evalues=False, + reshaping_function=None, + adaptive_aggregation=False, + gamma=0.5, + ): """ - Checks if the importance scores and p-values have been computed. + Performs feature selection based on False Discovery Rate (FDR) control. + + This method selects features by controlling the FDR using either p-values or e-values + derived from test scores. It supports different FDR control methods and optional + adaptive aggregation of the statistical values. + + Parameters + ---------- + fdr : float, default=None + The target false discovery rate level (between 0 and 1) + fdr_control: string, default="bhq" + The FDR control method to use. Options are: + - "bhq": Benjamini-Hochberg procedure + - 'bhy': Benjamini-Hochberg-Yekutieli procedure + - "ebh": e-BH procedure (only for e-values) + evalues: boolean, default=False + If True, uses e-values for selection. If False, uses p-values. + reshaping_function: callable, default=None + Reshaping function for BHY method, default uses sum of reciprocals + adaptive_aggregation: boolean, default=False + If True, uses adaptive weights for p-value aggregation. + Only applicable when evalues=False. + gamma: boolean, default=0.5 + The gamma parameter for quantile aggregation of p-values. + Only used when evalues=False. + + Returns + ------- + numpy.ndarray + Boolean array indicating selected features (True for selected, False for not selected) + + Raises + ------ + AssertionError + If test_scores\_ is None or if incompatible combinations of parameters are provided """ - if self.importances_ is None: - raise ValueError( - "The importances need to be called before calling this method" + self._check_importance() + assert ( + self.test_scores_ is not None + ), "this method doesn't support selection base on FDR" + + if self.test_scores_.shape[0] == 1: + self.threshold_fdr_ = _estimated_threshold(self.test_scores_, fdr=fdr) + selected = self.test_scores_[0] >= self.threshold_fdr_ + elif not evalues: + assert fdr_control != "ebh", "for p-value, the fdr control can't be 'ebh'" + pvalues = np.array( + [_empirical_pval(test_score) for test_score in self.test_scores_] ) + self.aggregated_pval_ = quantile_aggregation( + pvalues, gamma=gamma, adaptive=adaptive_aggregation + ) + self.threshold_fdr_ = fdr_threshold( + self.aggregated_pval_, + fdr=fdr, + method=fdr_control, + reshaping_function=reshaping_function, + ) + selected = self.aggregated_pval_ <= self.threshold_fdr_ + else: + assert fdr_control == "ebh", "for e-value, the fdr control need to be 'ebh'" + evalues = [] + for test_score in self.test_scores_: + ko_threshold = _estimated_threshold(test_score, fdr=fdr) + evalues.append(_empirical_eval(test_score, ko_threshold)) + self.aggregated_eval_ = np.mean(evalues, axis=0) + self.threshold_fdr_ = fdr_threshold( + self.aggregated_eval_, + fdr=fdr, + method=fdr_control, + reshaping_function=reshaping_function, + ) + selected = self.aggregated_eval_ >= self.threshold_fdr_ + return selected + + +def _estimated_threshold(test_score, fdr=0.1): + """ + Calculate the threshold based on the procedure stated in the knockoff article. + Original code: + https://github.com/msesia/knockoff-filter/blob/master/R/knockoff/R/knockoff_filter.R + Parameters + ---------- + test_score : 1D ndarray, shape (n_features, ) + Vector of test statistic. + fdr : float + Desired controlled FDR (false discovery rate) level. + Returns + ------- + threshold : float or np.inf + Threshold level. + """ + offset = 1 # Offset equals 1 is the knockoff+ procedure. + + threshold_mesh = np.sort(np.abs(test_score[test_score != 0])) + np.concatenate( + [[0], threshold_mesh, [np.inf]] + ) # if there is no solution, the threshold is inf + # find the right value of t for getting a good fdr + # Equation 1.8 of barber2015controlling and 3.10 in Candès 2018 + threshold = 0.0 + for threshold in threshold_mesh: + false_pos = np.sum(test_score <= -threshold) + selected = np.sum(test_score >= threshold) + if (offset + false_pos) / np.maximum(selected, 1) <= fdr: + break + return threshold + + +def _empirical_pval(test_score): + """ + Compute the empirical p-values from the test based on knockoff+. + Parameters + ---------- + test_score : 1D ndarray, shape (n_features, ) + Vector of test statistics. + Returns + ------- + pvals : 1D ndarray, shape (n_features, ) + Vector of empirical p-values. + """ + pvals = [] + n_features = test_score.size + + offset = 1 # Offset equals 1 is the knockoff+ procedure. + + test_score_inv = -test_score + for i in range(n_features): + if test_score[i] <= 0: + pvals.append(1) + else: + pvals.append( + (offset + np.sum(test_score_inv >= test_score[i])) / n_features + ) + + return np.array(pvals) + + +def _empirical_eval(test_score, ko_threshold): + """ + Compute the empirical e-values from the test based on knockoff. + Parameters + ---------- + test_score : 1D ndarray, shape (n_features, ) + Vector of test statistics. + ko_threshold : float + Threshold level. + Returns + ------- + evals : 1D ndarray, shape (n_features, ) + Vector of empirical e-values. + """ + evals = [] + n_features = test_score.size + + offset = 1 # Offset equals 1 is the knockoff+ procedure. + + for i in range(n_features): + if test_score[i] < ko_threshold: + evals.append(0) + else: + evals.append(n_features / (offset + np.sum(test_score <= -ko_threshold))) + + return np.array(evals) diff --git a/src/hidimstat/conditional_feature_importance.py b/src/hidimstat/conditional_feature_importance.py index 9b0e7905f..8c9686334 100644 --- a/src/hidimstat/conditional_feature_importance.py +++ b/src/hidimstat/conditional_feature_importance.py @@ -5,7 +5,7 @@ from sklearn.utils.validation import check_random_state from hidimstat.base_perturbation import BasePerturbation -from hidimstat.conditional_sampling import ConditionalSampler +from hidimstat.statistical_tools.conditional_sampling import ConditionalSampler class CFI(BasePerturbation): diff --git a/src/hidimstat/conditional_randomization_test.py b/src/hidimstat/conditional_randomization_test.py new file mode 100644 index 000000000..26be41e45 --- /dev/null +++ b/src/hidimstat/conditional_randomization_test.py @@ -0,0 +1,282 @@ +from copy import deepcopy +from itertools import product +import warnings + +import numpy as np +from joblib import Parallel, delayed +from sklearn.covariance import LedoitWolf +from sklearn.preprocessing import StandardScaler +from sklearn.utils.validation import check_memory +from tqdm import tqdm + +from hidimstat._utils.docstring import _aggregate_docstring +from hidimstat.statistical_tools.gaussian_distribution import GaussianDistribution +from hidimstat.statistical_tools.lasso_test import lasso_statistic +from hidimstat.base_variable_importance import BaseVariableImportance + + +class ConditionalRandimizationTest(BaseVariableImportance): + """ + Implements conditional randomization test (CRT). + The Conditional Randomization Test :footcite:t:`candes2018panning` is a method + for statistical variable importance testing (see algorithm 2). + + Parameters + ---------- + generator : object, default=GaussianGenerator(cov_estimator=LedoitWolf(assume_centered=True)) + Generator object for simulating null distributions + statistical_test : callable, default=lasso_statistic + Function that computes test statistic + n_permutation : int, default=10 + Number of permutations for the test + n_jobs : int, default=1 + Number of parallel jobs + memory : str or object, default=None + Used for caching + joblib_verbose : int, default=0 + Verbosity level for parallel jobs + + Attributes + ---------- + importances_ : ndarray of shape (n_features,) + Feature importance scores + pvalues_ : ndarray of shape (n_features,) + P-values for each feature + + Notes + ----- + The CRT tests feature importance by comparing observed test statistics against + a conditional null distribution generated through simulation. + + References + ---------- + .. footbibliography:: + """ + + def __init__( + self, + generator=GaussianDistribution(cov_estimator=LedoitWolf(assume_centered=True)), + statistical_test=lasso_statistic, + centered=True, + n_repeat=10, + n_jobs=1, + memory=None, + joblib_verbose=0, + ): + self.generator = generator + assert n_repeat > 0, "n_samplings must be positive" + self.n_repeat = n_repeat + self.n_jobs = n_jobs + self.memory = check_memory(memory) + self.joblib_verbose = joblib_verbose + self.statistical_test = statistical_test + self.centered = centered + + def fit(self, X, y=None): + """ + Fit the CRT model by training the generator. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Training data matrix where n_samples is the number of samples and + n_features is the number of features. + y : array-like of shape (n_samples,), default=None + Target values. Not used in this method. + + Returns + ------- + self : object + Returns the instance itself. + + Notes + ----- + The fit method only trains the generator component. The target values y + are not used in this step. + """ + if y is not None: + warnings.warn("y won't be used") + + if self.centered: + X_ = StandardScaler().fit_transform(X) + else: + X_ = X + self.generator.fit(X_) + return self + + def _check_fit(self): + try: + self.generator._check_fit() + except ValueError as exc: + raise ValueError( + "The CRT requires to be fitted before computing importance" + ) from exc + + def importance(self, X, y): + """ + Calculate p-values and identify significant features using the CRT test statistics. + This function processes the results from Conditional Randomization Test (CRT) to identify + statistically significant features. It computes p-values by comparing a reference test + statistic to test statistics from permuted data. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Training data matrix where n_samples is the number of samples and + n_features is the number of features. + y : array-like of shape (n_samples,) + Target values. + + Returns + ------- + importances_ : ndarray of shape (n_features,) + Feature importance scores for each feature. + Higher values indicate higher importance. + + Notes + ----- + The p-values are calculated using the formula: + (1 + #(T_perm >= T_obs)) / (n_permutations + 1) + where T_perm are the test statistics from permuted data and T_obs is the + reference test statistic. + """ + self._check_fit() + if self.centered: + X_ = StandardScaler().fit_transform(X) + else: + X_ = X + reference_value = self.statistical_test(X_, y) + + parallel = Parallel(self.n_jobs, verbose=self.joblib_verbose) + + self.test_scores_ = np.array( + parallel( + delayed(joblib_statistic_test)( + index, X_, self.generator.sample(), y, self.statistical_test + ) + for repeat_i, index in tqdm( + product(range(self.n_repeat), range(X_.shape[1])) + ) + ) + ) + self.test_scores_ = reference_value - np.array(self.test_scores_).reshape( + self.n_repeat, -1 + ) + + self.importances_ = np.mean(np.abs(self.test_scores_), axis=0) + # see equation of p-value in algorithm 2 + self.pvalues_ = ( + 1 + + np.sum( + self.test_scores_ >= 0, + axis=0, + ) + ) / (self.n_repeat + 1) + return self.importances_ + + def fit_importance(self, X, y, cv=None): + """ + Fits the model to the data and computes feature importance. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The input data matrix where n_samples is the number of samples and + n_features is the number of features. + y : array-like of shape (n_samples,) + The target values. + cv : None or cross-validation generator, default=None + Cross-validation parameter. Not used in this method. + A warning will be issued if provided. + + Returns + ------- + importances_ : ndarray of shape (n_features,) + Feature importance scores (p-values) for each feature. + Lower values indicate higher importance. Values range from 0 to 1. + + Notes + ----- + This method combines the fit and importance computation steps. + It first fits the generator to X and then computes importance scores + by comparing observed test statistics against permuted ones. + + See Also + -------- + fit : Method for fitting the generator only + importance : Method for computing importance scores only + """ + if cv is not None: + warnings.warn("cv won't be used") + + self.fit(X) + return self.importance(X, y) + + +def joblib_statistic_test(index, X, X_sample, y, statistic_test): + """ + Compute test statistic for a single feature with permuted data. + + Parameters + ---------- + index : int + Index of the feature to test + X : array-like of shape (n_samples, n_features) + Original input data matrix + X_sample : array-like of shape (n_samples, n_features) + Permuted data matrix + y : array-like of shape (n_samples,) + Target values + statistic_test : callable + Function that computes the test statistic + + Returns + ------- + float + Test statistic value for the specified feature + """ + X_tmp = deepcopy(X) + X_tmp[:, index] = deepcopy(X_sample[:, index]) + return statistic_test(X_tmp, y)[index] + + +def crt( + X, + y, + generator=GaussianDistribution(cov_estimator=LedoitWolf(assume_centered=True)), + statistical_test=lasso_statistic, + n_repeat=10, + n_jobs=1, + memory=None, + joblib_verbose=0, +): + crt = ConditionalRandimizationTest( + generator=generator, + statistical_test=statistical_test, + n_repeat=n_repeat, + n_jobs=n_jobs, + memory=memory, + joblib_verbose=joblib_verbose, + ) + return crt.fit_importance(X, y) + + +# use the docstring of the class for the function +crt.__doc__ = _aggregate_docstring( + [ + ConditionalRandimizationTest.__doc__, + ConditionalRandimizationTest.__init__.__doc__, + ConditionalRandimizationTest.fit_importance.__doc__, + ConditionalRandimizationTest.selection.__doc__, + ], + """ + Returns + ------- + selection: binary array-like of shape (n_features) + Binary array of the seleted features + importance : array-like of shape (n_features) + The computed feature importance scores. + pvalues : array-like of shape (n_features) + The computed significant of feature for the prediction. + """, +) diff --git a/src/hidimstat/gaussian_knockoff.py b/src/hidimstat/gaussian_knockoff.py deleted file mode 100644 index 886312c7e..000000000 --- a/src/hidimstat/gaussian_knockoff.py +++ /dev/null @@ -1,191 +0,0 @@ -import warnings - -import numpy as np - - -def gaussian_knockoff_generation(X, mu, sigma, seed=None, tol=1e-14): - """ - Generate second-order knockoff variables using the equi-correlated method. - - This function generates knockoff variables for a given design matrix X, - using the equi-correlated method described in :footcite:t:`barber2015controlling` - and :footcite:t:`candes2018panning`. The function takes as input the design matrix - X, the vector of empirical mean values mu, and the empirical covariance - matrix sigma. It returns the knockoff variables X_tilde. - - The original implementation can be found at - https://github.com/msesia/knockoff-filter/blob/master/R/knockoff/R/create_gaussian.R - - Parameters - ---------- - X: 2D ndarray (n_samples, n_features) - The original design matrix. - - mu : 1D ndarray (n_features, ) - A vector of empirical mean values. - - sigma : 2D ndarray (n_samples, n_features) - The empirical covariance matrix. - - seed : int, optional - A random seed for generating the uniform noise used to create - the knockoff variables. - - tol : float, default=1.e-14 - A tolerance value used for numerical stability in the calculation - of the Cholesky decomposition. - - Returns - ------- - X_tilde : 2D ndarray (n_samples, n_features) - The knockoff variables. - - mu_tilde : 2D ndarray (n_samples, n_features) - The mean matrix used for generating knockoffs. - - sigma_tilde_decompose : 2D ndarray (n_features, n_features) - The Cholesky decomposition of the covariance matrix. - - References - ---------- - .. footbibliography:: - """ - n_samples, n_features = X.shape - - # create a uniform noise for all the data - rng = np.random.RandomState(seed) - u_tilde = rng.randn(n_samples, n_features) - - diag_s = np.diag(_s_equi(sigma, tol=tol)) - - sigma_inv_s = np.linalg.solve(sigma, diag_s) - - # First part on the RHS of equation 1.4 in barber2015controlling - mu_tilde = X - np.dot(X - mu, sigma_inv_s) - # To calculate the Cholesky decomposition later on - sigma_tilde = 2 * diag_s - diag_s.dot(sigma_inv_s) - # test is the matrix is positive definite - while not np.all(np.linalg.eigvalsh(sigma_tilde) > tol): - sigma_tilde += 1e-10 * np.eye(n_features) - warnings.warn( - "The conditional covariance matrix for knockoffs is not positive " - "definite. Adding minor positive value to the matrix.", - UserWarning, - ) - - # Equation 1.4 in barber2015controlling - sigma_tilde_decompose = np.linalg.cholesky(sigma_tilde) - X_tilde = mu_tilde + np.dot(u_tilde, sigma_tilde_decompose) - - return X_tilde, mu_tilde, sigma_tilde_decompose - - -def repeat_gaussian_knockoff_generation(mu_tilde, sigma_tilde_decompose, seed): - """ - Generate additional knockoff variables using pre-computed values. - - This function generates additional knockoff variables using pre-computed - values returned by the gaussian_knockoff_generation function - with repeat=True. It takes as input mu_tilde and sigma_tilde_decompose, - which were returned by gaussian_knockoff_generation, and a random seed. - It returns the new knockoff variables X_tilde. - - Parameters - ---------- - mu_tilde : 2D ndarray (n_samples, n_features) - The matrix of means used to generate the knockoff variables, - returned by gaussian_knockoff_generation. - - sigma_tilde_decompose : 2D ndarray (n_features, n_features) - The Cholesky decomposition of the covariance matrix used - to generate the knockoff variables,returned by - gaussian_knockoff_generation. - - seed : int - A random seed for generating the uniform noise used to create - the knockoff variables. - - Returns - ------- - X_tilde : 2D ndarray (n_samples, n_features) - The knockoff variables. - """ - n_samples, n_features = mu_tilde.shape - - # create a uniform noise for all the data - rng = np.random.RandomState(seed) - u_tilde = rng.randn(n_samples, n_features) - - X_tilde = mu_tilde + np.dot(u_tilde, sigma_tilde_decompose) - return X_tilde - - -def _s_equi(sigma, tol=1e-14): - """ - Estimate the diagonal matrix of correlation between real - and knockoff variables using the equi-correlated equation. - - This function estimates the diagonal matrix of correlation - between real and knockoff variables using the equi-correlated - equation described in :footcite:t:`barber2015controlling` and - :footcite:t:`candes2018panning`. It takes as input the empirical - covariance matrix sigma and a tolerance value tol, - and returns a vector of diagonal values of the estimated - matrix diag{s}. - - Parameters - ---------- - sigma : 2D ndarray (n_features, n_features) - The empirical covariance matrix calculated from - the original design matrix. - - tol : float, optional - A tolerance value used for numerical stability in the calculation - of the eigenvalues of the correlation matrix. - - Returns - ------- - 1D ndarray (n_features, ) - A vector of diagonal values of the estimated matrix diag{s}. - - Raises - ------ - Exception - If the covariance matrix is not positive-definite. - """ - n_features = sigma.shape[0] - - # Convert covariance matrix to correlation matrix - # as example see cov2corr from statsmodels - features_std = np.sqrt(np.diag(sigma)) - scale = np.outer(features_std, features_std) - corr_matrix = sigma / scale - - eig_value = np.linalg.eigvalsh(corr_matrix) - lambda_min = np.min(eig_value[0]) - # check if the matrix is positive-defined - if lambda_min <= 0: - raise Exception("The covariance matrix is not positive-definite.") - - s = np.ones(n_features) * min(2 * lambda_min, 1) - - psd = np.all(np.linalg.eigvalsh(2 * corr_matrix - np.diag(s)) > tol) - s_eps = 0 - while not psd: - if s_eps == 0: - s_eps = np.finfo(type(s[0])).eps # avoid zeros - else: - s_eps *= 10 - # if all eigval > 0 then the matrix is positive define - psd = np.all( - np.linalg.eigvalsh(2 * corr_matrix - np.diag(s * (1 - s_eps))) > tol - ) - warnings.warn( - "The equi-correlated matrix for knockoffs is not positive " - f"definite. Reduce the value of distance by {s_eps}.", - UserWarning, - ) - - s = s * (1 - s_eps) - - return s * np.diag(sigma) diff --git a/src/hidimstat/knockoffs.py b/src/hidimstat/knockoffs.py index 7dab07aeb..e7461e8b9 100644 --- a/src/hidimstat/knockoffs.py +++ b/src/hidimstat/knockoffs.py @@ -7,10 +7,7 @@ from sklearn.utils import check_random_state from sklearn.utils.validation import check_memory -from hidimstat.gaussian_knockoff import ( - gaussian_knockoff_generation, - repeat_gaussian_knockoff_generation, -) +from hidimstat.statistical_tools.gaussian_distribution import GaussianDistribution from hidimstat.statistical_tools.multiple_testing import fdr_threshold from hidimstat.statistical_tools.aggregation import quantile_aggregation @@ -199,29 +196,12 @@ def model_x_knockoff( if centered: X = StandardScaler().fit_transform(X) - # estimation of X distribution - # original implementation: - # https://github.com/msesia/knockoff-filter/blob/master/R/knockoff/R/create_second_order.R - mu = X.mean(axis=0) - sigma = cov_estimator.fit(X).covariance_ - # Create knockoff variables - X_tilde, mu_tilde, sigma_tilde_decompose = memory.cache( - gaussian_knockoff_generation - )(X, mu, sigma, seed=seed_list[0], tol=tol_gauss) - - if n_bootstraps == 1: - X_tildes = [X_tilde] - else: - X_tildes = parallel( - delayed(repeat_gaussian_knockoff_generation)( - mu_tilde, - sigma_tilde_decompose, - seed=seed, - ) - for seed in seed_list[1:] - ) - X_tildes.insert(0, X_tilde) + conditionnal_sampler = GaussianDistribution( + cov_estimator, random_state=seed_list[0], tol=tol_gauss + ) + conditionnal_sampler.fit(X) + X_tildes = [conditionnal_sampler.sample() for i in range(n_bootstraps)] results = parallel( delayed(memory.cache(_stat_coefficient_diff))( diff --git a/src/hidimstat/statistical_tools/aggregation.py b/src/hidimstat/statistical_tools/aggregation.py index a9a85a4e3..21aa44f3c 100644 --- a/src/hidimstat/statistical_tools/aggregation.py +++ b/src/hidimstat/statistical_tools/aggregation.py @@ -1,7 +1,7 @@ import numpy as np -def quantile_aggregation(pvals, gamma=0.05, adaptive=False): +def quantile_aggregation(pvals, gamma=0.5, adaptive=False): """ Implements the quantile aggregation method for p-values. @@ -15,7 +15,7 @@ def quantile_aggregation(pvals, gamma=0.05, adaptive=False): pvals : ndarray of shape (n_sampling*2, n_test) Matrix of p-values to aggregate. Each row represents a sampling instance and each column a hypothesis test. - gamma : float, default=0.05 + gamma : float, default=0.5 Quantile level for aggregation. Must be in range (0,1]. adaptive : bool, default=False If True, uses adaptive quantile aggregation which optimizes over multiple gamma values. diff --git a/src/hidimstat/conditional_sampling.py b/src/hidimstat/statistical_tools/conditional_sampling.py similarity index 100% rename from src/hidimstat/conditional_sampling.py rename to src/hidimstat/statistical_tools/conditional_sampling.py diff --git a/src/hidimstat/statistical_tools/gaussian_distribution.py b/src/hidimstat/statistical_tools/gaussian_distribution.py new file mode 100644 index 000000000..a03d82e28 --- /dev/null +++ b/src/hidimstat/statistical_tools/gaussian_distribution.py @@ -0,0 +1,197 @@ +import warnings + +import numpy as np +from sklearn.utils import check_random_state + + +class GaussianDistribution: + """ + Generator for second-order Gaussian variables using the equi-correlated method. + Creates synthetic variables that preserve the covariance structure of the original + variables while ensuring conditional independence between the original and synthetic data. + + Parameters + ---------- + cov_estimator : object + Estimator for computing the covariance matrix. Must implement fit and + have a covariance\_ attribute after fitting. + random_state : int or None, default=None + Random seed for generating synthetic data. + tol : float, default=1e-14 + Tolerance for numerical stability in matrix computations. + + Attributes + ---------- + mu_tilde_ : ndarray of shape (n_samples, n_features) + Mean matrix for generating synthetic variables. + sigma_tilde_decompose_ : ndarray of shape (n_features, n_features) + Cholesky decomposition of the synthetic covariance matrix. + + References + ---------- + .. footbibliography:: + """ + + def __init__(self, cov_estimator, random_state=None, tol=1e-14): + self.cov_estimator = cov_estimator + self.tol = tol + self.rng = check_random_state(random_state) + + def fit(self, X): + """ + Fit the Gaussian synthetic variable generator. + This method estimates the parameters needed to generate Gaussian synthetic variables + based on the input data. It follows a methodology for creating second-order + synthetic variables that preserve the covariance structure. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The input samples used to estimate the parameters for synthetic variable generation. + The data is assumed to follow a Gaussian distribution. + + Returns + ------- + self : object + Returns the instance itself. + + Notes + ----- + The method implements the following steps: + 1. Centers and scales the data if specified + 2. Estimates mean and covariance of input data + 3. Computes parameters for synthetic variable generation + """ + _, n_features = X.shape + + # estimation of X distribution + # original implementation: + # https://github.com/msesia/knockoff-filter/blob/master/R/knockoff/R/create_second_order.R + mu = X.mean(axis=0) + sigma = self.cov_estimator.fit(X).covariance_ + + diag_s = np.diag(_s_equi(sigma, tol=self.tol)) + + sigma_inv_s = np.linalg.solve(sigma, diag_s) + + # First part on the RHS of equation 1.4 in barber2015controlling + self.mu_tilde_ = X - np.dot(X - mu, sigma_inv_s) + # To calculate the Cholesky decomposition later on + sigma_tilde = 2 * diag_s - diag_s.dot(sigma_inv_s) + # test is the matrix is positive definite + while not np.all(np.linalg.eigvalsh(sigma_tilde) > self.tol): + sigma_tilde += 1e-10 * np.eye(n_features) + warnings.warn( + "The conditional covariance matrix for knockoffs is not positive " + "definite. Adding minor positive value to the matrix.", + UserWarning, + ) + + self.sigma_tilde_decompose_ = np.linalg.cholesky(sigma_tilde) + + return self + + def _check_fit(self): + """ + Check if the model has been fit before performing analysis. + + Raises + ------ + ValueError + If any of the required attributes are missing, indicating the model + hasn't been fit before generating synthetic variables. + """ + if not hasattr(self, "mu_tilde_") or not hasattr( + self, "sigma_tilde_decompose_" + ): + raise ValueError("The GaussianGenerator requires to be fit before simulate") + + def sample(self): + """ + Generate synthetic variables. + This function generates synthetic variables that preserve the covariance structure + of the original data while ensuring conditional independence. + + Returns + ------- + X_tilde : 2D ndarray (n_samples, n_features) + The synthetic variables. + """ + self._check_fit() + n_samples, n_features = self.mu_tilde_.shape + + # create a uniform noise for all the data + u_tilde = self.rng.randn(n_samples, n_features) + + # Equation 1.4 in barber2015controlling + X_tilde = self.mu_tilde_ + np.dot(u_tilde, self.sigma_tilde_decompose_) + return X_tilde + + +def _s_equi(sigma, tol=1e-14): + """ + Estimate the diagonal matrix of correlation between real + and knockoff variables using the equi-correlated equation. + This function estimates the diagonal matrix of correlation + between real and knockoff variables using the equi-correlated + equation described in :footcite:t:`barber2015controlling` and + :footcite:t:`candes2018panning`. It takes as input the empirical + covariance matrix sigma and a tolerance value tol, + and returns a vector of diagonal values of the estimated + matrix diag{s}. + + Parameters + ---------- + sigma : 2D ndarray (n_features, n_features) + The empirical covariance matrix calculated from + the original design matrix. + tol : float, optional + A tolerance value used for numerical stability in the calculation + of the eigenvalues of the correlation matrix. + + Returns + ------- + 1D ndarray (n_features, ) + A vector of diagonal values of the estimated matrix diag{s}. + + Raises + ------ + Exception + If the covariance matrix is not positive-definite. + """ + n_features = sigma.shape[0] + + # Convert covariance matrix to correlation matrix + # as example see cov2corr from statsmodels + features_std = np.sqrt(np.diag(sigma)) + scale = np.outer(features_std, features_std) + corr_matrix = sigma / scale + + eig_value = np.linalg.eigvalsh(corr_matrix) + lambda_min = np.min(eig_value[0]) + # check if the matrix is positive-defined + if lambda_min <= 0: + raise Exception("The covariance matrix is not positive-definite.") + + s = np.ones(n_features) * min(2 * lambda_min, 1) + + psd = np.all(np.linalg.eigvalsh(2 * corr_matrix - np.diag(s)) > tol) + s_eps = 0 + while not psd: + if s_eps == 0: + s_eps = np.finfo(type(s[0])).eps # avoid zeros + else: + s_eps *= 10 + # if all eigval > 0 then the matrix is positive define + psd = np.all( + np.linalg.eigvalsh(2 * corr_matrix - np.diag(s * (1 - s_eps))) > tol + ) + warnings.warn( + "The equi-correlated matrix for knockoffs is not positive " + f"definite. Reduce the value of distance by {s_eps}.", + UserWarning, + ) + + s = s * (1 - s_eps) + + return s * np.diag(sigma) diff --git a/src/hidimstat/statistical_tools/lasso_test.py b/src/hidimstat/statistical_tools/lasso_test.py new file mode 100644 index 000000000..bf66a4c92 --- /dev/null +++ b/src/hidimstat/statistical_tools/lasso_test.py @@ -0,0 +1,54 @@ +import numpy as np +from sklearn.linear_model import LassoCV +from sklearn.model_selection import KFold + + +def lasso_statistic( + X, + y, + lasso=LassoCV( + n_jobs=1, + verbose=0, + max_iter=200000, + cv=KFold(n_splits=5, shuffle=True, random_state=0), + tol=1e-6, + ), + n_alphas=0, +): + """ + Compute Lasso statistic using feature coefficients. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The input data matrix. + y : array-like of shape (n_samples,) + The target values. + lasso : estimator, default=LassoCV(n_jobs=None, verbose=0, max_iter=200000, cv=KFold(n_splits=5, shuffle=True, random_state=0), tol=1e-6) + The Lasso estimator to use for computing the test statistic. + n_alphas : int, default=0 + Number of alpha values to test for Lasso regularization path. + If 0, uses the default alpha sequence from the estimator. + + Returns + ------- + coef : ndarray + Lasso coefficients for each feature. + + Raises + ------ + TypeError + If the provided estimator does not have coef\_ attribute or is not linear. + """ + if n_alphas != 0: + alpha_max = np.max(np.dot(X.T, y)) / (X.shape[1]) + alphas = np.linspace(alpha_max * np.exp(-n_alphas), alpha_max, n_alphas) + lasso.alphas = alphas + lasso.fit(X, y) + if hasattr(lasso, "coef_"): + coef = np.ravel(lasso.coef_) + elif hasattr(lasso, "best_estimator_") and hasattr(lasso.best_estimator_, "coef_"): + coef = np.ravel(lasso.best_estimator_.coef_) # for CV object + else: + raise TypeError("estimator should be linear") + return coef diff --git a/test/test_conditional_sampling.py b/test/statistical_tools/test_conditional_sampling.py similarity index 99% rename from test/test_conditional_sampling.py rename to test/statistical_tools/test_conditional_sampling.py index d42f1f11c..7c347ff50 100644 --- a/test/test_conditional_sampling.py +++ b/test/statistical_tools/test_conditional_sampling.py @@ -11,7 +11,7 @@ from sklearn.multiclass import OneVsRestClassifier from sklearn.preprocessing import StandardScaler -from hidimstat.conditional_sampling import ConditionalSampler +from hidimstat.statistical_tools.conditional_sampling import ConditionalSampler def test_continuous_case(): diff --git a/test/statistical_tools/test_gaussian_distribution.py b/test/statistical_tools/test_gaussian_distribution.py new file mode 100644 index 000000000..ba586dc1b --- /dev/null +++ b/test/statistical_tools/test_gaussian_distribution.py @@ -0,0 +1,82 @@ +import numpy as np +import pytest +from hidimstat.statistical_tools.gaussian_distribution import ( + _s_equi, + GaussianDistribution, +) +from hidimstat._utils.scenario import multivariate_simulation +from sklearn.covariance import LedoitWolf + + +def test_gaussian_equi(): + """test function of gaussian""" + seed = 42 + n = 100 + p = 50 + X, y, beta, noise = multivariate_simulation(n, p, seed=seed) + generator = GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), + random_state=seed * 2, + ) + generator.fit(X=X) + X_tilde = generator.sample() + assert X_tilde.shape == (n, p) + + +def test_gaussian_center(): + """test function of gaussian""" + seed = 42 + n = 100 + p = 50 + X, y, beta, noise = multivariate_simulation(n, p, seed=seed) + generator = GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), + random_state=seed * 2, + ) + generator.fit(X=X) + X_tilde = generator.sample() + assert X_tilde.shape == (n, p) + + +def test_gaussian_error(): + """test function error""" + seed = 42 + n = 100 + p = 50 + X, y, beta, noise = multivariate_simulation(n, p, seed=seed) + generator = GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), + random_state=seed * 2, + ) + with pytest.raises( + ValueError, match="The GaussianGenerator requires to be fit before simulate" + ): + generator.sample() + + +def test_s_equi_not_define_positive(): + """test the warning and error of s_equi function""" + n = 10 + tol = 1e-7 + seed = 42 + + # random positive matrix + rgn = np.random.RandomState(seed) + a = rgn.randn(n, n) + a -= np.min(a) + with pytest.raises( + Exception, match="The covariance matrix is not positive-definite." + ): + _s_equi(a) + + # matrix with positive eigenvalues, positive diagonal + while not np.all(np.linalg.eigvalsh(a) > tol): + a += 0.1 * np.eye(n) + with pytest.warns(UserWarning, match="The equi-correlated matrix"): + _s_equi(a) + + # positive definite matrix + u, s, vh = np.linalg.svd(a) + d = np.eye(n) + sigma = u * d * u.T + _s_equi(sigma) diff --git a/test/statistical_tools/test_lasso_test.py b/test/statistical_tools/test_lasso_test.py new file mode 100644 index 000000000..3a0d64e56 --- /dev/null +++ b/test/statistical_tools/test_lasso_test.py @@ -0,0 +1,13 @@ +import numpy as np +import pytest +from sklearn.svm import SVR + +from hidimstat.statistical_tools.lasso_test import ( + lasso_statistic, +) + + +def test_error_lasso_statistic(): + """Test error lasso statistic""" + with pytest.raises(TypeError, match="estimator should be linear"): + lasso_statistic(X=np.random.rand(10, 10), y=np.random.rand(10), lasso=SVR()) diff --git a/test/test_base_variable_importance.py b/test/test_base_variable_importance.py new file mode 100644 index 000000000..5f154f91a --- /dev/null +++ b/test/test_base_variable_importance.py @@ -0,0 +1,252 @@ +import pytest +import numpy as np + +from hidimstat import BaseVariableImportance + + +@pytest.fixture +def set_BaseVariableImportance(pvalues, test_score, seed): + """Create a BaseVariableImportance instance with test data for testing purposes. + + Parameters + ---------- + pvalues : bool + If True, generate random p-values for testing. + test_score : bool + If True, generate random test scores for testing. + seed : int + Random seed for reproducibility. + + Returns + ------- + BaseVariableImportance + A BaseVariableImportance instance with test data. + """ + nb_features = 100 + rng = np.random.RandomState(seed) + vi = BaseVariableImportance() + vi.importances_ = np.arange(nb_features) + rng.shuffle(vi.importances_) + if pvalues or test_score: + vi.pvalues_ = np.sort(rng.rand(nb_features))[vi.importances_] + if test_score: + # TODO: this can be improved. + vi.test_scores_ = [] + for i in range(10): + score = np.random.rand(nb_features) * 30 + vi.test_scores_.append(score) + for i in range(1, 30): + score = np.random.rand(nb_features) + 1 + score[-i:] = np.arange(30 - i, 30) * 2 + score[:i] = -np.arange(30 - i, 30) + vi.test_scores_.append(score[vi.importances_]) + vi.test_scores_ = np.array(vi.test_scores_) + return vi + + +@pytest.mark.parametrize( + "pvalues, test_score, seed", + [(False, False, 0), (True, False, 1), (True, True, 2)], + ids=["only importance", "p-value", "test-score"], +) +class TestSelection: + """Test selection based on importance""" + + def test_selection_k_best(self, set_BaseVariableImportance): + "test selection of the k_best" + vi = set_BaseVariableImportance + true_value = vi.importances_ >= 95 + selection = vi.selection(k_best=5) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_k_best_all(self, set_BaseVariableImportance): + "test selection to all base on string" + vi = set_BaseVariableImportance + true_value = np.ones_like(vi.importances_, dtype=bool) + selection = vi.selection(k_best="all") + np.testing.assert_array_equal(true_value, selection) + + def test_selection_k_best_none(self, set_BaseVariableImportance): + "test selection when there none" + vi = set_BaseVariableImportance + true_value = np.zeros_like(vi.importances_, dtype=bool) + selection = vi.selection(k_best=0) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_percentile(self, set_BaseVariableImportance): + "test selection bae on percentile" + vi = set_BaseVariableImportance + true_value = vi.importances_ >= 50 + selection = vi.selection(percentile=50) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_percentile_all(self, set_BaseVariableImportance): + "test selection when percentile is 100" + vi = set_BaseVariableImportance + true_value = np.ones_like(vi.importances_, dtype=bool) + selection = vi.selection(percentile=100) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_percentile_none(self, set_BaseVariableImportance): + "test selection when percentile is 0" + vi = set_BaseVariableImportance + true_value = np.zeros_like(vi.importances_, dtype=bool) + selection = vi.selection(percentile=0) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_percentile_threshols_value(self, set_BaseVariableImportance): + "test selection when percentile when the percentile equal on value" + vi = set_BaseVariableImportance + mask = np.ones_like(vi.importances_, dtype=bool) + mask[np.where(vi.importances_ == 99)] = False + vi.importances_ = vi.importances_[mask] + true_value = vi.importances_ >= 50 + selection = vi.selection(percentile=50) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_threshold(self, set_BaseVariableImportance): + "test threshold on importance" + vi = set_BaseVariableImportance + true_value = vi.importances_ < 5 + selection = vi.selection(threshold=5) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_threshold_pvalue(self, set_BaseVariableImportance): + "test threshold vbse on pvalues" + vi = set_BaseVariableImportance + if vi.pvalues_ is not None: + true_value = vi.importances_ < 5 + print(vi.pvalues_) + selection = vi.selection( + threshold_pvalue=vi.pvalues_[np.argsort(vi.importances_)[5]] + ) + np.testing.assert_array_equal(true_value, selection) + + +@pytest.mark.parametrize( + "pvalues, test_score, seed", [(True, True, 10)], ids=["default"] +) +class TestSelectionFDR: + """Test selection based on fdr""" + + def test_selection_fdr_default(self, set_BaseVariableImportance): + "test selection of the default" + vi = set_BaseVariableImportance + true_value = vi.importances_ >= 85 + selection = vi.selection_fdr(0.2) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_fdr_default_1(self, set_BaseVariableImportance): + "test selection of the default" + vi = set_BaseVariableImportance + vi.test_scores_ = np.array([vi.test_scores_[0, :]]) + true_value = vi.importances_ > -1 # all selected + selection = vi.selection_fdr(0.2) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_fdr_adaptation(self, set_BaseVariableImportance): + "test selection of the adaptation" + vi = set_BaseVariableImportance + true_value = vi.importances_ >= 85 + selection = vi.selection_fdr(0.2, adaptive_aggregation=True) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_fdr_bhy(self, set_BaseVariableImportance): + "test selection with bhy" + vi = set_BaseVariableImportance + true_value = vi.importances_ >= 85 + selection = vi.selection_fdr(0.8, fdr_control="bhy") + np.testing.assert_array_equal(true_value, selection) + + def test_selection_fdr_ebh(self, set_BaseVariableImportance): + "test selection with e-values" + vi = set_BaseVariableImportance + true_value = vi.importances_ >= 2 + selection = vi.selection_fdr(0.037, fdr_control="ebh", evalues=True) + np.testing.assert_array_equal(true_value, selection) + + +@pytest.mark.parametrize( + "pvalues, test_score, seed", + [(False, False, 0), (True, False, 0), (True, True, 0)], + ids=["only importance", "p-value", "test-score"], +) +class TestBVIExceptions: + """Test class for BVI Exception""" + + def test_not_fit(self, pvalues, test_score, seed): + "test detection unfit" + vi = BaseVariableImportance() + with pytest.raises( + ValueError, + match="The importances need to be called before calling this method", + ): + vi._check_importance() + with pytest.raises( + ValueError, + match="The importances need to be called before calling this method", + ): + vi.selection() + with pytest.raises( + ValueError, + match="The importances need to be called before calling this method", + ): + vi.selection_fdr(0.1) + + def test_selection_k_best(self, set_BaseVariableImportance): + "test selection k_best wrong" + vi = set_BaseVariableImportance + with pytest.raises(AssertionError, match="k_best needs to be positive or null"): + vi.selection(k_best=-10) + with pytest.warns(Warning, match="k=1000 is greater than n_features="): + vi.selection(k_best=1000) + + def test_selection_percentile(self, set_BaseVariableImportance): + "test selection percentile wrong" + vi = set_BaseVariableImportance + with pytest.raises( + AssertionError, match="percentile needs to be between 0 and 100" + ): + vi.selection(percentile=-1) + with pytest.raises( + AssertionError, match="percentile needs to be between 0 and 100" + ): + vi.selection(percentile=102) + + def test_selection_threshold(self, set_BaseVariableImportance): + "test selection threshold wrong" + vi = set_BaseVariableImportance + if vi.pvalues_ is None: + with pytest.raises( + AssertionError, + match="This method doesn't support a threshold on p-values", + ): + vi.selection(threshold_pvalue=-1) + else: + with pytest.raises( + AssertionError, match="threshold_pvalue needs to be between 0 and 1" + ): + vi.selection(threshold_pvalue=-1) + with pytest.raises( + AssertionError, match="threshold_pvalue needs to be between 0 and 1" + ): + vi.selection(threshold_pvalue=1.1) + + def test_selection_fdr_fdr_control(self, set_BaseVariableImportance): + "test selection fdr_control wrong" + vi = set_BaseVariableImportance + if vi.test_scores_ is None: + with pytest.raises( + AssertionError, + match="this method doesn't support selection base on FDR", + ): + vi.selection_fdr(fdr=0.1) + else: + with pytest.raises( + AssertionError, match="for e-value, the fdr control need to be 'ebh'" + ): + vi.selection_fdr(fdr=0.1, evalues=True) + with pytest.raises( + AssertionError, match="for p-value, the fdr control can't be 'ebh'" + ): + vi.selection_fdr(fdr=0.1, fdr_control="ebh", evalues=False) diff --git a/test/test_conditional_randomization_test.py b/test/test_conditional_randomization_test.py new file mode 100644 index 000000000..85df8bad2 --- /dev/null +++ b/test/test_conditional_randomization_test.py @@ -0,0 +1,459 @@ +import numpy as np +import pytest + +from sklearn.covariance import LedoitWolf, GraphicalLassoCV +from sklearn.model_selection import GridSearchCV +from sklearn.linear_model import Lasso +from sklearn.model_selection import KFold + +from hidimstat import CRT +from hidimstat.conditional_randomization_test import crt +from hidimstat.statistical_tools.gaussian_distribution import GaussianDistribution +from hidimstat.statistical_tools.multiple_testing import fdp_power +from hidimstat.statistical_tools.lasso_test import lasso_statistic + + +def configure_linear_categorial_crt(X, y, n_repeat, seed, fdr): + """ + Configure Conditional Randomize Test (CRT) model with linear regression + for feature importance analysis. + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Input data matrix where each column represents a feature + and each row a sample. + y : array-like of shape (n_samples,) + Target variable array. + n_repeat : int + Number of permutations to perform for the CRT analysis. + seed : int + Random seed for reproducibility. + Returns + ------- + importance : array-like + Array containing importance scores for each feature. + Higher values indicate greater feature importance in predicting + the target variable. + Notes + ----- + The function performs the following steps: + 1. Configures CRT + 2. Calculates feature importance + The CRT method uses permutation-based importance scoring with linear + regression as the base model. + """ + # instantiate CRT model with linear regression imputer + crt = CRT( + n_repeat=n_repeat, + generator=GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), random_state=seed + ), + ) + # fit the model using the training set + importance = crt.fit_importance(X, y) + selected = crt.selection_fdr(fdr=fdr) + return importance, selected + + +############################################################################## +## tests crt on different type of data +parameter_exact = [ + ("Dim", 300, 20, 5, 0.0, 42, 1.0, np.inf, 0.0), + ("Dim with noise", 300, 20, 5, 0.0, 42, 1.0, 10.0, 0.0), + ("Dim with correlated noise", 300, 20, 5, 0.0, 42, 1.0, 10.0, 0.2), + ("Dim with correlated features", 300, 20, 5, 0.2, 42, 1.0, np.inf, 0.0), + ("Dim high level noise", 300, 20, 5, 0.2, 42, 1.0, 1.0, 0.0), + ("Dim with correlated features and noise", 300, 20, 5, 0.2, 42, 1, 10, 0), + ( + "Dim with correlated features and correlated noise", + 300, + 20, + 5, + 0.2, + 42, + 1.0, + 10, + 0.2, + ), +] + + +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + zip(*(list(zip(*parameter_exact))[1:])), + ids=list(zip(*parameter_exact))[0], +) +@pytest.mark.parametrize( + "crt_n_sampling, crt_seed, crt_fdr", [(5, 5, 0.4)], ids=["default_crt"] +) +def test_crt_linear_data_exact(data_generator, crt_n_sampling, crt_seed, crt_fdr): + """Tests the method on linear cases with noise and correlation""" + X, y, important_features, _ = data_generator + importance, selected = configure_linear_categorial_crt( + X, y, crt_n_sampling, crt_seed, crt_fdr + ) + # check that importance scores are defined for each feature + assert importance.shape == (X.shape[1],) + # check that important features have the highest importance scores + assert np.all([int(i) in important_features for i in np.argsort(importance)[-5:]]) + + # test the selection part + fdp, power = fdp_power(np.where(selected)[0], important_features) + assert fdp < crt_fdr + assert power == 1.0 + + +## tests crt no power +parameter_no_power = [ + ("Dim", 200, 20, 3, 0.0, 42, 1.0, np.inf, 0.0), + ("Dim with noise", 200, 20, 3, 0.0, 42, 1.0, 10.0, 0.0), + ("Dim with correlated noise", 200, 20, 3, 0.0, 42, 1.0, 10.0, 0.2), + ("Dim with correlated features", 200, 20, 3, 0.2, 42, 1.0, np.inf, 0.0), + ("Dim high level noise", 200, 20, 3, 0.2, 42, 1.0, 1.0, 0.0), + ("Dim with correlated features and noise", 200, 20, 3, 0.2, 42, 1, 10, 0), + ( + "Dim with correlated features and correlated noise", + 200, + 20, + 3, + 0.2, + 42, + 1.0, + 10, + 0.2, + ), +] + + +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + zip(*(list(zip(*parameter_no_power))[1:])), + ids=list(zip(*parameter_no_power))[0], +) +@pytest.mark.parametrize( + "crt_n_sampling, crt_seed, crt_fdr", [(5, 5, 0.4)], ids=["default_crt"] +) +def test_crt_linear_no_power(data_generator, crt_n_sampling, crt_seed, crt_fdr): + """Tests the method on linear cases with noise and correlation""" + X, y, important_features, _ = data_generator + importance, selected = configure_linear_categorial_crt( + X, y, crt_n_sampling, crt_seed, crt_fdr + ) + # check that importance scores are defined for each feature + assert importance.shape == (X.shape[1],) + # check that important features have the highest importance scores + assert np.all([int(i) in important_features for i in np.argsort(importance)[-3:]]) + + # test the selection part + fdp, power = fdp_power(np.where(selected)[0], important_features) + assert fdp <= crt_fdr + assert power == 0.0 + + +## tests crt on different type of data +parameter_bad_detection = [ + ("No information", 300, 20, 5, 0.0, 42, 1.0, 0.0, 0.0), +] + + +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + zip(*(list(zip(*parameter_bad_detection))[1:])), + ids=list(zip(*parameter_bad_detection))[0], +) +@pytest.mark.parametrize( + "crt_n_sampling, crt_seed, crt_fdr", [(5, 5, 0.4)], ids=["default_crt"] +) +def test_crt_linear_fail(data_generator, crt_n_sampling, crt_seed, crt_fdr): + """Tests the method on linear cases with noise and correlation""" + X, y, important_features, _ = data_generator + importance, selected = configure_linear_categorial_crt( + X, y, crt_n_sampling, crt_seed, crt_fdr + ) + # check that importance scores are defined for each feature + assert importance.shape == (X.shape[1],) + # Verify that not all important features are detected + assert np.sum( + [int(i) in important_features for i in np.argsort(importance)[-4:]] + ) != len(important_features) + + # test the selection part + fdp, power = fdp_power(np.where(selected)[0], important_features) + assert fdp == 0.0 + assert power == 0.0 + + +############################################################################## +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + [(300, 20, 5, 0.0, 42, 1.0, np.inf, 0.0)], + ids=["default data"], +) +class TestCRTClass: + """Test the element of the class""" + + def test_crt_init(self, data_generator): + """Test CRT initialization""" + X, y, _, _ = data_generator + crt = CRT() + assert crt.n_jobs == 1 + assert crt.n_repeat == 10 + assert crt.generator is not None + assert crt.statistical_test is not None + + def test_crt_fit(self, data_generator): + """Test fitting CRT""" + X, y, _, _ = data_generator + crt = CRT() + crt.fit(X) + # check if the generator is fitted + crt.generator._check_fit() + + def test_crt_importance(self, data_generator): + """Test importance of CRT""" + X, y, important_features, _ = data_generator + crt = CRT(n_repeat=5) + crt.fit(X) + importance = crt.importance(X, y) + + # check that importance scores are defined for each feature + assert importance.shape == (X.shape[1],) + # check that important features have the highest importance scores + assert np.all( + [int(i) in important_features for i in np.argsort(importance)[-5:]] + ) + + def test_crt_categorical( + self, + n_samples, + n_features, + support_size, + rho, + seed, + value, + signal_noise_ratio, + rho_serial, + ): + """Test CRT with categorical variables""" + rng = np.random.default_rng(seed) + X_cont = rng.random((n_samples, 2)) + X_cat = rng.integers(low=0, high=3, size=(n_samples, 1)) + X = np.hstack([X_cont, X_cat]) + y = rng.random((n_samples)) + + crt = CRT(centered=False) + + with pytest.warns( + Warning, + match="The conditional covariance matrix for knockoffs is not positive definite.", + ): + with pytest.warns( + Warning, + match="The equi-correlated matrix for knockoffs is not positive definite.", + ): + importances = crt.fit_importance(X, y) + assert len(importances) == 3 + assert np.all(importances >= 0) + + def test_crt_CV_estimator(self, data_generator, seed): + """Test CRT with a crossvalidation estimator""" + fdr = 0.5 + X, y, important_features, _ = data_generator + + def lasso_statistic_gen(X, y): + return lasso_statistic( + X, + y, + lasso=GridSearchCV( + Lasso(), param_grid={"alpha": np.linspace(0.2, 0.3, 5)} + ), + n_alphas=5, + ) + + crt = CRT( + n_repeat=1, + generator=GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), random_state=seed + 2 + ), + statistical_test=lasso_statistic_gen, + ) + crt.fit_importance(X, y) + selected = crt.selection_fdr(fdr=fdr) + + fdp, power = fdp_power(np.where(selected)[0], important_features) + + assert fdp <= fdr + assert power > 0.7 + + def test_estimate_distribution(self, data_generator, seed): + """Test different estimation of the covariance""" + fdr = 0.2 + X, y, important_features, _ = data_generator + crt = CRT( + n_repeat=1, + generator=GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), random_state=seed + 1 + ), + ) + crt.fit_importance(X, y) + selected = crt.selection_fdr(fdr=fdr) + for i in np.where(selected)[0]: + assert np.any(i == important_features) + + crt = CRT( + n_repeat=1, + generator=GaussianDistribution( + cov_estimator=GraphicalLassoCV( + alphas=[1e-3, 1e-2, 1e-1, 1], + cv=KFold(n_splits=5, shuffle=True, random_state=0), + ), + random_state=seed + 2, + ), + ) + crt.fit_importance(X, y) + selected = crt.selection_fdr(fdr=fdr) + for i in np.where(selected)[0]: + assert np.any(i == important_features) + + def test_crt_selection(self, data_generator): + """Test the selection of variable from knockoff""" + fdr = 0.5 + n_repeat = 5 + X, y, important_features, _ = data_generator + crt = CRT(n_repeat=n_repeat) + crt.fit_importance(X, y) + selected = crt.selection_fdr(fdr=fdr) + + fdp, power = fdp_power(np.where(selected)[0], important_features) + assert fdp <= 0.2 + assert power > 0.7 + assert np.all(0 <= crt.pvalues_) or np.all(crt.pvalues_ <= 1) + + def test_crt_repeat_quantile(self, data_generator, n_features): + """Test crt selection""" + fdr = 0.5 + n_repeat = 5 + X, y, important_features, _ = data_generator + crt = CRT(n_repeat=n_repeat) + crt.fit_importance(X, y) + selected = crt.selection_fdr(fdr=fdr) + + fdp, power = fdp_power(np.where(selected)[0], important_features) + + assert crt.test_scores_.shape == (n_repeat, n_features) + assert fdp < 0.5 + assert power == 1.0 + + def test_crt_repeat_e_values(self, data_generator, n_features): + """Test crt selection with e-values""" + fdr = 0.5 + n_repeat = 5 + X, y, important_features, _ = data_generator + crt = CRT(n_repeat=n_repeat) + crt.fit_importance(X, y) + selected = crt.selection_fdr(fdr=fdr / 2, evalues=True, fdr_control="ebh") + + fdp, power = fdp_power(np.where(selected)[0], important_features) + + assert crt.test_scores_.shape == (n_repeat, n_features) + assert fdp < 0.5 + assert power == 1.0 + + def test_crt_invariant_with_bootstrap(self, data_generator): + """Test repeat invariance""" + fdr = 0.5 + X, y, important_features, _ = data_generator + + # Single AKO (or vanilla KO) (verbose vs no verbose) + crt_repeat = CRT( + n_repeat=5, + generator=GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), random_state=5 + ), + ) + crt_repeat.fit_importance(X, y) + selected_repeat = crt_repeat.selection_fdr(fdr=fdr) + + crt_no_repeat = CRT( + n_repeat=1, + generator=GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), random_state=5 + ), + ) + crt_no_repeat.fit(X).importance(X, y) + selected_no_repeat = crt_no_repeat.selection_fdr(fdr=fdr) + + fdp_repeat, power_repeat = fdp_power( + np.where(selected_repeat)[0], important_features + ) + assert fdp_repeat < 0.5 + assert power_repeat == 1.0 + fdp_no_repeat, power_no_repeat = fdp_power( + np.where(selected_no_repeat)[0], important_features + ) + assert fdp_no_repeat < 0.5 + assert power_no_repeat == 1.0 + + np.testing.assert_array_equal( + crt_repeat.test_scores_[0], + crt_no_repeat.test_scores_[0], + ) + # test that the selection no boostract should be lower than with boostrap + for i in np.where(selected_repeat)[0]: + assert selected_no_repeat[i] + # np.testing.assert_array_equal( + # crt_bootstrap.importances_, crt_no_bootstrap.importances_ + # ) + # np.testing.assert_array_equal(crt_bootstrap.pvalues_, crt_no_bootstrap.pvalues_) + # np.testing.assert_array_equal(selected_bootstrap, selected_no_bootstrap) + + def test_crt_function(self, data_generator): + """Test the function crt""" + X, y, important_features, _ = data_generator + importance = crt(X, y, n_repeat=5) + # check that importance scores are defined for each feature + assert importance.shape == (X.shape[1],) + # check that important features have the highest importance scores + assert np.all( + [int(i) in important_features for i in np.argsort(importance)[-5:]] + ) + + +############################################################################## +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + [(300, 20, 5, 0.0, 42, 1.0, np.inf, 0.0)], + ids=["default data"], +) +class TestCRTExceptions: + """Test class for CRT exceptions""" + + def test_warning(self, data_generator): + """Test if some warning are raised""" + X, y, _, _ = data_generator + crt = CRT(n_repeat=5) + with pytest.warns(Warning, match="y won't be used"): + crt.fit(X, y) + with pytest.warns(Warning, match="cv won't be used"): + crt.fit_importance(X, y, cv="test") + + def test_unfitted_importance(self, data_generator): + """Test importance method with unfitted model""" + X, y, _, _ = data_generator + crt = CRT( + n_repeat=5, + generator=GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), random_state=5 + ), + ) + + with pytest.raises( + ValueError, + match="The CRT requires to be fitted before computing importance", + ): + crt.importance(X, y) + + def test_invalid_n_samplings(self, data_generator): + """Test when invalid number of permutations is provided""" + with pytest.raises(AssertionError, match="n_samplings must be positive"): + CRT(n_repeat=-1) diff --git a/test/test_knockoff.py b/test/test_knockoff.py index 3b0e86aa1..8ec615772 100644 --- a/test/test_knockoff.py +++ b/test/test_knockoff.py @@ -4,7 +4,6 @@ model_x_knockoff_bootstrap_e_value, model_x_knockoff_bootstrap_quantile, ) -from hidimstat.gaussian_knockoff import gaussian_knockoff_generation, _s_equi from hidimstat._utils.scenario import multivariate_simulation from hidimstat.statistical_tools.multiple_testing import fdp_power import numpy as np @@ -235,72 +234,3 @@ def test_estimate_distribution(): ) for i in selected: assert np.any(i == non_zero) - - -def test_gaussian_knockoff_equi(): - """test function of gaussian knockoff""" - seed = 42 - n = 100 - p = 50 - X, y, beta, noise = multivariate_simulation(n, p, seed=seed) - non_zero = np.where(beta)[0] - mu = X.mean(axis=0) - sigma = LedoitWolf(assume_centered=True).fit(X).covariance_ - - X_tilde, mu_tilde, sigma_tilde_decompose = gaussian_knockoff_generation( - X, mu, sigma, seed=seed * 2 - ) - - assert X_tilde.shape == (n, p) - - -def test_gaussian_knockoff_equi_warning(): - "test warning in guassian knockoff" - seed = 42 - n = 100 - p = 50 - tol = 1e-14 - rgn = np.random.RandomState(seed) - X = rgn.randn(n, p) - mu = X.mean(axis=0) - # create a positive definite matrix - u, s, vh = np.linalg.svd(rgn.randn(p, p)) - d = np.eye(p) * tol / 10 - sigma = u * d * u.T - with pytest.warns( - UserWarning, - match="The conditional covariance matrix for knockoffs is not positive", - ): - X_tilde, mu_tilde, sigma_tilde_decompose = gaussian_knockoff_generation( - X, mu, sigma, seed=seed * 2, tol=tol - ) - - assert X_tilde.shape == (n, p) - - -def test_s_equi_not_define_positive(): - """test the warning and error of s_equi function""" - n = 10 - tol = 1e-7 - seed = 42 - - # random positive matrix - rgn = np.random.RandomState(seed) - a = rgn.randn(n, n) - a -= np.min(a) - with pytest.raises( - Exception, match="The covariance matrix is not positive-definite." - ): - _s_equi(a) - - # matrix with positive eigenvalues, positive diagonal - while not np.all(np.linalg.eigvalsh(a) > tol): - a += 0.1 * np.eye(n) - with pytest.warns(UserWarning, match="The equi-correlated matrix"): - _s_equi(a) - - # positive definite matrix - u, s, vh = np.linalg.svd(a) - d = np.eye(n) - sigma = u * d * u.T - _s_equi(sigma)