diff --git a/AUTHORS.rst b/AUTHORS.rst index df975608f..ed0fc73fe 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -50,4 +50,5 @@ Contributors * Brijesh Thummar * Faustin Pulvéric * Chaoqi Zhang +* Omid Gheibi To be continued ... diff --git a/HISTORY.rst b/HISTORY.rst index 1ef748509..3c3985683 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -9,6 +9,7 @@ History * Add FAQ entry in the documentation about ongoing works to extend MAPIE for LLM control * MAPIE now supports Python versions up to the latest release (currently 3.13) * Change `prefit` default value to `True` in split methods' docstrings to remain consistent with the implementation +* Add Venn-ABERS calibration method for binary classification 1.0.1 (2025-05-22) ------------------ diff --git a/mapie/calibration.py b/mapie/calibration.py index fb2f8ed5a..db77c8a7c 100644 --- a/mapie/calibration.py +++ b/mapie/calibration.py @@ -547,3 +547,613 @@ def predict( """ check_is_fitted(self, self.fit_attributes) return self.single_estimator_.predict(X) + + +class VennABERSCalibrator(BaseEstimator, ClassifierMixin): + """ + Venn-ABERS calibration for binary classification problems. + Implements the Inductive Venn-ABERS Predictors (IVAP) algorithm described in: + "Large-scale probabilistic prediction with and without validity guarantees" + by Vovk et al. (https://arxiv.org/pdf/1511.00213.pdf). + This is a MAPIE wrapper for + the implementation in https://github.com/ptocca/VennABERS/. + Note that VennABERSCalibrator uses its own specific calibration algorithm. + + Parameters + ---------- + estimator : Optional[ClassifierMixin] + Any classifier with scikit-learn API + (i.e. with fit, predict, and predict_proba methods), by default + ``None``. + If ``None``, estimator defaults to a ``LogisticRegression`` instance. + + cv: Optional[str] + The cross-validation strategy to compute scores: + + - "split", performs a standard splitting into a calibration and a + test set. + + - "prefit", assumes that ``estimator`` has been fitted already. + All the data that are provided in the ``fit`` method are then used + to calibrate the predictions through the score computation. + + By default "split". + + Attributes + ---------- + classes_: NDArray + Array with the name of each class. + + n_classes_: int + Number of classes that are in the training dataset. + + single_estimator_: ClassifierMixin + Classifier fitted on the training data. + + calibration_points_: List + List of calibration points used for Venn-ABERS calibration. + + References + ---------- + [1] Vovk, V., Petej, I., & Fedorova, V. (2015). Large-scale probabilistic + predictors with and without validity guarantees. Advances in Neural + Information Processing Systems, 28. + + Examples + -------- + >>> import numpy as np + >>> from mapie.calibration import VennABERSCalibrator + >>> from sklearn.datasets import make_classification + >>> from sklearn.linear_model import LogisticRegression + >>> X, y = make_classification(n_samples=100, n_features=4, random_state=42) + >>> model = LogisticRegression().fit(X[:80], y[:80]) + >>> calibrator = VennABERSCalibrator(model, cv="prefit") + >>> _ = calibrator.fit(X[80:], y[80:]) + >>> probs = calibrator.predict_proba(X[:5]) + >>> print(probs) + [[0.14285714 0.85714286] + [0.8 0.2 ] + [0.1 0.9 ] + [0.91666667 0.08333333] + [0.91666667 0.08333333]] + """ + + fit_attributes = [ + "single_estimator_", + "calibration_points_", + ] + + valid_cv = ["prefit", "split"] + + valid_inputs = ["binary"] + + calibration_points_: list[Tuple[float, Union[int, float]]] + + def __init__( + self, + estimator: Optional[ClassifierMixin] = None, + cv: Optional[str] = "split", + ) -> None: + self.estimator = estimator + self.cv = cv + + def _check_cv( + self, + cv: Optional[str], + ) -> str: + """ + Check if cross-validator is ``"prefit"`` or ``"split"``. + Else raise error. + + Parameters + ---------- + cv : str + Cross-validator to check. + + Returns + ------- + str + 'prefit' or 'split'. + + Raises + ------ + ValueError + If the cross-validator is not valid. + """ + if cv in self.valid_cv: + return cv + raise ValueError( + "Invalid cv argument. " + f"Allowed values are {self.valid_cv}." + ) + + def _check_type_of_target(self, y: ArrayLike): + """ + Check type of target for calibration class. + + Parameters + ---------- + y : ArrayLike of shape (n_samples,) + Training labels. + """ + if type_of_target(y) not in self.valid_inputs: + raise ValueError( + "VennABERSCalibrator only supports binary classification. " + "Make sure to have one of the allowed targets: " + + (", ").join(self.valid_inputs) + "." + ) + + def _prepare_data(self, calibr_points: list[Tuple[float, float]]) -> Tuple: + """ + Prepare data for Venn-ABERS calibration. + + Parameters + ---------- + calibr_points : List[Tuple[float, float]] + List of calibration points (score, label). + + Returns + ------- + Tuple + Prepared data for Venn-ABERS calibration. + """ + pts_sorted = sorted(calibr_points) + + xs = np.array([p[0] for p in pts_sorted]) + ys = np.array([p[1] for p in pts_sorted]) + pts_unique, pts_index, pts_inverse, pts_counts = np.unique( + xs, + return_index=True, + return_counts=True, + return_inverse=True + ) + + a = np.zeros(pts_unique.shape) + np.add.at(a, pts_inverse, ys) + + w = pts_counts + y_prime = a / w + y_csd = np.cumsum(a) # Equivalent to np.cumsum(w * y_prime) + x_prime = np.cumsum(w) + k_prime = len(x_prime) + + return y_prime, y_csd, x_prime, pts_unique, k_prime + + def _algorithm1(self, P: Dict, k_prime: int) -> list: + """ + Algorithm 1 from Venn-ABERS paper. + + Parameters + ---------- + P : Dict + Dictionary of points. + k_prime : int + Number of unique calibration points. + + Returns + ------- + List + Stack of points. + """ + S = [] + P[-1] = np.array((-1, -1)) + S.append(P[-1]) + S.append(P[0]) + + for i in range(1, k_prime + 1): + while len(S) > 1 and self._non_left_turn(S[-2], S[-1], P[i]): + S.pop() + S.append(P[i]) + return S + + def _algorithm2(self, P: Dict, S: list, k_prime: int) -> NDArray: + """ + Algorithm 2 from Venn-ABERS paper. + + Parameters + ---------- + P : Dict + Dictionary of points. + S : List + Stack of points from Algorithm 1. + k_prime : int + Number of unique calibration points. + + Returns + ------- + NDArray + F1 function values. + """ + S_prime = S[::-1] # reverse the stack + + F1 = np.zeros((k_prime + 1,)) + for i in range(1, k_prime + 1): + F1[i] = self._slope(S_prime[-1], S_prime[-2]) + P[i-1] = P[i-2] + P[i] - P[i-1] + + if self._not_below(P[i-1], S_prime[-1], S_prime[-2]): + continue + + S_prime.pop() + while len(S_prime) > 1 and \ + self._non_left_turn(P[i-1], S_prime[-1], S_prime[-2]): + S_prime.pop() + S_prime.append(P[i-1]) + + return F1 + + def _algorithm3(self, P: Dict, k_prime: int) -> list: + """ + Algorithm 3 from Venn-ABERS paper. + + Parameters + ---------- + P : Dict + Dictionary of points. + k_prime : int + Number of unique calibration points. + + Returns + ------- + List + Stack of points. + """ + S = [] + S.append(P[k_prime + 1]) + S.append(P[k_prime]) + + for i in range(k_prime - 1, -1, -1): # k'-1, k'-2, ..., 0 + while len(S) > 1 and self._non_right_turn(S[-2], S[-1], P[i]): + S.pop() + S.append(P[i]) + + return S + + def _algorithm4(self, P: Dict, S: list, k_prime: int) -> NDArray: + """ + Algorithm 4 from Venn-ABERS paper. + + Parameters + ---------- + P : Dict + Dictionary of points. + S : List + Stack of points from Algorithm 3. + k_prime : int + Number of unique calibration points. + + Returns + ------- + NDArray + F0 function values. + """ + S_prime = S[::-1] # reverse the stack + + F0 = np.zeros((k_prime + 1,)) + for i in range(k_prime, 0, -1): # k', k'-1, ..., 1 + F0[i] = self._slope(S_prime[-1], S_prime[-2]) + P[i] = P[i-1] + P[i+1] - P[i] + + if self._not_below(P[i], S_prime[-1], S_prime[-2]): + continue + + S_prime.pop() + while len(S_prime) > 1 and \ + self._non_right_turn(P[i], S_prime[-1], S_prime[-2]): + S_prime.pop() + S_prime.append(P[i]) + + return F0 + + def _compute_F(self, x_prime: NDArray, y_csd: NDArray, + k_prime: int) -> Tuple[NDArray, NDArray]: + """ + Compute F0 and F1 functions for Venn-ABERS calibration. + + Parameters + ---------- + x_prime : NDArray + Cumulative sum of weights. + y_csd : NDArray + Cumulative sum of weighted labels. + k_prime : int + Number of unique calibration points. + + Returns + ------- + Tuple[NDArray, NDArray] + F0 and F1 function values. + """ + # Compute F1 + P = {0: np.array((0, 0))} + P.update({i+1: np.array((k, v)) for i, (k, v) + in enumerate(zip(x_prime, y_csd))}) + + S = self._algorithm1(P, k_prime) + F1 = self._algorithm2(P, S, k_prime) + + # Compute F0 + P = {0: np.array((0, 0))} + P.update({i+1: np.array((k, v)) for i, (k, v) + in enumerate(zip(x_prime, y_csd))}) + P[k_prime + 1] = P[k_prime] + np.array((1.0, 0.0)) + + S = self._algorithm3(P, k_prime) + F0 = self._algorithm4(P, S, k_prime) + + return F0, F1 + + def _get_F_val(self, F0: NDArray, F1: NDArray, + pts_unique: NDArray, + test_objects: NDArray) -> Tuple[NDArray, NDArray]: + """ + Get F0 and F1 values for test objects. + + Parameters + ---------- + F0 : NDArray + F0 function values. + F1 : NDArray + F1 function values. + pts_unique : NDArray + Unique calibration points. + test_objects : NDArray + Test objects to calibrate. + + Returns + ------- + Tuple[NDArray, NDArray] + p0 and p1 probabilities. + """ + pos0 = np.searchsorted(pts_unique, test_objects, side='left') + pos1 = np.searchsorted(pts_unique[:-1], test_objects, side='right') + 1 + return F0[pos0], F1[pos1] + + def _scores_to_multi_probs(self, calibr_points: list[Tuple[float, float]], + test_objects: NDArray) -> Tuple[NDArray, NDArray]: + """ + Convert scores to multi-probabilities using Venn-ABERS calibration. + + Parameters + ---------- + calibr_points : List[Tuple[float, float]] + List of calibration points (score, label). + test_objects : NDArray + Test objects to calibrate. + + Returns + ------- + Tuple[NDArray, NDArray] + p0 and p1 probabilities. + """ + # Prepare data + y_prime, y_csd, x_prime, pts_unique, k_prime = self._prepare_data(calibr_points) + + # Compute F0 and F1 functions + F0, F1 = self._compute_F(x_prime, y_csd, k_prime) + + # Get values for test objects + p0, p1 = self._get_F_val(F0, F1, pts_unique, test_objects) + + return p0, p1 + + def _non_left_turn(self, a: NDArray, b: NDArray, c: NDArray) -> bool: + """ + Check if three points make a non-left turn. + + Parameters + ---------- + a : NDArray + First point. + b : NDArray + Second point. + c : NDArray + Third point. + + Returns + ------- + bool + True if non-left turn, False otherwise. + """ + d1 = b - a + d2 = c - b + return np.cross(d1, d2) <= 0 + + def _non_right_turn(self, a: NDArray, b: NDArray, c: NDArray) -> bool: + """ + Check if three points make a non-right turn. + + Parameters + ---------- + a : NDArray + First point. + b : NDArray + Second point. + c : NDArray + Third point. + + Returns + ------- + bool + True if non-right turn, False otherwise. + """ + d1 = b - a + d2 = c - b + return np.cross(d1, d2) >= 0 + + def _slope(self, a: NDArray, b: NDArray) -> float: + """ + Calculate slope between two points. + + Parameters + ---------- + a : NDArray + First point. + b : NDArray + Second point. + + Returns + ------- + float + Slope between points. + """ + ax, ay = a + bx, by = b + return (by - ay) / (bx - ax) + + def _not_below(self, t: NDArray, p1: NDArray, p2: NDArray) -> bool: + """ + Check if point t is not below the line defined by p1 and p2. + + Parameters + ---------- + t : NDArray + Point to check. + p1 : NDArray + First point of the line. + p2 : NDArray + Second point of the line. + + Returns + ------- + bool + True if t is not below the line, False otherwise. + """ + p1x, p1y = p1 + p2x, p2y = p2 + tx, ty = t + m = (p2y - p1y) / (p2x - p1x) + b = (p2x * p1y - p1x * p2y) / (p2x - p1x) + return (ty >= tx * m + b) + + def fit( + self, + X: ArrayLike, + y: ArrayLike, + sample_weight: Optional[ArrayLike] = None, + calib_size: Optional[float] = 0.33, + random_state: Optional[Union[int, np.random.RandomState]] = None, + shuffle: Optional[bool] = True, + stratify: Optional[ArrayLike] = None, + ) -> VennABERSCalibrator: + """ + Fit the calibrator. + + Parameters + ---------- + X : ArrayLike of shape (n_samples, n_features) + Training data. + y : ArrayLike of shape (n_samples,) + Training labels. + sample_weight : Optional[ArrayLike] of shape (n_samples,) + Sample weights. If ``None``, then samples are equally weighted. + calib_size : Optional[float], default=0.33 + If ``cv == "split"``, the proportion of samples to use for calibration. + random_state : Optional[Union[int, np.random.RandomState]] + Random state for reproducibility. + shuffle : bool, default=True + Whether to shuffle the data before splitting. + stratify : Optional[ArrayLike] + If not None, data is split in a stratified fashion. + + Returns + ------- + VennABERSCalibrator + Fitted calibrator. + """ + X, y = indexable(X, y) + y = _check_y(y) + self._check_type_of_target(y) + + cv = self._check_cv(self.cv) + estimator = _check_estimator_classification(X, y, cv, self.estimator) + sample_weight, X, y = _check_null_weight(sample_weight, X, y) + + if cv == "prefit": + self.single_estimator_ = estimator + self.classes_ = self.single_estimator_.classes_ + self.n_classes_ = len(self.classes_) + + # Get scores for calibration set + scores = self.single_estimator_.predict_proba(X)[:, 1] + + # Create calibration points + self.calibration_points_ = list(zip(scores, cast(NDArray, y))) + else: # cv == "split" + results = _get_calib_set( + X, + y, + sample_weight=sample_weight, + calib_size=calib_size, + random_state=random_state, + shuffle=shuffle, + stratify=stratify, + ) + X_train, y_train, X_calib, y_calib, sw_train, sw_calib = results + + # Fit estimator on training data + estimator = _fit_estimator( + clone(estimator), X_train, y_train, sw_train + ) + self.single_estimator_ = estimator + self.classes_ = self.single_estimator_.classes_ + self.n_classes_ = len(self.classes_) + + # Get scores for calibration set + scores = self.single_estimator_.predict_proba(X_calib)[:, 1] + + # Create calibration points + self.calibration_points_ = list(zip(scores, cast(NDArray, y_calib))) + + return self + + def predict_proba(self, X: ArrayLike) -> NDArray: + """ + Predict probabilities for test data using Venn-ABERS calibration. + + Parameters + ---------- + X : ArrayLike of shape (n_samples, n_features) + Test data. + + Returns + ------- + NDArray of shape (n_samples, 2) + Calibrated probabilities. + """ + check_is_fitted(self, self.fit_attributes) + + # Get scores for test data + scores = self.single_estimator_.predict_proba(X)[:, 1] + + # Apply Venn-ABERS calibration + p0, p1 = self._scores_to_multi_probs(self.calibration_points_, scores) + + # Normalize probabilities + p1_normalized = p1 / (p1 + (1 - p0)) + + # Return probabilities for both classes + result = np.zeros((len(scores), 2)) + result[:, 0] = 1 - p1_normalized + result[:, 1] = p1_normalized + + return result + + def predict(self, X: ArrayLike) -> NDArray: + """ + Predict class labels for test data. + + Parameters + ---------- + X : ArrayLike of shape (n_samples, n_features) + Test data. + + Returns + ------- + NDArray of shape (n_samples,) + Predicted class labels. + """ + check_is_fitted(self, self.fit_attributes) + + proba = self.predict_proba(X) + return self.classes_[np.argmax(proba, axis=1)] diff --git a/mapie/tests/test_calibration.py b/mapie/tests/test_calibration.py index f423324b1..ed45e03d9 100644 --- a/mapie/tests/test_calibration.py +++ b/mapie/tests/test_calibration.py @@ -19,6 +19,8 @@ from mapie.metrics.calibration import top_label_ece from mapie.metrics.calibration import expected_calibration_error +from mapie.calibration import VennABERSCalibrator + random_state = 20 CALIBRATORS = [ @@ -477,3 +479,183 @@ def early_stopping_monitor(i, est, locals): mapie.fit(X, y, monitor=early_stopping_monitor) assert mapie.single_estimator_.estimators_.shape[0] == 3 + + +def test_venn_abers_initialized() -> None: + """Test that VennABERSCalibrator initialization does not crash.""" + VennABERSCalibrator() + + +def test_venn_abers_default_parameters() -> None: + """Test default values of VennABERSCalibrator input parameters.""" + calibrator = VennABERSCalibrator() + assert calibrator.estimator is None + assert calibrator.cv == "split" + + +def test_venn_abers_prefit_cv_argument() -> None: + """Test that prefit method works with VennABERSCalibrator""" + # Create binary classification data + X_binary, y_binary = make_classification( + n_samples=100, n_features=4, n_classes=2, random_state=random_state + ) + est = LogisticRegression().fit(X_binary, y_binary) + calibrator = VennABERSCalibrator(estimator=est, cv="prefit") + calibrator.fit(X_binary, y_binary) + + +def test_venn_abers_split_cv_argument() -> None: + """Test that split method works with VennABERSCalibrator""" + # Create binary classification data + X_binary, y_binary = make_classification( + n_samples=100, n_features=4, n_classes=2, random_state=random_state + ) + calibrator = VennABERSCalibrator(cv="split") + calibrator.fit(X_binary, y_binary) + + +@pytest.mark.parametrize("cv", ["noprefit", "nosplit"]) +def test_venn_abers_invalid_cv_argument(cv: str) -> None: + """Test that other cv method does not work with VennABERSCalibrator""" + # Create binary classification data + X_binary, y_binary = make_classification( + n_samples=100, n_features=4, n_classes=2, random_state=random_state + ) + with pytest.raises( + ValueError, + match=r".*Invalid cv argument.*", + ): + calibrator = VennABERSCalibrator(cv=cv) + calibrator.fit(X_binary, y_binary) + + +def test_venn_abers_binary_classification() -> None: + """Test VennABERSCalibrator on binary classification problem""" + X_binary, y_binary = make_classification( + n_samples=100, n_features=4, n_classes=2, random_state=random_state + ) + X_train, X_test, y_train, y_test = train_test_split( + X_binary, y_binary, test_size=0.33, random_state=random_state + ) + + model = LogisticRegression().fit(X_train, y_train) + calibrator = VennABERSCalibrator(estimator=model, cv="prefit") + calibrator.fit(X_test, y_test) + + # Check that predict_proba returns probabilities + probs = calibrator.predict_proba(X_test) + assert probs.shape == (len(X_test), 2) + assert np.all((0 <= probs) & (probs <= 1)) + assert np.allclose(np.sum(probs, axis=1), 1.0) + + # Check that predict returns class labels + preds = calibrator.predict(X_test) + assert preds.shape == (len(X_test),) + assert set(np.unique(preds)).issubset(set(np.unique(y_binary))) + + +def test_venn_abers_prefit_split_same_results() -> None: + """Test that prefit and split method + return similar results for VennABERSCalibrator""" + X_binary, y_binary = make_classification( + n_samples=100, n_features=4, n_classes=2, random_state=random_state + ) + X_train, X_test, y_train, y_test = train_test_split( + X_binary, y_binary, test_size=0.33, random_state=random_state + ) + X_calib, X_test_final, y_calib, y_test_final = train_test_split( + X_test, y_test, test_size=0.5, random_state=random_state + ) + + est = LogisticRegression(random_state=random_state).fit(X_train, y_train) + + # Prefit method + calibrator_prefit = VennABERSCalibrator(estimator=est, cv="prefit") + calibrator_prefit.fit(X_calib, y_calib) + + # Split method + X_combined = np.vstack([X_train, X_calib]) + y_combined = np.hstack([y_train, y_calib]) + calibrator_split = VennABERSCalibrator( + estimator=LogisticRegression(random_state=random_state) + ) + calibrator_split.fit(X_combined, y_combined, random_state=random_state) + + # Compare results - note that we don't expect exact equality due to + # the different calibration points used in each method + y_prefit = calibrator_prefit.predict_proba(X_test_final) + y_split = calibrator_split.predict_proba(X_test_final) + + # Check that predictions are at least correlated + assert np.corrcoef(y_prefit[:, 1], y_split[:, 1])[0, 1] > 0.5 + + +def test_venn_abers_calibration_effect() -> None: + """Test that VennABERSCalibrator changes the probability estimates""" + X_binary, y_binary = make_classification( + n_samples=200, n_features=4, n_classes=2, random_state=random_state + ) + X_train, X_test, y_train, y_test = train_test_split( + X_binary, y_binary, test_size=0.5, random_state=random_state + ) + + # Train a model + model = LogisticRegression(random_state=random_state).fit(X_train, y_train) + + # Get uncalibrated probabilities + uncalibrated_probs = model.predict_proba(X_test) + + # Apply Venn-ABERS calibration + calibrator = VennABERSCalibrator(estimator=model, cv="prefit") + calibrator.fit(X_train, y_train) # Use training data for calibration + + # Get calibrated probabilities + calibrated_probs = calibrator.predict_proba(X_test) + + # Check that calibration changes the probabilities + assert not np.allclose(uncalibrated_probs, calibrated_probs) + + +def test_venn_abers_with_pipeline() -> None: + """Check that VennABERSCalibrator works with sklearn pipeline""" + X = pd.DataFrame({ + "x_cat": ["A", "A", "B", "A", "A", "B"] * 10, + "x_num": [0, 1, 1, 4, 2, 5] * 10, + }) + y = pd.Series([0, 1, 0, 1, 0, 1] * 10) + + numeric_preprocessor = Pipeline([ + ("imputer", SimpleImputer(strategy="mean")), + ]) + categorical_preprocessor = Pipeline(steps=[ + ("encoding", OneHotEncoder(handle_unknown="ignore")) + ]) + preprocessor = ColumnTransformer([ + ("cat", categorical_preprocessor, ["x_cat"]), + ("num", numeric_preprocessor, ["x_num"]) + ]) + pipe = make_pipeline(preprocessor, LogisticRegression()) + pipe.fit(X, y) + + calibrator = VennABERSCalibrator(estimator=pipe) + calibrator.fit(X, y) + + # Check predictions + probs = calibrator.predict_proba(X) + assert probs.shape == (len(X), 2) + assert np.all((0 <= probs) & (probs <= 1)) + + preds = calibrator.predict(X) + assert preds.shape == (len(X),) + assert set(np.unique(preds)).issubset(set(np.unique(y))) + + +def test_venn_abers_multiclass_error() -> None: + """Test that VennABERSCalibrator raises error for multiclass problems""" + calibrator = VennABERSCalibrator() + + with pytest.raises( + ValueError, + match=r".*Make sure to have one of the allowed targets:*" + ): + calibrator.fit(X, y)