diff --git a/examples/plot_conditional_vs_marginal_xor_data.py b/examples/plot_conditional_vs_marginal_xor_data.py index e9ea09ec9..55a095fc5 100644 --- a/examples/plot_conditional_vs_marginal_xor_data.py +++ b/examples/plot_conditional_vs_marginal_xor_data.py @@ -117,7 +117,7 @@ random_state=0, ) vim.fit(X_train, y_train) - importances.append(vim.importance(X_test, y_test)["importance"]) + importances.append(vim.importance(X_test, y_test)) importances = np.array(importances).T diff --git a/examples/plot_diabetes_variable_importance_example.py b/examples/plot_diabetes_variable_importance_example.py index 0340d9a3d..17e933802 100644 --- a/examples/plot_diabetes_variable_importance_example.py +++ b/examples/plot_diabetes_variable_importance_example.py @@ -184,14 +184,14 @@ def compute_pval(vim): # ------------------- -cfi_vim_arr = np.array([x["importance"] for x in cfi_importance_list]) / 2 +cfi_vim_arr = np.array(cfi_importance_list) / 2 cfi_pval = compute_pval(cfi_vim_arr) vim = [ pd.DataFrame( { "var": np.arange(cfi_vim_arr.shape[1]), - "importance": x["importance"], + "importance": x, "fold": i, "pval": cfi_pval, "method": "CFI", @@ -200,14 +200,14 @@ def compute_pval(vim): for x in cfi_importance_list ] -loco_vim_arr = np.array([x["importance"] for x in loco_importance_list]) +loco_vim_arr = np.array(loco_importance_list) loco_pval = compute_pval(loco_vim_arr) vim += [ pd.DataFrame( { "var": np.arange(loco_vim_arr.shape[1]), - "importance": x["importance"], + "importance": x, "fold": i, "pval": loco_pval, "method": "LOCO", @@ -216,14 +216,14 @@ def compute_pval(vim): for x in loco_importance_list ] -pfi_vim_arr = np.array([x["importance"] for x in pfi_importance_list]) +pfi_vim_arr = np.array(pfi_importance_list) pfi_pval = compute_pval(pfi_vim_arr) vim += [ pd.DataFrame( { "var": np.arange(pfi_vim_arr.shape[1]), - "importance": x["importance"], + "importance": x, "fold": i, "pval": pfi_pval, "method": "PFI", diff --git a/examples/plot_importance_classification_iris.py b/examples/plot_importance_classification_iris.py index eb92d7abf..9a2be2b72 100644 --- a/examples/plot_importance_classification_iris.py +++ b/examples/plot_importance_classification_iris.py @@ -93,7 +93,7 @@ def run_one_fold(X, y, model, train_index, test_index, vim_name="CFI", groups=No ) vim.fit(X[train_index], y[train_index], groups=groups) - importance = vim.importance(X[test_index], y[test_index])["importance"] + importance = vim.importance(X[test_index], y[test_index]) return pd.DataFrame( { diff --git a/examples/plot_model_agnostic_importance.py b/examples/plot_model_agnostic_importance.py index 9ace442d4..03f8ceded 100644 --- a/examples/plot_model_agnostic_importance.py +++ b/examples/plot_model_agnostic_importance.py @@ -108,10 +108,8 @@ vim_linear.fit(X[train], y[train]) vim_non_linear.fit(X[train], y[train]) - importances_linear.append(vim_linear.importance(X[test], y[test])["importance"]) - importances_non_linear.append( - vim_non_linear.importance(X[test], y[test])["importance"] - ) + importances_linear.append(vim_linear.importance(X[test], y[test])) + importances_non_linear.append(vim_non_linear.importance(X[test], y[test])) ################################################################################ diff --git a/examples/plot_pitfalls_permutation_importance.py b/examples/plot_pitfalls_permutation_importance.py index af4deb83e..e7a041d8e 100644 --- a/examples/plot_pitfalls_permutation_importance.py +++ b/examples/plot_pitfalls_permutation_importance.py @@ -132,7 +132,7 @@ ) pfi.fit(X_test, y_test) - permutation_importances.append(pfi.importance(X_test, y_test)["importance"]) + permutation_importances.append(pfi.importance(X_test, y_test)) permutation_importances = np.stack(permutation_importances) pval_pfi = ttest_1samp( permutation_importances, 0.0, axis=0, alternative="greater" @@ -200,7 +200,7 @@ ) cfi.fit(X_test, y_test) - conditional_importances.append(cfi.importance(X_test, y_test)["importance"]) + conditional_importances.append(cfi.importance(X_test, y_test)) cfi_pval = ttest_1samp( diff --git a/src/hidimstat/__init__.py b/src/hidimstat/__init__.py index 3bb1ef1bf..d282e5027 100644 --- a/src/hidimstat/__init__.py +++ b/src/hidimstat/__init__.py @@ -12,16 +12,16 @@ desparsified_group_lasso_pvalue, ) from .distilled_conditional_randomization_test import d0crt, D0CRT -from .conditional_feature_importance import CFI +from .conditional_feature_importance import cfi, CFI from .knockoffs import ( model_x_knockoff, model_x_knockoff_pvalue, model_x_knockoff_bootstrap_quantile, model_x_knockoff_bootstrap_e_value, ) -from .leave_one_covariate_out import LOCO +from .leave_one_covariate_out import loco, LOCO from .noise_std import reid -from .permutation_feature_importance import PFI +from .permutation_feature_importance import pfi, PFI from .statistical_tools.aggregation import quantile_aggregation @@ -47,6 +47,9 @@ "model_x_knockoff_bootstrap_quantile", "model_x_knockoff_bootstrap_e_value", "CFI", + "cfi", "LOCO", + "loco", "PFI", + "pfi", ] diff --git a/src/hidimstat/_utils/utils.py b/src/hidimstat/_utils/utils.py index df9585241..bc38b6217 100644 --- a/src/hidimstat/_utils/utils.py +++ b/src/hidimstat/_utils/utils.py @@ -25,3 +25,34 @@ def _check_vim_predict_method(method): "The method {} is not a valid method " "for variable importance measure prediction".format(method) ) + + +def get_generated_attributes(cls): + """ + Get all attributes from a class that end with a single underscore + and doesn't start with one underscore. + + Parameters + ---------- + cls : class + The class to inspect for attributes. + + Returns + ------- + list + A list of attribute names that end with a single underscore but not double underscore. + """ + # Get all attributes and methods of the class + all_attributes = dir(cls) + + # Filter out attributes that start with an underscore + filtered_attributes = [attr for attr in all_attributes if not attr.startswith("_")] + + # Filter out attributes that do not end with a single underscore + result = [ + attr + for attr in filtered_attributes + if attr.endswith("_") and not attr.endswith("__") + ] + + return result diff --git a/src/hidimstat/base_perturbation.py b/src/hidimstat/base_perturbation.py index ef3c58343..3c5d0bc06 100644 --- a/src/hidimstat/base_perturbation.py +++ b/src/hidimstat/base_perturbation.py @@ -1,48 +1,68 @@ +import warnings + import numpy as np import pandas as pd from joblib import Parallel, delayed -from sklearn.base import check_is_fitted +from scipy.stats import ttest_1samp +from sklearn.base import check_is_fitted, clone from sklearn.metrics import root_mean_squared_error -import warnings +from sklearn.model_selection import KFold from hidimstat._utils.utils import _check_vim_predict_method from hidimstat._utils.exception import InternalError from hidimstat.base_variable_importance import BaseVariableImportance +from hidimstat._utils.utils import get_generated_attributes class BasePerturbation(BaseVariableImportance): + """ + Base class for model-agnostic variable importance measures based on + perturbation. + + Parameters + ---------- + estimator : sklearn compatible estimator + The estimator to use for the prediction. + method : str, default="predict" + The method used for making predictions. This determines the predictions + passed to the loss function. Supported methods are "predict", + "predict_proba", "decision_function", "transform". + loss : callable, default=root_mean_squared_error + Loss function to compute difference between original and perturbed predictions. + n_permutations : int, default=50 + Number of permutations to perform for calculating variable importance. + Higher values give more stable results but increase computation time. + n_jobs : int, default=1 + Number of parallel jobs to run. -1 means using all processors. + + Attributes + ---------- + features_groups : dict + Mapping of feature groups identified during fit. + importances_ : ndarray + Computed importance scores for each feature group. + loss_reference_ : float + Loss of the original model without perturbation. + loss_ : dict + Loss values for each perturbed feature group. + pvalues_ : ndarray + P-values for importance scores. + + Notes + ----- + This is an abstract base class. Concrete implementations must override + the _permutation method. + """ + def __init__( self, estimator, + method: str = "predict", loss: callable = root_mean_squared_error, n_permutations: int = 50, - method: str = "predict", n_jobs: int = 1, ): - """ - Base class for model-agnostic variable importance measures based on - perturbation. - Parameters - ---------- - estimator : sklearn compatible estimator, optional - The estimator to use for the prediction. - loss : callable, default=root_mean_squared_error - The function to compute the loss when comparing the perturbed model - to the original model. - n_permutations : int, default=50 - This parameter is relevant only for PFI or CFI. - Specifies the number of times the variable group (residual for CFI) is - permuted. For each permutation, the perturbed model's loss is calculated - and averaged over all permutations. - method : str, default="predict" - The method used for making predictions. This determines the predictions - passed to the loss function. Supported methods are "predict", - "predict_proba", "decision_function", "transform". - n_jobs : int, default=1 - The number of parallel jobs to run. Parallelization is done over the - variables or groups of variables. - """ super().__init__() check_is_fitted(estimator) assert n_permutations > 0, "n_permutations must be positive" @@ -50,9 +70,16 @@ def __init__( self.loss = loss _check_vim_predict_method(method) self.method = method - self.n_jobs = n_jobs self.n_permutations = n_permutations - self.n_groups = None + self.n_jobs = n_jobs + # variable set in fit + self.features_groups = None + # varaible set in importance + self.loss_reference_ = None + self.loss_ = None + # internal variables + self._n_groups = None + self._groups_ids = None def fit(self, X, y=None, groups=None): """Base fit method for perturbation-based methods. Identifies the groups. @@ -69,28 +96,30 @@ def fit(self, X, y=None, groups=None): identified based on the columns of X. """ if groups is None: - self.n_groups = X.shape[1] - self.groups = {j: [j] for j in range(self.n_groups)} - self._groups_ids = np.array(list(self.groups.values()), dtype=int) + self._n_groups = X.shape[1] + self.features_groups = {j: [j] for j in range(self._n_groups)} + self._groups_ids = np.array(list(self.features_groups.values()), dtype=int) elif isinstance(groups, dict): - self.n_groups = len(groups) - self.groups = groups + self._n_groups = len(groups) + self.features_groups = groups if isinstance(X, pd.DataFrame): self._groups_ids = [] - for group_key in self.groups.keys(): + for group_key in self.features_groups.keys(): self._groups_ids.append( [ i for i, col in enumerate(X.columns) - if col in self.groups[group_key] + if col in self.features_groups[group_key] ] ) else: self._groups_ids = [ - np.array(ids, dtype=int) for ids in list(self.groups.values()) + np.array(ids, dtype=int) + for ids in list(self.features_groups.values()) ] else: raise ValueError("groups needs to be a dictionnary") + return self def predict(self, X): """ @@ -113,7 +142,7 @@ def predict(self, X): # Parallelize the computation of the importance scores for each group out_list = Parallel(n_jobs=self.n_jobs)( delayed(self._joblib_predict_one_group)(X_, group_id, group_key) - for group_id, group_key in enumerate(self.groups.keys()) + for group_id, group_key in enumerate(self.features_groups.keys()) ) return np.stack(out_list, axis=0) @@ -123,43 +152,128 @@ def importance(self, X, y): Parameters ---------- - X: array-like of shape (n_samples, n_features) - The input samples. - y: array-like of shape (n_samples,) - The target values. + X : array-like of shape (n_samples, n_features) + The input samples to compute importance scores for. + y : array-like of shape (n_samples,) + + importances_ : ndarray of shape (n_groups,) + The importance scores for each group of covariates. + A higher score indicates greater importance of that group. Returns ------- - out_dict: dict - A dictionary containing the following keys: - - 'loss_reference': the loss of the model with the original data. - - 'loss': a dictionary containing the loss of the perturbed model - for each group. - - 'importance': the importance scores for each group. + importances_ : ndarray of shape (n_features,) + Importance scores for each feature. + + Attributes + ---------- + loss_reference_ : float + The loss of the model with the original (non-perturbed) data. + loss_ : dict + Dictionary with indices as keys and arrays of perturbed losses as values. + Contains the loss values for each permutation of each group. + importances_ : ndarray of shape (n_groups,) + The calculated importance scores for each group. + pvalues_ : ndarray of shape (n_groups,) + P-values from one-sided t-test testing if importance scores are + significantly greater than 0. + + Notes + ----- + The importance score for each group is calculated as the mean increase in loss + when that group is perturbed, compared to the reference loss. + A higher importance score indicates that perturbing that group leads to + worse model performance, suggesting those features are more important. """ self._check_fit(X) - out_dict = dict() - y_pred = getattr(self.estimator, self.method)(X) - loss_reference = self.loss(y, y_pred) - out_dict["loss_reference"] = loss_reference + self.loss_reference_ = self.loss(y, y_pred) y_pred = self.predict(X) - out_dict["loss"] = dict() + self.loss_ = dict() for j, y_pred_j in enumerate(y_pred): list_loss = [] for y_pred_perm in y_pred_j: list_loss.append(self.loss(y, y_pred_perm)) - out_dict["loss"][j] = np.array(list_loss) + self.loss_[j] = np.array(list_loss) - out_dict["importance"] = np.array( + self.importances_ = np.array( [ - np.mean(out_dict["loss"][j]) - loss_reference - for j in range(self.n_groups) + np.mean(self.loss_[j]) - self.loss_reference_ + for j in range(self._n_groups) ] ) - return out_dict + self.pvalues_ = ttest_1samp( + self.importances_, 0.0, axis=0, alternative="greater" + ).pvalue + return self.importances_ + + def fit_importance( + self, X, y, cv=KFold(n_splits=5, shuffle=True, random_state=0), **fit_kwargs + ): + """ + Compute feature importance scores using cross-validation. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Training data. + y : array-like of shape (n_samples,) + Target values. + cv : cross-validation generator or iterable, default=KFold(n_splits=5, shuffle=True, random_state=0) + Determines the cross-validation splitting strategy. + **fit_kwargs : dict + Additional arguments passed to the fit method during variable group identification. + + Returns + ------- + importances_ : ndarray + Average importance scores for each feature group across CV folds. + + Attributes + ---------- + estimators_cv_ : list + List of fitted estimators for each CV fold. + importances_cv_ : list + List of importance scores for each CV fold. + pvalues_cv_ : list + List of p-values for each CV fold. + loss_cv_ : list + List of loss values for each CV fold. + loss_reference_cv_ : list + List of reference loss values for each CV fold. + + Notes + ----- + For each CV fold: + 1. Fits a clone of the base estimator on the training fold + 2. Identifies variable groups on the training fold + 3. Computes feature importances using the test fold + 4. Stores results for each fold in respective cv\_ attributes + + Final importances\_ and pvalues\_ are averaged across all CV folds. + """ + name_attribute_save = get_generated_attributes(self) + for name in name_attribute_save: + setattr(self, name + "cv_", []) + self.estimators_cv_ = [] + + for train, test in cv.split(X): + estimator = clone(self.estimator) + estimator.fit(X[train], y[train]) + self.fit(X[train], y[train], **fit_kwargs) + self.importance(X[test], y[test]) + # save result of each cv + for name in name_attribute_save: + getattr(self, name + "cv_").append(getattr(self, name)) + setattr(self, name, None) + self.estimators_cv_.append(estimator) + self.importances_ = np.mean(self.importances_cv_, axis=0) + self.pvalues_ = ( + None if self.pvalues_cv_[0] is None else np.mean(self.pvalues_cv_, axis=0) + ) + return self.importances_ def _check_fit(self, X): """ @@ -184,9 +298,9 @@ def _check_fit(self, X): of features in the grouped variables. """ if ( - self.n_groups is None - or not hasattr(self, "groups") - or not hasattr(self, "_groups_ids") + self._n_groups is None + or self.features_groups is None + or self._groups_ids is None ): raise ValueError( "The class is not fitted. The fit method must be called" @@ -204,7 +318,7 @@ def _check_fit(self, X): else: raise ValueError("X should be a pandas dataframe or a numpy array.") number_columns = X.shape[1] - for index_variables in self.groups.values(): + for index_variables in self.features_groups.values(): if type(index_variables[0]) is int or np.issubdtype( type(index_variables[0]), int ): @@ -222,7 +336,7 @@ def _check_fit(self, X): "A problem with indexing has happened during the fit." ) number_unique_feature_in_groups = np.unique( - np.concatenate([values for values in self.groups.values()]) + np.concatenate([values for values in self.features_groups.values()]) ).shape[0] if X.shape[1] != number_unique_feature_in_groups: warnings.warn( @@ -231,6 +345,16 @@ def _check_fit(self, X): f"{number_unique_feature_in_groups}" ) + def _check_importance(self): + """ + Checks if the loss has been computed. + """ + super()._check_importance() + if ( + self.loss_reference_ is None and not hasattr(self, "loss_reference_cv_") + ) or (self.loss_ is None and not hasattr(self, "loss_cv_")): + raise ValueError("The importance method has not yet been called.") + def _joblib_predict_one_group(self, X, group_id, group_key): """ Compute the predictions after perturbation of the data for a given diff --git a/src/hidimstat/conditional_feature_importance.py b/src/hidimstat/conditional_feature_importance.py index 6609fd072..c11ab714f 100644 --- a/src/hidimstat/conditional_feature_importance.py +++ b/src/hidimstat/conditional_feature_importance.py @@ -2,70 +2,75 @@ from joblib import Parallel, delayed from sklearn.base import check_is_fitted, clone, BaseEstimator from sklearn.metrics import root_mean_squared_error +from sklearn.model_selection import KFold +from sklearn.linear_model import RidgeCV, LogisticRegressionCV from sklearn.utils.validation import check_random_state from hidimstat.base_perturbation import BasePerturbation from hidimstat.conditional_sampling import ConditionalSampler +from hidimstat._utils.docstring import _aggregate_docstring class CFI(BasePerturbation): + """ + Conditional Feature Importance (CFI) algorithm. + :footcite:t:`Chamma_NeurIPS2023` and for group-level see + :footcite:t:`Chamma_AAAI2024`. + + Parameters + ---------- + estimator : sklearn compatible estimator, optional + The estimator to use for the prediction. + method : str, default="predict" + The method to use for the prediction. This determines the predictions passed + to the loss function. Supported methods are "predict", "predict_proba" or + "decision_function". + loss : callable, default=root_mean_squared_error + The loss function to use when comparing the perturbed model to the full + model. + n_permutations : int, default=50 + The number of permutations to perform. For each variable/group of variables, + the mean of the losses over the `n_permutations` is computed. + imputation_model_continuous : sklearn compatible estimator, default=RidgeCV() + The model used to estimate the conditional distribution of a given + continuous variable/group of variables given the others. + imputation_model_categorical : sklearn compatible estimator, default=LogisticRegressionCV() + The model used to estimate the conditional distribution of a given + categorical variable/group of variables given the others. Binary is + considered as a special case of categorical. + categorical_max_cardinality : int, default=10 + The maximum cardinality of a variable to be considered as categorical + when the variable type is inferred (set to "auto" or not provided). + random_state : int, default=None + The random state to use for sampling. + n_jobs : int, default=1 + The number of jobs to run in parallel. Parallelization is done over the + variables or groups of variables. + + References + ---------- + .. footbibliography:: + """ + def __init__( self, estimator, - loss: callable = root_mean_squared_error, method: str = "predict", - n_jobs: int = 1, + loss: callable = root_mean_squared_error, n_permutations: int = 50, - imputation_model_continuous=None, - imputation_model_categorical=None, - random_state: int = None, + imputation_model_continuous=RidgeCV(), + imputation_model_categorical=LogisticRegressionCV(), categorical_max_cardinality: int = 10, + random_state: int = None, + n_jobs: int = 1, ): - """ - Conditional Feature Importance (CFI) algorithm. - :footcite:t:`Chamma_NeurIPS2023` and for group-level see - :footcite:t:`Chamma_AAAI2024`. - Parameters - ---------- - estimator : sklearn compatible estimator, optional - The estimator to use for the prediction. - loss : callable, default=root_mean_squared_error - The loss function to use when comparing the perturbed model to the full - model. - method : str, default="predict" - The method to use for the prediction. This determines the predictions passed - to the loss function. Supported methods are "predict", "predict_proba" or - "decision_function". - n_jobs : int, default=1 - The number of jobs to run in parallel. Parallelization is done over the - variables or groups of variables. - n_permutations : int, default=50 - The number of permutations to perform. For each variable/group of variables, - the mean of the losses over the `n_permutations` is computed. - imputation_model_continuous : sklearn compatible estimator, optional - The model used to estimate the conditional distribution of a given - continuous variable/group of variables given the others. - imputation_model_categorical : sklearn compatible estimator, optional - The model used to estimate the conditional distribution of a given - categorical variable/group of variables given the others. Binary is - considered as a special case of categorical. - random_state : int, default=None - The random state to use for sampling. - categorical_max_cardinality : int, default=10 - The maximum cardinality of a variable to be considered as categorical - when the variable type is inferred (set to "auto" or not provided). - - References - ---------- - .. footbibliography:: - """ super().__init__( estimator=estimator, - loss=loss, method=method, - n_jobs=n_jobs, + loss=loss, n_permutations=n_permutations, + n_jobs=n_jobs, ) # check the validity of the inputs @@ -83,7 +88,8 @@ def __init__( self.random_state = random_state def fit(self, X, y=None, groups=None, var_type="auto"): - """Fit the imputation models. + """ + Fit the imputation models. Parameters ---------- @@ -107,13 +113,13 @@ def fit(self, X, y=None, groups=None, var_type="auto"): self.random_state = check_random_state(self.random_state) super().fit(X, None, groups=groups) if isinstance(var_type, str): - self.var_type = [var_type for _ in range(self.n_groups)] + var_type = [var_type for _ in range(self._n_groups)] else: - self.var_type = var_type + var_type = var_type self._list_imputation_models = [ ConditionalSampler( - data_type=self.var_type[groupd_id], + data_type=var_type[groupd_id], model_regression=( None if self.imputation_model_continuous is None @@ -127,7 +133,7 @@ def fit(self, X, y=None, groups=None, var_type="auto"): random_state=self.random_state, categorical_max_cardinality=self.categorical_max_cardinality, ) - for groupd_id in range(self.n_groups) + for groupd_id in range(self._n_groups) ] # Parallelize the fitting of the covariate estimators @@ -188,3 +194,71 @@ def _permutation(self, X, group_id): return self._list_imputation_models[group_id].sample( X_minus_j, X_j, n_samples=self.n_permutations ) + + +def cfi( + estimator, + X, + y, + cv=KFold(n_splits=5, shuffle=True, random_state=0), + groups: dict = None, + var_type: str = "auto", + method: str = "predict", + loss: callable = root_mean_squared_error, + n_permutations: int = 50, + imputation_model_continuous=None, + imputation_model_categorical=None, + categorical_max_cardinality: int = 10, + k_best=None, + percentile=None, + threshold=None, + threshold_pvalue=None, + random_state: int = None, + n_jobs: int = 1, +): + methods = CFI( + estimator=estimator, + method=method, + loss=loss, + n_permutations=n_permutations, + imputation_model_continuous=imputation_model_continuous, + imputation_model_categorical=imputation_model_categorical, + categorical_max_cardinality=categorical_max_cardinality, + random_state=random_state, + n_jobs=n_jobs, + ) + methods.fit_importance( + X, + y, + cv=cv, + groups=groups, + var_type=var_type, + ) + selection = methods.selection( + k_best=k_best, + percentile=percentile, + threshold=threshold, + threshold_pvalue=threshold_pvalue, + ) + return selection, methods.importances_, methods.pvalues_ + + +# use the docstring of the class for the function +cfi.__doc__ = _aggregate_docstring( + [ + CFI.__doc__, + CFI.__init__.__doc__, + CFI.fit_importance.__doc__, + CFI.selection.__doc__, + ], + """ + Returns + ------- + selection : ndarray of shape (n_features,) + Boolean array indicating selected features (True = selected) + importances : ndarray of shape (n_features,) + Feature importance scores/test statistics. + pvalues : ndarray of shape (n_features,) + P-values for importance scores. + """, +) diff --git a/src/hidimstat/conditional_sampling.py b/src/hidimstat/conditional_sampling.py index f8920581a..609c7ea5d 100644 --- a/src/hidimstat/conditional_sampling.py +++ b/src/hidimstat/conditional_sampling.py @@ -45,8 +45,8 @@ def __init__( model_regression=None, model_categorical=None, data_type: str = "auto", - random_state=None, categorical_max_cardinality=10, + random_state=None, ): """ Class use to sample from the conditional distribution $p(X^j | X^{-j})$. @@ -62,11 +62,11 @@ def __init__( The variable type. Supported types include "auto", "continuous", and "categorical". If "auto", the type is inferred from the cardinality of the unique values passed to the `fit` method. - random_state : int, optional - The random state to use for sampling. categorical_max_cardinality : int, default=10 The maximum cardinality of a variable to be considered as categorical when `data_type` is "auto". + random_state : int, optional + The random state to use for sampling. """ # check the validity of the inputs @@ -79,8 +79,8 @@ def __init__( self.data_type = data_type self.model_regression = model_regression self.model_categorical = model_categorical - self.rng = check_random_state(random_state) self.categorical_max_cardinality = categorical_max_cardinality + self.rng = check_random_state(random_state) def fit(self, X: np.ndarray, y: np.ndarray): r""" diff --git a/src/hidimstat/leave_one_covariate_out.py b/src/hidimstat/leave_one_covariate_out.py index 27bd29cbb..6deba0c9f 100644 --- a/src/hidimstat/leave_one_covariate_out.py +++ b/src/hidimstat/leave_one_covariate_out.py @@ -2,61 +2,68 @@ import pandas as pd from joblib import Parallel, delayed from sklearn.base import check_is_fitted, clone +from sklearn.model_selection import KFold from sklearn.metrics import root_mean_squared_error from hidimstat.base_perturbation import BasePerturbation +from hidimstat._utils.docstring import _aggregate_docstring class LOCO(BasePerturbation): + """ + Leave-One-Covariate-Out (LOCO) algorithm + + This method is presented in :footcite:t:`lei2018distribution` and :footcite:t:`verdinelli2024feature`. + The model is re-fitted for each variable/group of variables. The importance is + then computed as the difference between the loss of the full model and the loss + of the model without the variable/group. + + Parameters + ---------- + estimator : sklearn compatible estimator, optional + The estimator to use for the prediction. + method : str, default="predict" + The method to use for the prediction. This determines the predictions passed + to the loss function. Supported methods are "predict", "predict_proba" or + "decision_function". + loss : callable, default=root_mean_squared_error + The loss function to use when comparing the perturbed model to the full + model. + n_jobs : int, default=1 + The number of jobs to run in parallel. Parallelization is done over the + variables or groups of variables. + + Notes + ----- + :footcite:t:`Williamson_General_2023` also presented a LOCO method with an + additional data splitting strategy. + + References + ---------- + .. footbibliography:: + """ + def __init__( self, estimator, - loss: callable = root_mean_squared_error, method: str = "predict", + loss: callable = root_mean_squared_error, n_jobs: int = 1, ): - """ - Leave-One-Covariate-Out (LOCO) as presented in - :footcite:t:`lei2018distribution` and :footcite:t:`verdinelli2024feature`. - The model is re-fitted for each variable/group of variables. The importance is - then computed as the difference between the loss of the full model and the loss - of the model without the variable/group. - Parameters - ---------- - estimator : sklearn compatible estimator, optional - The estimator to use for the prediction. - loss : callable, default=root_mean_squared_error - The loss function to use when comparing the perturbed model to the full - model. - method : str, default="predict" - The method to use for the prediction. This determines the predictions passed - to the loss function. Supported methods are "predict", "predict_proba" or - "decision_function". - n_jobs : int, default=1 - The number of jobs to run in parallel. Parallelization is done over the - variables or groups of variables. - - Notes - ----- - :footcite:t:`Williamson_General_2023` also presented a LOCO method with an - additional data splitting strategy. - - References - ---------- - .. footbibliography:: - """ super().__init__( estimator=estimator, - loss=loss, method=method, - n_jobs=n_jobs, + loss=loss, n_permutations=1, + n_jobs=n_jobs, ) - self._list_estimators = [] + # internal variable + self._list_estimators = None def fit(self, X, y, groups=None): - """Fit a model after removing each covariate/group of covariates. + """ + Fit a model after removing each covariate/group of covariates. Parameters ---------- @@ -75,25 +82,47 @@ def fit(self, X, y, groups=None): """ super().fit(X, y, groups) # create a list of covariate estimators for each group if not provided - self._list_estimators = [clone(self.estimator) for _ in range(self.n_groups)] + self._list_estimators = [clone(self.estimator) for _ in range(self._n_groups)] # Parallelize the fitting of the covariate estimators self._list_estimators = Parallel(n_jobs=self.n_jobs)( delayed(self._joblib_fit_one_group)(estimator, X, y, key_groups) - for key_groups, estimator in zip(self.groups.keys(), self._list_estimators) + for key_groups, estimator in zip( + self.features_groups.keys(), self._list_estimators + ) ) return self + def importance(self, X, y): + """ + Compute the importance scores for each group of covariates. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The input samples to compute importance scores for. + y : array-like of shape (n_samples,) + + Returns + ------- + importances_ : ndarray of shape (n_features,) + The importance scores for each group of covariates. + A higher score indicates greater importance of that group. + """ + super().importance(X, y) + self.pvalues_ = None + return self.importances_ + def _joblib_fit_one_group(self, estimator, X, y, key_groups): """Fit the estimator after removing a group of covariates. Used in parallel.""" if isinstance(X, pd.DataFrame): - X_minus_j = X.drop(columns=self.groups[key_groups]) + X_minus_j = X.drop(columns=self.features_groups[key_groups]) else: - X_minus_j = np.delete(X, self.groups[key_groups], axis=1) + X_minus_j = np.delete(X, self.features_groups[key_groups], axis=1) estimator.fit(X_minus_j, y) return estimator - def _joblib_predict_one_group(self, X, group_id, key_groups): + def _joblib_predict_one_group(self, X, group_id, group_key): """Predict the target variable after removing a group of covariates. Used in parallel.""" X_minus_j = np.delete(X, self._groups_ids[group_id], axis=1) @@ -107,7 +136,63 @@ def _check_fit(self, X): covariates.""" super()._check_fit(X) check_is_fitted(self.estimator) - if len(self._list_estimators) == 0: + if self._list_estimators is None: raise ValueError("The estimators require to be fit before to use them") for m in self._list_estimators: check_is_fitted(m) + + +def loco( + estimator, + X, + y, + cv=KFold(n_splits=5, shuffle=True, random_state=0), + groups: dict = None, + method: str = "predict", + loss: callable = root_mean_squared_error, + k_best=None, + percentile=None, + threshold=None, + threshold_pvalue=None, + n_jobs: int = 1, +): + methods = LOCO( + estimator=estimator, + method=method, + loss=loss, + n_jobs=n_jobs, + ) + methods.fit_importance( + X, + y, + cv=cv, + groups=groups, + ) + selection = methods.selection( + k_best=k_best, + percentile=percentile, + threshold=threshold, + threshold_pvalue=threshold_pvalue, + ) + return selection, methods.importances_, methods.pvalues_ + + +# use the docstring of the class for the function +loco.__doc__ = _aggregate_docstring( + [ + LOCO.__doc__, + LOCO.__init__.__doc__, + LOCO.fit_importance.__doc__, + LOCO.selection.__doc__, + ], + """ + Returns + ------- + selection : ndarray of shape (n_features,) + Boolean array indicating selected features (True = selected) + importances : ndarray of shape (n_features,) + Feature importance scores/test statistics. + pvalues : ndarray of shape (n_features,) + None because there is no p-value for this method + """, +) diff --git a/src/hidimstat/permutation_feature_importance.py b/src/hidimstat/permutation_feature_importance.py index 7fe72a78a..6d6af8673 100644 --- a/src/hidimstat/permutation_feature_importance.py +++ b/src/hidimstat/permutation_feature_importance.py @@ -1,57 +1,63 @@ import numpy as np from sklearn.metrics import root_mean_squared_error +from sklearn.model_selection import KFold from sklearn.utils import check_random_state from hidimstat.base_perturbation import BasePerturbation +from hidimstat._utils.docstring import _aggregate_docstring class PFI(BasePerturbation): + """ + Permutation Feature Importance algorithm + + This as presented in :footcite:t:`breimanRandomForests2001`. + For each variable/group of variables, the importance is computed as + the difference between the loss of the initial model and the loss of + the model with the variable/group permuted. + The method was also used in :footcite:t:`mi2021permutation` + + Parameters + ---------- + estimator : sklearn compatible estimator, optionals + The estimator to use for the prediction. + method : str, default="predict" + The method to use for the prediction. This determines the predictions passed + to the loss function. Supported methods are "predict", "predict_proba" or + "decision_function". + loss : callable, default=root_mean_squared_error + The loss function to use when comparing the perturbed model to the full + model. + n_permutations : int, default=50 + The number of permutations to perform. For each variable/group of variables, + the mean of the losses over the `n_permutations` is computed. + random_state : int, default=None + The random state to use for sampling. + n_jobs : int, default=1 + The number of jobs to run in parallel. Parallelization is done over the + variables or groups of variables. + + References + ---------- + .. footbibliography:: + """ + def __init__( self, estimator, - loss: callable = root_mean_squared_error, method: str = "predict", - n_jobs: int = 1, + loss: callable = root_mean_squared_error, n_permutations: int = 50, random_state: int = None, + n_jobs: int = 1, ): - """ - Permutation Feature Importance algorithm as presented in - :footcite:t:`breimanRandomForests2001`. For each variable/group of variables, - the importance is computed as the difference between the loss of the initial - model and the loss of the model with the variable/group permuted. - The method was also used in :footcite:t:`mi2021permutation` - - Parameters - ---------- - estimator : sklearn compatible estimator, optionals - The estimator to use for the prediction. - loss : callable, default=root_mean_squared_error - The loss function to use when comparing the perturbed model to the full - model. - method : str, default="predict" - The method to use for the prediction. This determines the predictions passed - to the loss function. Supported methods are "predict", "predict_proba" or - "decision_function". - n_jobs : int, default=1 - The number of jobs to run in parallel. Parallelization is done over the - variables or groups of variables. - n_permutations : int, default=50 - The number of permutations to perform. For each variable/group of variables, - the mean of the losses over the `n_permutations` is computed. - random_state : int, default=None - The random state to use for sampling. - References - ---------- - .. footbibliography:: - """ super().__init__( estimator=estimator, - loss=loss, method=method, - n_jobs=n_jobs, + loss=loss, n_permutations=n_permutations, + n_jobs=n_jobs, ) self.random_state = random_state @@ -65,3 +71,63 @@ def _permutation(self, X, group_id): ] ) return X_perm_j + + +def pfi( + estimator, + X, + y, + cv=KFold(n_splits=5, shuffle=True, random_state=0), + groups: dict = None, + method: str = "predict", + loss: callable = root_mean_squared_error, + n_permutations: int = 50, + k_best=None, + percentile=None, + threshold=None, + threshold_pvalue=None, + random_state: int = None, + n_jobs: int = 1, +): + methods = PFI( + estimator=estimator, + method=method, + loss=loss, + n_permutations=n_permutations, + random_state=random_state, + n_jobs=n_jobs, + ) + methods.fit_importance( + X, + y, + cv=cv, + groups=groups, + ) + selection = methods.selection( + k_best=k_best, + percentile=percentile, + threshold=threshold, + threshold_pvalue=threshold_pvalue, + ) + return selection, methods.importances_, methods.pvalues_ + + +# use the docstring of the class for the function +pfi.__doc__ = _aggregate_docstring( + [ + PFI.__doc__, + PFI.__init__.__doc__, + PFI.fit_importance.__doc__, + PFI.selection.__doc__, + ], + """ + Returns + ------- + selection : ndarray of shape (n_features,) + Boolean array indicating selected features (True = selected) + importances : ndarray of shape (n_features,) + Feature importance scores/test statistics. + pvalues : ndarray of shape (n_features,) + P-values for importance scores. + """, +) diff --git a/test/_utils/test_utils.py b/test/_utils/test_utils.py new file mode 100644 index 000000000..192e08149 --- /dev/null +++ b/test/_utils/test_utils.py @@ -0,0 +1,16 @@ +from hidimstat._utils.utils import get_generated_attributes + + +def test_generated_attributes(): + """Test function for getting generated attribute""" + + class MyClass: + def __init__(self): + self.attr1 = 1 + self.attr2_ = 2 + self._attr3 = 3 + self.attr4__ = 4 + self.attr5_ = 5 + + attributes = get_generated_attributes(MyClass()) + assert attributes == ["attr2_", "attr5_"] diff --git a/test/test_base_perturbation.py b/test/test_base_perturbation.py index dd3ff6d6c..d21cf1958 100644 --- a/test/test_base_perturbation.py +++ b/test/test_base_perturbation.py @@ -12,3 +12,16 @@ def test_no_implemented_methods(): basic_class = BasePerturbation(estimator=estimator) with pytest.raises(NotImplementedError): basic_class._permutation(X, group_id=None) + + +def test_chek_importance(): + """test that the methods are not implemented in the base class""" + X = np.random.randint(0, 2, size=(100, 2, 1)) + estimator = LinearRegression() + estimator.fit(X[:, 0], X[:, 1]) + basic_class = BasePerturbation(estimator=estimator) + basic_class.importances_ = [] + with pytest.raises( + ValueError, match="The importance method has not yet been called." + ): + basic_class.selection() diff --git a/test/test_conditional_feature_importance.py b/test/test_conditional_feature_importance.py index 568e82bdf..cbb348b10 100644 --- a/test/test_conditional_feature_importance.py +++ b/test/test_conditional_feature_importance.py @@ -7,8 +7,9 @@ from sklearn.metrics import log_loss from sklearn.model_selection import train_test_split from sklearn.metrics import root_mean_squared_error +from sklearn.linear_model import RidgeCV, LogisticRegressionCV -from hidimstat import CFI +from hidimstat import cfi, CFI from hidimstat.base_perturbation import BasePerturbation from hidimstat._utils.exception import InternalError @@ -67,8 +68,7 @@ def run_cfi(X, y, n_permutation, seed): var_type="auto", ) # calculate feature importance using the test set - vim = cfi.importance(X_test, y_test) - importance = vim["importance"] + importance = cfi.importance(X_test, y_test) return importance @@ -200,9 +200,8 @@ def test_group(data_generator): ) # Warning expected since column names in pandas are not considered with pytest.warns(UserWarning, match="X does not have valid feature names, but"): - vim = cfi.importance(X_test_df, y_test) + importance = cfi.importance(X_test_df, y_test) - importance = vim["importance"] # Check if importance scores are computed for each feature assert importance.shape == (2,) # Verify that important feature group has higher score @@ -249,8 +248,7 @@ def test_classication(data_generator): groups=None, var_type=["continuous"] * X.shape[1], ) - vim = cfi.importance(X_test, y_test_clf) - importance = vim["importance"] + importance = cfi.importance(X_test, y_test_clf) # Check that importance scores are defined for each feature assert importance.shape == (X.shape[1],) # Check that important features have higher mean importance scores @@ -282,8 +280,8 @@ def test_init(self, data_generator): assert cfi.loss == root_mean_squared_error assert cfi.method == "predict" assert cfi.categorical_max_cardinality == 10 - assert cfi.imputation_model_categorical is None - assert cfi.imputation_model_continuous is None + assert isinstance(cfi.imputation_model_categorical, LogisticRegressionCV) + assert isinstance(cfi.imputation_model_continuous, RidgeCV) def test_fit(self, data_generator): """Test fitting CFI""" @@ -298,13 +296,13 @@ def test_fit(self, data_generator): # Test fit with auto var_type cfi.fit(X) assert len(cfi._list_imputation_models) == X.shape[1] - assert cfi.n_groups == X.shape[1] + assert cfi._n_groups == X.shape[1] # Test fit with specified groups groups = {"g1": [0, 1], "g2": [2, 3, 4]} cfi.fit(X, groups=groups) assert len(cfi._list_imputation_models) == 2 - assert cfi.n_groups == 2 + assert cfi._n_groups == 2 def test_categorical( self, @@ -335,7 +333,7 @@ def test_categorical( var_type = ["continuous", "continuous", "categorical"] cfi.fit(X, y, var_type=var_type) - importances = cfi.importance(X, y)["importance"] + importances = cfi.importance(X, y) assert len(importances) == 3 assert np.all(importances >= 0) @@ -501,7 +499,7 @@ def test_internal_error(self, data_generator): ], } cfi.fit(X, groups=subgroups, var_type="auto") - cfi.groups["group1"] = [None for i in range(100)] + cfi.features_groups["group1"] = [None for i in range(100)] X = X.to_records(index=False) X = np.array(X, dtype=X.dtype.descr) @@ -569,3 +567,22 @@ def test_groups_warning(self, data_generator): " number of features for which importance is computed: 4", ): cfi.importance(X, y) + + +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + [(150, 200, 10, 0.2, 42, 1.0, 1.0, 0.0)], + ids=["high level noise"], +) +@pytest.mark.parametrize("n_permutation, cfi_seed", [(20, 0)], ids=["default_cfi"]) +def test_function_cfi(data_generator, n_permutation, cfi_seed): + """Test CFI function""" + X, y, _, _ = data_generator + cfi( + LinearRegression().fit(X, y), + X, + y, + imputation_model_continuous=LinearRegression(), + n_permutations=n_permutation, + random_state=cfi_seed, + ) diff --git a/test/test_leave_one_covariate_out.py b/test/test_leave_one_covariate_out.py index e0fd2c52b..d4b79be62 100644 --- a/test/test_leave_one_covariate_out.py +++ b/test/test_leave_one_covariate_out.py @@ -1,3 +1,176 @@ +# import numpy as np +# import pandas as pd +# import pytest +# from sklearn.exceptions import NotFittedError +# from sklearn.linear_model import LinearRegression, LogisticRegression +# from sklearn.metrics import log_loss +# from sklearn.model_selection import train_test_split +# from hidimstat._utils.scenario import multivariate_simulation + +# from hidimstat import loco, LOCO +# from hidimstat.base_perturbation import BasePerturbation + + +# def test_loco(): +# """Test the Leave-One-Covariate-Out algorithm on a linear scenario.""" +# X, y, beta, noise = multivariate_simulation( +# n_samples=150, +# n_features=200, +# support_size=10, +# shuffle=False, +# seed=42, +# ) +# important_features = np.where(beta != 0)[0] +# non_important_features = np.where(beta == 0)[0] + +# X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0) + +# regression_model = LinearRegression() +# regression_model.fit(X_train, y_train) + +# loco = LOCO( +# estimator=regression_model, +# method="predict", +# n_jobs=1, +# ) + +# loco.fit( +# X_train, +# y_train, +# groups=None, +# ) +# importance = loco.importance(X_test, y_test) + +# assert importance.shape == (X.shape[1],) +# assert ( +# importance[important_features].mean() +# > importance[non_important_features].mean() +# ) + +# # Same with groups and a pd.DataFrame +# groups = { +# "group_0": [f"col_{i}" for i in important_features], +# "the_group_1": [f"col_{i}" for i in non_important_features], +# } +# X_df = pd.DataFrame(X, columns=[f"col_{i}" for i in range(X.shape[1])]) +# X_train_df, X_test_df, y_train, y_test = train_test_split(X_df, y, random_state=0) +# regression_model.fit(X_train_df, y_train) +# loco = LOCO( +# estimator=regression_model, +# method="predict", +# n_jobs=1, +# ) +# loco.fit( +# X_train_df, +# y_train, +# groups=groups, +# ) +# # warnings because we doesn't considere the name of columns of pandas +# with pytest.warns(UserWarning, match="X does not have valid feature names, but"): +# importance = loco.importance(X_test_df, y_test) + +# assert importance[0].mean() > importance[1].mean() + +# # Classification case +# y_clf = np.where(y > np.median(y), 1, 0) +# _, _, y_train_clf, y_test_clf = train_test_split(X, y_clf, random_state=0) +# logistic_model = LogisticRegression() +# logistic_model.fit(X_train, y_train_clf) + +# loco_clf = LOCO( +# estimator=logistic_model, +# method="predict_proba", +# n_jobs=1, +# loss=log_loss, +# ) +# loco_clf.fit( +# X_train, +# y_train_clf, +# groups={"group_0": important_features, "the_group_1": non_important_features}, +# ) +# importance_clf = loco_clf.importance(X_test, y_test_clf) + +# assert importance_clf.shape == (2,) +# assert importance[0].mean() > importance[1].mean() + + +# def test_raises_value_error(): +# """Test for error when model does not have predict_proba or predict.""" +# X, y, beta, noise = multivariate_simulation( +# n_samples=150, +# n_features=200, +# support_size=10, +# shuffle=False, +# seed=42, +# ) +# # Not fitted estimator +# with pytest.raises(NotFittedError): +# loco = LOCO( +# estimator=LinearRegression(), +# method="predict", +# ) + +# # Not fitted sub-model when calling importance and predict +# with pytest.raises(ValueError, match="The class is not fitted."): +# fitted_model = LinearRegression().fit(X, y) +# loco = LOCO( +# estimator=fitted_model, +# method="predict", +# ) +# loco.predict(X) +# with pytest.raises(ValueError, match="The class is not fitted."): +# fitted_model = LinearRegression().fit(X, y) +# loco = LOCO( +# estimator=fitted_model, +# method="predict", +# ) +# loco.importance(X, y) + +# with pytest.raises( +# ValueError, match="The estimators require to be fit before to use them" +# ): +# fitted_model = LinearRegression().fit(X, y) +# loco = LOCO( +# estimator=fitted_model, +# method="predict", +# ) +# BasePerturbation.fit(loco, X, y) +# loco.importance(X, y) + + +# def test_loco_function(): +# """Test the function of LOCO algorithm on a linear scenario.""" +# X, y, beta, noise = multivariate_simulation( +# n_samples=150, +# n_features=200, +# support_size=10, +# shuffle=False, +# seed=42, +# ) +# important_features = np.where(beta != 0)[0] +# non_important_features = np.where(beta == 0)[0] + +# X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0) + +# regression_model = LinearRegression() +# regression_model.fit(X_train, y_train) + +# selection, importance, pvalue = loco( +# regression_model, +# X, +# y, +# method="predict", +# n_jobs=1, +# ) + +# assert importance.shape == (X.shape[1],) +# assert ( +# importance[important_features].mean() +# > importance[non_important_features].mean() +# ) + + +from copy import deepcopy import numpy as np import pandas as pd import pytest @@ -5,57 +178,179 @@ from sklearn.linear_model import LinearRegression, LogisticRegression from sklearn.metrics import log_loss from sklearn.model_selection import train_test_split -from hidimstat._utils.scenario import multivariate_simulation +from sklearn.metrics import root_mean_squared_error -from hidimstat import LOCO +from hidimstat import loco, LOCO from hidimstat.base_perturbation import BasePerturbation +from hidimstat._utils.exception import InternalError -def test_loco(): - """Test the Leave-One-Covariate-Out algorithm on a linear scenario.""" - X, y, beta, noise = multivariate_simulation( - n_samples=150, - n_features=200, - support_size=10, - shuffle=False, - seed=42, - ) - important_features = np.where(beta != 0)[0] - non_important_features = np.where(beta == 0)[0] - +def run_loco(X, y): + """ + Configure Leave One Covariate Out (LOCO) model with linear regression + for feature importance analysis. + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Input data matrix where each column represents a feature + and each row a sample. + y : array-like of shape (n_samples,) + Target variable array. + Returns + ------- + importance : array-like + Array containing importance scores for each feature. + Higher values indicate greater feature importance in predicting + the target variable. + Notes + ----- + The function performs the following steps: + 1. Splits data into training and test sets + 2. Fits a linear regression model on training data + 3. Configures LOCO with linear regression as both estimator and imputer + 4. Calculates feature importance using the test set + The LOCO method uses permutation-based importance scoring with linear + regression as the base model. + """ + # split the data into training and test sets X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0) + # create and fit a linear regression model on the training set regression_model = LinearRegression() regression_model.fit(X_train, y_train) + # instantiate LOCO model with linear regression imputer loco = LOCO( estimator=regression_model, method="predict", n_jobs=1, ) - + # fit the model using the training set loco.fit( X_train, y_train, groups=None, ) - vim = loco.importance(X_test, y_test) + # calculate feature importance using the test set + importance = loco.importance(X_test, y_test) + return importance + + +############################################################################## +## tests loco on different type of data +parameter_exact = [ + ("HiDim", 150, 200, 10, 0.0, 42, 1.0, np.inf, 0.0), + ("HiDim with correlated features", 150, 200, 10, 0.2, 42, 1.0, np.inf, 0.0), +] + - importance = vim["importance"] +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + zip(*(list(zip(*parameter_exact))[1:])), + ids=list(zip(*parameter_exact))[0], +) +def test_linear_data_exact(data_generator): + """Tests the method on linear cases with noise and correlation""" + X, y, important_features, _ = data_generator + importance = run_loco(X, y) + # check that importance scores are defined for each feature assert importance.shape == (X.shape[1],) + # check that important features have the highest importance scores + assert np.all([int(i) in important_features for i in np.argsort(importance)[-10:]]) + + +parameter_partial = [ + ("HiDim with noise", 150, 200, 10, 0.0, 42, 1.0, 10.0, 0.0), + ("HiDim with correlated noise", 150, 200, 10, 0.0, 42, 1.0, 10.0, 0.2), +] + + +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + zip(*(list(zip(*parameter_partial))[1:])), + ids=list(zip(*parameter_partial))[0], +) +def test_linear_data_partial(data_generator, rho): + """Tests the method on linear cases with noise and correlation""" + X, y, important_features, _ = data_generator + importance = run_loco(X, y) + # check that importance scores are defined for each feature + assert importance.shape == (X.shape[1],) + # check that important features have the highest importance scores + min_rank = 0 + importance_sort = np.flip(np.argsort(importance)) + for index in important_features: + rank = np.where(importance_sort == index)[0] + if rank > min_rank: + min_rank = rank + # accept missing ranking of 5 elements + assert min_rank < 15 + + +parameter_fail = [ + ("HiDim with correlated features and noise", 150, 200, 10, 0.2, 42, 1, 10, 0), + ( + "HiDim with correlated features and correlated noise", + 150, + 200, + 10, + 0.2, + 42, + 1.0, + 10, + 0.2, + ), + ("high level noise", 150, 200, 10, 0.2, 42, 1.0, 1.0, 0.0), +] + + +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + zip(*(list(zip(*parameter_fail))[1:])), + ids=list(zip(*parameter_fail))[0], +) +def test_linear_data_fail(data_generator): + """Tests when the method doesn't identify all important features""" + X, y, important_features, not_important_features = data_generator + importance = run_loco(X, y) + # check that importance is defined for each feature + assert importance.shape == (X.shape[1],) + # check that mean importance of important features is + # higher than mean importance of other features assert ( importance[important_features].mean() - > importance[non_important_features].mean() + > importance[not_important_features].mean() ) + # Verify that not all important features are detected + assert np.sum( + [int(i) in important_features for i in np.argsort(importance)[-10:]] + ) != len(important_features) + + +############################################################################## +## Test specific options of loco +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + [(150, 200, 10, 0.0, 42, 1.0, np.inf, 0.0)], + ids=["high dimension"], +) +def test_group(data_generator): + """Test LOCO with groups using pandas objects""" + X, y, important_features, not_important_features = data_generator - # Same with groups and a pd.DataFrame + # Create groups and convert to pandas DataFrame groups = { "group_0": [f"col_{i}" for i in important_features], - "the_group_1": [f"col_{i}" for i in non_important_features], + "the_group_1": [f"col_{i}" for i in not_important_features], } X_df = pd.DataFrame(X, columns=[f"col_{i}" for i in range(X.shape[1])]) + # Split data into training and test sets X_train_df, X_test_df, y_train, y_test = train_test_split(X_df, y, random_state=0) + + # Create and fit linear regression model on training set + regression_model = LinearRegression() regression_model.fit(X_train_df, y_train) + loco = LOCO( estimator=regression_model, method="predict", @@ -66,76 +361,314 @@ def test_loco(): y_train, groups=groups, ) - # warnings because we doesn't considere the name of columns of pandas + # Warning expected since column names in pandas are not considered with pytest.warns(UserWarning, match="X does not have valid feature names, but"): - vim = loco.importance(X_test_df, y_test) + importance = loco.importance(X_test_df, y_test) + + # Check if importance scores are computed for each feature + assert importance.shape == (2,) + # Verify that important feature group has higher score + # than non-important feature group + assert importance[0] > importance[1] - importance = vim["importance"] - assert importance[0].mean() > importance[1].mean() - # Classification case - y_clf = np.where(y > np.median(y), 1, 0) - _, _, y_train_clf, y_test_clf = train_test_split(X, y_clf, random_state=0) +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + zip(*(list(zip(*parameter_exact))[1:])), + ids=list(zip(*parameter_exact))[0], +) +def test_classication(data_generator): + """Test LOCO for a classification problem""" + X, y, important_features, not_important_features = data_generator + # Create categories + y_clf = deepcopy(y) + y_clf[np.where(y > 4)] = 0 + y_clf[np.where(np.logical_and(y <= 4, y > 0))] = 1 + y_clf[np.where(np.logical_and(y <= 0, y > -4))] = 2 + y_clf[np.where(y <= -4)] = 3 + y_clf = np.array(y_clf, dtype=int) + + # Split the data into training and test sets + X_train, X_test, y_train_clf, y_test_clf = train_test_split( + X, y_clf, random_state=0 + ) + + # Create and fit a logistic regression model on the training set logistic_model = LogisticRegression() logistic_model.fit(X_train, y_train_clf) - loco_clf = LOCO( + loco = LOCO( estimator=logistic_model, - method="predict_proba", n_jobs=1, + method="predict_proba", loss=log_loss, ) - loco_clf.fit( + loco.fit( X_train, y_train_clf, - groups={"group_0": important_features, "the_group_1": non_important_features}, + groups=None, + ) + importance = loco.importance(X_test, y_test_clf) + # Check that importance scores are defined for each feature + assert importance.shape == (X.shape[1],) + # Check that important features have higher mean importance scores + assert ( + importance[important_features].mean() + > importance[not_important_features].mean() ) - vim_clf = loco_clf.importance(X_test, y_test_clf) - importance_clf = vim_clf["importance"] - assert importance_clf.shape == (2,) - assert importance[0].mean() > importance[1].mean() +############################################################################## +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + [(150, 200, 10, 0.0, 42, 1.0, 0.0, 0.0)], + ids=["default data"], +) +class TestLOCOClass: + """Test the element of the class""" -def test_raises_value_error(): - """Test for error when model does not have predict_proba or predict.""" - X, y, beta, noise = multivariate_simulation( - n_samples=150, - n_features=200, - support_size=10, - shuffle=False, - seed=42, - ) - # Not fitted estimator - with pytest.raises(NotFittedError): + def test_init(self, data_generator): + """Test LOCO initialization""" + X, y, _, _ = data_generator + fitted_model = LinearRegression().fit(X, y) loco = LOCO( - estimator=LinearRegression(), + estimator=fitted_model, method="predict", ) + assert loco.n_jobs == 1 + assert loco.n_permutations == 1 + assert loco.loss == root_mean_squared_error + assert loco.method == "predict" - # Not fitted sub-model when calling importance and predict - with pytest.raises(ValueError, match="The class is not fitted."): + def test_fit(self, data_generator): + """Test fitting LOCO""" + X, y, _, _ = data_generator + fitted_model = LinearRegression().fit(X, y) + loco = LOCO(estimator=fitted_model) + + # Test fit with specified groups + groups = {"g1": [0, 1], "g2": [2, 3, 4]} + loco.fit(X, y, groups=groups) + assert loco._n_groups == 2 + + def test_categorical( + self, + n_samples, + n_features, + support_size, + rho, + seed, + value, + signal_noise_ratio, + rho_serial, + ): + """Test LOCO with categorical variables""" + rng = np.random.default_rng(seed) + X_cont = rng.random((n_samples, 2)) + X_cat = rng.integers(low=0, high=3, size=(n_samples, 1)) + X = np.hstack([X_cont, X_cat]) + y = rng.random((n_samples, 1)) + fitted_model = LinearRegression().fit(X, y) + + loco = LOCO(estimator=fitted_model) + + loco.fit(X, y) + + importances = loco.importance(X, y) + assert len(importances) == 3 + assert np.all(importances >= 0) + + +############################################################################## +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + [(150, 200, 10, 0.0, 42, 1.0, 0.0, 0.0)], + ids=["default data"], +) +class TestLOCOExceptions: + """Test class for LOCO exceptions""" + + def test_unfitted_estimator(self, data_generator): + """Test when using an unfitted estimator""" + with pytest.raises(NotFittedError): + LOCO( + estimator=LinearRegression(), + method="predict", + ) + + def test_unknown_predict_method(self, data_generator): + """Test when an unknown prediction method is provided""" + X, y, _, _ = data_generator + fitted_model = LinearRegression().fit(X, y) + + with pytest.raises(ValueError): + LOCO( + estimator=fitted_model, + method="unknown method", + ) + + def test_unfitted_predict(self, data_generator): + """Test predict method with unfitted model""" + X, y, _, _ = data_generator fitted_model = LinearRegression().fit(X, y) loco = LOCO( estimator=fitted_model, method="predict", ) - loco.predict(X) - with pytest.raises(ValueError, match="The class is not fitted."): + + with pytest.raises(ValueError, match="The class is not fitted."): + loco.predict(X) + + def test_unfitted_importance(self, data_generator): + """Test importance method with unfitted model""" + X, y, _, _ = data_generator fitted_model = LinearRegression().fit(X, y) loco = LOCO( estimator=fitted_model, method="predict", ) - loco.importance(X, y) - with pytest.raises( - ValueError, match="The estimators require to be fit before to use them" - ): + with pytest.raises(ValueError, match="The class is not fitted."): + loco.importance(X, y) + + def test_unfitted_base_perturbation(self, data_generator): + """Test base perturbation with unfitted estimators""" + X, y, _, _ = data_generator fitted_model = LinearRegression().fit(X, y) loco = LOCO( estimator=fitted_model, method="predict", ) BasePerturbation.fit(loco, X, y) - loco.importance(X, y) + + with pytest.raises( + ValueError, match="The estimators require to be fit before to use them" + ): + loco.importance(X, y) + + def test_not_good_type_X(self, data_generator): + """Test when X is wrong type""" + X, y, _, _ = data_generator + fitted_model = LinearRegression().fit(X, y) + loco = LOCO( + estimator=fitted_model, + method="predict", + ) + loco.fit(X, y, groups=None) + + with pytest.raises( + ValueError, match="X should be a pandas dataframe or a numpy array." + ): + loco.importance(X.tolist(), y) + + def test_mismatched_features(self, data_generator): + """Test when number of features doesn't match between fit and predict""" + X, y, _, _ = data_generator + fitted_model = LinearRegression().fit(X, y) + loco = LOCO( + estimator=fitted_model, + method="predict", + ) + loco.fit(X, y, groups=None) + + with pytest.raises( + AssertionError, match="X does not correspond to the fitting data." + ): + loco.importance(X[:, :-1], y) + + def test_mismatched_features_string(self, data_generator): + """Test when name of features doesn't match between fit and predict""" + X, y, _, _ = data_generator + X = pd.DataFrame({"col_" + str(i): X[:, i] for i in range(X.shape[1])}) + fitted_model = LinearRegression().fit(X, y) + loco = LOCO( + estimator=fitted_model, + method="predict", + ) + subgroups = { + "group1": ["col_" + str(i) for i in range(int(X.shape[1] / 2))], + "group2": [ + "col_" + str(i) for i in range(int(X.shape[1] / 2), X.shape[1] - 3) + ], + } + loco.fit(X, y, groups=subgroups) + + with pytest.raises( + AssertionError, + match=f"The array is missing at least one of the following columns \['col_100', 'col_101', 'col_102',", + ): + loco.importance( + X[np.concatenate([subgroups["group1"], subgroups["group2"][:-2]])], y + ) + + def test_internal_error(self, data_generator): + """Test when name of features doesn't match between fit and predict""" + X, y, _, _ = data_generator + X = pd.DataFrame({"col_" + str(i): X[:, i] for i in range(X.shape[1])}) + fitted_model = LinearRegression().fit(X, y) + loco = LOCO( + estimator=fitted_model, + method="predict", + ) + subgroups = { + "group1": ["col_" + str(i) for i in range(int(X.shape[1] / 2))], + "group2": [ + "col_" + str(i) for i in range(int(X.shape[1] / 2), X.shape[1] - 3) + ], + } + loco.fit(X, y, groups=subgroups) + loco.features_groups["group1"] = [None for i in range(100)] + + X = X.to_records(index=False) + X = np.array(X, dtype=X.dtype.descr) + with pytest.raises( + InternalError, + match=f"A problem with indexing has happened during the fit.", + ): + loco.importance(X, y) + + def test_invalid_groups_format(self, data_generator): + """Test when groups are provided in invalid format""" + X, y, _, _ = data_generator + fitted_model = LinearRegression().fit(X, y) + loco = LOCO(estimator=fitted_model, method="predict") + + invalid_groups = ["group1", "group2"] # Should be dictionary + with pytest.raises(ValueError, match="groups needs to be a dictionnary"): + loco.fit(X, y, groups=invalid_groups) + + def test_groups_warning(self, data_generator): + """Test if a subgroup raise a warning""" + X, y, _, _ = data_generator + fitted_model = LinearRegression().fit(X, y) + loco = LOCO( + estimator=fitted_model, + method="predict", + ) + subgroups = {"group1": [0, 1], "group2": [2, 3]} + loco.fit(X, y, groups=subgroups) + + with pytest.warns( + UserWarning, + match="The number of features in X: 200 differs from the" + " number of features for which importance is computed: 4", + ): + loco.importance(X, y) + + +@pytest.mark.parametrize( + "n_samples, n_features, support_size, rho, seed, value, signal_noise_ratio, rho_serial", + [(150, 200, 10, 0.0, 42, 1.0, np.inf, 0.0)], + ids=["HiDim"], +) +def test_function_loco(data_generator): + """Test LOCO function""" + X, y, important_features, _ = data_generator + selection, importance, pvalue = loco(LinearRegression().fit(X, y), X, y) + # check that importance scores are defined for each feature + assert importance.shape == (X.shape[1],) + # check that important features have the highest importance scores + assert np.all([int(i) in important_features for i in np.argsort(importance)[-10:]]) + + assert pvalue is None + assert np.all(selection[important_features]) diff --git a/test/test_permutation_feature_importance.py b/test/test_permutation_feature_importance.py index b9639f359..42b1e93fe 100644 --- a/test/test_permutation_feature_importance.py +++ b/test/test_permutation_feature_importance.py @@ -5,7 +5,7 @@ from sklearn.model_selection import train_test_split import pytest -from hidimstat import PFI +from hidimstat import PFI, pfi from hidimstat._utils.scenario import multivariate_simulation @@ -39,9 +39,8 @@ def test_permutation_importance(): y_train, groups=None, ) - vim = pfi.importance(X_test, y_test) + importance = pfi.importance(X_test, y_test) - importance = vim["importance"] assert importance.shape == (X.shape[1],) assert ( importance[important_features].mean() @@ -70,9 +69,8 @@ def test_permutation_importance(): ) # warnings because we doesn't considere the name of columns of pandas with pytest.warns(UserWarning, match="X does not have valid feature names, but"): - vim = pfi.importance(X_test_df, y_test) + importance = pfi.importance(X_test_df, y_test) - importance = vim["importance"] assert importance[0].mean() > importance[1].mean() # Classification case @@ -95,7 +93,38 @@ def test_permutation_importance(): y_train_clf, groups=None, ) - vim_clf = pfi_clf.importance(X_test, y_test_clf) + importance_clf = pfi_clf.importance(X_test, y_test_clf) - importance_clf = vim_clf["importance"] assert importance_clf.shape == (X.shape[1],) + + +def test_permutation_importance_function(): + """Test the function of Permutation Importance algorithm on a linear scenario.""" + X, y, beta, noise = multivariate_simulation( + n_samples=150, + n_features=200, + support_size=10, + shuffle=False, + seed=42, + ) + important_features = beta != 0 + + X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0) + + regression_model = LinearRegression() + regression_model.fit(X_train, y_train) + + selection, importance, pvalue = pfi( + regression_model, + X, + y, + n_permutations=20, + method="predict", + random_state=0, + n_jobs=1, + ) + + assert importance.shape == (X.shape[1],) + assert ( + importance[important_features].mean() > importance[~important_features].mean() + )