-
Notifications
You must be signed in to change notification settings - Fork 25
Add TOPKAT and PROB-STD methods with tests #480
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
965da97
7c27fd3
7b423ba
ffa6656
714fec2
37a743f
2f4d79e
a2430de
ace45e6
333323e
240150e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,139 @@ | ||
| from numbers import Real | ||
|
|
||
| import numpy as np | ||
| from scipy.stats import norm | ||
| from sklearn.utils._param_validation import Interval | ||
| from sklearn.utils.validation import validate_data | ||
|
|
||
| from skfp.bases.base_ad_checker import BaseADChecker | ||
|
|
||
|
|
||
| class ProbStdADChecker(BaseADChecker): | ||
| """ | ||
| Probabilistic standard deviation method (PROB-STD). | ||
|
|
||
| Defines applicability domain based on the probabilistic interpretation of prediction | ||
| uncertainty from individual estimators in an ensemble model [1]_. For each sample, | ||
| the mean and standard deviation of the ensemble predictions are used to construct | ||
| a normal distribution. The score is defined as the probability mass under this | ||
| distribution that lies on the wrong side of the classification threshold (0.5). | ||
|
|
||
| This approach requires a fitted ensemble model exposing the ``estimators_`` | ||
| attribute (e.g., RandomForestRegressor or BaggingRegressor), where each | ||
| sub-model implements a ``.predict(X)`` method returning continuous outputs. | ||
| At prediction time, each sample is passed to all estimators, and their predictions | ||
| are used to compute a normal distribution. The sample is considered in-domain if | ||
| the resulting probability of misclassification (PROB-STD) is lower than or equal | ||
| to the specified threshold. | ||
|
|
||
| This method is specifically designed for binary classification with continuous | ||
| predictions around the 0.5 decision threshold, typically from regressors trained | ||
| on binary targets (e.g., 0.0 and 1.0). | ||
|
|
||
| Parameters | ||
| ---------- | ||
| model : object | ||
| Fitted ensemble model with accessible ``estimators_`` attribute and | ||
| ``.predict(X)`` method on each sub-estimator. | ||
|
|
||
| threshold : float, default=0.2 | ||
|
||
| Maximum allowed probability of incorrect class assignment. | ||
| Lower values yield a stricter applicability domain. | ||
|
|
||
| n_jobs : int, default=None | ||
| The number of jobs to run in parallel. :meth:`transform_x_y` and | ||
| :meth:`transform` are parallelized over the input molecules. ``None`` means 1 | ||
| unless in a :obj:`joblib.parallel_backend` context. ``-1`` means using all | ||
| processors. See scikit-learn documentation on ``n_jobs`` for more details. | ||
|
|
||
| verbose : int or dict, default=0 | ||
| Controls the verbosity when filtering molecules. | ||
| If a dictionary is passed, it is treated as kwargs for ``tqdm()``, | ||
| and can be used to control the progress bar. | ||
|
|
||
| References | ||
| ---------- | ||
| .. [1] `Klingspohn, W., Mathea, M., ter Laak, A. et al. | ||
| Efficiency of different measures for defining the applicability | ||
|
||
| domain of classification models. J Cheminform 9, 44 (2017). | ||
| <https://doi.org/10.1186/s13321-017-0230-2>`_ | ||
|
|
||
| Examples | ||
| -------- | ||
| >>> import numpy as np | ||
| >>> from sklearn.ensemble import RandomForestRegressor | ||
| >>> from skfp.applicability_domain import ProbStdADChecker | ||
| >>> X_train = np.random.uniform(0, 1, size=(1000, 5)) | ||
| >>> y_train = (X_train[:, 0] + X_train[:, 1] > 1).astype(float) | ||
| >>> model = RandomForestRegressor(n_estimators=10, random_state=0) | ||
| >>> model.fit(X_train, y_train) | ||
| >>> probstd_ad_checker = ProbStdADChecker(model=model, threshold=0.1) | ||
| >>> probstd_ad_checker | ||
| ProbStdADChecker(model=RandomForestRegressor(...), threshold=0.1) | ||
|
|
||
| >>> X_test = np.random.uniform(0, 1, size=(100, 5)) | ||
| >>> probstd_ad_checker.predict(X_test).shape | ||
| (100,) | ||
| """ | ||
|
|
||
| _parameter_constraints: dict = { | ||
| **BaseADChecker._parameter_constraints, | ||
| "model": [object], | ||
| "threshold": [Interval(Real, 0, None, closed="left")], | ||
| } | ||
|
|
||
| def __init__( | ||
| self, | ||
| model, | ||
| threshold: float = 0.2, | ||
| n_jobs: int | None = None, | ||
| verbose: int | dict = 0, | ||
| ): | ||
| super().__init__( | ||
| n_jobs=n_jobs, | ||
| verbose=verbose, | ||
| ) | ||
| self.model = model | ||
| self.threshold = threshold | ||
|
|
||
| def fit( # noqa: D102 | ||
| self, | ||
| X: np.ndarray, # noqa: ARG002 | ||
| y: np.ndarray | None = None, # noqa: ARG002 | ||
| ): | ||
| return self | ||
|
|
||
| def predict(self, X: np.ndarray) -> np.ndarray: # noqa: D102 | ||
| prob_std = self._compute_prob_std(X) | ||
| return prob_std <= self.threshold | ||
|
|
||
| def score_samples(self, X: np.ndarray) -> np.ndarray: | ||
| """ | ||
| Calculate the applicability domain score of samples. | ||
| It is defined as the minimum probabilistic mass under the normal distribution | ||
| that lies on either side of the classification threshold (0.5). | ||
| Lower values indicate higher confidence in class assignment. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| X : array-like of shape (n_samples, n_features) | ||
| The data matrix. | ||
|
|
||
| Returns | ||
| ------- | ||
| scores : ndarray of shape (n_samples,) | ||
| Probabilistic scores reflecting the uncertainty of class assignment. | ||
| """ | ||
| return self._compute_prob_std(X) | ||
|
|
||
| def _compute_prob_std(self, X: np.ndarray) -> np.ndarray: | ||
| X = validate_data(self, X=X, reset=False) | ||
|
|
||
| preds = np.array([est.predict(X) for est in self.model.estimators_]).T | ||
|
||
| y_mean = preds.mean(axis=1) | ||
| y_std = preds.std(axis=1) | ||
| y_std = np.maximum(y_std, 1e-8) | ||
|
|
||
| left_tail = norm.cdf(0.5, loc=y_mean, scale=y_std) | ||
| prob_std = np.minimum(left_tail, 1 - left_tail) | ||
| return prob_std | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,152 @@ | ||
| from numbers import Real | ||
|
|
||
| import numpy as np | ||
| from sklearn.utils._param_validation import Interval | ||
| from sklearn.utils.validation import validate_data | ||
|
|
||
| from skfp.bases.base_ad_checker import BaseADChecker | ||
|
|
||
|
|
||
| class TopKatADChecker(BaseADChecker): | ||
|
||
| """ | ||
| TOPKAT (Optimal Prediction Space) method. | ||
|
|
||
| Defines applicability domain using the Optimal Prediction Space (OPS) approach | ||
| introduced in the TOPKAT system [1]_. The method transforms the input feature space | ||
| into a normalized, centered and rotated space using PCA on a scaled [-1, 1] version | ||
| of the training data (S-space). Each new sample is projected into the same OPS space, | ||
| and a weighted distance (dOPS) from the center is computed. | ||
|
|
||
| Samples are considered in-domain if their dOPS is below a threshold. By default, | ||
| this threshold is computed as ``5 * D / (2 * N)``, where: | ||
|
||
| - ``D`` is the number of input features, | ||
| - ``N`` is the number of training samples. | ||
|
||
|
|
||
| This method captures both the variance and correlation structure of the descriptors, | ||
| and performs well for detecting global outliers in descriptor space. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| threshold : float, default=None | ||
| Optional user-defined threshold for dOPS. If provided, overrides the default | ||
| analytical threshold ``5 * D / (2 * N)``. Lower values produce stricter domains. | ||
|
|
||
| n_jobs : int, default=None | ||
| The number of jobs to run in parallel. :meth:`transform_x_y` and | ||
| :meth:`transform` are parallelized over the input molecules. ``None`` means 1 | ||
| unless in a :obj:`joblib.parallel_backend` context. ``-1`` means using all | ||
| processors. See scikit-learn documentation on ``n_jobs`` for more details. | ||
|
|
||
| verbose : int or dict, default=0 | ||
| Controls the verbosity when filtering molecules. | ||
| If a dictionary is passed, it is treated as kwargs for ``tqdm()``, | ||
| and can be used to control the progress bar. | ||
|
|
||
| References | ||
| ---------- | ||
| .. [1] Gombar, V. K. (1996). | ||
| Method and apparatus for validation of model-based predictions. | ||
| U.S. Patent No. 6,036,349. Washington, DC: U.S. Patent and Trademark Office. | ||
|
||
|
|
||
| Examples | ||
| -------- | ||
| >>> import numpy as np | ||
| >>> from skfp.applicability_domain import TopKatADChecker | ||
| >>> from sklearn.datasets import make_blobs | ||
| >>> X_train, _ = make_blobs(n_samples=100, centers=2, n_features=5, random_state=0) | ||
| >>> X_test = X_train[:5] | ||
|
|
||
| >>> topkat_ad_checker = TopKatADChecker() | ||
| >>> topkat_ad_checker.fit(X_train) | ||
| TopKatADChecker() | ||
|
|
||
| >>> topkat_ad_checker.predict(X_test) | ||
| array([ True, True, True, True, True]) | ||
| """ | ||
|
|
||
| _parameter_constraints: dict = { | ||
| **BaseADChecker._parameter_constraints, | ||
| "threshold": [Interval(Real, 0, None, closed="left")], | ||
| } | ||
|
|
||
| def __init__( | ||
| self, | ||
| threshold: float | None = None, | ||
| n_jobs: int | None = None, | ||
| verbose: int | dict = 0, | ||
| ): | ||
| super().__init__( | ||
| n_jobs=n_jobs, | ||
| verbose=verbose, | ||
| ) | ||
| self.threshold = threshold | ||
|
|
||
| def fit( # noqa: D102 | ||
| self, | ||
| X: np.ndarray, | ||
| y: np.ndarray | None = None, # noqa: ARG002 | ||
| ): | ||
| X = validate_data(self, X=X) | ||
|
|
||
| self.X_min_ = X.min(axis=0) | ||
| self.X_max_ = X.max(axis=0) | ||
| self.num_points = X.shape[0] | ||
| self.num_dims = X.shape[1] | ||
|
|
||
| self.S_ = (2 * X - self.X_max_ - self.X_min_) / np.where( | ||
|
||
| (self.X_max_ - self.X_min_) != 0, (self.X_max_ - self.X_min_), 1.0 | ||
| ) | ||
|
||
| S_bias = np.c_[np.ones(self.S_.shape[0]), self.S_] | ||
|
|
||
| cov_matrix = S_bias.T @ S_bias | ||
| eigvals, eigvecs = np.linalg.eigh(cov_matrix) | ||
| self.eigen_val = np.real(eigvals) | ||
| self.eigen_vec = np.real(eigvecs) | ||
|
|
||
| return self | ||
|
|
||
| def predict(self, X: np.ndarray) -> np.ndarray: # noqa: D102 | ||
| dOPS = self._compute_dops(X) | ||
|
|
||
| threshold = self.threshold | ||
| if threshold is None: | ||
| threshold = (5 * self.num_dims) / (2 * self.num_points) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One empty line after |
||
| return dOPS < threshold | ||
|
|
||
| def score_samples(self, X: np.ndarray) -> np.ndarray: | ||
| """ | ||
| Calculate the applicability domain score of samples. | ||
| It is defined as the weighted distance (dOPS) from | ||
| the center of the training data in the OPS-transformed space. | ||
| Lower values indicate higher similarity to the training data. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| X : array-like of shape (n_samples, n_features) | ||
| The data matrix. | ||
|
|
||
| Returns | ||
| ------- | ||
| scores : ndarray of shape (n_samples,) | ||
| Distance scores from the center of the Optimal Prediction Space (dOPS). | ||
| """ | ||
| return self._compute_dops(X) | ||
|
|
||
| def _compute_dops(self, X: np.ndarray) -> np.ndarray: | ||
| X = validate_data(self, X=X, reset=False) | ||
|
|
||
| Ssample = (2 * X - self.X_max_ - self.X_min_) / np.where( | ||
| (self.X_max_ - self.X_min_) != 0, (self.X_max_ - self.X_min_), 1.0 | ||
| ) | ||
| Ssample_bias = np.c_[np.ones(Ssample.shape[0]), Ssample] | ||
|
|
||
| OPS_sample = Ssample_bias @ self.eigen_vec | ||
|
|
||
| inv_eigval = np.divide( | ||
| 1.0, | ||
| self.eigen_val, | ||
| out=np.zeros_like(self.eigen_val), | ||
| where=self.eigen_val != 0, | ||
| ) | ||
| dOPS = np.sum((OPS_sample**2) * inv_eigval, axis=1) | ||
| return dOPS | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| import numpy as np | ||
| from sklearn.datasets import make_blobs, make_regression | ||
| from sklearn.ensemble import RandomForestRegressor | ||
| from sklearn.model_selection import train_test_split | ||
|
|
||
| from skfp.applicability_domain import ProbStdADChecker | ||
|
|
||
|
|
||
| def test_inside_probstd_ad(): | ||
| X, y = make_blobs( | ||
| n_samples=1000, centers=2, n_features=2, cluster_std=1.0, random_state=42 | ||
| ) | ||
| y = y.astype(float) | ||
|
|
||
| X_train, X_test, y_train, y_test = train_test_split( | ||
| X, y, test_size=0.1, random_state=42 | ||
| ) | ||
|
|
||
| model = RandomForestRegressor(n_estimators=10, random_state=42) | ||
| model.fit(X_train, y_train) | ||
|
|
||
| ad_checker = ProbStdADChecker(model=model, threshold=0.2) | ||
|
|
||
| scores = ad_checker.score_samples(X_test) | ||
| assert np.all(scores >= 0) | ||
| assert np.all(scores <= 0.5) | ||
|
|
||
| preds = ad_checker.predict(X_test) | ||
| assert isinstance(preds, np.ndarray) | ||
| assert np.issubdtype(preds.dtype, np.bool_) | ||
| assert preds.shape == (len(X_test),) | ||
|
|
||
| assert np.all(preds == 1) | ||
|
|
||
|
|
||
| def test_outside_probstd_ad(): | ||
| np.random.seed(42) | ||
| X = np.random.uniform(low=-5, high=5, size=(1000, 5)) | ||
| y = np.random.choice([0.0, 1.0], size=1000) | ||
|
|
||
| X_train, X_test, y_train, y_test = train_test_split( | ||
| X, y, test_size=0.1, random_state=42 | ||
| ) | ||
|
|
||
| model = RandomForestRegressor(n_estimators=50, random_state=42) | ||
| model.fit(X_train, y_train) | ||
|
|
||
| ad_checker = ProbStdADChecker(model=model, threshold=0.05) | ||
|
|
||
| scores = ad_checker.score_samples(X_test) | ||
| assert np.all(scores >= 0) | ||
| assert np.all(scores <= 0.5) | ||
|
|
||
| preds = ad_checker.predict(X_test) | ||
| assert isinstance(preds, np.ndarray) | ||
| assert np.issubdtype(preds.dtype, np.bool_) | ||
| assert preds.shape == (len(X_test),) | ||
|
|
||
| assert np.all(preds == 0) | ||
|
|
||
|
|
||
| def test_probstd_fit(): | ||
| # smoke test, should not throw errors | ||
| X_train, y_train = make_regression( | ||
| n_samples=100, n_features=2, noise=0.1, random_state=42 | ||
| ) | ||
|
|
||
| rf = RandomForestRegressor(n_estimators=5, random_state=42) | ||
| rf.fit(X_train, y_train) | ||
|
|
||
| ad_checker = ProbStdADChecker(model=rf) | ||
| ad_checker.fit(X_train, y_train) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add backticks `` for model names. Also, just RandomForestRegressor is enough as an example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we should also support classifiers with
.predict_proba()method here