diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index d1163d10..a1f31f0c 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -33,7 +33,7 @@ jobs: name: Cache venv with: path: ./.venv - key: ${{ matrix.os }}-venv-${{ hashFiles('**/uv.lock') }} + key: ${{ matrix.os }}-venv-${{ matrix.python-version }}-${{ hashFiles('**/uv.lock') }} - uses: actions/cache@v4 name: Cache datasets @@ -42,7 +42,8 @@ jobs: key: datasets - name: Install the project dependencies - run: uv sync --group test + run: uv sync --group test --python "$(python -c 'import sys; print(sys.executable)')" + shell: bash - name: Check pre-commit run: uv run pre-commit run --all-files diff --git a/docs/modules/applicability_domain.rst b/docs/modules/applicability_domain.rst index 9ab064b4..139d7370 100644 --- a/docs/modules/applicability_domain.rst +++ b/docs/modules/applicability_domain.rst @@ -21,5 +21,7 @@ Classes for checking applicability domain. KNNADChecker LeverageADChecker PCABoundingBoxADChecker + ProbStdADChecker ResponseVariableRangeADChecker StandardDeviationADChecker + TOPKATADChecker diff --git a/skfp/applicability_domain/__init__.py b/skfp/applicability_domain/__init__.py index 6deef192..e01788d6 100644 --- a/skfp/applicability_domain/__init__.py +++ b/skfp/applicability_domain/__init__.py @@ -5,5 +5,7 @@ from .knn import KNNADChecker from .leverage import LeverageADChecker from .pca_bounding_box import PCABoundingBoxADChecker +from .prob_std import ProbStdADChecker from .response_variable_range import ResponseVariableRangeADChecker from .standard_deviation import StandardDeviationADChecker +from .topkat import TOPKATADChecker diff --git a/skfp/applicability_domain/prob_std.py b/skfp/applicability_domain/prob_std.py new file mode 100644 index 00000000..5e32ae73 --- /dev/null +++ b/skfp/applicability_domain/prob_std.py @@ -0,0 +1,182 @@ +from numbers import Real + +import numpy as np +from scipy.stats import norm +from sklearn.base import is_classifier +from sklearn.ensemble import RandomForestRegressor +from sklearn.utils._param_validation import Interval, InvalidParameterError +from sklearn.utils.validation import check_is_fitted, validate_data + +from skfp.bases.base_ad_checker import BaseADChecker + + +class ProbStdADChecker(BaseADChecker): + """ + Probabilistic standard deviation method (PROB-STD). + + Defines applicability domain based on the probabilistic interpretation of prediction + uncertainty from individual estimators in an ensemble model [1]_. For each sample, + the mean and standard deviation of the ensemble predictions are used to construct + a normal distribution. The score is defined as the probability mass under this + distribution that lies on the wrong side of the classification threshold (0.5). + + This approach supports both regression models (using ``.predict(X)``) and binary classifiers + (using ``.predict_proba(X)`` and the probability of the positive class). For regression models, + the outputs should be interpretable as positive-class probabilities in [0, 1], e.g. when the + regressor is trained on binary targets. The ensemble model must expose the ``estimators_`` + attribute. If no model is provided, a default :class:`~sklearn.ensemble.RandomForestRegressor` + is created and trained during :meth:`fit`. + + At prediction time, each sample is passed to all estimators, and their predictions + (or predicted probabilities for classifiers) are used to construct the distribution. + The sample is considered in-domain if the resulting probability of misclassification + (PROB-STD) is lower than or equal to the specified threshold. + + Parameters + ---------- + model : object, default=None + Fitted ensemble model with accessible ``estimators_`` attribute and + either ``.predict(X)`` or ``.predict_proba(X)`` method on each sub-estimator. + If not provided, a default :class:`~sklearn.ensemble.RandomForestRegressor` will + be created. Note that if you pass a fitted model here, call to :meth:`fit` is + not necessary, but it will perform model validation. + + threshold : float, default=0.1 + Maximum allowed probability of incorrect class assignment. + Lower values yield a stricter applicability domain. + + n_jobs : int, default=None + The number of jobs to run in parallel. :meth:`transform_x_y` and + :meth:`transform` are parallelized over the input molecules. ``None`` means 1 + unless in a :obj:`joblib.parallel_backend` context. ``-1`` means using all + processors. See scikit-learn documentation on ``n_jobs`` for more details. + + verbose : int or dict, default=0 + Controls the verbosity when filtering molecules. + If a dictionary is passed, it is treated as kwargs for ``tqdm()``, + and can be used to control the progress bar. + + References + ---------- + .. [1] `Klingspohn, W., Mathea, M., ter Laak, A. et al. + "Efficiency of different measures for defining the applicability + domain of classification models." + Journal of Cheminformatics 9, 44 (2017). + `_ + + Examples + -------- + >>> import numpy as np + >>> from sklearn.ensemble import RandomForestRegressor + >>> from skfp.applicability_domain import ProbStdADChecker + >>> X_train = np.random.uniform(0, 1, size=(1000, 5)) + >>> y_train = (X_train[:, 0] + X_train[:, 1] > 1).astype(float) + >>> model = RandomForestRegressor(n_estimators=10, random_state=0) + >>> model.fit(X_train, y_train) + >>> probstd_ad_checker = ProbStdADChecker(model=model, threshold=0.1) + >>> probstd_ad_checker.fit() + >>> probstd_ad_checker + ProbStdADChecker(model=RandomForestRegressor(...), threshold=0.1) + + >>> X_test = np.random.uniform(0, 1, size=(100, 5)) + >>> probstd_ad_checker.predict(X_test).shape + (100,) + """ + + _parameter_constraints: dict = { + **BaseADChecker._parameter_constraints, + "model": [object, None], + "threshold": [Interval(Real, 0, 0.5, closed="left")], + } + + def __init__( + self, + model: object | None = None, + threshold: float = 0.1, + n_jobs: int | None = None, + verbose: int | dict = 0, + ): + super().__init__( + n_jobs=n_jobs, + verbose=verbose, + ) + self.model = model + self.threshold = threshold + + def _validate_params(self): + super()._validate_params() + + if self.model is not None and is_classifier(self.model): + check_is_fitted(self.model, "classes_") + + if not hasattr(self.model, "predict_proba"): + raise InvalidParameterError( + f"{self.__class__.__name__} requires classifiers with .predict_proba() method" + ) + + if len(getattr(self.model, "classes_", [])) != 2: + raise InvalidParameterError( + f"{self.__class__.__name__} only supports binary classifiers" + ) + + def fit( # noqa: D102 + self, + X: np.ndarray | None = None, + y: np.ndarray | None = None, + ): + self._validate_params() + + if self.model is None: + X, y = validate_data(self, X, y, ensure_2d=False) + self.model_ = RandomForestRegressor(random_state=0) + self.model_.fit(X, y) # type: ignore[union-attr] + else: + self.model_ = self.model + + return self + + def predict(self, X: np.ndarray) -> np.ndarray: # noqa: D102 + prob_std = self._compute_prob_std(X) + return prob_std <= self.threshold + + def score_samples(self, X: np.ndarray) -> np.ndarray: + """ + Calculate the applicability domain score of samples. + It is defined as the minimum probabilistic mass under the normal distribution + that lies on either side of the classification threshold (0.5). + Lower values indicate higher confidence in class assignment. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The data matrix. + + Returns + ------- + scores : ndarray of shape (n_samples,) + Probabilistic scores reflecting the uncertainty of class assignment. + """ + return self._compute_prob_std(X) + + def _compute_prob_std(self, X: np.ndarray) -> np.ndarray: + X = validate_data(self, X=X, reset=False) + if self.model is not None and not hasattr(self, "model_"): + self.model_ = self.model + check_is_fitted(self.model_, "estimators_") + self._validate_params() + + if is_classifier(self.model_): + preds = np.array([est.predict_proba(X) for est in self.model_.estimators_]) + preds = preds[:, :, 1] # shape: (n_estimators, n_samples) + else: + preds = np.array([est.predict(X) for est in self.model_.estimators_]) + + preds = preds.T # shape: (n_samples, n_estimators) + + y_mean = preds.mean(axis=1) + y_std = preds.std(axis=1) + y_std = np.maximum(y_std, 1e-8) + + left_tail = norm.cdf(0.5, loc=y_mean, scale=y_std) + prob_std = np.minimum(left_tail, 1 - left_tail) + return prob_std diff --git a/skfp/applicability_domain/topkat.py b/skfp/applicability_domain/topkat.py new file mode 100644 index 00000000..88c409a9 --- /dev/null +++ b/skfp/applicability_domain/topkat.py @@ -0,0 +1,160 @@ +from numbers import Real + +import numpy as np +from sklearn.utils._param_validation import Interval +from sklearn.utils.validation import validate_data + +from skfp.bases.base_ad_checker import BaseADChecker + + +class TOPKATADChecker(BaseADChecker): + """ + TOPKAT (Optimal Prediction Space) method. + + Defines applicability domain using the Optimal Prediction Space (OPS) approach + introduced in the TOPKAT system [1]_. The method transforms the input feature space + into a normalized, centered and rotated space using PCA on a scaled [-1, 1] version + of the training data (S-space). Each new sample is projected into the same OPS space, + and a weighted distance (dOPS) from the center is computed. + + Samples are considered in-domain if their dOPS is below a threshold. By default, + this threshold is computed as :math:`5 * D / (2 * N)`, where: + - :math:``D`` is the number of input features, + - :math:``N`` is the number of training samples. + + This method captures both the variance and correlation structure of the descriptors, + and performs well for detecting global outliers in descriptor space. + + Parameters + ---------- + threshold : float, default=None + Optional user-defined threshold for dOPS. If provided, overrides the default + analytical threshold :math:`5 * D / (2 * N)`. Lower values produce stricter domains. + + n_jobs : int, default=None + The number of jobs to run in parallel. :meth:`transform_x_y` and + :meth:`transform` are parallelized over the input molecules. ``None`` means 1 + unless in a :obj:`joblib.parallel_backend` context. ``-1`` means using all + processors. See scikit-learn documentation on ``n_jobs`` for more details. + + verbose : int or dict, default=0 + Controls the verbosity when filtering molecules. + If a dictionary is passed, it is treated as kwargs for ``tqdm()``, + and can be used to control the progress bar. + + References + ---------- + .. [1] Gombar, V. K. + `"Method and apparatus for validation of model-based predictions." + U.S. Patent No. 6,036,349. Washington, DC: U.S. Patent and Trademark Office. + `_ + + Examples + -------- + >>> import numpy as np + >>> from skfp.applicability_domain import TOPKATADChecker + >>> from sklearn.datasets import make_blobs + >>> X_train, _ = make_blobs(n_samples=100, centers=2, n_features=5, random_state=0) + >>> X_test = X_train[:5] + + >>> topkat_ad_checker = TOPKATADChecker() + >>> topkat_ad_checker.fit(X_train) + TOPKATADChecker() + + >>> topkat_ad_checker.predict(X_test) + array([ True, True, True, True, True]) + """ + + _parameter_constraints: dict = { + **BaseADChecker._parameter_constraints, + "threshold": [Interval(Real, 0, None, closed="left"), None], + } + + def __init__( + self, + threshold: float | None = None, + n_jobs: int | None = None, + verbose: int | dict = 0, + ): + super().__init__( + n_jobs=n_jobs, + verbose=verbose, + ) + self.threshold = threshold + + def fit( # noqa: D102 + self, + X: np.ndarray, + y: np.ndarray | None = None, # noqa: ARG002 + ): + X = validate_data(self, X=X) + + self.X_min_ = X.min(axis=0) + self.X_max_ = X.max(axis=0) + self.range_ = self.X_max_ - self.X_min_ + self.num_points = X.shape[0] + self.num_dims = X.shape[1] + + # TOPKAT S-space: feature-wise scaling of X to [-1, 1]. + # Avoid division by zero: where range==0, denom=1 => scaled value will be 0. + self.denom_ = np.where((self.range_) != 0, (self.range_), 1.0) + S = (2 * X - self.X_max_ - self.X_min_) / self.denom_ + + # Augment with bias (1) so the subsequent rotation (PCA) includes the intercept term. + S_bias = np.c_[np.ones(S.shape[0]), S] + + cov_matrix = S_bias.T @ S_bias + eigvals, eigvecs = np.linalg.eigh(cov_matrix) + self.eigen_val = np.real(eigvals) + self.eigen_vec = np.real(eigvecs) + + return self + + def predict(self, X: np.ndarray) -> np.ndarray: # noqa: D102 + dOPS = self._compute_dops(X) + + threshold = self.threshold + if threshold is None: + threshold = (5 * self.num_dims) / (2 * self.num_points) + + return dOPS < threshold + + def score_samples(self, X: np.ndarray) -> np.ndarray: + """ + Calculate the applicability domain score of samples. + It is defined as the weighted distance (dOPS) from + the center of the training data in the OPS-transformed space. + Lower values indicate higher similarity to the training data. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The data matrix. + + Returns + ------- + scores : ndarray of shape (n_samples,) + Distance scores from the center of the Optimal Prediction Space (dOPS). + """ + return self._compute_dops(X) + + def _compute_dops(self, X: np.ndarray) -> np.ndarray: + X = validate_data(self, X=X, reset=False) + + # Apply the same S-space transform as in fit(). + Ssample = (2 * X - self.X_max_ - self.X_min_) / self.denom_ + + # Add bias term to match the augmented space used to compute eigenvectors. + Ssample_bias = np.c_[np.ones(Ssample.shape[0]), Ssample] + + # Project to OPS space + OPS_sample = Ssample_bias @ self.eigen_vec + + inv_eigval = np.divide( + 1.0, + self.eigen_val, + out=np.zeros_like(self.eigen_val), + where=self.eigen_val != 0, + ) + dOPS = np.sum((OPS_sample**2) * inv_eigval, axis=1) + return dOPS diff --git a/tests/applicability_domain/prob_std.py b/tests/applicability_domain/prob_std.py new file mode 100644 index 00000000..5745a2f2 --- /dev/null +++ b/tests/applicability_domain/prob_std.py @@ -0,0 +1,164 @@ +import numpy as np +import pytest +from sklearn.datasets import ( + make_blobs, + make_classification, + make_multilabel_classification, + make_regression, +) +from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor +from sklearn.model_selection import train_test_split +from sklearn.svm import LinearSVC +from sklearn.utils._param_validation import InvalidParameterError + +from skfp.applicability_domain import ProbStdADChecker + + +def test_inside_probstd_ad(): + X, y = make_blobs( + n_samples=1000, centers=2, n_features=2, cluster_std=1.0, random_state=42 + ) + y = y.astype(float) + + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.1, random_state=42 + ) + + model = RandomForestRegressor(n_estimators=10, random_state=42) + model.fit(X_train, y_train) + + ad_checker = ProbStdADChecker(model=model, threshold=0.2) + ad_checker.fit() + + scores = ad_checker.score_samples(X_test) + assert np.all(scores >= 0) + assert np.all(scores <= 0.5) + + preds = ad_checker.predict(X_test) + assert isinstance(preds, np.ndarray) + assert np.issubdtype(preds.dtype, np.bool_) + assert preds.shape == (len(X_test),) + + assert np.all(preds == 1) + + +def test_outside_probstd_ad(): + np.random.seed(42) + X = np.random.uniform(low=-5, high=5, size=(1000, 5)) + y = np.random.choice([0.0, 1.0], size=1000) + + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.1, random_state=42 + ) + + model = RandomForestRegressor(n_estimators=50, random_state=42) + model.fit(X_train, y_train) + + ad_checker = ProbStdADChecker(model=model, threshold=0.05) + ad_checker.fit() + + scores = ad_checker.score_samples(X_test) + assert np.all(scores >= 0) + assert np.all(scores <= 0.5) + + preds = ad_checker.predict(X_test) + assert isinstance(preds, np.ndarray) + assert np.issubdtype(preds.dtype, np.bool_) + assert preds.shape == (len(X_test),) + + assert np.all(preds == 0) + + +def test_probstd_ad_with_default_model(): + X = np.random.uniform(size=(100, 5)) + y = X.sum(axis=1) + + ad_checker = ProbStdADChecker(threshold=0.2) + ad_checker.fit(X, y) + + scores = ad_checker.score_samples(X) + assert np.all(scores >= 0) + assert np.all(scores <= 0.5) + + preds = ad_checker.predict(X) + assert isinstance(preds, np.ndarray) + assert np.isdtype(preds.dtype, np.bool) + assert preds.shape == (len(X),) + + +def test_ptobstd_ad_checker_with_classifier(): + X, y = make_classification( + n_samples=100, + n_features=5, + n_classes=2, + n_clusters_per_class=1, + random_state=42, + ) + + model = RandomForestClassifier(n_estimators=5, random_state=42) + model.fit(X, y) + + ad_checker = ProbStdADChecker(model=model, threshold=0.2) + ad_checker.fit() + + scores = ad_checker.score_samples(X) + assert scores.shape == (len(X),) + assert np.all(scores >= 0) + assert np.all(scores <= 0.5) + + preds = ad_checker.predict(X) + assert isinstance(preds, np.ndarray) + assert np.isdtype(preds.dtype, np.bool) + assert preds.shape == (len(X),) + + +def test_probstd_fit(): + # smoke test, should not throw errors + X_train, y_train = make_regression( + n_samples=100, n_features=2, noise=0.1, random_state=42 + ) + + rf = RandomForestRegressor(n_estimators=5, random_state=42) + rf.fit(X_train, y_train) + + ad_checker = ProbStdADChecker(model=rf) + ad_checker.fit(None, None) + + +def test_probstd_ad_checker_raise_error_on_multilabel(): + X, y = make_multilabel_classification( + n_samples=50, n_features=5, n_classes=3, random_state=42 + ) + model = RandomForestClassifier(n_estimators=5, random_state=42) + model.fit(X, y) + + ad_checker = ProbStdADChecker(model=model) + + with pytest.raises(InvalidParameterError, match="only supports binary classifiers"): + ad_checker.fit() + + +def test_probstd_ad_checker_raise_error_on_multiclass(): + X, y = make_classification( + n_samples=50, n_features=5, n_classes=3, n_clusters_per_class=1, random_state=42 + ) + model = RandomForestClassifier(n_estimators=5, random_state=42) + model.fit(X, y) + + ad_checker = ProbStdADChecker(model=model) + + with pytest.raises(InvalidParameterError, match="only supports binary classifiers"): + ad_checker.fit() + + +def test_probstd_ad_checker_raise_error_no_predict_proba(): + X, y = make_classification(n_samples=50, n_features=5, n_classes=2, random_state=42) + model = LinearSVC() + model.fit(X, y) + + ad_checker = ProbStdADChecker(model=model) + + with pytest.raises(InvalidParameterError) as exc_info: + ad_checker.fit() + + assert "requires classifiers with .predict_proba() method" in str(exc_info) diff --git a/tests/applicability_domain/topkat.py b/tests/applicability_domain/topkat.py new file mode 100644 index 00000000..55dc3976 --- /dev/null +++ b/tests/applicability_domain/topkat.py @@ -0,0 +1,44 @@ +import numpy as np + +from skfp.applicability_domain import TOPKATADChecker +from tests.applicability_domain.utils import get_data_inside_ad, get_data_outside_ad + + +def test_inside_topkat_ad(): + X_train, X_test = get_data_inside_ad() + + ad_checker = TOPKATADChecker() + ad_checker.fit(X_train) + + scores = ad_checker.score_samples(X_test) + assert np.all(scores >= 0) + + preds = ad_checker.predict(X_test) + assert isinstance(preds, np.ndarray) + assert np.isdtype(preds.dtype, np.bool) + assert preds.shape == (len(X_test),) + assert np.all(preds == 1) + + +def test_outside_topkat_ad(): + X_train, X_test = get_data_outside_ad() + + ad_checker = TOPKATADChecker() + ad_checker.fit(X_train) + + scores = ad_checker.score_samples(X_test) + assert np.all(scores >= 0) + + preds = ad_checker.predict(X_test) + assert isinstance(preds, np.ndarray) + assert np.isdtype(preds.dtype, np.bool) + assert preds.shape == (len(X_test),) + assert np.all(preds == 0) + + +def test_topkat_pass_y_train(): + # smoke test, should not throw errors + X_train = np.vstack((np.zeros((10, 5)), np.ones((10, 5)))) + y_train = np.zeros(10) + ad_checker = TOPKATADChecker() + ad_checker.fit(X_train, y_train)