3RNN/Lib/site-packages/sklearn/model_selection/tests/test_classification_threshold.py
2024-05-26 19:49:15 +02:00

685 lines
25 KiB
Python

import numpy as np
import pytest
from sklearn.base import clone
from sklearn.datasets import (
load_breast_cancer,
load_iris,
make_classification,
make_multilabel_classification,
)
from sklearn.dummy import DummyClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.exceptions import NotFittedError
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
balanced_accuracy_score,
f1_score,
fbeta_score,
make_scorer,
recall_score,
)
from sklearn.model_selection import (
FixedThresholdClassifier,
StratifiedShuffleSplit,
TunedThresholdClassifierCV,
)
from sklearn.model_selection._classification_threshold import (
_CurveScorer,
_fit_and_score_over_thresholds,
)
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.utils._mocking import CheckingClassifier
from sklearn.utils._testing import (
_convert_container,
assert_allclose,
assert_array_equal,
)
def test_curve_scorer():
"""Check the behaviour of the `_CurveScorer` class."""
X, y = make_classification(random_state=0)
estimator = LogisticRegression().fit(X, y)
curve_scorer = _CurveScorer(
balanced_accuracy_score,
sign=1,
response_method="predict_proba",
thresholds=10,
kwargs={},
)
scores, thresholds = curve_scorer(estimator, X, y)
assert thresholds.shape == scores.shape
# check that the thresholds are probabilities with extreme values close to 0 and 1.
# they are not exactly 0 and 1 because they are the extremum of the
# `estimator.predict_proba(X)` values.
assert 0 <= thresholds.min() <= 0.01
assert 0.99 <= thresholds.max() <= 1
# balanced accuracy should be between 0.5 and 1 when it is not adjusted
assert 0.5 <= scores.min() <= 1
# check that passing kwargs to the scorer works
curve_scorer = _CurveScorer(
balanced_accuracy_score,
sign=1,
response_method="predict_proba",
thresholds=10,
kwargs={"adjusted": True},
)
scores, thresholds = curve_scorer(estimator, X, y)
# balanced accuracy should be between 0.5 and 1 when it is not adjusted
assert 0 <= scores.min() <= 0.5
# check that we can inverse the sign of the score when dealing with `neg_*` scorer
curve_scorer = _CurveScorer(
balanced_accuracy_score,
sign=-1,
response_method="predict_proba",
thresholds=10,
kwargs={"adjusted": True},
)
scores, thresholds = curve_scorer(estimator, X, y)
assert all(scores <= 0)
def test_curve_scorer_pos_label(global_random_seed):
"""Check that we propagate properly the `pos_label` parameter to the scorer."""
n_samples = 30
X, y = make_classification(
n_samples=n_samples, weights=[0.9, 0.1], random_state=global_random_seed
)
estimator = LogisticRegression().fit(X, y)
curve_scorer = _CurveScorer(
recall_score,
sign=1,
response_method="predict_proba",
thresholds=10,
kwargs={"pos_label": 1},
)
scores_pos_label_1, thresholds_pos_label_1 = curve_scorer(estimator, X, y)
curve_scorer = _CurveScorer(
recall_score,
sign=1,
response_method="predict_proba",
thresholds=10,
kwargs={"pos_label": 0},
)
scores_pos_label_0, thresholds_pos_label_0 = curve_scorer(estimator, X, y)
# Since `pos_label` is forwarded to the curve_scorer, the thresholds are not equal.
assert not (thresholds_pos_label_1 == thresholds_pos_label_0).all()
# The min-max range for the thresholds is defined by the probabilities of the
# `pos_label` class (the column of `predict_proba`).
y_pred = estimator.predict_proba(X)
assert thresholds_pos_label_0.min() == pytest.approx(y_pred.min(axis=0)[0])
assert thresholds_pos_label_0.max() == pytest.approx(y_pred.max(axis=0)[0])
assert thresholds_pos_label_1.min() == pytest.approx(y_pred.min(axis=0)[1])
assert thresholds_pos_label_1.max() == pytest.approx(y_pred.max(axis=0)[1])
# The recall cannot be negative and `pos_label=1` should have a higher recall
# since there is less samples to be considered.
assert 0.0 < scores_pos_label_0.min() < scores_pos_label_1.min()
assert scores_pos_label_0.max() == pytest.approx(1.0)
assert scores_pos_label_1.max() == pytest.approx(1.0)
def test_fit_and_score_over_thresholds_curve_scorers():
"""Check that `_fit_and_score_over_thresholds` returns thresholds in ascending order
for the different accepted curve scorers."""
X, y = make_classification(n_samples=100, random_state=0)
train_idx, val_idx = np.arange(50), np.arange(50, 100)
classifier = LogisticRegression()
curve_scorer = _CurveScorer(
score_func=balanced_accuracy_score,
sign=1,
response_method="predict_proba",
thresholds=10,
kwargs={},
)
scores, thresholds = _fit_and_score_over_thresholds(
classifier,
X,
y,
fit_params={},
train_idx=train_idx,
val_idx=val_idx,
curve_scorer=curve_scorer,
score_params={},
)
assert np.all(thresholds[:-1] <= thresholds[1:])
assert isinstance(scores, np.ndarray)
assert np.logical_and(scores >= 0, scores <= 1).all()
def test_fit_and_score_over_thresholds_prefit():
"""Check the behaviour with a prefit classifier."""
X, y = make_classification(n_samples=100, random_state=0)
# `train_idx is None` to indicate that the classifier is prefit
train_idx, val_idx = None, np.arange(50, 100)
classifier = DecisionTreeClassifier(random_state=0).fit(X, y)
# make sure that the classifier memorized the full dataset such that
# we get perfect predictions and thus match the expected score
assert classifier.score(X[val_idx], y[val_idx]) == pytest.approx(1.0)
curve_scorer = _CurveScorer(
score_func=balanced_accuracy_score,
sign=1,
response_method="predict_proba",
thresholds=2,
kwargs={},
)
scores, thresholds = _fit_and_score_over_thresholds(
classifier,
X,
y,
fit_params={},
train_idx=train_idx,
val_idx=val_idx,
curve_scorer=curve_scorer,
score_params={},
)
assert np.all(thresholds[:-1] <= thresholds[1:])
assert_allclose(scores, [0.5, 1.0])
@pytest.mark.usefixtures("enable_slep006")
def test_fit_and_score_over_thresholds_sample_weight():
"""Check that we dispatch the sample-weight to fit and score the classifier."""
X, y = load_iris(return_X_y=True)
X, y = X[:100], y[:100] # only 2 classes
# create a dataset and repeat twice the sample of class #0
X_repeated, y_repeated = np.vstack([X, X[y == 0]]), np.hstack([y, y[y == 0]])
# create a sample weight vector that is equivalent to the repeated dataset
sample_weight = np.ones_like(y)
sample_weight[:50] *= 2
classifier = LogisticRegression()
train_repeated_idx = np.arange(X_repeated.shape[0])
val_repeated_idx = np.arange(X_repeated.shape[0])
curve_scorer = _CurveScorer(
score_func=balanced_accuracy_score,
sign=1,
response_method="predict_proba",
thresholds=10,
kwargs={},
)
scores_repeated, thresholds_repeated = _fit_and_score_over_thresholds(
classifier,
X_repeated,
y_repeated,
fit_params={},
train_idx=train_repeated_idx,
val_idx=val_repeated_idx,
curve_scorer=curve_scorer,
score_params={},
)
train_idx, val_idx = np.arange(X.shape[0]), np.arange(X.shape[0])
scores, thresholds = _fit_and_score_over_thresholds(
classifier.set_fit_request(sample_weight=True),
X,
y,
fit_params={"sample_weight": sample_weight},
train_idx=train_idx,
val_idx=val_idx,
curve_scorer=curve_scorer.set_score_request(sample_weight=True),
score_params={"sample_weight": sample_weight},
)
assert_allclose(thresholds_repeated, thresholds)
assert_allclose(scores_repeated, scores)
@pytest.mark.usefixtures("enable_slep006")
@pytest.mark.parametrize("fit_params_type", ["list", "array"])
def test_fit_and_score_over_thresholds_fit_params(fit_params_type):
"""Check that we pass `fit_params` to the classifier when calling `fit`."""
X, y = make_classification(n_samples=100, random_state=0)
fit_params = {
"a": _convert_container(y, fit_params_type),
"b": _convert_container(y, fit_params_type),
}
classifier = CheckingClassifier(expected_fit_params=["a", "b"], random_state=0)
classifier.set_fit_request(a=True, b=True)
train_idx, val_idx = np.arange(50), np.arange(50, 100)
curve_scorer = _CurveScorer(
score_func=balanced_accuracy_score,
sign=1,
response_method="predict_proba",
thresholds=10,
kwargs={},
)
_fit_and_score_over_thresholds(
classifier,
X,
y,
fit_params=fit_params,
train_idx=train_idx,
val_idx=val_idx,
curve_scorer=curve_scorer,
score_params={},
)
@pytest.mark.parametrize(
"data",
[
make_classification(n_classes=3, n_clusters_per_class=1, random_state=0),
make_multilabel_classification(random_state=0),
],
)
def test_tuned_threshold_classifier_no_binary(data):
"""Check that we raise an informative error message for non-binary problem."""
err_msg = "Only binary classification is supported."
with pytest.raises(ValueError, match=err_msg):
TunedThresholdClassifierCV(LogisticRegression()).fit(*data)
@pytest.mark.parametrize(
"params, err_type, err_msg",
[
(
{"cv": "prefit", "refit": True},
ValueError,
"When cv='prefit', refit cannot be True.",
),
(
{"cv": 10, "refit": False},
ValueError,
"When cv has several folds, refit cannot be False.",
),
(
{"cv": "prefit", "refit": False},
NotFittedError,
"`estimator` must be fitted.",
),
],
)
def test_tuned_threshold_classifier_conflict_cv_refit(params, err_type, err_msg):
"""Check that we raise an informative error message when `cv` and `refit`
cannot be used together.
"""
X, y = make_classification(n_samples=100, random_state=0)
with pytest.raises(err_type, match=err_msg):
TunedThresholdClassifierCV(LogisticRegression(), **params).fit(X, y)
@pytest.mark.parametrize(
"estimator",
[LogisticRegression(), SVC(), GradientBoostingClassifier(n_estimators=4)],
)
@pytest.mark.parametrize(
"response_method", ["predict_proba", "predict_log_proba", "decision_function"]
)
@pytest.mark.parametrize(
"ThresholdClassifier", [FixedThresholdClassifier, TunedThresholdClassifierCV]
)
def test_threshold_classifier_estimator_response_methods(
ThresholdClassifier, estimator, response_method
):
"""Check that `TunedThresholdClassifierCV` exposes the same response methods as the
underlying estimator.
"""
X, y = make_classification(n_samples=100, random_state=0)
model = ThresholdClassifier(estimator=estimator)
assert hasattr(model, response_method) == hasattr(estimator, response_method)
model.fit(X, y)
assert hasattr(model, response_method) == hasattr(estimator, response_method)
if hasattr(model, response_method):
y_pred_cutoff = getattr(model, response_method)(X)
y_pred_underlying_estimator = getattr(model.estimator_, response_method)(X)
assert_allclose(y_pred_cutoff, y_pred_underlying_estimator)
@pytest.mark.parametrize(
"response_method", ["auto", "decision_function", "predict_proba"]
)
def test_tuned_threshold_classifier_without_constraint_value(response_method):
"""Check that `TunedThresholdClassifierCV` is optimizing a given objective
metric."""
X, y = load_breast_cancer(return_X_y=True)
# remove feature to degrade performances
X = X[:, :5]
# make the problem completely imbalanced such that the balanced accuracy is low
indices_pos = np.flatnonzero(y == 1)
indices_pos = indices_pos[: indices_pos.size // 50]
indices_neg = np.flatnonzero(y == 0)
X = np.vstack([X[indices_neg], X[indices_pos]])
y = np.hstack([y[indices_neg], y[indices_pos]])
lr = make_pipeline(StandardScaler(), LogisticRegression()).fit(X, y)
thresholds = 100
model = TunedThresholdClassifierCV(
estimator=lr,
scoring="balanced_accuracy",
response_method=response_method,
thresholds=thresholds,
store_cv_results=True,
)
score_optimized = balanced_accuracy_score(y, model.fit(X, y).predict(X))
score_baseline = balanced_accuracy_score(y, lr.predict(X))
assert score_optimized > score_baseline
assert model.cv_results_["thresholds"].shape == (thresholds,)
assert model.cv_results_["scores"].shape == (thresholds,)
def test_tuned_threshold_classifier_metric_with_parameter():
"""Check that we can pass a metric with a parameter in addition check that
`f_beta` with `beta=1` is equivalent to `f1` and different from `f_beta` with
`beta=2`.
"""
X, y = load_breast_cancer(return_X_y=True)
lr = make_pipeline(StandardScaler(), LogisticRegression()).fit(X, y)
model_fbeta_1 = TunedThresholdClassifierCV(
estimator=lr, scoring=make_scorer(fbeta_score, beta=1)
).fit(X, y)
model_fbeta_2 = TunedThresholdClassifierCV(
estimator=lr, scoring=make_scorer(fbeta_score, beta=2)
).fit(X, y)
model_f1 = TunedThresholdClassifierCV(
estimator=lr, scoring=make_scorer(f1_score)
).fit(X, y)
assert model_fbeta_1.best_threshold_ == pytest.approx(model_f1.best_threshold_)
assert model_fbeta_1.best_threshold_ != pytest.approx(model_fbeta_2.best_threshold_)
@pytest.mark.parametrize(
"response_method", ["auto", "decision_function", "predict_proba"]
)
@pytest.mark.parametrize(
"metric",
[
make_scorer(balanced_accuracy_score),
make_scorer(f1_score, pos_label="cancer"),
],
)
def test_tuned_threshold_classifier_with_string_targets(response_method, metric):
"""Check that targets represented by str are properly managed.
Also, check with several metrics to be sure that `pos_label` is properly
dispatched.
"""
X, y = load_breast_cancer(return_X_y=True)
# Encode numeric targets by meaningful strings. We purposely designed the class
# names such that the `pos_label` is the first alphabetically sorted class and thus
# encoded as 0.
classes = np.array(["cancer", "healthy"], dtype=object)
y = classes[y]
model = TunedThresholdClassifierCV(
estimator=make_pipeline(StandardScaler(), LogisticRegression()),
scoring=metric,
response_method=response_method,
thresholds=100,
).fit(X, y)
assert_array_equal(model.classes_, np.sort(classes))
y_pred = model.predict(X)
assert_array_equal(np.unique(y_pred), np.sort(classes))
@pytest.mark.usefixtures("enable_slep006")
@pytest.mark.parametrize("with_sample_weight", [True, False])
def test_tuned_threshold_classifier_refit(with_sample_weight, global_random_seed):
"""Check the behaviour of the `refit` parameter."""
rng = np.random.RandomState(global_random_seed)
X, y = make_classification(n_samples=100, random_state=0)
if with_sample_weight:
sample_weight = rng.randn(X.shape[0])
sample_weight = np.abs(sample_weight, out=sample_weight)
else:
sample_weight = None
# check that `estimator_` if fitted on the full dataset when `refit=True`
estimator = LogisticRegression().set_fit_request(sample_weight=True)
model = TunedThresholdClassifierCV(estimator, refit=True).fit(
X, y, sample_weight=sample_weight
)
assert model.estimator_ is not estimator
estimator.fit(X, y, sample_weight=sample_weight)
assert_allclose(model.estimator_.coef_, estimator.coef_)
assert_allclose(model.estimator_.intercept_, estimator.intercept_)
# check that `estimator_` was not altered when `refit=False` and `cv="prefit"`
estimator = LogisticRegression().set_fit_request(sample_weight=True)
estimator.fit(X, y, sample_weight=sample_weight)
coef = estimator.coef_.copy()
model = TunedThresholdClassifierCV(estimator, cv="prefit", refit=False).fit(
X, y, sample_weight=sample_weight
)
assert model.estimator_ is estimator
assert_allclose(model.estimator_.coef_, coef)
# check that we train `estimator_` on the training split of a given cross-validation
estimator = LogisticRegression().set_fit_request(sample_weight=True)
cv = [
(np.arange(50), np.arange(50, 100)),
] # single split
model = TunedThresholdClassifierCV(estimator, cv=cv, refit=False).fit(
X, y, sample_weight=sample_weight
)
assert model.estimator_ is not estimator
if with_sample_weight:
sw_train = sample_weight[cv[0][0]]
else:
sw_train = None
estimator.fit(X[cv[0][0]], y[cv[0][0]], sample_weight=sw_train)
assert_allclose(model.estimator_.coef_, estimator.coef_)
@pytest.mark.usefixtures("enable_slep006")
@pytest.mark.parametrize("fit_params_type", ["list", "array"])
def test_tuned_threshold_classifier_fit_params(fit_params_type):
"""Check that we pass `fit_params` to the classifier when calling `fit`."""
X, y = make_classification(n_samples=100, random_state=0)
fit_params = {
"a": _convert_container(y, fit_params_type),
"b": _convert_container(y, fit_params_type),
}
classifier = CheckingClassifier(expected_fit_params=["a", "b"], random_state=0)
classifier.set_fit_request(a=True, b=True)
model = TunedThresholdClassifierCV(classifier)
model.fit(X, y, **fit_params)
@pytest.mark.usefixtures("enable_slep006")
def test_tuned_threshold_classifier_cv_zeros_sample_weights_equivalence():
"""Check that passing removing some sample from the dataset `X` is
equivalent to passing a `sample_weight` with a factor 0."""
X, y = load_iris(return_X_y=True)
# Scale the data to avoid any convergence issue
X = StandardScaler().fit_transform(X)
# Only use 2 classes and select samples such that 2-fold cross-validation
# split will lead to an equivalence with a `sample_weight` of 0
X = np.vstack((X[:40], X[50:90]))
y = np.hstack((y[:40], y[50:90]))
sample_weight = np.zeros_like(y)
sample_weight[::2] = 1
estimator = LogisticRegression().set_fit_request(sample_weight=True)
model_without_weights = TunedThresholdClassifierCV(estimator, cv=2)
model_with_weights = clone(model_without_weights)
model_with_weights.fit(X, y, sample_weight=sample_weight)
model_without_weights.fit(X[::2], y[::2])
assert_allclose(
model_with_weights.estimator_.coef_, model_without_weights.estimator_.coef_
)
y_pred_with_weights = model_with_weights.predict_proba(X)
y_pred_without_weights = model_without_weights.predict_proba(X)
assert_allclose(y_pred_with_weights, y_pred_without_weights)
def test_tuned_threshold_classifier_thresholds_array():
"""Check that we can pass an array to `thresholds` and it is used as candidate
threshold internally."""
X, y = make_classification(random_state=0)
estimator = LogisticRegression()
thresholds = np.linspace(0, 1, 11)
tuned_model = TunedThresholdClassifierCV(
estimator,
thresholds=thresholds,
response_method="predict_proba",
store_cv_results=True,
).fit(X, y)
assert_allclose(tuned_model.cv_results_["thresholds"], thresholds)
@pytest.mark.parametrize("store_cv_results", [True, False])
def test_tuned_threshold_classifier_store_cv_results(store_cv_results):
"""Check that if `cv_results_` exists depending on `store_cv_results`."""
X, y = make_classification(random_state=0)
estimator = LogisticRegression()
tuned_model = TunedThresholdClassifierCV(
estimator, store_cv_results=store_cv_results
).fit(X, y)
if store_cv_results:
assert hasattr(tuned_model, "cv_results_")
else:
assert not hasattr(tuned_model, "cv_results_")
def test_tuned_threshold_classifier_cv_float():
"""Check the behaviour when `cv` is set to a float."""
X, y = make_classification(random_state=0)
# case where `refit=False` and cv is a float: the underlying estimator will be fit
# on the training set given by a ShuffleSplit. We check that we get the same model
# coefficients.
test_size = 0.3
estimator = LogisticRegression()
tuned_model = TunedThresholdClassifierCV(
estimator, cv=test_size, refit=False, random_state=0
).fit(X, y)
tuned_model.fit(X, y)
cv = StratifiedShuffleSplit(n_splits=1, test_size=test_size, random_state=0)
train_idx, val_idx = next(cv.split(X, y))
cloned_estimator = clone(estimator).fit(X[train_idx], y[train_idx])
assert_allclose(tuned_model.estimator_.coef_, cloned_estimator.coef_)
# case where `refit=True`, then the underlying estimator is fitted on the full
# dataset.
tuned_model.set_params(refit=True).fit(X, y)
cloned_estimator = clone(estimator).fit(X, y)
assert_allclose(tuned_model.estimator_.coef_, cloned_estimator.coef_)
def test_tuned_threshold_classifier_error_constant_predictor():
"""Check that we raise a ValueError if the underlying classifier returns constant
probabilities such that we cannot find any threshold.
"""
X, y = make_classification(random_state=0)
estimator = DummyClassifier(strategy="constant", constant=1)
tuned_model = TunedThresholdClassifierCV(estimator, response_method="predict_proba")
err_msg = "The provided estimator makes constant predictions"
with pytest.raises(ValueError, match=err_msg):
tuned_model.fit(X, y)
@pytest.mark.parametrize(
"response_method", ["auto", "predict_proba", "decision_function"]
)
def test_fixed_threshold_classifier_equivalence_default(response_method):
"""Check that `FixedThresholdClassifier` has the same behaviour as the vanilla
classifier.
"""
X, y = make_classification(random_state=0)
classifier = LogisticRegression().fit(X, y)
classifier_default_threshold = FixedThresholdClassifier(
estimator=clone(classifier), response_method=response_method
)
classifier_default_threshold.fit(X, y)
# emulate the response method that should take into account the `pos_label`
if response_method in ("auto", "predict_proba"):
y_score = classifier_default_threshold.predict_proba(X)[:, 1]
threshold = 0.5
else: # response_method == "decision_function"
y_score = classifier_default_threshold.decision_function(X)
threshold = 0.0
y_pred_lr = (y_score >= threshold).astype(int)
assert_allclose(classifier_default_threshold.predict(X), y_pred_lr)
@pytest.mark.parametrize(
"response_method, threshold", [("predict_proba", 0.7), ("decision_function", 2.0)]
)
@pytest.mark.parametrize("pos_label", [0, 1])
def test_fixed_threshold_classifier(response_method, threshold, pos_label):
"""Check that applying `predict` lead to the same prediction as applying the
threshold to the output of the response method.
"""
X, y = make_classification(n_samples=50, random_state=0)
logistic_regression = LogisticRegression().fit(X, y)
model = FixedThresholdClassifier(
estimator=clone(logistic_regression),
threshold=threshold,
response_method=response_method,
pos_label=pos_label,
).fit(X, y)
# check that the underlying estimator is the same
assert_allclose(model.estimator_.coef_, logistic_regression.coef_)
# emulate the response method that should take into account the `pos_label`
if response_method == "predict_proba":
y_score = model.predict_proba(X)[:, pos_label]
else: # response_method == "decision_function"
y_score = model.decision_function(X)
y_score = y_score if pos_label == 1 else -y_score
# create a mapping from boolean values to class labels
map_to_label = np.array([0, 1]) if pos_label == 1 else np.array([1, 0])
y_pred_lr = map_to_label[(y_score >= threshold).astype(int)]
assert_allclose(model.predict(X), y_pred_lr)
for method in ("predict_proba", "predict_log_proba", "decision_function"):
assert_allclose(
getattr(model, method)(X), getattr(logistic_regression, method)(X)
)
assert_allclose(
getattr(model.estimator_, method)(X),
getattr(logistic_regression, method)(X),
)
@pytest.mark.usefixtures("enable_slep006")
def test_fixed_threshold_classifier_metadata_routing():
"""Check that everything works with metadata routing."""
X, y = make_classification(random_state=0)
sample_weight = np.ones_like(y)
sample_weight[::2] = 2
classifier = LogisticRegression().set_fit_request(sample_weight=True)
classifier.fit(X, y, sample_weight=sample_weight)
classifier_default_threshold = FixedThresholdClassifier(estimator=clone(classifier))
classifier_default_threshold.fit(X, y, sample_weight=sample_weight)
assert_allclose(classifier_default_threshold.estimator_.coef_, classifier.coef_)