diff --git a/docs/src/api.rst b/docs/src/api.rst index 51074f1dc..a71bcd670 100644 --- a/docs/src/api.rst +++ b/docs/src/api.rst @@ -24,7 +24,6 @@ Functions desparsified_group_lasso_pvalue ensemble_clustered_inference ensemble_clustered_inference_pvalue - model_x_knockoff reid Classes @@ -36,6 +35,7 @@ Classes BaseVariableImportance BasePerturbation + ModelXKnockoff LOCO CFI PFI diff --git a/examples/plot_knockoff_aggregation.py b/examples/plot_knockoff_aggregation.py index 29c7c12ef..a7278cf73 100644 --- a/examples/plot_knockoff_aggregation.py +++ b/examples/plot_knockoff_aggregation.py @@ -20,16 +20,9 @@ from joblib import Parallel, delayed import matplotlib.pyplot as plt import numpy as np -from sklearn.linear_model import LassoCV -from sklearn.model_selection import KFold from sklearn.utils import check_random_state -from hidimstat.knockoffs import ( - model_x_knockoff, - model_x_knockoff_bootstrap_e_value, - model_x_knockoff_bootstrap_quantile, - model_x_knockoff_pvalue, -) +from hidimstat.knockoffs import ModelXKnockoff from hidimstat.statistical_tools.multiple_testing import fdp_power from hidimstat._utils.scenario import multivariate_simulation @@ -95,46 +88,28 @@ def single_run( non_zero_index = np.where(beta_true)[0] # Use model-X Knockoffs [1] - selected, test_scores, threshold, X_tildes = model_x_knockoff( - X, - y, - estimator=LassoCV( - n_jobs=1, - cv=KFold(n_splits=5, shuffle=True, random_state=0), - ), - n_bootstraps=1, - random_state=seed, - ) - mx_selection, _ = model_x_knockoff_pvalue(test_scores, fdr=fdr) - fdp_mx, power_mx = fdp_power(mx_selection, non_zero_index) + model_x_knockoff = ModelXKnockoff(n_repeat=1) + model_x_knockoff.fit_importance(X, y) + mx_selection = model_x_knockoff.selection_fdr(fdr=fdr) + fdp_mx, power_mx = fdp_power(np.where(mx_selection)[0], non_zero_index) # Use aggregation model-X Knockoffs [2] - selected, test_scores, threshold, X_tildes = model_x_knockoff( - X, - y, - estimator=LassoCV( - n_jobs=1, - cv=KFold(n_splits=5, shuffle=True, random_state=0), - ), - n_bootstraps=n_bootstraps, - n_jobs=1, - random_state=seed, - ) + model_x_knockoff_repeat = ModelXKnockoff(n_repeat=n_bootstraps) + model_x_knockoff_repeat.fit_importance(X, y) # Use p-values aggregation [2] - aggregated_ko_selection, _, _ = model_x_knockoff_bootstrap_quantile( - test_scores, fdr=fdr, adaptive_aggregation=True + aggregated_ko_selection = model_x_knockoff_repeat.selection_fdr( + fdr=fdr, adaptive_aggregation=True + ) + fdp_pval, power_pval = fdp_power( + np.where(aggregated_ko_selection)[0], non_zero_index ) - - fdp_pval, power_pval = fdp_power(aggregated_ko_selection, non_zero_index) # Use e-values aggregation [3] - eval_selection, _, _ = model_x_knockoff_bootstrap_e_value( - test_scores, threshold, fdr=fdr + eval_selection = model_x_knockoff_repeat.selection_fdr( + fdr=fdr, fdr_control="ebh", evalues=True ) - - fdp_eval, power_eval = fdp_power(eval_selection, non_zero_index) - + fdp_eval, power_eval = fdp_power(np.where(eval_selection)[0], non_zero_index) return fdp_mx, fdp_pval, fdp_eval, power_mx, power_pval, power_eval diff --git a/examples/plot_knockoffs_wisconsin.py b/examples/plot_knockoffs_wisconsin.py index 8839ff332..ddd66a2cb 100644 --- a/examples/plot_knockoffs_wisconsin.py +++ b/examples/plot_knockoffs_wisconsin.py @@ -139,30 +139,44 @@ # We use the Model-X Knockoff procedure to control the FDR (False Discovery Rate). The # selection of variables is based on the Lasso Coefficient Difference (LCD) statistic # :footcite:t:`candes2018panning`. -from hidimstat import model_x_knockoff - -fdr = 0.2 - -selected, test_scores, threshold, X_tildes = model_x_knockoff( +from sklearn.covariance import LedoitWolf +from hidimstat import ModelXKnockoff +from hidimstat.statistical_tools.lasso_test import lasso_statistic_with_sampling +from hidimstat.statistical_tools.gaussian_distribution import GaussianDistribution + + +def logistic_test(X, X_tilde, y): + return lasso_statistic_with_sampling( + X, + X_tilde, + y, + lasso=LogisticRegressionCV( + solver="liblinear", + penalty="l1", + Cs=np.logspace(-3, 3, 10), + random_state=rng, + tol=1e-3, + max_iter=1000, + ), + preconfigure_lasso=None, + ) + + +model_x_knockoff = ModelXKnockoff( + generator=GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), random_state=0, tol=1e-15 + ), + statistical_test=logistic_test, + n_repeat=1, +) +importance = model_x_knockoff.fit_importance( noisy_train, y_train, - estimator=LogisticRegressionCV( - solver="liblinear", - penalty="l1", - Cs=np.logspace(-3, 3, 10), - random_state=rng, - tol=1e-3, - max_iter=1000, - ), - n_bootstraps=1, - random_state=0, - tol_gauss=1e-15, - preconfigure_estimator=None, - fdr=fdr, ) +selection = model_x_knockoff.selection_fdr(fdr=0.2) # Count how many selected features are actually noise -num_false_discoveries = np.sum(selected >= p) +num_false_discoveries = np.sum(selection[p:]) print(f"Knockoffs make at least {num_false_discoveries} False Discoveries") @@ -177,11 +191,11 @@ import matplotlib.pyplot as plt import seaborn as sns -selected_mask = np.array(["not selected"] * len(test_scores)) -selected_mask[selected] = "selected" +selected_mask = np.array(["not selected"] * len(importance)) +selected_mask[selection] = "selected" df_ko = pd.DataFrame( { - "score": test_scores, + "score": importance, "variable": feature_names_noise, "selected": selected_mask, } @@ -202,7 +216,9 @@ ax=ax, palette={"selected": "tab:red", "not selected": "tab:gray"}, ) -ax.axvline(x=threshold, color="k", linestyle="--", label="Threshold") +ax.axvline( + x=model_x_knockoff.threshold_fdr_, color="k", linestyle="--", label="Threshold" +) ax.legend() ax.set_xlabel("KO statistic (LCD)") ax.set_ylabel("") diff --git a/examples/plot_pitfalls_permutation_importance.py b/examples/plot_pitfalls_permutation_importance.py index af4deb83e..adc144a62 100644 --- a/examples/plot_pitfalls_permutation_importance.py +++ b/examples/plot_pitfalls_permutation_importance.py @@ -26,7 +26,7 @@ from sklearn.preprocessing import StandardScaler from hidimstat import CFI, PFI -from hidimstat.conditional_sampling import ConditionalSampler +from hidimstat.statistical_tools.conditional_sampling import ConditionalSampler rng = np.random.RandomState(0) diff --git a/src/hidimstat/__init__.py b/src/hidimstat/__init__.py index 81d5a0cce..1d3ee0a6a 100644 --- a/src/hidimstat/__init__.py +++ b/src/hidimstat/__init__.py @@ -15,12 +15,7 @@ ) from .distilled_conditional_randomization_test import d0crt, D0CRT from .conditional_feature_importance import CFI -from .knockoffs import ( - model_x_knockoff, - model_x_knockoff_pvalue, - model_x_knockoff_bootstrap_quantile, - model_x_knockoff_bootstrap_e_value, -) +from .knockoffs import ModelXKnockoff from .leave_one_covariate_out import LOCO from .noise_std import reid from .permutation_feature_importance import PFI @@ -44,10 +39,7 @@ "desparsified_lasso_pvalue", "desparsified_group_lasso_pvalue", "reid", - "model_x_knockoff", - "model_x_knockoff_pvalue", - "model_x_knockoff_bootstrap_quantile", - "model_x_knockoff_bootstrap_e_value", + "ModelXKnockoff", "CFI", "LOCO", "PFI", diff --git a/src/hidimstat/base_variable_importance.py b/src/hidimstat/base_variable_importance.py index b4f539024..911435a8e 100644 --- a/src/hidimstat/base_variable_importance.py +++ b/src/hidimstat/base_variable_importance.py @@ -3,6 +3,9 @@ from sklearn.base import BaseEstimator import numpy as np +from hidimstat.statistical_tools.multiple_testing import fdr_threshold +from hidimstat.statistical_tools.aggregation import quantile_aggregation + class BaseVariableImportance(BaseEstimator): """ @@ -34,24 +37,35 @@ def __init__(self): self.importances_ = None self.pvalues_ = None self.selections_ = None + self.test_scores_ = None + self.threshold_fdr_ = None + self.aggregated_pval_ = None + self.aggregated_eval_ = None + + def _check_importance(self): + """ + Checks if the importance scores and p-values have been computed. + """ + if self.importances_ is None: + raise ValueError( + "The importances need to be called before calling this method" + ) def selection( self, k_best=None, percentile=None, threshold=None, threshold_pvalue=None ): """ Selects features based on variable importance. - In case several arguments are different from None, - the returned selection is the conjunction of all of them. Parameters ---------- - k_best : int, optional, default=None + k_best : int, default=None Selects the top k features based on importance scores. - percentile : float, optional, default=None + percentile : float, default=None Selects features based on a specified percentile of importance scores. - threshold : float, optional, default=None + threshold : float, default=None Selects features with importance scores above the specified threshold. - threshold_pvalue : float, optional, default=None + threshold_pvalue : float, default=None Selects features with p-values below the specified threshold. Returns @@ -61,17 +75,23 @@ def selection( """ self._check_importance() if k_best is not None: - if not isinstance(k_best, str) and k_best > self.importances_.shape[1]: + if not isinstance(k_best, str) and k_best > self.importances_.shape[0]: warnings.warn( - f"k={k_best} is greater than n_features={self.importances_.shape[1]}. " + f"k={k_best} is greater than n_features={self.importances_.shape[0]}. " "All the features will be returned." ) - assert k_best > 0, "k_best needs to be positive and not null" + if isinstance(k_best, str): + assert k_best == "all" + else: + assert k_best >= 0, "k_best needs to be positive or null" if percentile is not None: assert ( - 0 < percentile and percentile < 100 + 0 <= percentile and percentile <= 100 ), "percentile needs to be between 0 and 100" if threshold_pvalue is not None: + assert ( + self.pvalues_ is not None + ), "This method doesn't support a threshold on p-values" assert ( 0 < threshold_pvalue and threshold_pvalue < 1 ), "threshold_pvalue needs to be between 0 and 1" @@ -96,9 +116,9 @@ def selection( elif percentile == 0: mask_percentile = np.zeros(len(self.importances_), dtype=bool) elif percentile is not None: - threshold = np.percentile(self.importances_, 100 - percentile) - mask_percentile = self.importances_ > threshold - ties = np.where(self.importances_ == threshold)[0] + threshold_percentile = np.percentile(self.importances_, 100 - percentile) + mask_percentile = self.importances_ > threshold_percentile + ties = np.where(self.importances_ == threshold_percentile)[0] if len(ties): max_feats = int(len(self.importances_) * percentile / 100) kept_ties = ties[: max_feats - mask_percentile.sum()] @@ -123,11 +143,177 @@ def selection( return self.selections_ - def _check_importance(self): + def selection_fdr( + self, + fdr, + fdr_control="bhq", + evalues=False, + reshaping_function=None, + adaptive_aggregation=False, + gamma=0.5, + ): """ - Checks if the importance scores and p-values have been computed. + Performs feature selection based on False Discovery Rate (FDR) control. + + This method selects features by controlling the FDR using either p-values or e-values + derived from test scores. It supports different FDR control methods and optional + adaptive aggregation of the statistical values. + + Parameters + ---------- + fdr : float, default=None + The target false discovery rate level (between 0 and 1) + fdr_control: string, default="bhq" + The FDR control method to use. Options are: + - "bhq": Benjamini-Hochberg procedure + - 'bhy': Benjamini-Hochberg-Yekutieli procedure + - "ebh": e-BH procedure (only for e-values) + evalues: boolean, default=False + If True, uses e-values for selection. If False, uses p-values. + reshaping_function: callable, default=None + Reshaping function for BHY method, default uses sum of reciprocals + adaptive_aggregation: boolean, default=False + If True, uses adaptive weights for p-value aggregation. + Only applicable when evalues=False. + gamma: boolean, default=0.5 + The gamma parameter for quantile aggregation of p-values. + Only used when evalues=False. + + Returns + ------- + numpy.ndarray + Boolean array indicating selected features (True for selected, False for not selected) + + Raises + ------ + AssertionError + If test_scores\_ is None or if incompatible combinations of parameters are provided """ - if self.importances_ is None: - raise ValueError( - "The importances need to be called before calling this method" + self._check_importance() + assert ( + self.test_scores_ is not None + ), "this method doesn't support selection base on FDR" + + if self.test_scores_.shape[0] == 1: + self.threshold_fdr_ = _estimated_threshold(self.test_scores_, fdr=fdr) + selected = self.test_scores_[0] >= self.threshold_fdr_ + elif not evalues: + assert fdr_control != "ebh", "for p-value, the fdr control can't be 'ebh'" + pvalues = np.array( + [_empirical_pval(test_score) for test_score in self.test_scores_] ) + self.aggregated_pval_ = quantile_aggregation( + pvalues, gamma=gamma, adaptive=adaptive_aggregation + ) + self.threshold_fdr_ = fdr_threshold( + self.aggregated_pval_, + fdr=fdr, + method=fdr_control, + reshaping_function=reshaping_function, + ) + selected = self.aggregated_pval_ <= self.threshold_fdr_ + else: + assert fdr_control == "ebh", "for e-value, the fdr control need to be 'ebh'" + evalues = [] + for test_score in self.test_scores_: + ko_threshold = _estimated_threshold(test_score, fdr=fdr) + evalues.append(_empirical_eval(test_score, ko_threshold)) + self.aggregated_eval_ = np.mean(evalues, axis=0) + self.threshold_fdr_ = fdr_threshold( + self.aggregated_eval_, + fdr=fdr, + method=fdr_control, + reshaping_function=reshaping_function, + ) + selected = self.aggregated_eval_ >= self.threshold_fdr_ + return selected + + +def _estimated_threshold(test_score, fdr=0.1): + """ + Calculate the threshold based on the procedure stated in the knockoff article. + Original code: + https://github.com/msesia/knockoff-filter/blob/master/R/knockoff/R/knockoff_filter.R + Parameters + ---------- + test_score : 1D ndarray, shape (n_features, ) + Vector of test statistic. + fdr : float + Desired controlled FDR (false discovery rate) level. + Returns + ------- + threshold : float or np.inf + Threshold level. + """ + offset = 1 # Offset equals 1 is the knockoff+ procedure. + + threshold_mesh = np.sort(np.abs(test_score[test_score != 0])) + np.concatenate( + [[0], threshold_mesh, [np.inf]] + ) # if there is no solution, the threshold is inf + # find the right value of t for getting a good fdr + # Equation 1.8 of barber2015controlling and 3.10 in Candès 2018 + threshold = 0.0 + for threshold in threshold_mesh: + false_pos = np.sum(test_score <= -threshold) + selected = np.sum(test_score >= threshold) + if (offset + false_pos) / np.maximum(selected, 1) <= fdr: + break + return threshold + + +def _empirical_pval(test_score): + """ + Compute the empirical p-values from the test based on knockoff+. + Parameters + ---------- + test_score : 1D ndarray, shape (n_features, ) + Vector of test statistics. + Returns + ------- + pvals : 1D ndarray, shape (n_features, ) + Vector of empirical p-values. + """ + pvals = [] + n_features = test_score.size + + offset = 1 # Offset equals 1 is the knockoff+ procedure. + + test_score_inv = -test_score + for i in range(n_features): + if test_score[i] <= 0: + pvals.append(1) + else: + pvals.append( + (offset + np.sum(test_score_inv >= test_score[i])) / n_features + ) + + return np.array(pvals) + + +def _empirical_eval(test_score, ko_threshold): + """ + Compute the empirical e-values from the test based on knockoff. + Parameters + ---------- + test_score : 1D ndarray, shape (n_features, ) + Vector of test statistics. + ko_threshold : float + Threshold level. + Returns + ------- + evals : 1D ndarray, shape (n_features, ) + Vector of empirical e-values. + """ + evals = [] + n_features = test_score.size + + offset = 1 # Offset equals 1 is the knockoff+ procedure. + + for i in range(n_features): + if test_score[i] < ko_threshold: + evals.append(0) + else: + evals.append(n_features / (offset + np.sum(test_score <= -ko_threshold))) + + return np.array(evals) diff --git a/src/hidimstat/conditional_feature_importance.py b/src/hidimstat/conditional_feature_importance.py index 9b0e7905f..8c9686334 100644 --- a/src/hidimstat/conditional_feature_importance.py +++ b/src/hidimstat/conditional_feature_importance.py @@ -5,7 +5,7 @@ from sklearn.utils.validation import check_random_state from hidimstat.base_perturbation import BasePerturbation -from hidimstat.conditional_sampling import ConditionalSampler +from hidimstat.statistical_tools.conditional_sampling import ConditionalSampler class CFI(BasePerturbation): diff --git a/src/hidimstat/gaussian_knockoff.py b/src/hidimstat/gaussian_knockoff.py deleted file mode 100644 index 886312c7e..000000000 --- a/src/hidimstat/gaussian_knockoff.py +++ /dev/null @@ -1,191 +0,0 @@ -import warnings - -import numpy as np - - -def gaussian_knockoff_generation(X, mu, sigma, seed=None, tol=1e-14): - """ - Generate second-order knockoff variables using the equi-correlated method. - - This function generates knockoff variables for a given design matrix X, - using the equi-correlated method described in :footcite:t:`barber2015controlling` - and :footcite:t:`candes2018panning`. The function takes as input the design matrix - X, the vector of empirical mean values mu, and the empirical covariance - matrix sigma. It returns the knockoff variables X_tilde. - - The original implementation can be found at - https://github.com/msesia/knockoff-filter/blob/master/R/knockoff/R/create_gaussian.R - - Parameters - ---------- - X: 2D ndarray (n_samples, n_features) - The original design matrix. - - mu : 1D ndarray (n_features, ) - A vector of empirical mean values. - - sigma : 2D ndarray (n_samples, n_features) - The empirical covariance matrix. - - seed : int, optional - A random seed for generating the uniform noise used to create - the knockoff variables. - - tol : float, default=1.e-14 - A tolerance value used for numerical stability in the calculation - of the Cholesky decomposition. - - Returns - ------- - X_tilde : 2D ndarray (n_samples, n_features) - The knockoff variables. - - mu_tilde : 2D ndarray (n_samples, n_features) - The mean matrix used for generating knockoffs. - - sigma_tilde_decompose : 2D ndarray (n_features, n_features) - The Cholesky decomposition of the covariance matrix. - - References - ---------- - .. footbibliography:: - """ - n_samples, n_features = X.shape - - # create a uniform noise for all the data - rng = np.random.RandomState(seed) - u_tilde = rng.randn(n_samples, n_features) - - diag_s = np.diag(_s_equi(sigma, tol=tol)) - - sigma_inv_s = np.linalg.solve(sigma, diag_s) - - # First part on the RHS of equation 1.4 in barber2015controlling - mu_tilde = X - np.dot(X - mu, sigma_inv_s) - # To calculate the Cholesky decomposition later on - sigma_tilde = 2 * diag_s - diag_s.dot(sigma_inv_s) - # test is the matrix is positive definite - while not np.all(np.linalg.eigvalsh(sigma_tilde) > tol): - sigma_tilde += 1e-10 * np.eye(n_features) - warnings.warn( - "The conditional covariance matrix for knockoffs is not positive " - "definite. Adding minor positive value to the matrix.", - UserWarning, - ) - - # Equation 1.4 in barber2015controlling - sigma_tilde_decompose = np.linalg.cholesky(sigma_tilde) - X_tilde = mu_tilde + np.dot(u_tilde, sigma_tilde_decompose) - - return X_tilde, mu_tilde, sigma_tilde_decompose - - -def repeat_gaussian_knockoff_generation(mu_tilde, sigma_tilde_decompose, seed): - """ - Generate additional knockoff variables using pre-computed values. - - This function generates additional knockoff variables using pre-computed - values returned by the gaussian_knockoff_generation function - with repeat=True. It takes as input mu_tilde and sigma_tilde_decompose, - which were returned by gaussian_knockoff_generation, and a random seed. - It returns the new knockoff variables X_tilde. - - Parameters - ---------- - mu_tilde : 2D ndarray (n_samples, n_features) - The matrix of means used to generate the knockoff variables, - returned by gaussian_knockoff_generation. - - sigma_tilde_decompose : 2D ndarray (n_features, n_features) - The Cholesky decomposition of the covariance matrix used - to generate the knockoff variables,returned by - gaussian_knockoff_generation. - - seed : int - A random seed for generating the uniform noise used to create - the knockoff variables. - - Returns - ------- - X_tilde : 2D ndarray (n_samples, n_features) - The knockoff variables. - """ - n_samples, n_features = mu_tilde.shape - - # create a uniform noise for all the data - rng = np.random.RandomState(seed) - u_tilde = rng.randn(n_samples, n_features) - - X_tilde = mu_tilde + np.dot(u_tilde, sigma_tilde_decompose) - return X_tilde - - -def _s_equi(sigma, tol=1e-14): - """ - Estimate the diagonal matrix of correlation between real - and knockoff variables using the equi-correlated equation. - - This function estimates the diagonal matrix of correlation - between real and knockoff variables using the equi-correlated - equation described in :footcite:t:`barber2015controlling` and - :footcite:t:`candes2018panning`. It takes as input the empirical - covariance matrix sigma and a tolerance value tol, - and returns a vector of diagonal values of the estimated - matrix diag{s}. - - Parameters - ---------- - sigma : 2D ndarray (n_features, n_features) - The empirical covariance matrix calculated from - the original design matrix. - - tol : float, optional - A tolerance value used for numerical stability in the calculation - of the eigenvalues of the correlation matrix. - - Returns - ------- - 1D ndarray (n_features, ) - A vector of diagonal values of the estimated matrix diag{s}. - - Raises - ------ - Exception - If the covariance matrix is not positive-definite. - """ - n_features = sigma.shape[0] - - # Convert covariance matrix to correlation matrix - # as example see cov2corr from statsmodels - features_std = np.sqrt(np.diag(sigma)) - scale = np.outer(features_std, features_std) - corr_matrix = sigma / scale - - eig_value = np.linalg.eigvalsh(corr_matrix) - lambda_min = np.min(eig_value[0]) - # check if the matrix is positive-defined - if lambda_min <= 0: - raise Exception("The covariance matrix is not positive-definite.") - - s = np.ones(n_features) * min(2 * lambda_min, 1) - - psd = np.all(np.linalg.eigvalsh(2 * corr_matrix - np.diag(s)) > tol) - s_eps = 0 - while not psd: - if s_eps == 0: - s_eps = np.finfo(type(s[0])).eps # avoid zeros - else: - s_eps *= 10 - # if all eigval > 0 then the matrix is positive define - psd = np.all( - np.linalg.eigvalsh(2 * corr_matrix - np.diag(s * (1 - s_eps))) > tol - ) - warnings.warn( - "The equi-correlated matrix for knockoffs is not positive " - f"definite. Reduce the value of distance by {s_eps}.", - UserWarning, - ) - - s = s * (1 - s_eps) - - return s * np.diag(sigma) diff --git a/src/hidimstat/knockoffs.py b/src/hidimstat/knockoffs.py index 7dab07aeb..3c2ebcb67 100644 --- a/src/hidimstat/knockoffs.py +++ b/src/hidimstat/knockoffs.py @@ -1,93 +1,21 @@ +import warnings + import numpy as np from joblib import Parallel, delayed from sklearn.covariance import LedoitWolf -from sklearn.linear_model import LassoCV -from sklearn.model_selection import KFold -from sklearn.preprocessing import StandardScaler -from sklearn.utils import check_random_state from sklearn.utils.validation import check_memory +from sklearn.preprocessing import StandardScaler -from hidimstat.gaussian_knockoff import ( - gaussian_knockoff_generation, - repeat_gaussian_knockoff_generation, +from hidimstat._utils.docstring import _aggregate_docstring +from hidimstat.statistical_tools.gaussian_distribution import GaussianDistribution +from hidimstat.statistical_tools.lasso_test import lasso_statistic_with_sampling +from hidimstat.base_variable_importance import ( + BaseVariableImportance, + _empirical_pval, ) -from hidimstat.statistical_tools.multiple_testing import fdr_threshold -from hidimstat.statistical_tools.aggregation import quantile_aggregation -def preconfigure_estimator_LassoCV(estimator, X, X_tilde, y, n_alphas=20): - """ - Configure the estimator for Model-X knockoffs. - - This function sets up the regularization path for the Lasso estimator - based on the input data and the number of alphas to use. The regularization - path is defined by a sequence of alpha values, which control the amount - of shrinkage applied to the coefficient estimates. - - Parameters - ---------- - estimator : sklearn.linear_model.LassoCV - The Lasso estimator to configure. - - X : 2D ndarray (n_samples, n_features) - The original design matrix. - - X_tilde : 2D ndarray (n_samples, n_features) - The knockoff design matrix. - - y : 1D ndarray (n_samples, ) - The target vector. - - n_alphas : int, default=10 - The number of alpha values to use to instantiate the cross-validation. - - Returns - ------- - estimator : sklearn.linear_model.LassoCV - The configured estimator. - - Raises - ------ - TypeError - If estimator is not an instance of LassoCV. - - Notes - ----- - The alpha values are calculated based on the combined design matrix [X, X_tilde]. - alpha_max is set to max(X_ko.T @ y)/(2*n_features). - """ - if type(estimator).__name__ != "LassoCV": - raise TypeError("You should not use this function to configure the estimator") - - n_features = X.shape[1] - X_ko = np.column_stack([X, X_tilde]) - alpha_max = np.max(np.dot(X_ko.T, y)) / (2 * n_features) - alphas = np.linspace(alpha_max * np.exp(-n_alphas), alpha_max, n_alphas) - estimator.alphas = alphas - return estimator - - -def model_x_knockoff( - X, - y, - estimator=LassoCV( - n_jobs=None, - verbose=0, - max_iter=200000, - cv=KFold(n_splits=5, shuffle=True, random_state=0), - tol=1e-6, - ), - preconfigure_estimator=preconfigure_estimator_LassoCV, - fdr=0.1, - centered=True, - cov_estimator=LedoitWolf(assume_centered=True), - joblib_verbose=0, - n_bootstraps=1, - n_jobs=1, - random_state=None, - tol_gauss=1e-14, - memory=None, -): +class ModelXKnockoff(BaseVariableImportance): """ Model-X Knockoff @@ -181,387 +109,200 @@ def model_x_knockoff( ---------- .. footbibliography:: """ - assert n_bootstraps > 0, "the number of bootstraps should at least higher than 1" - memory = check_memory(memory) - # unnecessary to have n_jobs > number of bootstraps - n_jobs = min(n_bootstraps, n_jobs) - parallel = Parallel(n_jobs, verbose=joblib_verbose) - - # get the seed for the different run - if isinstance(random_state, (int, np.int32, np.int64)): - rng = check_random_state(random_state) - elif random_state is None: - rng = check_random_state(0) - else: - raise TypeError("Wrong type for random_state") - seed_list = rng.randint(1, np.iinfo(np.int32).max, n_bootstraps) - - if centered: - X = StandardScaler().fit_transform(X) - - # estimation of X distribution - # original implementation: - # https://github.com/msesia/knockoff-filter/blob/master/R/knockoff/R/create_second_order.R - mu = X.mean(axis=0) - sigma = cov_estimator.fit(X).covariance_ - - # Create knockoff variables - X_tilde, mu_tilde, sigma_tilde_decompose = memory.cache( - gaussian_knockoff_generation - )(X, mu, sigma, seed=seed_list[0], tol=tol_gauss) - - if n_bootstraps == 1: - X_tildes = [X_tilde] - else: - X_tildes = parallel( - delayed(repeat_gaussian_knockoff_generation)( - mu_tilde, - sigma_tilde_decompose, - seed=seed, - ) - for seed in seed_list[1:] - ) - X_tildes.insert(0, X_tilde) - - results = parallel( - delayed(memory.cache(_stat_coefficient_diff))( - X, X_tildes[i], y, estimator, fdr, preconfigure_estimator - ) - for i in range(n_bootstraps) - ) - test_scores, threshold, selected = zip(*results) - - if n_bootstraps == 1: - return selected[0], test_scores[0], threshold[0], X_tildes[0] - else: - return selected, test_scores, threshold, X_tildes - - -def model_x_knockoff_pvalue(test_score, fdr=0.1, fdr_control="bhq"): - """ - This function implements the computation of the empirical p-values - - Parameters - ---------- - test_score : 1D array, (n_features, ) - A vector of test statistics. - fdr : float, default=0.1 - The desired controlled False Discovery Rate (FDR) level. - - fdr_control : str, default="bhq" - The method used to control the False Discovery Rate. - Available methods are: - * 'bhq': Standard Benjamini-Hochberg :footcite:`benjamini1995controlling,bhy_2001` - * 'bhy': Benjamini-Hochberg-Yekutieli :footcite:p:`bhy_2001` - * 'ebh': e-Benjamini-Hochberg :footcite:`wang2022false` - - Returns - ------- - selected : 1D array, int - A vector of indices of the selected variables. - - pvals : 1D array, (n_features, ) - A vector of empirical p-values. - - Notes - ----- - This function calculates the empirical p-values based on the test statistics and the - desired FDR level. It then identifies the selected variables based on the p-values. - """ - pvals = _empirical_knockoff_pval(test_score) - threshold = fdr_threshold(pvals, fdr=fdr, method=fdr_control) - selected = np.where(pvals <= threshold)[0] - return selected, pvals - - -def model_x_knockoff_bootstrap_e_value(test_scores, ko_threshold, fdr=0.1): - """ - This function implements the computation of the empirical e-values - from knockoff test and aggregates them using the e-BH procedure. - - Parameters - ---------- - test_scores : 2D array, (n_bootstraps, n_features) - A matrix of test statistics for each bootstrap sample. - - ko_threshold : float - Threshold level. - - fdr : float, default=0.1 - The desired controlled False Discovery Rate (FDR) level. - - Returns - ------- - selected : 1D array, int - A vector of indices of the selected variables. - - aggregated_eval : 1D array, (n_features, ) - A vector of aggregated empirical e-values. - - evals : 2D array, (n_bootstraps, n_features) - A matrix of empirical e-values for each bootstrap sample. + def __init__( + self, + generator=GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), random_state=0 + ), + statistical_test=lasso_statistic_with_sampling, + centered=True, + joblib_verbose=0, + n_repeat=1, + n_jobs=1, + memory=None, + ): + self.generator = generator + self.memory = check_memory(memory) + self.joblib_verbose = joblib_verbose + self.statistical_test = statistical_test + self.centered = centered + + assert n_repeat > 0, "n_samplings must be positive" + self.n_repeat = n_repeat + # unnecessary to have n_jobs > number of bootstraps + self.n_jobs = min(n_repeat, n_jobs) + + def fit(self, X, y=None): + """ + Fit the Model-X Knockoff model by training the generator. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Training data matrix where n_samples is the number of samples and + n_features is the number of features. + y : array-like of shape (n_samples,), default=None + Target values. Not used in this method. + + Returns + ------- + self : object + Returns the instance itself. + + Notes + ----- + The fit method only trains the generator component. The target values y + are not used in this step. + """ + if y is not None: + warnings.warn("y won't be used") + if self.centered: + X_ = StandardScaler().fit_transform(X) + else: + X_ = X + self.generator.fit(X_) + return self + + def _check_fit(self): + try: + self.generator._check_fit() + except ValueError as exc: + raise ValueError( + "The Model-X Knockoff requires to be fitted before computing importance" + ) from exc + + def importance(self, X, y): + """ + Calculate feature importance scores using Model-X knockoffs. + + This method generates knockoff variables and computes test statistics to measure + feature importance. For multiple repeats, the scores are averaged across repeats + to improve stability. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Training data matrix where n_samples is the number of samples and + n_features is the number of features. + y : array-like of shape (n_samples,) + Target values. + + Returns + ------- + importances_ : ndarray of shape (n_features,) + Feature importance scores for each feature. + Higher absolute values indicate higher importance. + + Notes + ----- + The method generates knockoff variables that satisfy the exchangeability property + and computes test statistics comparing original features against their knockoffs. + When n_repeat > 1, multiple sets of knockoffs are generated and results are averaged. + """ + self._check_fit() + + if self.centered: + X_ = StandardScaler().fit_transform(X) + else: + X_ = X - Notes - ----- - This function calculates the empirical e-values based on the test statistics and the - desired FDR level. It then aggregates the e-values using the e-BH procedure and identifies - the selected variables based on the aggregated e-values. - """ - n_bootstraps = len(test_scores) - evals = np.array( - [ - _empirical_knockoff_eval(test_scores[i], ko_threshold[i]) - for i in range(n_bootstraps) - ] - ) + X_tildes = [] + for i in range(self.n_repeat): + X_tildes.append(self.generator.sample()) - aggregated_eval = np.mean(evals, axis=0) - threshold = fdr_threshold(aggregated_eval, fdr=fdr, method="ebh") - selected = np.where(aggregated_eval >= threshold)[0] + parallel = Parallel(self.n_jobs, verbose=self.joblib_verbose) + self.test_scores_ = np.array( + parallel( + delayed(self.statistical_test)(X_, X_tildes[i], y) + for i in range(self.n_repeat) + ) + ) + self.test_scores_ = np.array(self.test_scores_) - return selected, aggregated_eval, evals + self.importances_ = np.mean(self.test_scores_, axis=0) + self.pvalues_ = np.mean( + [_empirical_pval(self.test_scores_[i]) for i in range(self.n_repeat)], + axis=0, + ) + return self.importances_ + + def fit_importance(self, X, y, cv=None): + """ + Fits the model to the data and computes feature importance. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The input data matrix where n_samples is the number of samples and + n_features is the number of features. + y : array-like of shape (n_samples,) + The target values. + cv : None or cross-validation generator, default=None + Cross-validation parameter. Not used in this method. + A warning will be issued if provided. + + Returns + ------- + importances_ : ndarray of shape (n_features,) + Feature importance scores (p-values) for each feature. + Lower values indicate higher importance. Values range from 0 to 1. + + Notes + ----- + This method combines the fit and importance computation steps. + It first fits the generator to X and then computes importance scores + by comparing observed test statistics against permuted ones. + + See Also + -------- + fit : Method for fitting the generator only + importance : Method for computing importance scores only + """ + if cv is not None: + warnings.warn("cv won't be used") + + self.fit(X) + return self.importance(X, y) -def model_x_knockoff_bootstrap_quantile( - test_scores, - fdr=0.1, - fdr_control="bhq", - reshaping_function=None, - adaptive_aggregation=False, - gamma=0.5, +def model_x_knockoff( + X, + y, + generator=GaussianDistribution(cov_estimator=LedoitWolf(assume_centered=True)), + statistical_test=lasso_statistic_with_sampling, + joblib_verbose=0, + n_repeat=1, + n_jobs=1, + memory=None, ): - """ - This function implements the computation of the empirical p-values - from knockoff test and aggregates them using the quantile aggregation procedure. - - Parameters - ---------- - test_scores : 2D array, (n_bootstraps, n_features) - A matrix of test statistics for each bootstrap sample. - - fdr : float, default=0.1 - The desired controlled False Discovery Rate (FDR) level. - - fdr_control : str, default="bhq" - The method used to control the False Discovery Rate. - Available methods are: - * 'bhq': Standard Benjamini-Hochberg :footcite:`benjamini1995controlling,bhy_2001` - * 'bhy': Benjamini-Hochberg-Yekutieli :footcite:p:`bhy_2001` - * 'ebh': e-Benjamini-Hochberg :footcite:`wang2022false` - - reshaping_function : function or None, default=None - A function used to reshape the aggregated p-values before controlling the FDR. - - adaptive_aggregation : bool, default=False - Whether to use adaptive quantile aggregation. - - gamma : float, default=0.5 - The quantile level (between 0 and 1) used for aggregation. - For non-adaptive aggregation, a single gamma value is used. - For adaptive aggregation, this is the starting point for the grid search - over gamma values. - - Returns - ------- - selected : 1D array, int - A vector of indices of the selected variables. - - aggregated_pval : 1D array, (n_features, ) - A vector of aggregated empirical p-values. - - pvals : 2D array, (n_bootstraps, n_features) - A matrix of empirical p-values for each bootstrap sample. - - Notes - ----- - This function calculates the empirical p-values based on the test statistics and the - desired FDR level. It then aggregates the p-values using the quantile aggregation - procedure and identifies the selected variables based on the aggregated p-values. - """ - n_bootstraps = len(test_scores) - pvals = np.array( - [_empirical_knockoff_pval(test_scores[i]) for i in range(n_bootstraps)] - ) - - aggregated_pval = quantile_aggregation( - pvals, gamma=gamma, adaptive=adaptive_aggregation - ) - - threshold = fdr_threshold( - aggregated_pval, - fdr=fdr, - method=fdr_control, - reshaping_function=reshaping_function, + model_x_knockoff = ModelXKnockoff( + generator=generator, + statistical_test=statistical_test, + n_repeat=n_repeat, + n_jobs=n_jobs, + memory=memory, + joblib_verbose=joblib_verbose, ) - selected = np.where(aggregated_pval <= threshold)[0] - - return selected, aggregated_pval, pvals - - -def _stat_coefficient_diff(X, X_tilde, y, estimator, fdr, preconfigure_estimator=None): - """ - Compute the Lasso Coefficient-Difference (LCD) statistic by comparing original and knockoff coefficients. - - This function fits a model on the concatenated original and knockoff features, then - calculates test statistics based on the difference between coefficient magnitudes. - - Parameters - ---------- - X : ndarray of shape (n_samples, n_features) - Original feature matrix. - - X_tilde : ndarray of shape (n_samples, n_features) - Knockoff feature matrix. - - y : ndarray of shape (n_samples,) - Target values. - - estimator : estimator object - Scikit-learn estimator with fit() method and coef_ attribute. - Common choices include LassoCV, LogisticRegressionCV. - - fdr : float - Target false discovery rate level between 0 and 1. - - preconfigure_estimator : callable, default=None - Optional function to configure estimator parameters before fitting. - Called with arguments (estimator, X, X_tilde, y). - - Returns - ------- - test_score : ndarray of shape (n_features,) - Feature importance scores computed as |beta_j| - |beta_j'| - where beta_j and beta_j' are original and knockoff coefficients. - - ko_thr : float - Knockoff threshold value used for feature selection. - - selected : ndarray - Indices of features with test_score >= ko_thr. - - Notes - ----- - The test statistic follows Equation 1.7 in Barber & Candès (2015) and - Equation 3.6 in Candès et al. (2018). - """ - n_samples, n_features = X.shape - X_ko = np.column_stack([X, X_tilde]) - if preconfigure_estimator is not None: - estimator = preconfigure_estimator(estimator, X, X_tilde, y) - estimator.fit(X_ko, y) - if hasattr(estimator, "coef_"): - coef = np.ravel(estimator.coef_) - elif hasattr(estimator, "best_estimator_") and hasattr( - estimator.best_estimator_, "coef_" - ): - coef = np.ravel(estimator.best_estimator_.coef_) # for CV object - else: - raise TypeError("estimator should be linear") - # Equation 1.7 in barber2015controlling or 3.6 of candes2018panning - test_score = np.abs(coef[:n_features]) - np.abs(coef[n_features:]) - - # Compute the threshold level and selecte the important variables - ko_thr = _knockoff_threshold(test_score, fdr=fdr) - selected = np.where(test_score >= ko_thr)[0] - - return test_score, ko_thr, selected - - -def _knockoff_threshold(test_score, fdr=0.1): - """ - Calculate the knockoff threshold based on the procedure stated in the article. - - Original code: - https://github.com/msesia/knockoff-filter/blob/master/R/knockoff/R/knockoff_filter.R - - Parameters - ---------- - test_score : 1D ndarray, shape (n_features, ) - Vector of test statistic. - - fdr : float - Desired controlled FDR (false discovery rate) level. - - Returns - ------- - threshold : float or np.inf - Threshold level. - """ - offset = 1 # Offset equals 1 is the knockoff+ procedure. - - threshold_mesh = np.sort(np.abs(test_score[test_score != 0])) - np.concatenate( - [[0], threshold_mesh, [np.inf]] - ) # if there is no solution, the threshold is inf - # find the right value of t for getting a good fdr - # Equation 1.8 of barber2015controlling and 3.10 in Candès 2018 - threshold = 0.0 - for threshold in threshold_mesh: - false_pos = np.sum(test_score <= -threshold) - selected = np.sum(test_score >= threshold) - if (offset + false_pos) / np.maximum(selected, 1) <= fdr: - break - return threshold - - -def _empirical_knockoff_pval(test_score): - """ - Compute the empirical p-values from the knockoff+ test. - - Parameters - ---------- - test_score : 1D ndarray, shape (n_features, ) - Vector of test statistics. + return model_x_knockoff.fit_importance(X, y) - Returns - ------- - pvals : 1D ndarray, shape (n_features, ) - Vector of empirical p-values. - """ - pvals = [] - n_features = test_score.size - - offset = 1 # Offset equals 1 is the knockoff+ procedure. - - test_score_inv = -test_score - for i in range(n_features): - if test_score[i] <= 0: - pvals.append(1) - else: - pvals.append( - (offset + np.sum(test_score_inv >= test_score[i])) / n_features - ) - - return np.array(pvals) - -def _empirical_knockoff_eval(test_score, ko_threshold): +# use the docstring of the class for the function +model_x_knockoff.__doc__ = _aggregate_docstring( + [ + ModelXKnockoff.__doc__, + ModelXKnockoff.__init__.__doc__, + ModelXKnockoff.fit_importance.__doc__, + ModelXKnockoff.selection.__doc__, + ], """ - Compute the empirical e-values from the knockoff test. - - Parameters - ---------- - test_score : 1D ndarray, shape (n_features, ) - Vector of test statistics. - - ko_threshold : float - Threshold level. - Returns ------- - evals : 1D ndarray, shape (n_features, ) - Vector of empirical e-values. - """ - evals = [] - n_features = test_score.size - - offset = 1 # Offset equals 1 is the knockoff+ procedure. - - for i in range(n_features): - if test_score[i] < ko_threshold: - evals.append(0) - else: - evals.append(n_features / (offset + np.sum(test_score <= -ko_threshold))) - - return np.array(evals) + selection: binary array-like of shape (n_features) + Binary array of the seleted features + importance : array-like of shape (n_features) + The computed feature importance scores. + pvalues : array-like of shape (n_features) + The computed significant of feature for the prediction. + """, +) diff --git a/src/hidimstat/statistical_tools/aggregation.py b/src/hidimstat/statistical_tools/aggregation.py index a9a85a4e3..21aa44f3c 100644 --- a/src/hidimstat/statistical_tools/aggregation.py +++ b/src/hidimstat/statistical_tools/aggregation.py @@ -1,7 +1,7 @@ import numpy as np -def quantile_aggregation(pvals, gamma=0.05, adaptive=False): +def quantile_aggregation(pvals, gamma=0.5, adaptive=False): """ Implements the quantile aggregation method for p-values. @@ -15,7 +15,7 @@ def quantile_aggregation(pvals, gamma=0.05, adaptive=False): pvals : ndarray of shape (n_sampling*2, n_test) Matrix of p-values to aggregate. Each row represents a sampling instance and each column a hypothesis test. - gamma : float, default=0.05 + gamma : float, default=0.5 Quantile level for aggregation. Must be in range (0,1]. adaptive : bool, default=False If True, uses adaptive quantile aggregation which optimizes over multiple gamma values. diff --git a/src/hidimstat/conditional_sampling.py b/src/hidimstat/statistical_tools/conditional_sampling.py similarity index 100% rename from src/hidimstat/conditional_sampling.py rename to src/hidimstat/statistical_tools/conditional_sampling.py diff --git a/src/hidimstat/statistical_tools/gaussian_distribution.py b/src/hidimstat/statistical_tools/gaussian_distribution.py new file mode 100644 index 000000000..e0fafeb76 --- /dev/null +++ b/src/hidimstat/statistical_tools/gaussian_distribution.py @@ -0,0 +1,186 @@ +import warnings + +import numpy as np +from sklearn.utils import check_random_state + + +class GaussianDistribution: + """ + Generator for second-order Gaussian variables using the equi-correlated method. + Creates synthetic variables that preserve the covariance structure of the original + variables while ensuring conditional independence between the original and synthetic data. + Parameters + ---------- + cov_estimator : object + Estimator for computing the covariance matrix. Must implement fit and + have a covariance_ attribute after fitting. + random_state : int or None, default=None + Random seed for generating synthetic data. + tol : float, default=1e-14 + Tolerance for numerical stability in matrix computations. + Attributes + ---------- + mu_tilde_ : ndarray of shape (n_samples, n_features) + Mean matrix for generating synthetic variables. + sigma_tilde_decompose_ : ndarray of shape (n_features, n_features) + Cholesky decomposition of the synthetic covariance matrix. + References + ---------- + .. footbibliography:: + """ + + def __init__(self, cov_estimator, random_state=None, tol=1e-14): + self.cov_estimator = cov_estimator + self.tol = tol + self.rng = check_random_state(random_state) + + def fit(self, X): + """ + Fit the Gaussian synthetic variable generator. + This method estimates the parameters needed to generate Gaussian synthetic variables + based on the input data. It follows a methodology for creating second-order + synthetic variables that preserve the covariance structure. + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The input samples used to estimate the parameters for synthetic variable generation. + The data is assumed to follow a Gaussian distribution. + Returns + ------- + self : object + Returns the instance itself. + Notes + ----- + The method implements the following steps: + 1. Centers and scales the data if specified + 2. Estimates mean and covariance of input data + 3. Computes parameters for synthetic variable generation + """ + _, n_features = X.shape + + # estimation of X distribution + # original implementation: + # https://github.com/msesia/knockoff-filter/blob/master/R/knockoff/R/create_second_order.R + mu = X.mean(axis=0) + sigma = self.cov_estimator.fit(X).covariance_ + + diag_s = np.diag(_s_equi(sigma, tol=self.tol)) + + sigma_inv_s = np.linalg.solve(sigma, diag_s) + + # First part on the RHS of equation 1.4 in barber2015controlling + self.mu_tilde_ = X - np.dot(X - mu, sigma_inv_s) + # To calculate the Cholesky decomposition later on + sigma_tilde = 2 * diag_s - diag_s.dot(sigma_inv_s) + # test is the matrix is positive definite + while not np.all(np.linalg.eigvalsh(sigma_tilde) > self.tol): + sigma_tilde += 1e-10 * np.eye(n_features) + warnings.warn( + "The conditional covariance matrix for knockoffs is not positive " + "definite. Adding minor positive value to the matrix.", + UserWarning, + ) + + self.sigma_tilde_decompose_ = np.linalg.cholesky(sigma_tilde) + + return self + + def _check_fit(self): + """ + Check if the model has been fit before performing analysis. + Raises + ------ + ValueError + If any of the required attributes are missing, indicating the model + hasn't been fit before generating synthetic variables. + """ + if not hasattr(self, "mu_tilde_") or not hasattr( + self, "sigma_tilde_decompose_" + ): + raise ValueError("The GaussianGenerator requires to be fit before simulate") + + def sample(self): + """ + Generate synthetic variables. + This function generates synthetic variables that preserve the covariance structure + of the original data while ensuring conditional independence. + Returns + ------- + X_tilde : 2D ndarray (n_samples, n_features) + The synthetic variables. + """ + self._check_fit() + n_samples, n_features = self.mu_tilde_.shape + + # create a uniform noise for all the data + u_tilde = self.rng.randn(n_samples, n_features) + + # Equation 1.4 in barber2015controlling + X_tilde = self.mu_tilde_ + np.dot(u_tilde, self.sigma_tilde_decompose_) + return X_tilde + + +def _s_equi(sigma, tol=1e-14): + """ + Estimate the diagonal matrix of correlation between real + and knockoff variables using the equi-correlated equation. + This function estimates the diagonal matrix of correlation + between real and knockoff variables using the equi-correlated + equation described in :footcite:t:`barber2015controlling` and + :footcite:t:`candes2018panning`. It takes as input the empirical + covariance matrix sigma and a tolerance value tol, + and returns a vector of diagonal values of the estimated + matrix diag{s}. + Parameters + ---------- + sigma : 2D ndarray (n_features, n_features) + The empirical covariance matrix calculated from + the original design matrix. + tol : float, optional + A tolerance value used for numerical stability in the calculation + of the eigenvalues of the correlation matrix. + Returns + ------- + 1D ndarray (n_features, ) + A vector of diagonal values of the estimated matrix diag{s}. + Raises + ------ + Exception + If the covariance matrix is not positive-definite. + """ + n_features = sigma.shape[0] + + # Convert covariance matrix to correlation matrix + # as example see cov2corr from statsmodels + features_std = np.sqrt(np.diag(sigma)) + scale = np.outer(features_std, features_std) + corr_matrix = sigma / scale + + eig_value = np.linalg.eigvalsh(corr_matrix) + lambda_min = np.min(eig_value[0]) + # check if the matrix is positive-defined + if lambda_min <= 0: + raise Exception("The covariance matrix is not positive-definite.") + + s = np.ones(n_features) * min(2 * lambda_min, 1) + + psd = np.all(np.linalg.eigvalsh(2 * corr_matrix - np.diag(s)) > tol) + s_eps = 0 + while not psd: + if s_eps == 0: + s_eps = np.finfo(type(s[0])).eps # avoid zeros + else: + s_eps *= 10 + # if all eigval > 0 then the matrix is positive define + psd = np.all( + np.linalg.eigvalsh(2 * corr_matrix - np.diag(s * (1 - s_eps))) > tol + ) + warnings.warn( + "The equi-correlated matrix for knockoffs is not positive " + f"definite. Reduce the value of distance by {s_eps}.", + UserWarning, + ) + + s = s * (1 - s_eps) + + return s * np.diag(sigma) diff --git a/src/hidimstat/statistical_tools/lasso_test.py b/src/hidimstat/statistical_tools/lasso_test.py new file mode 100644 index 000000000..d090bd65d --- /dev/null +++ b/src/hidimstat/statistical_tools/lasso_test.py @@ -0,0 +1,112 @@ +import numpy as np +from sklearn.linear_model import LassoCV +from sklearn.model_selection import KFold + + +def preconfigure_LassoCV(estimator, X, X_tilde, y, n_alphas=20): + """ + Configure the estimator for Model-X knockoffs. + This function sets up the regularization path for the Lasso estimator + based on the input data and the number of alphas to use. The regularization + path is defined by a sequence of alpha values, which control the amount + of shrinkage applied to the coefficient estimates. + Parameters + ---------- + estimator : sklearn.linear_model.LassoCV + The Lasso estimator to configure. + X : 2D ndarray (n_samples, n_features) + The original design matrix. + X_tilde : 2D ndarray (n_samples, n_features) + The knockoff design matrix. + y : 1D ndarray (n_samples, ) + The target vector. + n_alphas : int, default=10 + The number of alpha values to use to instantiate the cross-validation. + Returns + ------- + estimator : sklearn.linear_model.LassoCV + The configured estimator. + Raises + ------ + TypeError + If estimator is not an instance of LassoCV. + Notes + ----- + The alpha values are calculated based on the combined design matrix [X, X_tilde]. + alpha_max is set to max(X_ko.T @ y)/(2*n_features). + """ + if type(estimator).__name__ != "LassoCV": + raise TypeError("You should not use this function to configure the estimator") + + n_features = X.shape[1] + X_ko = np.column_stack([X, X_tilde]) + alpha_max = np.max(np.dot(X_ko.T, y)) / (2 * n_features) + alphas = np.linspace(alpha_max * np.exp(-n_alphas), alpha_max, n_alphas) + estimator.alphas = alphas + return estimator + + +def lasso_statistic_with_sampling( + X, + X_tilde, + y, + lasso=LassoCV( + n_jobs=1, + verbose=0, + max_iter=200000, + cv=KFold(n_splits=5, shuffle=True, random_state=0), + random_state=1, + tol=1e-6, + ), + preconfigure_lasso=preconfigure_LassoCV, +): + """ + Compute the Lasso Coefficient-Difference (LCD) statistic by comparing original and knockoff coefficients. + This function fits a model on the concatenated original and knockoff features, then + calculates test statistics based on the difference between coefficient magnitudes. + Model-X Knockoff + Parameters + ---------- + X : ndarray of shape (n_samples, n_features) + Original feature matrix. + X_tilde : ndarray of shape (n_samples, n_features) + Knockoff feature matrix. + y : ndarray of shape (n_samples,) + Target values. + estimator : estimator object + Scikit-learn estimator with fit() method and coef_ attribute. + Common choices include LassoCV, LogisticRegressionCV. + fdr : float + Target false discovery rate level between 0 and 1. + preconfigure_estimator : callable, default=None + Optional function to configure estimator parameters before fitting. + Called with arguments (estimator, X, X_tilde, y). + Returns + ------- + test_score : ndarray of shape (n_features,) + Feature importance scores computed as |beta_j| - |beta_j'| + where beta_j and beta_j' are original and knockoff coefficients. + ko_thr : float + Knockoff threshold value used for feature selection. + selected : ndarray + Indices of features with test_score >= ko_thr. + Notes + ----- + The test statistic follows Equation 1.7 in Barber & Candès (2015) and + Equation 3.6 in Candès et al. (2018). + """ + n_samples, n_features = X.shape + X_ko = np.column_stack([X, X_tilde]) + if preconfigure_lasso is not None: + lasso = preconfigure_lasso(lasso, X, X_tilde, y) + lasso.fit(X_ko, y) + if hasattr(lasso, "coef_"): + coef = np.ravel(lasso.coef_) + elif hasattr(lasso, "best_estimator_") and hasattr(lasso.best_estimator_, "coef_"): + coef = np.ravel(lasso.best_estimator_.coef_) # for CV object + else: + raise TypeError("estimator should be linear") + # Equation 1.7 in barber2015controlling or 3.6 of candes2018panning + test_score = np.abs(coef[:n_features]) - np.abs(coef[n_features:]) + + return test_score diff --git a/test/test_conditional_sampling.py b/test/statistical_tools/test_conditional_sampling.py similarity index 99% rename from test/test_conditional_sampling.py rename to test/statistical_tools/test_conditional_sampling.py index d42f1f11c..7c347ff50 100644 --- a/test/test_conditional_sampling.py +++ b/test/statistical_tools/test_conditional_sampling.py @@ -11,7 +11,7 @@ from sklearn.multiclass import OneVsRestClassifier from sklearn.preprocessing import StandardScaler -from hidimstat.conditional_sampling import ConditionalSampler +from hidimstat.statistical_tools.conditional_sampling import ConditionalSampler def test_continuous_case(): diff --git a/test/statistical_tools/test_gaussian_distribution.py b/test/statistical_tools/test_gaussian_distribution.py new file mode 100644 index 000000000..ba586dc1b --- /dev/null +++ b/test/statistical_tools/test_gaussian_distribution.py @@ -0,0 +1,82 @@ +import numpy as np +import pytest +from hidimstat.statistical_tools.gaussian_distribution import ( + _s_equi, + GaussianDistribution, +) +from hidimstat._utils.scenario import multivariate_simulation +from sklearn.covariance import LedoitWolf + + +def test_gaussian_equi(): + """test function of gaussian""" + seed = 42 + n = 100 + p = 50 + X, y, beta, noise = multivariate_simulation(n, p, seed=seed) + generator = GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), + random_state=seed * 2, + ) + generator.fit(X=X) + X_tilde = generator.sample() + assert X_tilde.shape == (n, p) + + +def test_gaussian_center(): + """test function of gaussian""" + seed = 42 + n = 100 + p = 50 + X, y, beta, noise = multivariate_simulation(n, p, seed=seed) + generator = GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), + random_state=seed * 2, + ) + generator.fit(X=X) + X_tilde = generator.sample() + assert X_tilde.shape == (n, p) + + +def test_gaussian_error(): + """test function error""" + seed = 42 + n = 100 + p = 50 + X, y, beta, noise = multivariate_simulation(n, p, seed=seed) + generator = GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), + random_state=seed * 2, + ) + with pytest.raises( + ValueError, match="The GaussianGenerator requires to be fit before simulate" + ): + generator.sample() + + +def test_s_equi_not_define_positive(): + """test the warning and error of s_equi function""" + n = 10 + tol = 1e-7 + seed = 42 + + # random positive matrix + rgn = np.random.RandomState(seed) + a = rgn.randn(n, n) + a -= np.min(a) + with pytest.raises( + Exception, match="The covariance matrix is not positive-definite." + ): + _s_equi(a) + + # matrix with positive eigenvalues, positive diagonal + while not np.all(np.linalg.eigvalsh(a) > tol): + a += 0.1 * np.eye(n) + with pytest.warns(UserWarning, match="The equi-correlated matrix"): + _s_equi(a) + + # positive definite matrix + u, s, vh = np.linalg.svd(a) + d = np.eye(n) + sigma = u * d * u.T + _s_equi(sigma) diff --git a/test/statistical_tools/test_lasso_test.py b/test/statistical_tools/test_lasso_test.py new file mode 100644 index 000000000..d23019f2c --- /dev/null +++ b/test/statistical_tools/test_lasso_test.py @@ -0,0 +1,47 @@ +import numpy as np +import pytest +from sklearn.linear_model import RidgeCV +from sklearn.svm import SVR + +from hidimstat.statistical_tools.lasso_test import ( + preconfigure_LassoCV, + lasso_statistic_with_sampling, +) + + +def test_preconfigure_LassoCV(): + """Test type errors""" + with pytest.raises( + TypeError, match="You should not use this function to configure the estimator" + ): + preconfigure_LassoCV( + estimator=RidgeCV(), + X=np.random.rand(10, 10), + y=np.random.rand(10), + X_tilde=np.random.rand(10, 10), + ) + + +def test_error_lasso_statistic_with_sampling_with_bad_config(): + """Test error lasso statistic""" + with pytest.raises( + TypeError, match="You should not use this function to configure the estimator" + ): + lasso_statistic_with_sampling( + X=np.random.rand(10, 10), + X_tilde=np.random.rand(10, 10), + y=np.random.rand(10), + lasso=SVR(), + ) + + +def test_error_lasso_statistic_with_sampling(): + """Test error lasso statistic""" + with pytest.raises(TypeError, match="estimator should be linear"): + lasso_statistic_with_sampling( + X=np.random.rand(10, 10), + X_tilde=np.random.rand(10, 10), + y=np.random.rand(10), + lasso=SVR(), + preconfigure_lasso=None, + ) diff --git a/test/test_base_variable_importance.py b/test/test_base_variable_importance.py new file mode 100644 index 000000000..30d901c3d --- /dev/null +++ b/test/test_base_variable_importance.py @@ -0,0 +1,244 @@ +import pytest +import numpy as np + +from hidimstat import BaseVariableImportance + + +@pytest.fixture +def set_BaseVariableImportance(pvalues, test_score, seed): + """Create a BaseVariableImportance instance with test data for testing purposes. + + Parameters + ---------- + pvalues : bool + If True, generate random p-values for testing. + test_score : bool + If True, generate random test scores for testing. + seed : int + Random seed for reproducibility. + + Returns + ------- + BaseVariableImportance + A BaseVariableImportance instance with test data. + """ + nb_features = 100 + rng = np.random.RandomState(seed) + vi = BaseVariableImportance() + vi.importances_ = np.arange(nb_features) + rng.shuffle(vi.importances_) + if pvalues or test_score: + vi.pvalues_ = np.sort(rng.rand(nb_features))[vi.importances_] + if test_score: + # TODO: this can be improved. + vi.test_scores_ = [] + for i in range(10): + score = np.random.rand(nb_features) * 30 + vi.test_scores_.append(score) + for i in range(1, 30): + score = np.random.rand(nb_features) + 1 + score[-i:] = np.arange(30 - i, 30) * 2 + score[:i] = -np.arange(30 - i, 30) + vi.test_scores_.append(score[vi.importances_]) + vi.test_scores_ = np.array(vi.test_scores_) + return vi + + +@pytest.mark.parametrize( + "pvalues, test_score, seed", + [(False, False, 0), (True, False, 1), (True, True, 2)], + ids=["only importance", "p-value", "test-score"], +) +class TestSelection: + """Test selection based on importance""" + + def test_selection_k_best(self, set_BaseVariableImportance): + "test selection of the k_best" + vi = set_BaseVariableImportance + true_value = vi.importances_ >= 95 + selection = vi.selection(k_best=5) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_k_best_all(self, set_BaseVariableImportance): + "test selection to all base on string" + vi = set_BaseVariableImportance + true_value = np.ones_like(vi.importances_, dtype=bool) + selection = vi.selection(k_best="all") + np.testing.assert_array_equal(true_value, selection) + + def test_selection_k_best_none(self, set_BaseVariableImportance): + "test selection when there none" + vi = set_BaseVariableImportance + true_value = np.zeros_like(vi.importances_, dtype=bool) + selection = vi.selection(k_best=0) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_percentile(self, set_BaseVariableImportance): + "test selection bae on percentile" + vi = set_BaseVariableImportance + true_value = vi.importances_ >= 50 + selection = vi.selection(percentile=50) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_percentile_all(self, set_BaseVariableImportance): + "test selection when percentile is 100" + vi = set_BaseVariableImportance + true_value = np.ones_like(vi.importances_, dtype=bool) + selection = vi.selection(percentile=100) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_percentile_none(self, set_BaseVariableImportance): + "test selection when percentile is 0" + vi = set_BaseVariableImportance + true_value = np.zeros_like(vi.importances_, dtype=bool) + selection = vi.selection(percentile=0) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_percentile_threshols_value(self, set_BaseVariableImportance): + "test selection when percentile when the percentile equal on value" + vi = set_BaseVariableImportance + mask = np.ones_like(vi.importances_, dtype=bool) + mask[np.where(vi.importances_ == 99)] = False + vi.importances_ = vi.importances_[mask] + true_value = vi.importances_ >= 50 + selection = vi.selection(percentile=50) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_threshold(self, set_BaseVariableImportance): + "test threshold on importance" + vi = set_BaseVariableImportance + true_value = vi.importances_ < 5 + selection = vi.selection(threshold=5) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_threshold_pvalue(self, set_BaseVariableImportance): + "test threshold vbse on pvalues" + vi = set_BaseVariableImportance + if vi.pvalues_ is not None: + true_value = vi.importances_ < 5 + print(vi.pvalues_) + selection = vi.selection( + threshold_pvalue=vi.pvalues_[np.argsort(vi.importances_)[5]] + ) + np.testing.assert_array_equal(true_value, selection) + + +@pytest.mark.parametrize( + "pvalues, test_score, seed", [(True, True, 10)], ids=["default"] +) +class TestSelectionFDR: + """Test selection based on fdr""" + + def test_selection_fdr_default(self, set_BaseVariableImportance): + "test selection of the default" + vi = set_BaseVariableImportance + true_value = vi.importances_ >= 85 + selection = vi.selection_fdr(0.2) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_fdr_adaptation(self, set_BaseVariableImportance): + "test selection of the adaptation" + vi = set_BaseVariableImportance + true_value = vi.importances_ >= 85 + selection = vi.selection_fdr(0.2, adaptive_aggregation=True) + np.testing.assert_array_equal(true_value, selection) + + def test_selection_fdr_bhy(self, set_BaseVariableImportance): + "test selection with bhy" + vi = set_BaseVariableImportance + true_value = vi.importances_ >= 85 + selection = vi.selection_fdr(0.8, fdr_control="bhy") + np.testing.assert_array_equal(true_value, selection) + + def test_selection_fdr_ebh(self, set_BaseVariableImportance): + "test selection with e-values" + vi = set_BaseVariableImportance + true_value = vi.importances_ >= 2 + selection = vi.selection_fdr(0.037, fdr_control="ebh", evalues=True) + np.testing.assert_array_equal(true_value, selection) + + +@pytest.mark.parametrize( + "pvalues, test_score, seed", + [(False, False, 0), (True, False, 0), (True, True, 0)], + ids=["only importance", "p-value", "test-score"], +) +class TestBVIExceptions: + """Test class for BVI Exception""" + + def test_not_fit(self, pvalues, test_score, seed): + "test detection unfit" + vi = BaseVariableImportance() + with pytest.raises( + ValueError, + match="The importances need to be called before calling this method", + ): + vi._check_importance() + with pytest.raises( + ValueError, + match="The importances need to be called before calling this method", + ): + vi.selection() + with pytest.raises( + ValueError, + match="The importances need to be called before calling this method", + ): + vi.selection_fdr(0.1) + + def test_selection_k_best(self, set_BaseVariableImportance): + "test selection k_best wrong" + vi = set_BaseVariableImportance + with pytest.raises(AssertionError, match="k_best needs to be positive or null"): + vi.selection(k_best=-10) + with pytest.warns(Warning, match="k=1000 is greater than n_features="): + vi.selection(k_best=1000) + + def test_selection_percentile(self, set_BaseVariableImportance): + "test selection percentile wrong" + vi = set_BaseVariableImportance + with pytest.raises( + AssertionError, match="percentile needs to be between 0 and 100" + ): + vi.selection(percentile=-1) + with pytest.raises( + AssertionError, match="percentile needs to be between 0 and 100" + ): + vi.selection(percentile=102) + + def test_selection_threshold(self, set_BaseVariableImportance): + "test selection threshold wrong" + vi = set_BaseVariableImportance + if vi.pvalues_ is None: + with pytest.raises( + AssertionError, + match="This method doesn't support a threshold on p-values", + ): + vi.selection(threshold_pvalue=-1) + else: + with pytest.raises( + AssertionError, match="threshold_pvalue needs to be between 0 and 1" + ): + vi.selection(threshold_pvalue=-1) + with pytest.raises( + AssertionError, match="threshold_pvalue needs to be between 0 and 1" + ): + vi.selection(threshold_pvalue=1.1) + + def test_selection_fdr_fdr_control(self, set_BaseVariableImportance): + "test selection fdr_control wrong" + vi = set_BaseVariableImportance + if vi.test_scores_ is None: + with pytest.raises( + AssertionError, + match="this method doesn't support selection base on FDR", + ): + vi.selection_fdr(fdr=0.1) + else: + with pytest.raises( + AssertionError, match="for e-value, the fdr control need to be 'ebh'" + ): + vi.selection_fdr(fdr=0.1, evalues=True) + with pytest.raises( + AssertionError, match="for p-value, the fdr control can't be 'ebh'" + ): + vi.selection_fdr(fdr=0.1, fdr_control="ebh", evalues=False) diff --git a/test/test_knockoff.py b/test/test_knockoff.py index 3b0e86aa1..0844ff5ea 100644 --- a/test/test_knockoff.py +++ b/test/test_knockoff.py @@ -1,306 +1,537 @@ -from hidimstat.knockoffs import ( - model_x_knockoff, - model_x_knockoff_pvalue, - model_x_knockoff_bootstrap_e_value, - model_x_knockoff_bootstrap_quantile, -) -from hidimstat.gaussian_knockoff import gaussian_knockoff_generation, _s_equi -from hidimstat._utils.scenario import multivariate_simulation -from hidimstat.statistical_tools.multiple_testing import fdp_power import numpy as np import pytest + from sklearn.covariance import LedoitWolf, GraphicalLassoCV from sklearn.model_selection import GridSearchCV from sklearn.linear_model import Lasso from sklearn.model_selection import KFold -from sklearn.tree import DecisionTreeRegressor - - -def test_knockoff_bootstrap_quantile(): - """Test bootstrap knockoof with quantile aggregation""" - n = 500 - p = 100 - signal_noise_ratio = 5 - n_bootstraps = 25 - fdr = 0.5 - X, y, beta, noise = multivariate_simulation( - n, p, signal_noise_ratio=signal_noise_ratio, seed=0 - ) - non_zero_index = np.where(beta)[0] - selected, test_scores, threshold, X_tildes = model_x_knockoff( - X, y, n_bootstraps=n_bootstraps, random_state=None, fdr=fdr - ) +from hidimstat import ModelXKnockoff +from hidimstat.knockoffs import model_x_knockoff +from hidimstat.statistical_tools.gaussian_distribution import GaussianDistribution +from hidimstat.statistical_tools.multiple_testing import fdp_power +from hidimstat.statistical_tools.lasso_test import lasso_statistic_with_sampling - selected_verbose, aggregated_pval, pvals = model_x_knockoff_bootstrap_quantile( - test_scores, fdr=fdr - ) - fdp_verbose, power_verbose = fdp_power(selected_verbose, non_zero_index) - - fdp_no_verbose, power_no_verbose = fdp_power(selected_verbose, non_zero_index) - - assert pvals.shape == (n_bootstraps, p) - assert fdp_verbose < 0.5 - assert power_verbose > 0.1 - assert fdp_no_verbose < 0.5 - assert power_no_verbose > 0.1 - - -def test_knockoff_bootstrap_e_values(): - """Test bootstrap Knockoff with e-values""" - n = 500 - p = 100 - signal_noise_ratio = 5 - n_bootstraps = 25 - fdr = 0.5 - X, y, beta, noise = multivariate_simulation( - n, p, signal_noise_ratio=signal_noise_ratio, seed=0 - ) - non_zero_index = np.where(beta)[0] - selected, test_scores, threshold, X_tildes = model_x_knockoff( - X, y, n_bootstraps=n_bootstraps, random_state=None, fdr=fdr / 2 +def configure_linear_categorial_model_x_knockoff(X, y, n_repeat, seed, fdr): + """ + Configure Model-X Knockoff model with linear regression + for feature importance analysis. + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Input data matrix where each column represents a feature + and each row a sample. + y : array-like of shape (n_samples,) + Target variable array. + n_repeat : int + Number of permutations to perform for the ModelXKnockoff analysis. + seed : int + Random seed for reproducibility. + Returns + ------- + importance : array-like + Array containing importance scores for each feature. + Higher values indicate greater feature importance in predicting + the target variable. + Notes + ----- + The function performs the following steps: + 1. Configures ModelXKnockoff + 2. Calculates feature importance + The ModelXKnockoff method uses permutation-based importance scoring with linear + regression as the base model. + """ + # instantiate ModelXKnockoff model with linear regression imputer + model_x_knockoff = ModelXKnockoff( + n_repeat=n_repeat, + generator=GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), random_state=seed + ), ) - - # Using e-values aggregation (verbose vs no verbose) - selected_verbose, aggregated_eval, evals = model_x_knockoff_bootstrap_e_value( - test_scores, threshold, fdr=fdr + # fit the model using the training set + importance = model_x_knockoff.fit_importance(X, y) + selected = model_x_knockoff.selection_fdr(fdr=fdr) + return importance, selected + + +############################################################################## +## tests model_x_knockoff on different type of data +parameter_exact = [ + ("Dim", 500, 50, 10, 0.0, 42, 1.0, np.inf, 0.0), + ("Dim with noise", 500, 50, 10, 0.0, 42, 1.0, 10.0, 0.0), + ("Dim with correlated noise", 500, 50, 10, 0.0, 42, 1.0, 10.0, 0.2), + ("Dim high level noise", 500, 50, 10, 0.2, 42, 1.0, 1.0, 0.0), + ("Dim with correlated features and noise", 500, 50, 10, 0.2, 42, 1, 10, 0), + ( + "Dim with correlated features and correlated noise", + 500, + 50, + 10, + 0.2, + 42, + 1.0, + 10, + 0.2, + ), +] + + +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + zip(*(list(zip(*parameter_exact))[1:])), + ids=list(zip(*parameter_exact))[0], +) +@pytest.mark.parametrize( + "model_x_knockoff_n_sampling, model_x_knockoff_seed, model_x_knockoff_fdr", + [(10, 5, 0.4)], + ids=["default_model_x_knockoff"], +) +def test_model_x_knockoff_linear_data_exact( + data_generator, + model_x_knockoff_n_sampling, + model_x_knockoff_seed, + model_x_knockoff_fdr, +): + """Tests the method on linear cases with noise and correlation""" + X, y, important_features, _ = data_generator + importance, selected = configure_linear_categorial_model_x_knockoff( + X, y, model_x_knockoff_n_sampling, model_x_knockoff_seed, model_x_knockoff_fdr ) - fdp_verbose, power_verbose = fdp_power(selected_verbose, non_zero_index) + # check that importance scores are defined for each feature + assert importance.shape == (X.shape[1],) + # check that important features have the highest importance scores + assert np.all([int(i) in important_features for i in np.argsort(importance)[-10:]]) - fdp_no_verbose, power_no_verbose = fdp_power(selected_verbose, non_zero_index) + # test the selection part + fdp, power = fdp_power(np.where(selected)[0], important_features) + assert fdp < model_x_knockoff_fdr + assert power == 1.0 - assert fdp_verbose < 0.5 - assert power_verbose > 0.1 - assert fdp_no_verbose < 0.5 - assert power_no_verbose > 0.1 - # Checking value for offset not belonging to (0,1) - with pytest.raises(Exception): - _ = model_x_knockoff_bootstrap_e_value( - test_scores, - threshold, - offset=2, - ) +## tests model_x_knockoff on different type of data no power if correlated data +parameter_bad_FDR = [ + ("Dim with correlated features", 500, 50, 10, 0.2, 42, 1.0, np.inf, 0.0), +] -def test_invariant_with_bootstrap(): - """Test bootstrap Knockoff""" - n = 500 - p = 100 - signal_noise_ratio = 5 - fdr = 0.5 - X, y, beta, noise = multivariate_simulation( - n, p, signal_noise_ratio=signal_noise_ratio, seed=0 +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + zip(*(list(zip(*parameter_bad_FDR))[1:])), + ids=list(zip(*parameter_bad_FDR))[0], +) +@pytest.mark.parametrize( + "model_x_knockoff_n_sampling, model_x_knockoff_seed, model_x_knockoff_fdr", + [(10, 5, 0.4)], + ids=["default_model_x_knockoff"], +) +def test_linear_data_bad_FDR( + data_generator, + model_x_knockoff_n_sampling, + model_x_knockoff_seed, + model_x_knockoff_fdr, +): + """Tests the method on linear cases with noise and correlation""" + X, y, important_features, _ = data_generator + importance, selected = configure_linear_categorial_model_x_knockoff( + X, y, model_x_knockoff_n_sampling, model_x_knockoff_seed, model_x_knockoff_fdr ) - # Single AKO (or vanilla KO) (verbose vs no verbose) - ( - selected_bootstrap, - test_scores_bootstrap, - threshold_bootstrap, - X_tildes_bootstrap, - ) = model_x_knockoff(X, y, n_bootstraps=2, random_state=5, fdr=fdr) - ( - selected_no_bootstrap, - test_scores_no_bootstrap, - threshold_no_bootstrap, - X_tildes_no_bootstrap, - ) = model_x_knockoff(X, y, n_bootstraps=1, random_state=5, fdr=fdr) - - np.testing.assert_array_equal(test_scores_bootstrap[0], test_scores_no_bootstrap) - np.testing.assert_array_equal(selected_bootstrap[0], selected_no_bootstrap) - np.testing.assert_array_equal(threshold_bootstrap[0], threshold_no_bootstrap) - np.testing.assert_array_equal(X_tildes_bootstrap[0], X_tildes_no_bootstrap) - - -def test_knockoff_exception(): - """Test exception raise by Knockoff""" - n = 500 - p = 100 - signal_noise_ratio = 5 - X, y, beta, noise = multivariate_simulation( - n, p, signal_noise_ratio=signal_noise_ratio, seed=0 + # check that importance scores are defined for each feature + assert importance.shape == (X.shape[1],) + # check that important features have the highest importance scores + # assert np.all([int(i) in important_features for i in np.argsort(importance)[-10:]]) + + # test the selection part + fdp, power = fdp_power(np.where(selected)[0], important_features) + assert fdp > model_x_knockoff_fdr + assert power == 1.0 + + +## tests model_x_knockoff no power +parameter_no_power = [ + ("Dim", 50, 150, 30, 0.0, 42, 1.0, np.inf, 0.0), + ("Dim with noise", 50, 150, 30, 0.0, 42, 1.0, 10.0, 0.0), + ("Dim with correlated noise", 50, 150, 30, 0.0, 42, 1.0, 10.0, 0.2), + ("Dim with correlated features", 50, 150, 30, 0.2, 42, 1.0, np.inf, 0.0), + ("Dim high level noise", 50, 150, 30, 0.2, 42, 1.0, 1.0, 0.0), +] + + +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + zip(*(list(zip(*parameter_no_power))[1:])), + ids=list(zip(*parameter_no_power))[0], +) +@pytest.mark.parametrize( + "model_x_knockoff_n_sampling, model_x_knockoff_seed, model_x_knockoff_fdr", + [(10, 5, 0.1)], + ids=["default_model_x_knockoff"], +) +def test_model_x_knockoff_linear_no_power( + data_generator, + model_x_knockoff_n_sampling, + model_x_knockoff_seed, + model_x_knockoff_fdr, +): + """Tests the method on linear cases with noise and correlation""" + X, y, important_features, _ = data_generator + importance, selected = configure_linear_categorial_model_x_knockoff( + X, y, model_x_knockoff_n_sampling, model_x_knockoff_seed, model_x_knockoff_fdr ) - non_zero_index = np.where(beta)[0] - - # Checking wrong type for random_state - with pytest.raises(Exception): - _ = model_x_knockoff( - X, - y, - random_state="test", - ) + # check that importance scores are defined for each feature + assert importance.shape == (X.shape[1],) + # test the selection part + fdp, power = fdp_power(np.where(selected)[0], important_features) + assert fdp < model_x_knockoff_fdr + assert power == 0.0 -def test_model_x_knockoff(): - """Test the selection of variable from knockoff""" - seed = 42 - fdr = 0.2 - n = 300 - p = 300 - support_size = 18 - X, y, beta, noise = multivariate_simulation( - n, p, support_size=support_size, seed=seed - ) - non_zero = np.where(beta)[0] - selected, test_score, threshold, X_tildes = model_x_knockoff( - X, y, n_bootstraps=1, random_state=seed + 1, fdr=fdr - ) - ko_result, pvals = model_x_knockoff_pvalue(test_score, fdr=fdr) - fdp, power = fdp_power(ko_result, non_zero) - assert fdp <= 0.2 - assert power > 0.7 - assert np.all(0 <= pvals) or np.all(pvals <= 1) - - -def test_model_x_knockoff_estimator(): - """Test knockoff with a crossvalidation estimator""" - seed = 42 - fdr = 0.2 - n = 300 - p = 300 - X, y, beta, noise = multivariate_simulation(n, p, seed=seed) - non_zero = np.where(beta)[0] - selected, test_scores, threshold, X_tildes = model_x_knockoff( - X, - y, - estimator=GridSearchCV(Lasso(), param_grid={"alpha": np.linspace(0.2, 0.3, 5)}), - n_bootstraps=1, - preconfigure_estimator=None, - random_state=seed + 1, - fdr=fdr, +## tests model_x_knockoff on different type of data +parameter_bad_detection = [ + ("No information", 500, 50, 10, 0.0, 42, 1.0, 0.0, 0.0), + ("Dim with correlated features and noise", 50, 150, 30, 0.2, 42, 1, 10, 0), + ( + "Dim with correlated features and correlated noise", + 50, + 150, + 30, + 0.2, + 42, + 1.0, + 10, + 0.2, + ), +] + + +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + zip(*(list(zip(*parameter_bad_detection))[1:])), + ids=list(zip(*parameter_bad_detection))[0], +) +@pytest.mark.parametrize( + "model_x_knockoff_n_sampling, model_x_knockoff_seed, model_x_knockoff_fdr", + [(10, 5, 0.4)], + ids=["default_model_x_knockoff"], +) +def test_model_x_knockoff_linear_fail( + data_generator, + model_x_knockoff_n_sampling, + model_x_knockoff_seed, + model_x_knockoff_fdr, +): + """Tests the method on linear cases with noise and correlation""" + X, y, important_features, _ = data_generator + importance, selected = configure_linear_categorial_model_x_knockoff( + X, y, model_x_knockoff_n_sampling, model_x_knockoff_seed, model_x_knockoff_fdr ) - fdp, power = fdp_power(selected, non_zero) - - assert fdp <= fdr - assert power > 0.7 - - -def test_model_x_knockoff_exception(): - "Test the exception raise by model_x_knockoff" - n = 50 - p = 100 - seed = 45 - rgn = np.random.RandomState(seed) - X = rgn.randn(n, p) - y = rgn.randn(n) - with pytest.raises(TypeError, match="You should not use this function"): - model_x_knockoff( - X, - y, - estimator=Lasso(), - n_bootstraps=1, + # check that importance scores are defined for each feature + assert importance.shape == (X.shape[1],) + # Verify that not all important features are detected + assert np.sum( + [int(i) in important_features for i in np.argsort(importance)[-4:]] + ) != len(important_features) + + # test the selection part + fdp, power = fdp_power(np.where(selected)[0], important_features) + assert fdp == power + assert power < 0.2 + + +############################################################################## +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + [(300, 20, 5, 0.0, 42, 1.0, np.inf, 0.0)], + ids=["default data"], +) +class TestModelXKnockoffClass: + """Test the element of the class""" + + def test_model_x_knockoff_init(self, data_generator): + """Test ModelXKnockoff initialization""" + X, y, _, _ = data_generator + model_x_knockoff = ModelXKnockoff() + assert model_x_knockoff.n_jobs == 1 + assert model_x_knockoff.n_repeat == 1 + assert model_x_knockoff.generator is not None + assert model_x_knockoff.statistical_test is not None + + def test_model_x_knockoff_fit(self, data_generator): + """Test fitting ModelXKnockoff""" + X, y, _, _ = data_generator + model_x_knockoff = ModelXKnockoff() + model_x_knockoff.fit(X) + # check if the generator is fitted + model_x_knockoff.generator._check_fit() + + def test_model_x_knockoff_importance(self, data_generator): + """Test importance of ModelXKnockoff""" + X, y, important_features, _ = data_generator + model_x_knockoff = ModelXKnockoff(n_repeat=5) + model_x_knockoff.fit(X) + importance = model_x_knockoff.importance(X, y) + + # check that importance scores are defined for each feature + assert importance.shape == (X.shape[1],) + # check that important features have the highest importance scores + assert np.all( + [int(i) in important_features for i in np.argsort(importance)[-5:]] ) - with pytest.raises(TypeError, match="estimator should be linear"): - model_x_knockoff( - X, - y, - estimator=DecisionTreeRegressor(), - preconfigure_estimator=None, - n_bootstraps=1, + + def test_model_x_knockoff_categorical( + self, + n_samples, + n_features, + support_size, + rho, + seed, + value, + signal_noise_ratio, + rho_serial, + ): + """Test ModelXKnockoff with categorical variables""" + rng = np.random.default_rng(seed) + X_cont = rng.random((n_samples, 2)) + X_cat = rng.integers(low=0, high=3, size=(n_samples, 1)) + X = np.hstack([X_cont, X_cat]) + y = rng.random((n_samples)) + + model_x_knockoff = ModelXKnockoff(centered=False) + + with pytest.warns( + Warning, + match="The conditional covariance matrix for knockoffs is not positive definite.", + ): + with pytest.warns( + Warning, + match="The equi-correlated matrix for knockoffs is not positive definite.", + ): + importances = model_x_knockoff.fit_importance(X, y) + assert len(importances) == 3 + assert np.all(importances >= 0) + + def test_model_x_knockoff_CV_estimator(self, data_generator, seed): + """Test ModelXKnockoff with a crossvalidation estimator""" + fdr = 0.7 + X, y, important_features, _ = data_generator + + def lasso_statistic_gen(X, X_tilde, y): + return lasso_statistic_with_sampling( + X, + X_tilde, + y, + lasso=GridSearchCV( + Lasso(), param_grid={"alpha": np.linspace(0.2, 0.3, 5)} + ), + preconfigure_lasso=None, + ) + + model_x_knockoff = ModelXKnockoff( + n_repeat=1, + generator=GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), random_state=seed + 2 + ), + statistical_test=lasso_statistic_gen, + ) + model_x_knockoff.fit_importance(X, y) + selected = model_x_knockoff.selection_fdr(fdr=fdr) + + fdp, power = fdp_power(np.where(selected)[0], important_features) + + assert fdp <= fdr + assert power == 1.0 + + def test_estimate_distribution(self, data_generator, seed): + """Test different estimation of the covariance""" + fdr = 0.3 + X, y, important_features, _ = data_generator + model_x_knockoff = ModelXKnockoff( + n_repeat=1, + generator=GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), random_state=seed + 1 + ), + ) + model_x_knockoff.fit_importance(X, y) + selected = model_x_knockoff.selection_fdr(fdr=fdr) + for i in important_features: + assert selected[i] + + model_x_knockoff = ModelXKnockoff( + n_repeat=1, + generator=GaussianDistribution( + cov_estimator=GraphicalLassoCV( + alphas=[1e-3, 1e-2, 1e-1, 1], + cv=KFold(n_splits=5, shuffle=True, random_state=seed + 2), + ), + random_state=seed + 3, + ), + ) + model_x_knockoff.fit_importance(X, y) + selected = model_x_knockoff.selection_fdr(fdr=fdr) + for i in important_features: + assert selected[i] + + def test_model_x_knockoff_selection(self, data_generator): + """Test the selection of variable from knockoff""" + fdr = 0.5 + n_repeat = 10 + X, y, important_features, _ = data_generator + model_x_knockoff = ModelXKnockoff( + n_repeat=n_repeat, + generator=GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), random_state=0 + ), + ) + model_x_knockoff.fit_importance(X, y) + selected = model_x_knockoff.selection_fdr(fdr=fdr) + fdp, power = fdp_power(np.where(selected)[0], important_features) + assert fdp <= 0.5 + assert power > 0.7 + assert np.all(0 <= model_x_knockoff.pvalues_) or np.all( + model_x_knockoff.pvalues_ <= 1 ) + def test_model_x_knockoff_repeat_quantile(self, data_generator, n_features): + """Test ModelXKnockoff selection""" + fdr = 0.5 + n_repeat = 10 + X, y, important_features, _ = data_generator + model_x_knockoff = ModelXKnockoff( + n_repeat=n_repeat, + generator=GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), random_state=0 + ), + ) + model_x_knockoff.fit_importance(X, y) + selected = model_x_knockoff.selection_fdr(fdr=fdr) + + fdp, power = fdp_power(np.where(selected)[0], important_features) + + assert model_x_knockoff.test_scores_.shape == (n_repeat, n_features) + assert fdp < 0.5 + assert power == 1.0 + + def test_model_x_knockoff_repeat_e_values(self, data_generator, n_features): + """Test ModelXKnockoff selection with e-values""" + fdr = 0.75 + n_repeat = 20 + X, y, important_features, _ = data_generator + model_x_knockoff = ModelXKnockoff( + n_repeat=n_repeat, + generator=GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), random_state=0 + ), + ) + model_x_knockoff.fit_importance(X, y) + selected = model_x_knockoff.selection_fdr( + fdr=fdr, evalues=True, fdr_control="ebh" + ) -def test_estimate_distribution(): - """ - test different estimation of the covariance - """ - seed = 42 - fdr = 0.1 - n = 100 - p = 50 - X, y, beta, noise = multivariate_simulation(n, p, seed=seed) - non_zero = np.where(beta)[0] - selected, test_scores, threshold, X_tildes = model_x_knockoff( - X, - y, - cov_estimator=LedoitWolf(assume_centered=True), - n_bootstraps=1, - random_state=seed + 1, - fdr=fdr, - ) - for i in selected: - assert np.any(i == non_zero) - selected, test_scores, threshold, X_tildes = model_x_knockoff( - X, - y, - cov_estimator=GraphicalLassoCV( - alphas=[1e-3, 1e-2, 1e-1, 1], - cv=KFold(n_splits=5, shuffle=True, random_state=0), - ), - n_bootstraps=1, - random_state=seed + 2, - fdr=fdr, - ) - for i in selected: - assert np.any(i == non_zero) - - -def test_gaussian_knockoff_equi(): - """test function of gaussian knockoff""" - seed = 42 - n = 100 - p = 50 - X, y, beta, noise = multivariate_simulation(n, p, seed=seed) - non_zero = np.where(beta)[0] - mu = X.mean(axis=0) - sigma = LedoitWolf(assume_centered=True).fit(X).covariance_ - - X_tilde, mu_tilde, sigma_tilde_decompose = gaussian_knockoff_generation( - X, mu, sigma, seed=seed * 2 - ) + fdp, power = fdp_power(np.where(selected)[0], important_features) - assert X_tilde.shape == (n, p) - - -def test_gaussian_knockoff_equi_warning(): - "test warning in guassian knockoff" - seed = 42 - n = 100 - p = 50 - tol = 1e-14 - rgn = np.random.RandomState(seed) - X = rgn.randn(n, p) - mu = X.mean(axis=0) - # create a positive definite matrix - u, s, vh = np.linalg.svd(rgn.randn(p, p)) - d = np.eye(p) * tol / 10 - sigma = u * d * u.T - with pytest.warns( - UserWarning, - match="The conditional covariance matrix for knockoffs is not positive", - ): - X_tilde, mu_tilde, sigma_tilde_decompose = gaussian_knockoff_generation( - X, mu, sigma, seed=seed * 2, tol=tol + assert model_x_knockoff.test_scores_.shape == (n_repeat, n_features) + assert fdp <= 0.75 + assert power == 1.0 + + def test_model_x_knockoff_invariant_with_bootstrap(self, data_generator): + """Test repeat invariance""" + fdr = 0.5 + X, y, important_features, _ = data_generator + + # Single AKO (or vanilla KO) (verbose vs no verbose) + model_x_knockoff_repeat = ModelXKnockoff( + n_repeat=10, + generator=GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), random_state=0 + ), + ) + model_x_knockoff_repeat.fit_importance(X, y) + selected_repeat = model_x_knockoff_repeat.selection_fdr(fdr=fdr) + + model_x_knockoff_no_repeat = ModelXKnockoff( + n_repeat=1, + generator=GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), random_state=0 + ), + ) + model_x_knockoff_no_repeat.fit(X).importance(X, y) + selected_no_repeat = model_x_knockoff_no_repeat.selection_fdr(fdr=fdr) + + fdp_repeat, power_repeat = fdp_power( + np.where(selected_repeat)[0], important_features ) + assert fdp_repeat < 0.5 + assert power_repeat == 1.0 + fdp_no_repeat, power_no_repeat = fdp_power( + np.where(selected_no_repeat)[0], important_features + ) + assert fdp_no_repeat < 0.5 + assert power_no_repeat == 1.0 - assert X_tilde.shape == (n, p) + np.testing.assert_array_equal( + model_x_knockoff_repeat.test_scores_[0], + model_x_knockoff_no_repeat.test_scores_[0], + ) + # test that the selection no boostract should be lower than with boostrap + for i in np.where(selected_repeat)[0]: + assert selected_no_repeat[i] + # np.testing.assert_array_equal( + # model_x_knockoff_repeat.importances_, + # model_x_knockoff_no_repeat.importances_, + # ) + # np.testing.assert_array_equal( + # model_x_knockoff_repeat.pvalues_, model_x_knockoff_no_repeat.pvalues_ + # ) + # np.testing.assert_array_equal(selected_repeat, selected_no_repeat) + + def test_model_x_knockoff_function(self, data_generator): + """Test the function ModelXKnockoff""" + X, y, important_features, _ = data_generator + importance = model_x_knockoff(X, y, n_repeat=5) + # check that importance scores are defined for each feature + assert importance.shape == (X.shape[1],) + # check that important features have the highest importance scores + assert np.all( + [int(i) in important_features for i in np.argsort(importance)[-5:]] + ) -def test_s_equi_not_define_positive(): - """test the warning and error of s_equi function""" - n = 10 - tol = 1e-7 - seed = 42 +############################################################################## +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + [(300, 20, 5, 0.0, 42, 1.0, np.inf, 0.0)], + ids=["default data"], +) +class TestModelXKnockoffExceptions: + """Test class for ModelXKnockoff exceptions""" + + def test_warning(self, data_generator): + """Test if some warning are raised""" + X, y, _, _ = data_generator + model_x_knockoff = ModelXKnockoff(n_repeat=5) + with pytest.warns(Warning, match="y won't be used"): + model_x_knockoff.fit(X, y) + with pytest.warns(Warning, match="cv won't be used"): + model_x_knockoff.fit_importance(X, y, cv="test") + + def test_unfitted_importance(self, data_generator): + """Test importance method with unfitted model""" + X, y, _, _ = data_generator + model_x_knockoff = ModelXKnockoff( + n_repeat=5, + generator=GaussianDistribution( + cov_estimator=LedoitWolf(assume_centered=True), random_state=0 + ), + ) - # random positive matrix - rgn = np.random.RandomState(seed) - a = rgn.randn(n, n) - a -= np.min(a) - with pytest.raises( - Exception, match="The covariance matrix is not positive-definite." - ): - _s_equi(a) - - # matrix with positive eigenvalues, positive diagonal - while not np.all(np.linalg.eigvalsh(a) > tol): - a += 0.1 * np.eye(n) - with pytest.warns(UserWarning, match="The equi-correlated matrix"): - _s_equi(a) - - # positive definite matrix - u, s, vh = np.linalg.svd(a) - d = np.eye(n) - sigma = u * d * u.T - _s_equi(sigma) + with pytest.raises( + ValueError, + match="The Model-X Knockoff requires to be fitted before computing importance", + ): + model_x_knockoff.importance(X, y) + + def test_invalid_n_samplings(self, data_generator): + """Test when invalid number of permutations is provided""" + with pytest.raises(AssertionError, match="n_samplings must be positive"): + ModelXKnockoff(n_repeat=-1)