685 lines
25 KiB
Python
685 lines
25 KiB
Python
|
import numpy as np
|
||
|
import pytest
|
||
|
|
||
|
from sklearn.base import clone
|
||
|
from sklearn.datasets import (
|
||
|
load_breast_cancer,
|
||
|
load_iris,
|
||
|
make_classification,
|
||
|
make_multilabel_classification,
|
||
|
)
|
||
|
from sklearn.dummy import DummyClassifier
|
||
|
from sklearn.ensemble import GradientBoostingClassifier
|
||
|
from sklearn.exceptions import NotFittedError
|
||
|
from sklearn.linear_model import LogisticRegression
|
||
|
from sklearn.metrics import (
|
||
|
balanced_accuracy_score,
|
||
|
f1_score,
|
||
|
fbeta_score,
|
||
|
make_scorer,
|
||
|
recall_score,
|
||
|
)
|
||
|
from sklearn.model_selection import (
|
||
|
FixedThresholdClassifier,
|
||
|
StratifiedShuffleSplit,
|
||
|
TunedThresholdClassifierCV,
|
||
|
)
|
||
|
from sklearn.model_selection._classification_threshold import (
|
||
|
_CurveScorer,
|
||
|
_fit_and_score_over_thresholds,
|
||
|
)
|
||
|
from sklearn.pipeline import make_pipeline
|
||
|
from sklearn.preprocessing import StandardScaler
|
||
|
from sklearn.svm import SVC
|
||
|
from sklearn.tree import DecisionTreeClassifier
|
||
|
from sklearn.utils._mocking import CheckingClassifier
|
||
|
from sklearn.utils._testing import (
|
||
|
_convert_container,
|
||
|
assert_allclose,
|
||
|
assert_array_equal,
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_curve_scorer():
|
||
|
"""Check the behaviour of the `_CurveScorer` class."""
|
||
|
X, y = make_classification(random_state=0)
|
||
|
estimator = LogisticRegression().fit(X, y)
|
||
|
curve_scorer = _CurveScorer(
|
||
|
balanced_accuracy_score,
|
||
|
sign=1,
|
||
|
response_method="predict_proba",
|
||
|
thresholds=10,
|
||
|
kwargs={},
|
||
|
)
|
||
|
scores, thresholds = curve_scorer(estimator, X, y)
|
||
|
|
||
|
assert thresholds.shape == scores.shape
|
||
|
# check that the thresholds are probabilities with extreme values close to 0 and 1.
|
||
|
# they are not exactly 0 and 1 because they are the extremum of the
|
||
|
# `estimator.predict_proba(X)` values.
|
||
|
assert 0 <= thresholds.min() <= 0.01
|
||
|
assert 0.99 <= thresholds.max() <= 1
|
||
|
# balanced accuracy should be between 0.5 and 1 when it is not adjusted
|
||
|
assert 0.5 <= scores.min() <= 1
|
||
|
|
||
|
# check that passing kwargs to the scorer works
|
||
|
curve_scorer = _CurveScorer(
|
||
|
balanced_accuracy_score,
|
||
|
sign=1,
|
||
|
response_method="predict_proba",
|
||
|
thresholds=10,
|
||
|
kwargs={"adjusted": True},
|
||
|
)
|
||
|
scores, thresholds = curve_scorer(estimator, X, y)
|
||
|
|
||
|
# balanced accuracy should be between 0.5 and 1 when it is not adjusted
|
||
|
assert 0 <= scores.min() <= 0.5
|
||
|
|
||
|
# check that we can inverse the sign of the score when dealing with `neg_*` scorer
|
||
|
curve_scorer = _CurveScorer(
|
||
|
balanced_accuracy_score,
|
||
|
sign=-1,
|
||
|
response_method="predict_proba",
|
||
|
thresholds=10,
|
||
|
kwargs={"adjusted": True},
|
||
|
)
|
||
|
scores, thresholds = curve_scorer(estimator, X, y)
|
||
|
|
||
|
assert all(scores <= 0)
|
||
|
|
||
|
|
||
|
def test_curve_scorer_pos_label(global_random_seed):
|
||
|
"""Check that we propagate properly the `pos_label` parameter to the scorer."""
|
||
|
n_samples = 30
|
||
|
X, y = make_classification(
|
||
|
n_samples=n_samples, weights=[0.9, 0.1], random_state=global_random_seed
|
||
|
)
|
||
|
estimator = LogisticRegression().fit(X, y)
|
||
|
|
||
|
curve_scorer = _CurveScorer(
|
||
|
recall_score,
|
||
|
sign=1,
|
||
|
response_method="predict_proba",
|
||
|
thresholds=10,
|
||
|
kwargs={"pos_label": 1},
|
||
|
)
|
||
|
scores_pos_label_1, thresholds_pos_label_1 = curve_scorer(estimator, X, y)
|
||
|
|
||
|
curve_scorer = _CurveScorer(
|
||
|
recall_score,
|
||
|
sign=1,
|
||
|
response_method="predict_proba",
|
||
|
thresholds=10,
|
||
|
kwargs={"pos_label": 0},
|
||
|
)
|
||
|
scores_pos_label_0, thresholds_pos_label_0 = curve_scorer(estimator, X, y)
|
||
|
|
||
|
# Since `pos_label` is forwarded to the curve_scorer, the thresholds are not equal.
|
||
|
assert not (thresholds_pos_label_1 == thresholds_pos_label_0).all()
|
||
|
# The min-max range for the thresholds is defined by the probabilities of the
|
||
|
# `pos_label` class (the column of `predict_proba`).
|
||
|
y_pred = estimator.predict_proba(X)
|
||
|
assert thresholds_pos_label_0.min() == pytest.approx(y_pred.min(axis=0)[0])
|
||
|
assert thresholds_pos_label_0.max() == pytest.approx(y_pred.max(axis=0)[0])
|
||
|
assert thresholds_pos_label_1.min() == pytest.approx(y_pred.min(axis=0)[1])
|
||
|
assert thresholds_pos_label_1.max() == pytest.approx(y_pred.max(axis=0)[1])
|
||
|
|
||
|
# The recall cannot be negative and `pos_label=1` should have a higher recall
|
||
|
# since there is less samples to be considered.
|
||
|
assert 0.0 < scores_pos_label_0.min() < scores_pos_label_1.min()
|
||
|
assert scores_pos_label_0.max() == pytest.approx(1.0)
|
||
|
assert scores_pos_label_1.max() == pytest.approx(1.0)
|
||
|
|
||
|
|
||
|
def test_fit_and_score_over_thresholds_curve_scorers():
|
||
|
"""Check that `_fit_and_score_over_thresholds` returns thresholds in ascending order
|
||
|
for the different accepted curve scorers."""
|
||
|
X, y = make_classification(n_samples=100, random_state=0)
|
||
|
train_idx, val_idx = np.arange(50), np.arange(50, 100)
|
||
|
classifier = LogisticRegression()
|
||
|
|
||
|
curve_scorer = _CurveScorer(
|
||
|
score_func=balanced_accuracy_score,
|
||
|
sign=1,
|
||
|
response_method="predict_proba",
|
||
|
thresholds=10,
|
||
|
kwargs={},
|
||
|
)
|
||
|
scores, thresholds = _fit_and_score_over_thresholds(
|
||
|
classifier,
|
||
|
X,
|
||
|
y,
|
||
|
fit_params={},
|
||
|
train_idx=train_idx,
|
||
|
val_idx=val_idx,
|
||
|
curve_scorer=curve_scorer,
|
||
|
score_params={},
|
||
|
)
|
||
|
|
||
|
assert np.all(thresholds[:-1] <= thresholds[1:])
|
||
|
assert isinstance(scores, np.ndarray)
|
||
|
assert np.logical_and(scores >= 0, scores <= 1).all()
|
||
|
|
||
|
|
||
|
def test_fit_and_score_over_thresholds_prefit():
|
||
|
"""Check the behaviour with a prefit classifier."""
|
||
|
X, y = make_classification(n_samples=100, random_state=0)
|
||
|
|
||
|
# `train_idx is None` to indicate that the classifier is prefit
|
||
|
train_idx, val_idx = None, np.arange(50, 100)
|
||
|
classifier = DecisionTreeClassifier(random_state=0).fit(X, y)
|
||
|
# make sure that the classifier memorized the full dataset such that
|
||
|
# we get perfect predictions and thus match the expected score
|
||
|
assert classifier.score(X[val_idx], y[val_idx]) == pytest.approx(1.0)
|
||
|
|
||
|
curve_scorer = _CurveScorer(
|
||
|
score_func=balanced_accuracy_score,
|
||
|
sign=1,
|
||
|
response_method="predict_proba",
|
||
|
thresholds=2,
|
||
|
kwargs={},
|
||
|
)
|
||
|
scores, thresholds = _fit_and_score_over_thresholds(
|
||
|
classifier,
|
||
|
X,
|
||
|
y,
|
||
|
fit_params={},
|
||
|
train_idx=train_idx,
|
||
|
val_idx=val_idx,
|
||
|
curve_scorer=curve_scorer,
|
||
|
score_params={},
|
||
|
)
|
||
|
assert np.all(thresholds[:-1] <= thresholds[1:])
|
||
|
assert_allclose(scores, [0.5, 1.0])
|
||
|
|
||
|
|
||
|
@pytest.mark.usefixtures("enable_slep006")
|
||
|
def test_fit_and_score_over_thresholds_sample_weight():
|
||
|
"""Check that we dispatch the sample-weight to fit and score the classifier."""
|
||
|
X, y = load_iris(return_X_y=True)
|
||
|
X, y = X[:100], y[:100] # only 2 classes
|
||
|
|
||
|
# create a dataset and repeat twice the sample of class #0
|
||
|
X_repeated, y_repeated = np.vstack([X, X[y == 0]]), np.hstack([y, y[y == 0]])
|
||
|
# create a sample weight vector that is equivalent to the repeated dataset
|
||
|
sample_weight = np.ones_like(y)
|
||
|
sample_weight[:50] *= 2
|
||
|
|
||
|
classifier = LogisticRegression()
|
||
|
train_repeated_idx = np.arange(X_repeated.shape[0])
|
||
|
val_repeated_idx = np.arange(X_repeated.shape[0])
|
||
|
curve_scorer = _CurveScorer(
|
||
|
score_func=balanced_accuracy_score,
|
||
|
sign=1,
|
||
|
response_method="predict_proba",
|
||
|
thresholds=10,
|
||
|
kwargs={},
|
||
|
)
|
||
|
scores_repeated, thresholds_repeated = _fit_and_score_over_thresholds(
|
||
|
classifier,
|
||
|
X_repeated,
|
||
|
y_repeated,
|
||
|
fit_params={},
|
||
|
train_idx=train_repeated_idx,
|
||
|
val_idx=val_repeated_idx,
|
||
|
curve_scorer=curve_scorer,
|
||
|
score_params={},
|
||
|
)
|
||
|
|
||
|
train_idx, val_idx = np.arange(X.shape[0]), np.arange(X.shape[0])
|
||
|
scores, thresholds = _fit_and_score_over_thresholds(
|
||
|
classifier.set_fit_request(sample_weight=True),
|
||
|
X,
|
||
|
y,
|
||
|
fit_params={"sample_weight": sample_weight},
|
||
|
train_idx=train_idx,
|
||
|
val_idx=val_idx,
|
||
|
curve_scorer=curve_scorer.set_score_request(sample_weight=True),
|
||
|
score_params={"sample_weight": sample_weight},
|
||
|
)
|
||
|
|
||
|
assert_allclose(thresholds_repeated, thresholds)
|
||
|
assert_allclose(scores_repeated, scores)
|
||
|
|
||
|
|
||
|
@pytest.mark.usefixtures("enable_slep006")
|
||
|
@pytest.mark.parametrize("fit_params_type", ["list", "array"])
|
||
|
def test_fit_and_score_over_thresholds_fit_params(fit_params_type):
|
||
|
"""Check that we pass `fit_params` to the classifier when calling `fit`."""
|
||
|
X, y = make_classification(n_samples=100, random_state=0)
|
||
|
fit_params = {
|
||
|
"a": _convert_container(y, fit_params_type),
|
||
|
"b": _convert_container(y, fit_params_type),
|
||
|
}
|
||
|
|
||
|
classifier = CheckingClassifier(expected_fit_params=["a", "b"], random_state=0)
|
||
|
classifier.set_fit_request(a=True, b=True)
|
||
|
train_idx, val_idx = np.arange(50), np.arange(50, 100)
|
||
|
|
||
|
curve_scorer = _CurveScorer(
|
||
|
score_func=balanced_accuracy_score,
|
||
|
sign=1,
|
||
|
response_method="predict_proba",
|
||
|
thresholds=10,
|
||
|
kwargs={},
|
||
|
)
|
||
|
_fit_and_score_over_thresholds(
|
||
|
classifier,
|
||
|
X,
|
||
|
y,
|
||
|
fit_params=fit_params,
|
||
|
train_idx=train_idx,
|
||
|
val_idx=val_idx,
|
||
|
curve_scorer=curve_scorer,
|
||
|
score_params={},
|
||
|
)
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize(
|
||
|
"data",
|
||
|
[
|
||
|
make_classification(n_classes=3, n_clusters_per_class=1, random_state=0),
|
||
|
make_multilabel_classification(random_state=0),
|
||
|
],
|
||
|
)
|
||
|
def test_tuned_threshold_classifier_no_binary(data):
|
||
|
"""Check that we raise an informative error message for non-binary problem."""
|
||
|
err_msg = "Only binary classification is supported."
|
||
|
with pytest.raises(ValueError, match=err_msg):
|
||
|
TunedThresholdClassifierCV(LogisticRegression()).fit(*data)
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize(
|
||
|
"params, err_type, err_msg",
|
||
|
[
|
||
|
(
|
||
|
{"cv": "prefit", "refit": True},
|
||
|
ValueError,
|
||
|
"When cv='prefit', refit cannot be True.",
|
||
|
),
|
||
|
(
|
||
|
{"cv": 10, "refit": False},
|
||
|
ValueError,
|
||
|
"When cv has several folds, refit cannot be False.",
|
||
|
),
|
||
|
(
|
||
|
{"cv": "prefit", "refit": False},
|
||
|
NotFittedError,
|
||
|
"`estimator` must be fitted.",
|
||
|
),
|
||
|
],
|
||
|
)
|
||
|
def test_tuned_threshold_classifier_conflict_cv_refit(params, err_type, err_msg):
|
||
|
"""Check that we raise an informative error message when `cv` and `refit`
|
||
|
cannot be used together.
|
||
|
"""
|
||
|
X, y = make_classification(n_samples=100, random_state=0)
|
||
|
with pytest.raises(err_type, match=err_msg):
|
||
|
TunedThresholdClassifierCV(LogisticRegression(), **params).fit(X, y)
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize(
|
||
|
"estimator",
|
||
|
[LogisticRegression(), SVC(), GradientBoostingClassifier(n_estimators=4)],
|
||
|
)
|
||
|
@pytest.mark.parametrize(
|
||
|
"response_method", ["predict_proba", "predict_log_proba", "decision_function"]
|
||
|
)
|
||
|
@pytest.mark.parametrize(
|
||
|
"ThresholdClassifier", [FixedThresholdClassifier, TunedThresholdClassifierCV]
|
||
|
)
|
||
|
def test_threshold_classifier_estimator_response_methods(
|
||
|
ThresholdClassifier, estimator, response_method
|
||
|
):
|
||
|
"""Check that `TunedThresholdClassifierCV` exposes the same response methods as the
|
||
|
underlying estimator.
|
||
|
"""
|
||
|
X, y = make_classification(n_samples=100, random_state=0)
|
||
|
|
||
|
model = ThresholdClassifier(estimator=estimator)
|
||
|
assert hasattr(model, response_method) == hasattr(estimator, response_method)
|
||
|
|
||
|
model.fit(X, y)
|
||
|
assert hasattr(model, response_method) == hasattr(estimator, response_method)
|
||
|
|
||
|
if hasattr(model, response_method):
|
||
|
y_pred_cutoff = getattr(model, response_method)(X)
|
||
|
y_pred_underlying_estimator = getattr(model.estimator_, response_method)(X)
|
||
|
|
||
|
assert_allclose(y_pred_cutoff, y_pred_underlying_estimator)
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize(
|
||
|
"response_method", ["auto", "decision_function", "predict_proba"]
|
||
|
)
|
||
|
def test_tuned_threshold_classifier_without_constraint_value(response_method):
|
||
|
"""Check that `TunedThresholdClassifierCV` is optimizing a given objective
|
||
|
metric."""
|
||
|
X, y = load_breast_cancer(return_X_y=True)
|
||
|
# remove feature to degrade performances
|
||
|
X = X[:, :5]
|
||
|
|
||
|
# make the problem completely imbalanced such that the balanced accuracy is low
|
||
|
indices_pos = np.flatnonzero(y == 1)
|
||
|
indices_pos = indices_pos[: indices_pos.size // 50]
|
||
|
indices_neg = np.flatnonzero(y == 0)
|
||
|
|
||
|
X = np.vstack([X[indices_neg], X[indices_pos]])
|
||
|
y = np.hstack([y[indices_neg], y[indices_pos]])
|
||
|
|
||
|
lr = make_pipeline(StandardScaler(), LogisticRegression()).fit(X, y)
|
||
|
thresholds = 100
|
||
|
model = TunedThresholdClassifierCV(
|
||
|
estimator=lr,
|
||
|
scoring="balanced_accuracy",
|
||
|
response_method=response_method,
|
||
|
thresholds=thresholds,
|
||
|
store_cv_results=True,
|
||
|
)
|
||
|
score_optimized = balanced_accuracy_score(y, model.fit(X, y).predict(X))
|
||
|
score_baseline = balanced_accuracy_score(y, lr.predict(X))
|
||
|
assert score_optimized > score_baseline
|
||
|
assert model.cv_results_["thresholds"].shape == (thresholds,)
|
||
|
assert model.cv_results_["scores"].shape == (thresholds,)
|
||
|
|
||
|
|
||
|
def test_tuned_threshold_classifier_metric_with_parameter():
|
||
|
"""Check that we can pass a metric with a parameter in addition check that
|
||
|
`f_beta` with `beta=1` is equivalent to `f1` and different from `f_beta` with
|
||
|
`beta=2`.
|
||
|
"""
|
||
|
X, y = load_breast_cancer(return_X_y=True)
|
||
|
lr = make_pipeline(StandardScaler(), LogisticRegression()).fit(X, y)
|
||
|
model_fbeta_1 = TunedThresholdClassifierCV(
|
||
|
estimator=lr, scoring=make_scorer(fbeta_score, beta=1)
|
||
|
).fit(X, y)
|
||
|
model_fbeta_2 = TunedThresholdClassifierCV(
|
||
|
estimator=lr, scoring=make_scorer(fbeta_score, beta=2)
|
||
|
).fit(X, y)
|
||
|
model_f1 = TunedThresholdClassifierCV(
|
||
|
estimator=lr, scoring=make_scorer(f1_score)
|
||
|
).fit(X, y)
|
||
|
|
||
|
assert model_fbeta_1.best_threshold_ == pytest.approx(model_f1.best_threshold_)
|
||
|
assert model_fbeta_1.best_threshold_ != pytest.approx(model_fbeta_2.best_threshold_)
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize(
|
||
|
"response_method", ["auto", "decision_function", "predict_proba"]
|
||
|
)
|
||
|
@pytest.mark.parametrize(
|
||
|
"metric",
|
||
|
[
|
||
|
make_scorer(balanced_accuracy_score),
|
||
|
make_scorer(f1_score, pos_label="cancer"),
|
||
|
],
|
||
|
)
|
||
|
def test_tuned_threshold_classifier_with_string_targets(response_method, metric):
|
||
|
"""Check that targets represented by str are properly managed.
|
||
|
Also, check with several metrics to be sure that `pos_label` is properly
|
||
|
dispatched.
|
||
|
"""
|
||
|
X, y = load_breast_cancer(return_X_y=True)
|
||
|
# Encode numeric targets by meaningful strings. We purposely designed the class
|
||
|
# names such that the `pos_label` is the first alphabetically sorted class and thus
|
||
|
# encoded as 0.
|
||
|
classes = np.array(["cancer", "healthy"], dtype=object)
|
||
|
y = classes[y]
|
||
|
model = TunedThresholdClassifierCV(
|
||
|
estimator=make_pipeline(StandardScaler(), LogisticRegression()),
|
||
|
scoring=metric,
|
||
|
response_method=response_method,
|
||
|
thresholds=100,
|
||
|
).fit(X, y)
|
||
|
assert_array_equal(model.classes_, np.sort(classes))
|
||
|
y_pred = model.predict(X)
|
||
|
assert_array_equal(np.unique(y_pred), np.sort(classes))
|
||
|
|
||
|
|
||
|
@pytest.mark.usefixtures("enable_slep006")
|
||
|
@pytest.mark.parametrize("with_sample_weight", [True, False])
|
||
|
def test_tuned_threshold_classifier_refit(with_sample_weight, global_random_seed):
|
||
|
"""Check the behaviour of the `refit` parameter."""
|
||
|
rng = np.random.RandomState(global_random_seed)
|
||
|
X, y = make_classification(n_samples=100, random_state=0)
|
||
|
if with_sample_weight:
|
||
|
sample_weight = rng.randn(X.shape[0])
|
||
|
sample_weight = np.abs(sample_weight, out=sample_weight)
|
||
|
else:
|
||
|
sample_weight = None
|
||
|
|
||
|
# check that `estimator_` if fitted on the full dataset when `refit=True`
|
||
|
estimator = LogisticRegression().set_fit_request(sample_weight=True)
|
||
|
model = TunedThresholdClassifierCV(estimator, refit=True).fit(
|
||
|
X, y, sample_weight=sample_weight
|
||
|
)
|
||
|
|
||
|
assert model.estimator_ is not estimator
|
||
|
estimator.fit(X, y, sample_weight=sample_weight)
|
||
|
assert_allclose(model.estimator_.coef_, estimator.coef_)
|
||
|
assert_allclose(model.estimator_.intercept_, estimator.intercept_)
|
||
|
|
||
|
# check that `estimator_` was not altered when `refit=False` and `cv="prefit"`
|
||
|
estimator = LogisticRegression().set_fit_request(sample_weight=True)
|
||
|
estimator.fit(X, y, sample_weight=sample_weight)
|
||
|
coef = estimator.coef_.copy()
|
||
|
model = TunedThresholdClassifierCV(estimator, cv="prefit", refit=False).fit(
|
||
|
X, y, sample_weight=sample_weight
|
||
|
)
|
||
|
|
||
|
assert model.estimator_ is estimator
|
||
|
assert_allclose(model.estimator_.coef_, coef)
|
||
|
|
||
|
# check that we train `estimator_` on the training split of a given cross-validation
|
||
|
estimator = LogisticRegression().set_fit_request(sample_weight=True)
|
||
|
cv = [
|
||
|
(np.arange(50), np.arange(50, 100)),
|
||
|
] # single split
|
||
|
model = TunedThresholdClassifierCV(estimator, cv=cv, refit=False).fit(
|
||
|
X, y, sample_weight=sample_weight
|
||
|
)
|
||
|
|
||
|
assert model.estimator_ is not estimator
|
||
|
if with_sample_weight:
|
||
|
sw_train = sample_weight[cv[0][0]]
|
||
|
else:
|
||
|
sw_train = None
|
||
|
estimator.fit(X[cv[0][0]], y[cv[0][0]], sample_weight=sw_train)
|
||
|
assert_allclose(model.estimator_.coef_, estimator.coef_)
|
||
|
|
||
|
|
||
|
@pytest.mark.usefixtures("enable_slep006")
|
||
|
@pytest.mark.parametrize("fit_params_type", ["list", "array"])
|
||
|
def test_tuned_threshold_classifier_fit_params(fit_params_type):
|
||
|
"""Check that we pass `fit_params` to the classifier when calling `fit`."""
|
||
|
X, y = make_classification(n_samples=100, random_state=0)
|
||
|
fit_params = {
|
||
|
"a": _convert_container(y, fit_params_type),
|
||
|
"b": _convert_container(y, fit_params_type),
|
||
|
}
|
||
|
|
||
|
classifier = CheckingClassifier(expected_fit_params=["a", "b"], random_state=0)
|
||
|
classifier.set_fit_request(a=True, b=True)
|
||
|
model = TunedThresholdClassifierCV(classifier)
|
||
|
model.fit(X, y, **fit_params)
|
||
|
|
||
|
|
||
|
@pytest.mark.usefixtures("enable_slep006")
|
||
|
def test_tuned_threshold_classifier_cv_zeros_sample_weights_equivalence():
|
||
|
"""Check that passing removing some sample from the dataset `X` is
|
||
|
equivalent to passing a `sample_weight` with a factor 0."""
|
||
|
X, y = load_iris(return_X_y=True)
|
||
|
# Scale the data to avoid any convergence issue
|
||
|
X = StandardScaler().fit_transform(X)
|
||
|
# Only use 2 classes and select samples such that 2-fold cross-validation
|
||
|
# split will lead to an equivalence with a `sample_weight` of 0
|
||
|
X = np.vstack((X[:40], X[50:90]))
|
||
|
y = np.hstack((y[:40], y[50:90]))
|
||
|
sample_weight = np.zeros_like(y)
|
||
|
sample_weight[::2] = 1
|
||
|
|
||
|
estimator = LogisticRegression().set_fit_request(sample_weight=True)
|
||
|
model_without_weights = TunedThresholdClassifierCV(estimator, cv=2)
|
||
|
model_with_weights = clone(model_without_weights)
|
||
|
|
||
|
model_with_weights.fit(X, y, sample_weight=sample_weight)
|
||
|
model_without_weights.fit(X[::2], y[::2])
|
||
|
|
||
|
assert_allclose(
|
||
|
model_with_weights.estimator_.coef_, model_without_weights.estimator_.coef_
|
||
|
)
|
||
|
|
||
|
y_pred_with_weights = model_with_weights.predict_proba(X)
|
||
|
y_pred_without_weights = model_without_weights.predict_proba(X)
|
||
|
assert_allclose(y_pred_with_weights, y_pred_without_weights)
|
||
|
|
||
|
|
||
|
def test_tuned_threshold_classifier_thresholds_array():
|
||
|
"""Check that we can pass an array to `thresholds` and it is used as candidate
|
||
|
threshold internally."""
|
||
|
X, y = make_classification(random_state=0)
|
||
|
estimator = LogisticRegression()
|
||
|
thresholds = np.linspace(0, 1, 11)
|
||
|
tuned_model = TunedThresholdClassifierCV(
|
||
|
estimator,
|
||
|
thresholds=thresholds,
|
||
|
response_method="predict_proba",
|
||
|
store_cv_results=True,
|
||
|
).fit(X, y)
|
||
|
assert_allclose(tuned_model.cv_results_["thresholds"], thresholds)
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize("store_cv_results", [True, False])
|
||
|
def test_tuned_threshold_classifier_store_cv_results(store_cv_results):
|
||
|
"""Check that if `cv_results_` exists depending on `store_cv_results`."""
|
||
|
X, y = make_classification(random_state=0)
|
||
|
estimator = LogisticRegression()
|
||
|
tuned_model = TunedThresholdClassifierCV(
|
||
|
estimator, store_cv_results=store_cv_results
|
||
|
).fit(X, y)
|
||
|
if store_cv_results:
|
||
|
assert hasattr(tuned_model, "cv_results_")
|
||
|
else:
|
||
|
assert not hasattr(tuned_model, "cv_results_")
|
||
|
|
||
|
|
||
|
def test_tuned_threshold_classifier_cv_float():
|
||
|
"""Check the behaviour when `cv` is set to a float."""
|
||
|
X, y = make_classification(random_state=0)
|
||
|
|
||
|
# case where `refit=False` and cv is a float: the underlying estimator will be fit
|
||
|
# on the training set given by a ShuffleSplit. We check that we get the same model
|
||
|
# coefficients.
|
||
|
test_size = 0.3
|
||
|
estimator = LogisticRegression()
|
||
|
tuned_model = TunedThresholdClassifierCV(
|
||
|
estimator, cv=test_size, refit=False, random_state=0
|
||
|
).fit(X, y)
|
||
|
tuned_model.fit(X, y)
|
||
|
|
||
|
cv = StratifiedShuffleSplit(n_splits=1, test_size=test_size, random_state=0)
|
||
|
train_idx, val_idx = next(cv.split(X, y))
|
||
|
cloned_estimator = clone(estimator).fit(X[train_idx], y[train_idx])
|
||
|
|
||
|
assert_allclose(tuned_model.estimator_.coef_, cloned_estimator.coef_)
|
||
|
|
||
|
# case where `refit=True`, then the underlying estimator is fitted on the full
|
||
|
# dataset.
|
||
|
tuned_model.set_params(refit=True).fit(X, y)
|
||
|
cloned_estimator = clone(estimator).fit(X, y)
|
||
|
|
||
|
assert_allclose(tuned_model.estimator_.coef_, cloned_estimator.coef_)
|
||
|
|
||
|
|
||
|
def test_tuned_threshold_classifier_error_constant_predictor():
|
||
|
"""Check that we raise a ValueError if the underlying classifier returns constant
|
||
|
probabilities such that we cannot find any threshold.
|
||
|
"""
|
||
|
X, y = make_classification(random_state=0)
|
||
|
estimator = DummyClassifier(strategy="constant", constant=1)
|
||
|
tuned_model = TunedThresholdClassifierCV(estimator, response_method="predict_proba")
|
||
|
err_msg = "The provided estimator makes constant predictions"
|
||
|
with pytest.raises(ValueError, match=err_msg):
|
||
|
tuned_model.fit(X, y)
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize(
|
||
|
"response_method", ["auto", "predict_proba", "decision_function"]
|
||
|
)
|
||
|
def test_fixed_threshold_classifier_equivalence_default(response_method):
|
||
|
"""Check that `FixedThresholdClassifier` has the same behaviour as the vanilla
|
||
|
classifier.
|
||
|
"""
|
||
|
X, y = make_classification(random_state=0)
|
||
|
classifier = LogisticRegression().fit(X, y)
|
||
|
classifier_default_threshold = FixedThresholdClassifier(
|
||
|
estimator=clone(classifier), response_method=response_method
|
||
|
)
|
||
|
classifier_default_threshold.fit(X, y)
|
||
|
|
||
|
# emulate the response method that should take into account the `pos_label`
|
||
|
if response_method in ("auto", "predict_proba"):
|
||
|
y_score = classifier_default_threshold.predict_proba(X)[:, 1]
|
||
|
threshold = 0.5
|
||
|
else: # response_method == "decision_function"
|
||
|
y_score = classifier_default_threshold.decision_function(X)
|
||
|
threshold = 0.0
|
||
|
|
||
|
y_pred_lr = (y_score >= threshold).astype(int)
|
||
|
assert_allclose(classifier_default_threshold.predict(X), y_pred_lr)
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize(
|
||
|
"response_method, threshold", [("predict_proba", 0.7), ("decision_function", 2.0)]
|
||
|
)
|
||
|
@pytest.mark.parametrize("pos_label", [0, 1])
|
||
|
def test_fixed_threshold_classifier(response_method, threshold, pos_label):
|
||
|
"""Check that applying `predict` lead to the same prediction as applying the
|
||
|
threshold to the output of the response method.
|
||
|
"""
|
||
|
X, y = make_classification(n_samples=50, random_state=0)
|
||
|
logistic_regression = LogisticRegression().fit(X, y)
|
||
|
model = FixedThresholdClassifier(
|
||
|
estimator=clone(logistic_regression),
|
||
|
threshold=threshold,
|
||
|
response_method=response_method,
|
||
|
pos_label=pos_label,
|
||
|
).fit(X, y)
|
||
|
|
||
|
# check that the underlying estimator is the same
|
||
|
assert_allclose(model.estimator_.coef_, logistic_regression.coef_)
|
||
|
|
||
|
# emulate the response method that should take into account the `pos_label`
|
||
|
if response_method == "predict_proba":
|
||
|
y_score = model.predict_proba(X)[:, pos_label]
|
||
|
else: # response_method == "decision_function"
|
||
|
y_score = model.decision_function(X)
|
||
|
y_score = y_score if pos_label == 1 else -y_score
|
||
|
|
||
|
# create a mapping from boolean values to class labels
|
||
|
map_to_label = np.array([0, 1]) if pos_label == 1 else np.array([1, 0])
|
||
|
y_pred_lr = map_to_label[(y_score >= threshold).astype(int)]
|
||
|
assert_allclose(model.predict(X), y_pred_lr)
|
||
|
|
||
|
for method in ("predict_proba", "predict_log_proba", "decision_function"):
|
||
|
assert_allclose(
|
||
|
getattr(model, method)(X), getattr(logistic_regression, method)(X)
|
||
|
)
|
||
|
assert_allclose(
|
||
|
getattr(model.estimator_, method)(X),
|
||
|
getattr(logistic_regression, method)(X),
|
||
|
)
|
||
|
|
||
|
|
||
|
@pytest.mark.usefixtures("enable_slep006")
|
||
|
def test_fixed_threshold_classifier_metadata_routing():
|
||
|
"""Check that everything works with metadata routing."""
|
||
|
X, y = make_classification(random_state=0)
|
||
|
sample_weight = np.ones_like(y)
|
||
|
sample_weight[::2] = 2
|
||
|
classifier = LogisticRegression().set_fit_request(sample_weight=True)
|
||
|
classifier.fit(X, y, sample_weight=sample_weight)
|
||
|
classifier_default_threshold = FixedThresholdClassifier(estimator=clone(classifier))
|
||
|
classifier_default_threshold.fit(X, y, sample_weight=sample_weight)
|
||
|
assert_allclose(classifier_default_threshold.estimator_.coef_, classifier.coef_)
|