2744 lines
91 KiB
2744 lines
91 KiB
Test the ColumnTransformer.
import pickle
import re
import warnings
from unittest.mock import Mock
import joblib
import numpy as np
import pytest
from numpy.testing import assert_allclose
from scipy import sparse
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import (
from sklearn.compose._column_transformer import _RemainderColsList
from sklearn.exceptions import NotFittedError
from sklearn.feature_selection import VarianceThreshold
from sklearn.preprocessing import (
from sklearn.tests.metadata_routing_common import (
from sklearn.utils._testing import (
from sklearn.utils.fixes import CSR_CONTAINERS, parse_version
class Trans(TransformerMixin, BaseEstimator):
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
# 1D Series -> 2D DataFrame
if hasattr(X, "to_frame"):
return X.to_frame()
# 1D array -> 2D array
if getattr(X, "ndim", 2) == 1:
return np.atleast_2d(X).T
return X
class DoubleTrans(BaseEstimator):
def fit(self, X, y=None):
return self
def transform(self, X):
return 2 * X
class SparseMatrixTrans(BaseEstimator):
def __init__(self, csr_container):
self.csr_container = csr_container
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
n_samples = len(X)
return self.csr_container(sparse.eye(n_samples, n_samples))
class TransNo2D(BaseEstimator):
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
return X
class TransRaise(BaseEstimator):
def fit(self, X, y=None):
raise ValueError("specific message")
def transform(self, X, y=None):
raise ValueError("specific message")
def test_column_transformer():
X_array = np.array([[0, 1, 2], [2, 4, 6]]).T
X_res_first1D = np.array([0, 1, 2])
X_res_second1D = np.array([2, 4, 6])
X_res_first = X_res_first1D.reshape(-1, 1)
X_res_both = X_array
cases = [
# single column 1D / 2D
(0, X_res_first),
([0], X_res_first),
# list-like
([0, 1], X_res_both),
(np.array([0, 1]), X_res_both),
# slice
(slice(0, 1), X_res_first),
(slice(0, 2), X_res_both),
# boolean mask
(np.array([True, False]), X_res_first),
([True, False], X_res_first),
(np.array([True, True]), X_res_both),
([True, True], X_res_both),
for selection, res in cases:
ct = ColumnTransformer([("trans", Trans(), selection)], remainder="drop")
assert_array_equal(ct.fit_transform(X_array), res)
assert_array_equal(ct.fit(X_array).transform(X_array), res)
# callable that returns any of the allowed specifiers
ct = ColumnTransformer(
[("trans", Trans(), lambda x: selection)], remainder="drop"
assert_array_equal(ct.fit_transform(X_array), res)
assert_array_equal(ct.fit(X_array).transform(X_array), res)
ct = ColumnTransformer([("trans1", Trans(), [0]), ("trans2", Trans(), [1])])
assert_array_equal(ct.fit_transform(X_array), X_res_both)
assert_array_equal(ct.fit(X_array).transform(X_array), X_res_both)
assert len(ct.transformers_) == 2
# test with transformer_weights
transformer_weights = {"trans1": 0.1, "trans2": 10}
both = ColumnTransformer(
[("trans1", Trans(), [0]), ("trans2", Trans(), [1])],
res = np.vstack(
transformer_weights["trans1"] * X_res_first1D,
transformer_weights["trans2"] * X_res_second1D,
assert_array_equal(both.fit_transform(X_array), res)
assert_array_equal(both.fit(X_array).transform(X_array), res)
assert len(both.transformers_) == 2
both = ColumnTransformer(
[("trans", Trans(), [0, 1])], transformer_weights={"trans": 0.1}
assert_array_equal(both.fit_transform(X_array), 0.1 * X_res_both)
assert_array_equal(both.fit(X_array).transform(X_array), 0.1 * X_res_both)
assert len(both.transformers_) == 1
def test_column_transformer_tuple_transformers_parameter():
X_array = np.array([[0, 1, 2], [2, 4, 6]]).T
transformers = [("trans1", Trans(), [0]), ("trans2", Trans(), [1])]
ct_with_list = ColumnTransformer(transformers)
ct_with_tuple = ColumnTransformer(tuple(transformers))
ct_with_list.fit_transform(X_array), ct_with_tuple.fit_transform(X_array)
@pytest.mark.parametrize("constructor_name", ["dataframe", "polars"])
def test_column_transformer_dataframe(constructor_name):
if constructor_name == "dataframe":
dataframe_lib = pytest.importorskip("pandas")
dataframe_lib = pytest.importorskip(constructor_name)
X_array = np.array([[0, 1, 2], [2, 4, 6]]).T
X_df = _convert_container(
X_array, constructor_name, columns_name=["first", "second"]
X_res_first = np.array([0, 1, 2]).reshape(-1, 1)
X_res_both = X_array
cases = [
# String keys: label based
# list
(["first"], X_res_first),
(["first", "second"], X_res_both),
# slice
(slice("first", "second"), X_res_both),
# int keys: positional
# list
([0], X_res_first),
([0, 1], X_res_both),
(np.array([0, 1]), X_res_both),
# slice
(slice(0, 1), X_res_first),
(slice(0, 2), X_res_both),
# boolean mask
(np.array([True, False]), X_res_first),
([True, False], X_res_first),
if constructor_name == "dataframe":
# Scalars are only supported for pandas dataframes.
# scalar
(0, X_res_first),
("first", X_res_first),
dataframe_lib.Series([True, False], index=["first", "second"]),
for selection, res in cases:
ct = ColumnTransformer([("trans", Trans(), selection)], remainder="drop")
assert_array_equal(ct.fit_transform(X_df), res)
assert_array_equal(ct.fit(X_df).transform(X_df), res)
# callable that returns any of the allowed specifiers
ct = ColumnTransformer(
[("trans", Trans(), lambda X: selection)], remainder="drop"
assert_array_equal(ct.fit_transform(X_df), res)
assert_array_equal(ct.fit(X_df).transform(X_df), res)
ct = ColumnTransformer(
[("trans1", Trans(), ["first"]), ("trans2", Trans(), ["second"])]
assert_array_equal(ct.fit_transform(X_df), X_res_both)
assert_array_equal(ct.fit(X_df).transform(X_df), X_res_both)
assert len(ct.transformers_) == 2
assert ct.transformers_[-1][0] != "remainder"
ct = ColumnTransformer([("trans1", Trans(), [0]), ("trans2", Trans(), [1])])
assert_array_equal(ct.fit_transform(X_df), X_res_both)
assert_array_equal(ct.fit(X_df).transform(X_df), X_res_both)
assert len(ct.transformers_) == 2
assert ct.transformers_[-1][0] != "remainder"
# test with transformer_weights
transformer_weights = {"trans1": 0.1, "trans2": 10}
both = ColumnTransformer(
[("trans1", Trans(), ["first"]), ("trans2", Trans(), ["second"])],
res = np.vstack(
transformer_weights["trans1"] * X_df["first"],
transformer_weights["trans2"] * X_df["second"],
assert_array_equal(both.fit_transform(X_df), res)
assert_array_equal(both.fit(X_df).transform(X_df), res)
assert len(both.transformers_) == 2
assert both.transformers_[-1][0] != "remainder"
# test multiple columns
both = ColumnTransformer(
[("trans", Trans(), ["first", "second"])], transformer_weights={"trans": 0.1}
assert_array_equal(both.fit_transform(X_df), 0.1 * X_res_both)
assert_array_equal(both.fit(X_df).transform(X_df), 0.1 * X_res_both)
assert len(both.transformers_) == 1
assert both.transformers_[-1][0] != "remainder"
both = ColumnTransformer(
[("trans", Trans(), [0, 1])], transformer_weights={"trans": 0.1}
assert_array_equal(both.fit_transform(X_df), 0.1 * X_res_both)
assert_array_equal(both.fit(X_df).transform(X_df), 0.1 * X_res_both)
assert len(both.transformers_) == 1
assert both.transformers_[-1][0] != "remainder"
# ensure pandas object is passed through
class TransAssert(BaseEstimator):
def __init__(self, expected_type_transform):
self.expected_type_transform = expected_type_transform
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
assert isinstance(X, self.expected_type_transform)
if isinstance(X, dataframe_lib.Series):
X = X.to_frame()
return X
ct = ColumnTransformer(
["first", "second"],
if constructor_name == "dataframe":
# DataFrame protocol does not have 1d columns, so we only test on Pandas
# dataframes.
ct = ColumnTransformer(
# Only test on pandas because the dataframe protocol requires string column
# names
# integer column spec + integer column names -> still use positional
X_df2 = X_df.copy()
X_df2.columns = [1, 0]
ct = ColumnTransformer([("trans", Trans(), 0)], remainder="drop")
assert_array_equal(ct.fit_transform(X_df2), X_res_first)
assert_array_equal(ct.fit(X_df2).transform(X_df2), X_res_first)
assert len(ct.transformers_) == 2
assert ct.transformers_[-1][0] == "remainder"
assert ct.transformers_[-1][1] == "drop"
assert_array_equal(ct.transformers_[-1][2], [1])
@pytest.mark.parametrize("pandas", [True, False], ids=["pandas", "numpy"])
[[], np.array([False, False]), [False, False]],
ids=["list", "bool", "bool_int"],
@pytest.mark.parametrize("callable_column", [False, True])
def test_column_transformer_empty_columns(pandas, column_selection, callable_column):
# test case that ensures that the column transformer does also work when
# a given transformer doesn't have any columns to work on
X_array = np.array([[0, 1, 2], [2, 4, 6]]).T
X_res_both = X_array
if pandas:
pd = pytest.importorskip("pandas")
X = pd.DataFrame(X_array, columns=["first", "second"])
X = X_array
if callable_column:
column = lambda X: column_selection # noqa
column = column_selection
ct = ColumnTransformer(
[("trans1", Trans(), [0, 1]), ("trans2", TransRaise(), column)]
assert_array_equal(ct.fit_transform(X), X_res_both)
assert_array_equal(ct.fit(X).transform(X), X_res_both)
assert len(ct.transformers_) == 2
assert isinstance(ct.transformers_[1][1], TransRaise)
ct = ColumnTransformer(
[("trans1", TransRaise(), column), ("trans2", Trans(), [0, 1])]
assert_array_equal(ct.fit_transform(X), X_res_both)
assert_array_equal(ct.fit(X).transform(X), X_res_both)
assert len(ct.transformers_) == 2
assert isinstance(ct.transformers_[0][1], TransRaise)
ct = ColumnTransformer([("trans", TransRaise(), column)], remainder="passthrough")
assert_array_equal(ct.fit_transform(X), X_res_both)
assert_array_equal(ct.fit(X).transform(X), X_res_both)
assert len(ct.transformers_) == 2 # including remainder
assert isinstance(ct.transformers_[0][1], TransRaise)
fixture = np.array([[], [], []])
ct = ColumnTransformer([("trans", TransRaise(), column)], remainder="drop")
assert_array_equal(ct.fit_transform(X), fixture)
assert_array_equal(ct.fit(X).transform(X), fixture)
assert len(ct.transformers_) == 2 # including remainder
assert isinstance(ct.transformers_[0][1], TransRaise)
def test_column_transformer_output_indices():
# Checks for the output_indices_ attribute
X_array = np.arange(6).reshape(3, 2)
ct = ColumnTransformer([("trans1", Trans(), [0]), ("trans2", Trans(), [1])])
X_trans = ct.fit_transform(X_array)
assert ct.output_indices_ == {
"trans1": slice(0, 1),
"trans2": slice(1, 2),
"remainder": slice(0, 0),
assert_array_equal(X_trans[:, [0]], X_trans[:, ct.output_indices_["trans1"]])
assert_array_equal(X_trans[:, [1]], X_trans[:, ct.output_indices_["trans2"]])
# test with transformer_weights and multiple columns
ct = ColumnTransformer(
[("trans", Trans(), [0, 1])], transformer_weights={"trans": 0.1}
X_trans = ct.fit_transform(X_array)
assert ct.output_indices_ == {"trans": slice(0, 2), "remainder": slice(0, 0)}
assert_array_equal(X_trans[:, [0, 1]], X_trans[:, ct.output_indices_["trans"]])
assert_array_equal(X_trans[:, []], X_trans[:, ct.output_indices_["remainder"]])
# test case that ensures that the attribute does also work when
# a given transformer doesn't have any columns to work on
ct = ColumnTransformer([("trans1", Trans(), [0, 1]), ("trans2", TransRaise(), [])])
X_trans = ct.fit_transform(X_array)
assert ct.output_indices_ == {
"trans1": slice(0, 2),
"trans2": slice(0, 0),
"remainder": slice(0, 0),
assert_array_equal(X_trans[:, [0, 1]], X_trans[:, ct.output_indices_["trans1"]])
assert_array_equal(X_trans[:, []], X_trans[:, ct.output_indices_["trans2"]])
assert_array_equal(X_trans[:, []], X_trans[:, ct.output_indices_["remainder"]])
ct = ColumnTransformer([("trans", TransRaise(), [])], remainder="passthrough")
X_trans = ct.fit_transform(X_array)
assert ct.output_indices_ == {"trans": slice(0, 0), "remainder": slice(0, 2)}
assert_array_equal(X_trans[:, []], X_trans[:, ct.output_indices_["trans"]])
assert_array_equal(X_trans[:, [0, 1]], X_trans[:, ct.output_indices_["remainder"]])
def test_column_transformer_output_indices_df():
# Checks for the output_indices_ attribute with data frames
pd = pytest.importorskip("pandas")
X_df = pd.DataFrame(np.arange(6).reshape(3, 2), columns=["first", "second"])
ct = ColumnTransformer(
[("trans1", Trans(), ["first"]), ("trans2", Trans(), ["second"])]
X_trans = ct.fit_transform(X_df)
assert ct.output_indices_ == {
"trans1": slice(0, 1),
"trans2": slice(1, 2),
"remainder": slice(0, 0),
assert_array_equal(X_trans[:, [0]], X_trans[:, ct.output_indices_["trans1"]])
assert_array_equal(X_trans[:, [1]], X_trans[:, ct.output_indices_["trans2"]])
assert_array_equal(X_trans[:, []], X_trans[:, ct.output_indices_["remainder"]])
ct = ColumnTransformer([("trans1", Trans(), [0]), ("trans2", Trans(), [1])])
X_trans = ct.fit_transform(X_df)
assert ct.output_indices_ == {
"trans1": slice(0, 1),
"trans2": slice(1, 2),
"remainder": slice(0, 0),
assert_array_equal(X_trans[:, [0]], X_trans[:, ct.output_indices_["trans1"]])
assert_array_equal(X_trans[:, [1]], X_trans[:, ct.output_indices_["trans2"]])
assert_array_equal(X_trans[:, []], X_trans[:, ct.output_indices_["remainder"]])
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_column_transformer_sparse_array(csr_container):
X_sparse = csr_container(sparse.eye(3, 2))
# no distinction between 1D and 2D
X_res_first = X_sparse[:, [0]]
X_res_both = X_sparse
for col in [(0,), [0], slice(0, 1)]:
for remainder, res in [("drop", X_res_first), ("passthrough", X_res_both)]:
ct = ColumnTransformer(
[("trans", Trans(), col)], remainder=remainder, sparse_threshold=0.8
assert sparse.issparse(ct.fit_transform(X_sparse))
assert_allclose_dense_sparse(ct.fit_transform(X_sparse), res)
assert_allclose_dense_sparse(ct.fit(X_sparse).transform(X_sparse), res)
for col in [[0, 1], slice(0, 2)]:
ct = ColumnTransformer([("trans", Trans(), col)], sparse_threshold=0.8)
assert sparse.issparse(ct.fit_transform(X_sparse))
assert_allclose_dense_sparse(ct.fit_transform(X_sparse), X_res_both)
assert_allclose_dense_sparse(ct.fit(X_sparse).transform(X_sparse), X_res_both)
def test_column_transformer_list():
X_list = [[1, float("nan"), "a"], [0, 0, "b"]]
expected_result = np.array(
[1, float("nan"), 1, 0],
[-1, 0, 0, 1],
ct = ColumnTransformer(
("numerical", StandardScaler(), [0, 1]),
("categorical", OneHotEncoder(), [2]),
assert_array_equal(ct.fit_transform(X_list), expected_result)
assert_array_equal(ct.fit(X_list).transform(X_list), expected_result)
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_column_transformer_sparse_stacking(csr_container):
X_array = np.array([[0, 1, 2], [2, 4, 6]]).T
col_trans = ColumnTransformer(
[("trans1", Trans(), [0]), ("trans2", SparseMatrixTrans(csr_container), 1)],
X_trans = col_trans.transform(X_array)
assert sparse.issparse(X_trans)
assert X_trans.shape == (X_trans.shape[0], X_trans.shape[0] + 1)
assert_array_equal(X_trans.toarray()[:, 1:], np.eye(X_trans.shape[0]))
assert len(col_trans.transformers_) == 2
assert col_trans.transformers_[-1][0] != "remainder"
col_trans = ColumnTransformer(
[("trans1", Trans(), [0]), ("trans2", SparseMatrixTrans(csr_container), 1)],
X_trans = col_trans.transform(X_array)
assert not sparse.issparse(X_trans)
assert X_trans.shape == (X_trans.shape[0], X_trans.shape[0] + 1)
assert_array_equal(X_trans[:, 1:], np.eye(X_trans.shape[0]))
def test_column_transformer_mixed_cols_sparse():
df = np.array([["a", 1, True], ["b", 2, False]], dtype="O")
ct = make_column_transformer(
(OneHotEncoder(), [0]), ("passthrough", [1, 2]), sparse_threshold=1.0
# this shouldn't fail, since boolean can be coerced into a numeric
# See: https://github.com/scikit-learn/scikit-learn/issues/11912
X_trans = ct.fit_transform(df)
assert X_trans.getformat() == "csr"
assert_array_equal(X_trans.toarray(), np.array([[1, 0, 1, 1], [0, 1, 2, 0]]))
ct = make_column_transformer(
(OneHotEncoder(), [0]), ("passthrough", [0]), sparse_threshold=1.0
with pytest.raises(ValueError, match="For a sparse output, all columns should"):
# this fails since strings `a` and `b` cannot be
# coerced into a numeric.
def test_column_transformer_sparse_threshold():
X_array = np.array([["a", "b"], ["A", "B"]], dtype=object).T
# above data has sparsity of 4 / 8 = 0.5
# apply threshold even if all sparse
col_trans = ColumnTransformer(
[("trans1", OneHotEncoder(), [0]), ("trans2", OneHotEncoder(), [1])],
res = col_trans.fit_transform(X_array)
assert not sparse.issparse(res)
assert not col_trans.sparse_output_
# mixed -> sparsity of (4 + 2) / 8 = 0.75
for thres in [0.75001, 1]:
col_trans = ColumnTransformer(
("trans1", OneHotEncoder(sparse_output=True), [0]),
("trans2", OneHotEncoder(sparse_output=False), [1]),
res = col_trans.fit_transform(X_array)
assert sparse.issparse(res)
assert col_trans.sparse_output_
for thres in [0.75, 0]:
col_trans = ColumnTransformer(
("trans1", OneHotEncoder(sparse_output=True), [0]),
("trans2", OneHotEncoder(sparse_output=False), [1]),
res = col_trans.fit_transform(X_array)
assert not sparse.issparse(res)
assert not col_trans.sparse_output_
# if nothing is sparse -> no sparse
for thres in [0.33, 0, 1]:
col_trans = ColumnTransformer(
("trans1", OneHotEncoder(sparse_output=False), [0]),
("trans2", OneHotEncoder(sparse_output=False), [1]),
res = col_trans.fit_transform(X_array)
assert not sparse.issparse(res)
assert not col_trans.sparse_output_
def test_column_transformer_error_msg_1D():
X_array = np.array([[0.0, 1.0, 2.0], [2.0, 4.0, 6.0]]).T
col_trans = ColumnTransformer([("trans", StandardScaler(), 0)])
msg = "1D data passed to a transformer"
with pytest.raises(ValueError, match=msg):
with pytest.raises(ValueError, match=msg):
col_trans = ColumnTransformer([("trans", TransRaise(), 0)])
for func in [col_trans.fit, col_trans.fit_transform]:
with pytest.raises(ValueError, match="specific message"):
def test_2D_transformer_output():
X_array = np.array([[0, 1, 2], [2, 4, 6]]).T
# if one transformer is dropped, test that name is still correct
ct = ColumnTransformer([("trans1", "drop", 0), ("trans2", TransNo2D(), 1)])
msg = "the 'trans2' transformer should be 2D"
with pytest.raises(ValueError, match=msg):
# because fit is also doing transform, this raises already on fit
with pytest.raises(ValueError, match=msg):
def test_2D_transformer_output_pandas():
pd = pytest.importorskip("pandas")
X_array = np.array([[0, 1, 2], [2, 4, 6]]).T
X_df = pd.DataFrame(X_array, columns=["col1", "col2"])
# if one transformer is dropped, test that name is still correct
ct = ColumnTransformer([("trans1", TransNo2D(), "col1")])
msg = "the 'trans1' transformer should be 2D"
with pytest.raises(ValueError, match=msg):
# because fit is also doing transform, this raises already on fit
with pytest.raises(ValueError, match=msg):
@pytest.mark.parametrize("remainder", ["drop", "passthrough"])
def test_column_transformer_invalid_columns(remainder):
X_array = np.array([[0, 1, 2], [2, 4, 6]]).T
# general invalid
for col in [1.5, ["string", 1], slice(1, "s"), np.array([1.0])]:
ct = ColumnTransformer([("trans", Trans(), col)], remainder=remainder)
with pytest.raises(ValueError, match="No valid specification"):
# invalid for arrays
for col in ["string", ["string", "other"], slice("a", "b")]:
ct = ColumnTransformer([("trans", Trans(), col)], remainder=remainder)
with pytest.raises(ValueError, match="Specifying the columns"):
# transformed n_features does not match fitted n_features
col = [0, 1]
ct = ColumnTransformer([("trans", Trans(), col)], remainder=remainder)
X_array_more = np.array([[0, 1, 2], [2, 4, 6], [3, 6, 9]]).T
msg = "X has 3 features, but ColumnTransformer is expecting 2 features as input."
with pytest.raises(ValueError, match=msg):
X_array_fewer = np.array(
[0, 1, 2],
err_msg = (
"X has 1 features, but ColumnTransformer is expecting 2 features as input."
with pytest.raises(ValueError, match=err_msg):
def test_column_transformer_invalid_transformer():
class NoTrans(BaseEstimator):
def fit(self, X, y=None):
return self
def predict(self, X):
return X
X_array = np.array([[0, 1, 2], [2, 4, 6]]).T
ct = ColumnTransformer([("trans", NoTrans(), [0])])
msg = "All estimators should implement fit and transform"
with pytest.raises(TypeError, match=msg):
def test_make_column_transformer():
scaler = StandardScaler()
norm = Normalizer()
ct = make_column_transformer((scaler, "first"), (norm, ["second"]))
names, transformers, columns = zip(*ct.transformers)
assert names == ("standardscaler", "normalizer")
assert transformers == (scaler, norm)
assert columns == ("first", ["second"])
def test_make_column_transformer_pandas():
pd = pytest.importorskip("pandas")
X_array = np.array([[0, 1, 2], [2, 4, 6]]).T
X_df = pd.DataFrame(X_array, columns=["first", "second"])
norm = Normalizer()
ct1 = ColumnTransformer([("norm", Normalizer(), X_df.columns)])
ct2 = make_column_transformer((norm, X_df.columns))
assert_almost_equal(ct1.fit_transform(X_df), ct2.fit_transform(X_df))
def test_make_column_transformer_kwargs():
scaler = StandardScaler()
norm = Normalizer()
ct = make_column_transformer(
(scaler, "first"),
(norm, ["second"]),
assert (
== make_column_transformer((scaler, "first"), (norm, ["second"])).transformers
assert ct.n_jobs == 3
assert ct.remainder == "drop"
assert ct.sparse_threshold == 0.5
# invalid keyword parameters should raise an error message
msg = re.escape(
"make_column_transformer() got an unexpected "
"keyword argument 'transformer_weights'"
with pytest.raises(TypeError, match=msg):
(scaler, "first"),
(norm, ["second"]),
transformer_weights={"pca": 10, "Transf": 1},
def test_make_column_transformer_remainder_transformer():
scaler = StandardScaler()
norm = Normalizer()
remainder = StandardScaler()
ct = make_column_transformer(
(scaler, "first"), (norm, ["second"]), remainder=remainder
assert ct.remainder == remainder
def test_column_transformer_get_set_params():
ct = ColumnTransformer(
[("trans1", StandardScaler(), [0]), ("trans2", StandardScaler(), [1])]
exp = {
"n_jobs": None,
"remainder": "drop",
"sparse_threshold": 0.3,
"trans1": ct.transformers[0][1],
"trans1__copy": True,
"trans1__with_mean": True,
"trans1__with_std": True,
"trans2": ct.transformers[1][1],
"trans2__copy": True,
"trans2__with_mean": True,
"trans2__with_std": True,
"transformers": ct.transformers,
"transformer_weights": None,
"verbose_feature_names_out": True,
"verbose": False,
"force_int_remainder_cols": True,
assert ct.get_params() == exp
assert not ct.get_params()["trans1__with_mean"]
exp = {
"n_jobs": None,
"remainder": "drop",
"sparse_threshold": 0.3,
"trans1": "passthrough",
"trans2": ct.transformers[1][1],
"trans2__copy": True,
"trans2__with_mean": True,
"trans2__with_std": True,
"transformers": ct.transformers,
"transformer_weights": None,
"verbose_feature_names_out": True,
"verbose": False,
"force_int_remainder_cols": True,
assert ct.get_params() == exp
def test_column_transformer_named_estimators():
X_array = np.array([[0.0, 1.0, 2.0], [2.0, 4.0, 6.0]]).T
ct = ColumnTransformer(
("trans1", StandardScaler(), [0]),
("trans2", StandardScaler(with_std=False), [1]),
assert not hasattr(ct, "transformers_")
assert hasattr(ct, "transformers_")
assert isinstance(ct.named_transformers_["trans1"], StandardScaler)
assert isinstance(ct.named_transformers_.trans1, StandardScaler)
assert isinstance(ct.named_transformers_["trans2"], StandardScaler)
assert isinstance(ct.named_transformers_.trans2, StandardScaler)
assert not ct.named_transformers_.trans2.with_std
# check it are fitted transformers
assert ct.named_transformers_.trans1.mean_ == 1.0
def test_column_transformer_cloning():
X_array = np.array([[0.0, 1.0, 2.0], [2.0, 4.0, 6.0]]).T
ct = ColumnTransformer([("trans", StandardScaler(), [0])])
assert not hasattr(ct.transformers[0][1], "mean_")
assert hasattr(ct.transformers_[0][1], "mean_")
ct = ColumnTransformer([("trans", StandardScaler(), [0])])
assert not hasattr(ct.transformers[0][1], "mean_")
assert hasattr(ct.transformers_[0][1], "mean_")
def test_column_transformer_get_feature_names():
X_array = np.array([[0.0, 1.0, 2.0], [2.0, 4.0, 6.0]]).T
ct = ColumnTransformer([("trans", Trans(), [0, 1])])
# raise correct error when not fitted
with pytest.raises(NotFittedError):
# raise correct error when no feature names are available
msg = re.escape(
"Transformer trans (type Trans) does not provide get_feature_names_out"
with pytest.raises(AttributeError, match=msg):
def test_column_transformer_special_strings():
# one 'drop' -> ignore
X_array = np.array([[0.0, 1.0, 2.0], [2.0, 4.0, 6.0]]).T
ct = ColumnTransformer([("trans1", Trans(), [0]), ("trans2", "drop", [1])])
exp = np.array([[0.0], [1.0], [2.0]])
assert_array_equal(ct.fit_transform(X_array), exp)
assert_array_equal(ct.fit(X_array).transform(X_array), exp)
assert len(ct.transformers_) == 2
assert ct.transformers_[-1][0] != "remainder"
# all 'drop' -> return shape 0 array
ct = ColumnTransformer([("trans1", "drop", [0]), ("trans2", "drop", [1])])
assert_array_equal(ct.fit(X_array).transform(X_array).shape, (3, 0))
assert_array_equal(ct.fit_transform(X_array).shape, (3, 0))
assert len(ct.transformers_) == 2
assert ct.transformers_[-1][0] != "remainder"
# 'passthrough'
X_array = np.array([[0.0, 1.0, 2.0], [2.0, 4.0, 6.0]]).T
ct = ColumnTransformer([("trans1", Trans(), [0]), ("trans2", "passthrough", [1])])
exp = X_array
assert_array_equal(ct.fit_transform(X_array), exp)
assert_array_equal(ct.fit(X_array).transform(X_array), exp)
assert len(ct.transformers_) == 2
assert ct.transformers_[-1][0] != "remainder"
def test_column_transformer_remainder():
X_array = np.array([[0, 1, 2], [2, 4, 6]]).T
X_res_first = np.array([0, 1, 2]).reshape(-1, 1)
X_res_second = np.array([2, 4, 6]).reshape(-1, 1)
X_res_both = X_array
# default drop
ct = ColumnTransformer([("trans1", Trans(), [0])])
assert_array_equal(ct.fit_transform(X_array), X_res_first)
assert_array_equal(ct.fit(X_array).transform(X_array), X_res_first)
assert len(ct.transformers_) == 2
assert ct.transformers_[-1][0] == "remainder"
assert ct.transformers_[-1][1] == "drop"
assert_array_equal(ct.transformers_[-1][2], [1])
# specify passthrough
ct = ColumnTransformer([("trans", Trans(), [0])], remainder="passthrough")
assert_array_equal(ct.fit_transform(X_array), X_res_both)
assert_array_equal(ct.fit(X_array).transform(X_array), X_res_both)
assert len(ct.transformers_) == 2
assert ct.transformers_[-1][0] == "remainder"
assert isinstance(ct.transformers_[-1][1], FunctionTransformer)
assert_array_equal(ct.transformers_[-1][2], [1])
# column order is not preserved (passed through added to end)
ct = ColumnTransformer([("trans1", Trans(), [1])], remainder="passthrough")
assert_array_equal(ct.fit_transform(X_array), X_res_both[:, ::-1])
assert_array_equal(ct.fit(X_array).transform(X_array), X_res_both[:, ::-1])
assert len(ct.transformers_) == 2
assert ct.transformers_[-1][0] == "remainder"
assert isinstance(ct.transformers_[-1][1], FunctionTransformer)
assert_array_equal(ct.transformers_[-1][2], [0])
# passthrough when all actual transformers are skipped
ct = ColumnTransformer([("trans1", "drop", [0])], remainder="passthrough")
assert_array_equal(ct.fit_transform(X_array), X_res_second)
assert_array_equal(ct.fit(X_array).transform(X_array), X_res_second)
assert len(ct.transformers_) == 2
assert ct.transformers_[-1][0] == "remainder"
assert isinstance(ct.transformers_[-1][1], FunctionTransformer)
assert_array_equal(ct.transformers_[-1][2], [1])
# check default for make_column_transformer
ct = make_column_transformer((Trans(), [0]))
assert ct.remainder == "drop"
# TODO(1.7): check for deprecated force_int_remainder_cols
# TODO(1.9): remove force_int but keep the test
"cols1, cols2",
([0], [False, True, False]), # mix types
([0], [1]), # ints
(lambda x: [0], lambda x: [1]), # callables
@pytest.mark.parametrize("force_int", [False, True])
def test_column_transformer_remainder_dtypes_ints(force_int, cols1, cols2):
"""Check that the remainder columns are always stored as indices when
other columns are not all specified as column names or masks, regardless of
X = np.ones((1, 3))
ct = make_column_transformer(
(Trans(), cols1),
(Trans(), cols2),
with warnings.catch_warnings():
assert ct.transformers_[-1][-1][0] == 2
# TODO(1.7): check for deprecated force_int_remainder_cols
# TODO(1.9): remove force_int but keep the test
"force_int, cols1, cols2, expected_cols",
(True, ["A"], ["B"], [2]),
(False, ["A"], ["B"], ["C"]),
(True, [True, False, False], [False, True, False], [2]),
(False, [True, False, False], [False, True, False], [False, False, True]),
def test_column_transformer_remainder_dtypes(force_int, cols1, cols2, expected_cols):
"""Check that the remainder columns format matches the format of the other
columns when they're all strings or masks, unless `force_int = True`.
X = np.ones((1, 3))
if isinstance(cols1[0], str):
pd = pytest.importorskip("pandas")
X = pd.DataFrame(X, columns=["A", "B", "C"])
# if inputs are column names store remainder columns as column names unless
# force_int_remainder_cols is True
ct = make_column_transformer(
(Trans(), cols1),
(Trans(), cols2),
with warnings.catch_warnings():
if force_int:
# If we forced using ints and we access the remainder columns a warning is shown
match = "The format of the columns of the 'remainder' transformer"
cols = ct.transformers_[-1][-1]
with pytest.warns(FutureWarning, match=match):
with warnings.catch_warnings():
cols = ct.transformers_[-1][-1]
assert cols == expected_cols
def test_remainder_list_repr():
cols = _RemainderColsList([0, 1], warning_enabled=False)
assert str(cols) == "[0, 1]"
assert repr(cols) == "[0, 1]"
mock = Mock()
cols._repr_pretty_(mock, False)
mock.text.assert_called_once_with("[0, 1]")
"key, expected_cols",
([0], [1]),
(np.array([0]), [1]),
(slice(0, 1), [1]),
(np.array([True, False]), [False, True]),
def test_column_transformer_remainder_numpy(key, expected_cols):
# test different ways that columns are specified with passthrough
X_array = np.array([[0, 1, 2], [2, 4, 6]]).T
X_res_both = X_array
ct = ColumnTransformer(
[("trans1", Trans(), key)],
assert_array_equal(ct.fit_transform(X_array), X_res_both)
assert_array_equal(ct.fit(X_array).transform(X_array), X_res_both)
assert len(ct.transformers_) == 2
assert ct.transformers_[-1][0] == "remainder"
assert isinstance(ct.transformers_[-1][1], FunctionTransformer)
assert ct.transformers_[-1][2] == expected_cols
"key, expected_cols",
([0], [1]),
(slice(0, 1), [1]),
(np.array([True, False]), [False, True]),
(["first"], ["second"]),
("pd-index", ["second"]),
(np.array(["first"]), ["second"]),
(np.array(["first"], dtype=object), ["second"]),
(slice(None, "first"), ["second"]),
(slice("first", "first"), ["second"]),
def test_column_transformer_remainder_pandas(key, expected_cols):
# test different ways that columns are specified with passthrough
pd = pytest.importorskip("pandas")
if isinstance(key, str) and key == "pd-index":
key = pd.Index(["first"])
X_array = np.array([[0, 1, 2], [2, 4, 6]]).T
X_df = pd.DataFrame(X_array, columns=["first", "second"])
X_res_both = X_array
ct = ColumnTransformer(
[("trans1", Trans(), key)],
assert_array_equal(ct.fit_transform(X_df), X_res_both)
assert_array_equal(ct.fit(X_df).transform(X_df), X_res_both)
assert len(ct.transformers_) == 2
assert ct.transformers_[-1][0] == "remainder"
assert isinstance(ct.transformers_[-1][1], FunctionTransformer)
assert ct.transformers_[-1][2] == expected_cols
"key, expected_cols",
([0], [1, 2]),
(np.array([0]), [1, 2]),
(slice(0, 1), [1, 2]),
(np.array([True, False, False]), [False, True, True]),
def test_column_transformer_remainder_transformer(key, expected_cols):
X_array = np.array([[0, 1, 2], [2, 4, 6], [8, 6, 4]]).T
X_res_both = X_array.copy()
# second and third columns are doubled when remainder = DoubleTrans
X_res_both[:, 1:3] *= 2
ct = ColumnTransformer(
[("trans1", Trans(), key)],
assert_array_equal(ct.fit_transform(X_array), X_res_both)
assert_array_equal(ct.fit(X_array).transform(X_array), X_res_both)
assert len(ct.transformers_) == 2
assert ct.transformers_[-1][0] == "remainder"
assert isinstance(ct.transformers_[-1][1], DoubleTrans)
assert ct.transformers_[-1][2] == expected_cols
def test_column_transformer_no_remaining_remainder_transformer():
X_array = np.array([[0, 1, 2], [2, 4, 6], [8, 6, 4]]).T
ct = ColumnTransformer([("trans1", Trans(), [0, 1, 2])], remainder=DoubleTrans())
assert_array_equal(ct.fit_transform(X_array), X_array)
assert_array_equal(ct.fit(X_array).transform(X_array), X_array)
assert len(ct.transformers_) == 1
assert ct.transformers_[-1][0] != "remainder"
def test_column_transformer_drops_all_remainder_transformer():
X_array = np.array([[0, 1, 2], [2, 4, 6], [8, 6, 4]]).T
# columns are doubled when remainder = DoubleTrans
X_res_both = 2 * X_array.copy()[:, 1:3]
ct = ColumnTransformer([("trans1", "drop", [0])], remainder=DoubleTrans())
assert_array_equal(ct.fit_transform(X_array), X_res_both)
assert_array_equal(ct.fit(X_array).transform(X_array), X_res_both)
assert len(ct.transformers_) == 2
assert ct.transformers_[-1][0] == "remainder"
assert isinstance(ct.transformers_[-1][1], DoubleTrans)
assert_array_equal(ct.transformers_[-1][2], [1, 2])
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_column_transformer_sparse_remainder_transformer(csr_container):
X_array = np.array([[0, 1, 2], [2, 4, 6], [8, 6, 4]]).T
ct = ColumnTransformer(
[("trans1", Trans(), [0])],
X_trans = ct.fit_transform(X_array)
assert sparse.issparse(X_trans)
# SparseMatrixTrans creates 3 features for each column. There is
# one column in ``transformers``, thus:
assert X_trans.shape == (3, 3 + 1)
exp_array = np.hstack((X_array[:, 0].reshape(-1, 1), np.eye(3)))
assert_array_equal(X_trans.toarray(), exp_array)
assert len(ct.transformers_) == 2
assert ct.transformers_[-1][0] == "remainder"
assert isinstance(ct.transformers_[-1][1], SparseMatrixTrans)
assert_array_equal(ct.transformers_[-1][2], [1, 2])
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_column_transformer_drop_all_sparse_remainder_transformer(csr_container):
X_array = np.array([[0, 1, 2], [2, 4, 6], [8, 6, 4]]).T
ct = ColumnTransformer(
[("trans1", "drop", [0])],
X_trans = ct.fit_transform(X_array)
assert sparse.issparse(X_trans)
# SparseMatrixTrans creates 3 features for each column, thus:
assert X_trans.shape == (3, 3)
assert_array_equal(X_trans.toarray(), np.eye(3))
assert len(ct.transformers_) == 2
assert ct.transformers_[-1][0] == "remainder"
assert isinstance(ct.transformers_[-1][1], SparseMatrixTrans)
assert_array_equal(ct.transformers_[-1][2], [1, 2])
def test_column_transformer_get_set_params_with_remainder():
ct = ColumnTransformer(
[("trans1", StandardScaler(), [0])], remainder=StandardScaler()
exp = {
"n_jobs": None,
"remainder": ct.remainder,
"remainder__copy": True,
"remainder__with_mean": True,
"remainder__with_std": True,
"sparse_threshold": 0.3,
"trans1": ct.transformers[0][1],
"trans1__copy": True,
"trans1__with_mean": True,
"trans1__with_std": True,
"transformers": ct.transformers,
"transformer_weights": None,
"verbose_feature_names_out": True,
"verbose": False,
"force_int_remainder_cols": True,
assert ct.get_params() == exp
assert not ct.get_params()["remainder__with_std"]
exp = {
"n_jobs": None,
"remainder": ct.remainder,
"remainder__copy": True,
"remainder__with_mean": True,
"remainder__with_std": False,
"sparse_threshold": 0.3,
"trans1": "passthrough",
"transformers": ct.transformers,
"transformer_weights": None,
"verbose_feature_names_out": True,
"verbose": False,
"force_int_remainder_cols": True,
assert ct.get_params() == exp
def test_column_transformer_no_estimators():
X_array = np.array([[0, 1, 2], [2, 4, 6], [8, 6, 4]]).astype("float").T
ct = ColumnTransformer([], remainder=StandardScaler())
params = ct.get_params()
assert params["remainder__with_mean"]
X_trans = ct.fit_transform(X_array)
assert X_trans.shape == X_array.shape
assert len(ct.transformers_) == 1
assert ct.transformers_[-1][0] == "remainder"
assert ct.transformers_[-1][2] == [0, 1, 2]
["est", "pattern"],
[("trans1", Trans(), [0]), ("trans2", Trans(), [1])],
r"\[ColumnTransformer\].*\(1 of 3\) Processing trans1.* total=.*\n"
r"\[ColumnTransformer\].*\(2 of 3\) Processing trans2.* total=.*\n"
r"\[ColumnTransformer\].*\(3 of 3\) Processing remainder.* total=.*\n$"
[("trans1", Trans(), [0]), ("trans2", Trans(), [1])],
r"\[ColumnTransformer\].*\(1 of 3\) Processing trans1.* total=.*\n"
r"\[ColumnTransformer\].*\(2 of 3\) Processing trans2.* total=.*\n"
r"\[ColumnTransformer\].*\(3 of 3\) Processing remainder.* total=.*\n$"
[("trans1", Trans(), [0]), ("trans2", "drop", [1])],
r"\[ColumnTransformer\].*\(1 of 2\) Processing trans1.* total=.*\n"
r"\[ColumnTransformer\].*\(2 of 2\) Processing remainder.* total=.*\n$"
[("trans1", Trans(), [0]), ("trans2", "passthrough", [1])],
r"\[ColumnTransformer\].*\(1 of 3\) Processing trans1.* total=.*\n"
r"\[ColumnTransformer\].*\(2 of 3\) Processing trans2.* total=.*\n"
r"\[ColumnTransformer\].*\(3 of 3\) Processing remainder.* total=.*\n$"
ColumnTransformer([("trans1", Trans(), [0])], remainder="passthrough"),
r"\[ColumnTransformer\].*\(1 of 2\) Processing trans1.* total=.*\n"
r"\[ColumnTransformer\].*\(2 of 2\) Processing remainder.* total=.*\n$"
[("trans1", Trans(), [0]), ("trans2", Trans(), [1])], remainder="drop"
r"\[ColumnTransformer\].*\(1 of 2\) Processing trans1.* total=.*\n"
r"\[ColumnTransformer\].*\(2 of 2\) Processing trans2.* total=.*\n$"
ColumnTransformer([("trans1", Trans(), [0])], remainder="drop"),
r"\[ColumnTransformer\].*\(1 of 1\) Processing trans1.* total=.*\n$",
@pytest.mark.parametrize("method", ["fit", "fit_transform"])
def test_column_transformer_verbose(est, pattern, method, capsys):
X_array = np.array([[0, 1, 2], [2, 4, 6], [8, 6, 4]]).T
func = getattr(est, method)
assert not capsys.readouterr().out, "Got output for verbose=False"
assert re.match(pattern, capsys.readouterr()[0])
def test_column_transformer_no_estimators_set_params():
ct = ColumnTransformer([]).set_params(n_jobs=2)
assert ct.n_jobs == 2
def test_column_transformer_callable_specifier():
# assert that function gets the full array
X_array = np.array([[0, 1, 2], [2, 4, 6]]).T
X_res_first = np.array([[0, 1, 2]]).T
def func(X):
assert_array_equal(X, X_array)
return [0]
ct = ColumnTransformer([("trans", Trans(), func)], remainder="drop")
assert_array_equal(ct.fit_transform(X_array), X_res_first)
assert_array_equal(ct.fit(X_array).transform(X_array), X_res_first)
assert callable(ct.transformers[0][2])
assert ct.transformers_[0][2] == [0]
def test_column_transformer_callable_specifier_dataframe():
# assert that function gets the full dataframe
pd = pytest.importorskip("pandas")
X_array = np.array([[0, 1, 2], [2, 4, 6]]).T
X_res_first = np.array([[0, 1, 2]]).T
X_df = pd.DataFrame(X_array, columns=["first", "second"])
def func(X):
assert_array_equal(X.columns, X_df.columns)
assert_array_equal(X.values, X_df.values)
return ["first"]
ct = ColumnTransformer([("trans", Trans(), func)], remainder="drop")
assert_array_equal(ct.fit_transform(X_df), X_res_first)
assert_array_equal(ct.fit(X_df).transform(X_df), X_res_first)
assert callable(ct.transformers[0][2])
assert ct.transformers_[0][2] == ["first"]
def test_column_transformer_negative_column_indexes():
X = np.random.randn(2, 2)
X_categories = np.array([[1], [2]])
X = np.concatenate([X, X_categories], axis=1)
ohe = OneHotEncoder()
tf_1 = ColumnTransformer([("ohe", ohe, [-1])], remainder="passthrough")
tf_2 = ColumnTransformer([("ohe", ohe, [2])], remainder="passthrough")
assert_array_equal(tf_1.fit_transform(X), tf_2.fit_transform(X))
@pytest.mark.parametrize("array_type", [np.asarray, *CSR_CONTAINERS])
def test_column_transformer_mask_indexing(array_type):
# Regression test for #14510
# Boolean array-like does not behave as boolean array with sparse matrices.
X = np.transpose([[1, 2, 3], [4, 5, 6], [5, 6, 7], [8, 9, 10]])
X = array_type(X)
column_transformer = ColumnTransformer(
[("identity", FunctionTransformer(), [False, True, False, True])]
X_trans = column_transformer.fit_transform(X)
assert X_trans.shape == (3, 2)
def test_n_features_in():
# make sure n_features_in is what is passed as input to the column
# transformer.
X = [[1, 2], [3, 4], [5, 6]]
ct = ColumnTransformer([("a", DoubleTrans(), [0]), ("b", DoubleTrans(), [1])])
assert not hasattr(ct, "n_features_in_")
assert ct.n_features_in_ == 2
"cols, pattern, include, exclude",
(["col_int", "col_float"], None, np.number, None),
(["col_int", "col_float"], None, None, object),
(["col_int", "col_float"], None, [int, float], None),
(["col_str"], None, [object], None),
(["col_str"], None, object, None),
(["col_float"], None, float, None),
(["col_float"], "at$", [np.number], None),
(["col_int"], None, [int], None),
(["col_int"], "^col_int", [np.number], None),
(["col_float", "col_str"], "float|str", None, None),
(["col_str"], "^col_s", None, [int]),
([], "str$", float, None),
(["col_int", "col_float", "col_str"], None, [np.number, object], None),
def test_make_column_selector_with_select_dtypes(cols, pattern, include, exclude):
pd = pytest.importorskip("pandas")
X_df = pd.DataFrame(
"col_int": np.array([0, 1, 2], dtype=int),
"col_float": np.array([0.0, 1.0, 2.0], dtype=float),
"col_str": ["one", "two", "three"],
columns=["col_int", "col_float", "col_str"],
selector = make_column_selector(
dtype_include=include, dtype_exclude=exclude, pattern=pattern
assert_array_equal(selector(X_df), cols)
def test_column_transformer_with_make_column_selector():
# Functional test for column transformer + column selector
pd = pytest.importorskip("pandas")
X_df = pd.DataFrame(
"col_int": np.array([0, 1, 2], dtype=int),
"col_float": np.array([0.0, 1.0, 2.0], dtype=float),
"col_cat": ["one", "two", "one"],
"col_str": ["low", "middle", "high"],
columns=["col_int", "col_float", "col_cat", "col_str"],
X_df["col_str"] = X_df["col_str"].astype("category")
cat_selector = make_column_selector(dtype_include=["category", object])
num_selector = make_column_selector(dtype_include=np.number)
ohe = OneHotEncoder()
scaler = StandardScaler()
ct_selector = make_column_transformer((ohe, cat_selector), (scaler, num_selector))
ct_direct = make_column_transformer(
(ohe, ["col_cat", "col_str"]), (scaler, ["col_float", "col_int"])
X_selector = ct_selector.fit_transform(X_df)
X_direct = ct_direct.fit_transform(X_df)
assert_allclose(X_selector, X_direct)
def test_make_column_selector_error():
selector = make_column_selector(dtype_include=np.number)
X = np.array([[0.1, 0.2]])
msg = "make_column_selector can only be applied to pandas dataframes"
with pytest.raises(ValueError, match=msg):
def test_make_column_selector_pickle():
pd = pytest.importorskip("pandas")
X_df = pd.DataFrame(
"col_int": np.array([0, 1, 2], dtype=int),
"col_float": np.array([0.0, 1.0, 2.0], dtype=float),
"col_str": ["one", "two", "three"],
columns=["col_int", "col_float", "col_str"],
selector = make_column_selector(dtype_include=[object])
selector_picked = pickle.loads(pickle.dumps(selector))
assert_array_equal(selector(X_df), selector_picked(X_df))
[[], np.array([], dtype=int), lambda x: []],
ids=["list", "array", "callable"],
def test_feature_names_empty_columns(empty_col):
pd = pytest.importorskip("pandas")
df = pd.DataFrame({"col1": ["a", "a", "b"], "col2": ["z", "z", "z"]})
ct = ColumnTransformer(
("ohe", OneHotEncoder(), ["col1", "col2"]),
("empty_features", OneHotEncoder(), empty_col),
ct.get_feature_names_out(), ["ohe__col1_a", "ohe__col1_b", "ohe__col2_z"]
lambda x: [1],
lambda x: ["col2"],
[False, True],
lambda x: [False, True],
def test_feature_names_out_pandas(selector):
"""Checks name when selecting only the second column"""
pd = pytest.importorskip("pandas")
df = pd.DataFrame({"col1": ["a", "a", "b"], "col2": ["z", "z", "z"]})
ct = ColumnTransformer([("ohe", OneHotEncoder(), selector)])
assert_array_equal(ct.get_feature_names_out(), ["ohe__col2_z"])
"selector", [[1], lambda x: [1], [False, True], lambda x: [False, True]]
def test_feature_names_out_non_pandas(selector):
"""Checks name when selecting the second column with numpy array"""
X = [["a", "z"], ["a", "z"], ["b", "z"]]
ct = ColumnTransformer([("ohe", OneHotEncoder(), selector)])
assert_array_equal(ct.get_feature_names_out(), ["ohe__x1_z"])
@pytest.mark.parametrize("remainder", ["passthrough", StandardScaler()])
def test_sk_visual_block_remainder(remainder):
# remainder='passthrough' or an estimator will be shown in repr_html
ohe = OneHotEncoder()
ct = ColumnTransformer(
transformers=[("ohe", ohe, ["col1", "col2"])], remainder=remainder
visual_block = ct._sk_visual_block_()
assert visual_block.names == ("ohe", "remainder")
assert visual_block.name_details == (["col1", "col2"], "")
assert visual_block.estimators == (ohe, remainder)
def test_sk_visual_block_remainder_drop():
# remainder='drop' is not shown in repr_html
ohe = OneHotEncoder()
ct = ColumnTransformer(transformers=[("ohe", ohe, ["col1", "col2"])])
visual_block = ct._sk_visual_block_()
assert visual_block.names == ("ohe",)
assert visual_block.name_details == (["col1", "col2"],)
assert visual_block.estimators == (ohe,)
@pytest.mark.parametrize("remainder", ["passthrough", StandardScaler()])
def test_sk_visual_block_remainder_fitted_pandas(remainder):
# Remainder shows the columns after fitting
pd = pytest.importorskip("pandas")
ohe = OneHotEncoder()
ct = ColumnTransformer(
transformers=[("ohe", ohe, ["col1", "col2"])],
df = pd.DataFrame(
"col1": ["a", "b", "c"],
"col2": ["z", "z", "z"],
"col3": [1, 2, 3],
"col4": [3, 4, 5],
visual_block = ct._sk_visual_block_()
assert visual_block.names == ("ohe", "remainder")
assert visual_block.name_details == (["col1", "col2"], ["col3", "col4"])
assert visual_block.estimators == (ohe, remainder)
@pytest.mark.parametrize("remainder", ["passthrough", StandardScaler()])
def test_sk_visual_block_remainder_fitted_numpy(remainder):
# Remainder shows the indices after fitting
X = np.array([[1, 2, 3], [4, 5, 6]], dtype=float)
scaler = StandardScaler()
ct = ColumnTransformer(
transformers=[("scale", scaler, [0, 2])], remainder=remainder
visual_block = ct._sk_visual_block_()
assert visual_block.names == ("scale", "remainder")
assert visual_block.name_details == ([0, 2], [1])
assert visual_block.estimators == (scaler, remainder)
@pytest.mark.parametrize("explicit_colname", ["first", "second", 0, 1])
@pytest.mark.parametrize("remainder", [Trans(), "passthrough", "drop"])
def test_column_transformer_reordered_column_names_remainder(
explicit_colname, remainder
"""Test the interaction between remainder and column transformer"""
pd = pytest.importorskip("pandas")
X_fit_array = np.array([[0, 1, 2], [2, 4, 6]]).T
X_fit_df = pd.DataFrame(X_fit_array, columns=["first", "second"])
X_trans_array = np.array([[2, 4, 6], [0, 1, 2]]).T
X_trans_df = pd.DataFrame(X_trans_array, columns=["second", "first"])
tf = ColumnTransformer([("bycol", Trans(), explicit_colname)], remainder=remainder)
X_fit_trans = tf.transform(X_fit_df)
# Changing the order still works
X_trans = tf.transform(X_trans_df)
assert_allclose(X_trans, X_fit_trans)
# extra columns are ignored
X_extended_df = X_fit_df.copy()
X_extended_df["third"] = [3, 6, 9]
X_trans = tf.transform(X_extended_df)
assert_allclose(X_trans, X_fit_trans)
if isinstance(explicit_colname, str):
# Raise error if columns are specified by names but input only allows
# to specify by position, e.g. numpy array instead of a pandas df.
X_array = X_fit_array.copy()
err_msg = "Specifying the columns"
with pytest.raises(ValueError, match=err_msg):
def test_feature_name_validation_missing_columns_drop_passthough():
"""Test the interaction between {'drop', 'passthrough'} and
missing column names."""
pd = pytest.importorskip("pandas")
X = np.ones(shape=(3, 4))
df = pd.DataFrame(X, columns=["a", "b", "c", "d"])
df_dropped = df.drop("c", axis=1)
# with remainder='passthrough', all columns seen during `fit` must be
# present
tf = ColumnTransformer([("bycol", Trans(), [1])], remainder="passthrough")
msg = r"columns are missing: {'c'}"
with pytest.raises(ValueError, match=msg):
# with remainder='drop', it is allowed to have column 'c' missing
tf = ColumnTransformer([("bycol", Trans(), [1])], remainder="drop")
df_dropped_trans = tf.transform(df_dropped)
df_fit_trans = tf.transform(df)
assert_allclose(df_dropped_trans, df_fit_trans)
# bycol drops 'c', thus it is allowed for 'c' to be missing
tf = ColumnTransformer([("bycol", "drop", ["c"])], remainder="passthrough")
df_dropped_trans = tf.transform(df_dropped)
df_fit_trans = tf.transform(df)
assert_allclose(df_dropped_trans, df_fit_trans)
def test_feature_names_in_():
"""Feature names are stored in column transformer.
Column transformer deliberately does not check for column name consistency.
It only checks that the non-dropped names seen in `fit` are seen
in `transform`. This behavior is already tested in
pd = pytest.importorskip("pandas")
feature_names = ["a", "c", "d"]
df = pd.DataFrame([[1, 2, 3]], columns=feature_names)
ct = ColumnTransformer([("bycol", Trans(), ["a", "d"])], remainder="passthrough")
assert_array_equal(ct.feature_names_in_, feature_names)
assert isinstance(ct.feature_names_in_, np.ndarray)
assert ct.feature_names_in_.dtype == object
class TransWithNames(Trans):
def __init__(self, feature_names_out=None):
self.feature_names_out = feature_names_out
def get_feature_names_out(self, input_features=None):
if self.feature_names_out is not None:
return np.asarray(self.feature_names_out, dtype=object)
return input_features
"transformers, remainder, expected_names",
("bycol1", TransWithNames(), ["d", "c"]),
("bycol2", "passthrough", ["d"]),
["bycol1__d", "bycol1__c", "bycol2__d", "remainder__a", "remainder__b"],
("bycol1", TransWithNames(), ["d", "c"]),
("bycol2", "passthrough", ["d"]),
["bycol1__d", "bycol1__c", "bycol2__d"],
("bycol1", TransWithNames(), ["b"]),
("bycol2", "drop", ["d"]),
["bycol1__b", "remainder__a", "remainder__c"],
("bycol1", TransWithNames(["pca1", "pca2"]), ["a", "b", "d"]),
["bycol1__pca1", "bycol1__pca2", "remainder__c"],
("bycol1", TransWithNames(["a", "b"]), ["d"]),
("bycol2", "passthrough", ["b"]),
["bycol1__a", "bycol1__b", "bycol2__b"],
("bycol1", TransWithNames([f"pca{i}" for i in range(2)]), ["b"]),
("bycol2", TransWithNames([f"pca{i}" for i in range(2)]), ["b"]),
("bycol1", "drop", ["d"]),
("bycol1", TransWithNames(), slice(1, 3)),
["bycol1__b", "bycol1__c"],
("bycol1", TransWithNames(), ["b"]),
("bycol2", "drop", slice(3, 4)),
["bycol1__b", "remainder__a", "remainder__c"],
("bycol1", TransWithNames(), ["d", "c"]),
("bycol2", "passthrough", slice(3, 4)),
["bycol1__d", "bycol1__c", "bycol2__d", "remainder__a", "remainder__b"],
("bycol1", TransWithNames(), slice("b", "c")),
["bycol1__b", "bycol1__c"],
("bycol1", TransWithNames(), ["b"]),
("bycol2", "drop", slice("c", "d")),
["bycol1__b", "remainder__a"],
("bycol1", TransWithNames(), ["d", "c"]),
("bycol2", "passthrough", slice("c", "d")),
def test_verbose_feature_names_out_true(transformers, remainder, expected_names):
"""Check feature_names_out for verbose_feature_names_out=True (default)"""
pd = pytest.importorskip("pandas")
df = pd.DataFrame([[1, 2, 3, 4]], columns=["a", "b", "c", "d"])
ct = ColumnTransformer(
names = ct.get_feature_names_out()
assert isinstance(names, np.ndarray)
assert names.dtype == object
assert_array_equal(names, expected_names)
"transformers, remainder, expected_names",
("bycol1", TransWithNames(), ["d", "c"]),
("bycol2", "passthrough", ["a"]),
["d", "c", "a", "b"],
("bycol1", TransWithNames(["a"]), ["d", "c"]),
("bycol2", "passthrough", ["d"]),
["a", "d"],
("bycol1", TransWithNames(), ["b"]),
("bycol2", "drop", ["d"]),
["b", "a", "c"],
("bycol1", TransWithNames(["pca1", "pca2"]), ["a", "b", "d"]),
["pca1", "pca2", "c"],
("bycol1", TransWithNames(["a", "c"]), ["d"]),
("bycol2", "passthrough", ["d"]),
["a", "c", "d"],
("bycol1", TransWithNames([f"pca{i}" for i in range(2)]), ["b"]),
("bycol2", TransWithNames([f"kpca{i}" for i in range(2)]), ["b"]),
["pca0", "pca1", "kpca0", "kpca1", "a", "c", "d"],
("bycol1", "drop", ["d"]),
("bycol1", TransWithNames(), slice(1, 2)),
("bycol2", "drop", ["d"]),
["b", "a", "c"],
("bycol1", TransWithNames(), ["b"]),
("bycol2", "drop", slice(3, 4)),
["b", "a", "c"],
("bycol1", TransWithNames(), ["d", "c"]),
("bycol2", "passthrough", slice(0, 2)),
["d", "c", "a", "b"],
("bycol1", TransWithNames(), slice("a", "b")),
("bycol2", "drop", ["d"]),
["a", "b", "c"],
("bycol1", TransWithNames(), ["b"]),
("bycol2", "drop", slice("c", "d")),
["b", "a"],
("bycol1", TransWithNames(), ["d", "c"]),
("bycol2", "passthrough", slice("a", "b")),
["d", "c", "a", "b"],
("bycol1", TransWithNames(), ["d", "c"]),
("bycol2", "passthrough", slice("b", "b")),
["d", "c", "b"],
def test_verbose_feature_names_out_false(transformers, remainder, expected_names):
"""Check feature_names_out for verbose_feature_names_out=False"""
pd = pytest.importorskip("pandas")
df = pd.DataFrame([[1, 2, 3, 4]], columns=["a", "b", "c", "d"])
ct = ColumnTransformer(
names = ct.get_feature_names_out()
assert isinstance(names, np.ndarray)
assert names.dtype == object
assert_array_equal(names, expected_names)
"transformers, remainder, colliding_columns",
("bycol1", TransWithNames(), ["b"]),
("bycol2", "passthrough", ["b"]),
("bycol1", TransWithNames(["c", "d"]), ["c"]),
("bycol2", "passthrough", ["c"]),
("bycol1", TransWithNames(["a"]), ["b"]),
("bycol2", "passthrough", ["b"]),
("bycol1", TransWithNames(["a"]), ["b"]),
("bycol2", "drop", ["b"]),
("bycol1", TransWithNames(["c", "b"]), ["b"]),
("bycol2", "passthrough", ["c", "b"]),
"['b', 'c']",
("bycol1", TransWithNames(["a"]), ["b"]),
("bycol2", "passthrough", ["a"]),
("bycol3", TransWithNames(["a"]), ["b"]),
("bycol1", TransWithNames(["a", "b"]), ["b"]),
("bycol2", "passthrough", ["a"]),
("bycol3", TransWithNames(["b"]), ["c"]),
"['a', 'b']",
("bycol1", TransWithNames([f"pca{i}" for i in range(6)]), ["b"]),
("bycol2", TransWithNames([f"pca{i}" for i in range(6)]), ["b"]),
"['pca0', 'pca1', 'pca2', 'pca3', 'pca4', ...]",
("bycol1", TransWithNames(["a", "b"]), slice(1, 2)),
("bycol2", "passthrough", ["a"]),
("bycol3", TransWithNames(["b"]), ["c"]),
"['a', 'b']",
("bycol1", TransWithNames(["a", "b"]), ["b"]),
("bycol2", "passthrough", slice(0, 1)),
("bycol3", TransWithNames(["b"]), ["c"]),
"['a', 'b']",
("bycol1", TransWithNames(["a", "b"]), slice("b", "c")),
("bycol2", "passthrough", ["a"]),
("bycol3", TransWithNames(["b"]), ["c"]),
"['a', 'b']",
("bycol1", TransWithNames(["a", "b"]), ["b"]),
("bycol2", "passthrough", slice("a", "a")),
("bycol3", TransWithNames(["b"]), ["c"]),
"['a', 'b']",
def test_verbose_feature_names_out_false_errors(
transformers, remainder, colliding_columns
"""Check feature_names_out for verbose_feature_names_out=False"""
pd = pytest.importorskip("pandas")
df = pd.DataFrame([[1, 2, 3, 4]], columns=["a", "b", "c", "d"])
ct = ColumnTransformer(
msg = re.escape(
f"Output feature names: {colliding_columns} are not unique. Please set "
"verbose_feature_names_out=True to add prefixes to feature names"
with pytest.raises(ValueError, match=msg):
@pytest.mark.parametrize("verbose_feature_names_out", [True, False])
@pytest.mark.parametrize("remainder", ["drop", "passthrough"])
def test_column_transformer_set_output(verbose_feature_names_out, remainder):
"""Check column transformer behavior with set_output."""
pd = pytest.importorskip("pandas")
df = pd.DataFrame([[1, 2, 3, 4]], columns=["a", "b", "c", "d"], index=[10])
ct = ColumnTransformer(
[("first", TransWithNames(), ["a", "c"]), ("second", TransWithNames(), ["d"])],
X_trans = ct.fit_transform(df)
assert isinstance(X_trans, np.ndarray)
df_test = pd.DataFrame([[1, 2, 3, 4]], columns=df.columns, index=[20])
X_trans = ct.transform(df_test)
assert isinstance(X_trans, pd.DataFrame)
feature_names_out = ct.get_feature_names_out()
assert_array_equal(X_trans.columns, feature_names_out)
assert_array_equal(X_trans.index, df_test.index)
@pytest.mark.parametrize("remainder", ["drop", "passthrough"])
@pytest.mark.parametrize("fit_transform", [True, False])
def test_column_transform_set_output_mixed(remainder, fit_transform):
"""Check ColumnTransformer outputs mixed types correctly."""
pd = pytest.importorskip("pandas")
df = pd.DataFrame(
"pet": pd.Series(["dog", "cat", "snake"], dtype="category"),
"color": pd.Series(["green", "blue", "red"], dtype="object"),
"age": [1.4, 2.1, 4.4],
"height": [20, 40, 10],
"distance": pd.Series([20, pd.NA, 100], dtype="Int32"),
ct = ColumnTransformer(
OneHotEncoder(sparse_output=False, dtype="int8"),
("age", StandardScaler(), ["age"]),
if fit_transform:
X_trans = ct.fit_transform(df)
X_trans = ct.fit(df).transform(df)
assert isinstance(X_trans, pd.DataFrame)
assert_array_equal(X_trans.columns, ct.get_feature_names_out())
expected_dtypes = {
"color_blue": "int8",
"color_green": "int8",
"color_red": "int8",
"age": "float64",
"pet": "category",
"height": "int64",
"distance": "Int32",
for col, dtype in X_trans.dtypes.items():
assert dtype == expected_dtypes[col]
@pytest.mark.parametrize("remainder", ["drop", "passthrough"])
def test_column_transform_set_output_after_fitting(remainder):
pd = pytest.importorskip("pandas")
df = pd.DataFrame(
"pet": pd.Series(["dog", "cat", "snake"], dtype="category"),
"age": [1.4, 2.1, 4.4],
"height": [20, 40, 10],
ct = ColumnTransformer(
OneHotEncoder(sparse_output=False, dtype="int16"),
("age", StandardScaler(), ["age"]),
# fit without calling set_output
X_trans = ct.fit_transform(df)
assert isinstance(X_trans, np.ndarray)
assert X_trans.dtype == "float64"
X_trans_df = ct.transform(df)
expected_dtypes = {
"pet_cat": "int16",
"pet_dog": "int16",
"pet_snake": "int16",
"height": "int64",
"age": "float64",
for col, dtype in X_trans_df.dtypes.items():
assert dtype == expected_dtypes[col]
# PandasOutTransformer that does not define get_feature_names_out and always expects
# the input to be a DataFrame.
class PandasOutTransformer(BaseEstimator):
def __init__(self, offset=1.0):
self.offset = offset
def fit(self, X, y=None):
pd = pytest.importorskip("pandas")
assert isinstance(X, pd.DataFrame)
return self
def transform(self, X, y=None):
pd = pytest.importorskip("pandas")
assert isinstance(X, pd.DataFrame)
return X - self.offset
def set_output(self, transform=None):
# This transformer will always output a DataFrame regardless of the
# configuration.
return self
"trans_1, expected_verbose_names, expected_non_verbose_names",
["trans_0__feat1", "trans_1__feat0"],
["feat1", "feat0"],
["trans_0__feat1", "trans_1__feat0"],
["feat1", "feat0"],
def test_transformers_with_pandas_out_but_not_feature_names_out(
trans_1, expected_verbose_names, expected_non_verbose_names
"""Check that set_config(transform="pandas") is compatible with more transformers.
Specifically, if transformers returns a DataFrame, but does not define
pd = pytest.importorskip("pandas")
X_df = pd.DataFrame({"feat0": [1.0, 2.0, 3.0], "feat1": [2.0, 3.0, 4.0]})
ct = ColumnTransformer(
("trans_0", PandasOutTransformer(offset=3.0), ["feat1"]),
("trans_1", trans_1, ["feat0"]),
X_trans_np = ct.fit_transform(X_df)
assert isinstance(X_trans_np, np.ndarray)
# `ct` does not have `get_feature_names_out` because `PandasOutTransformer` does
# not define the method.
with pytest.raises(AttributeError, match="not provide get_feature_names_out"):
# The feature names are prefixed because verbose_feature_names_out=True is default
X_trans_df0 = ct.fit_transform(X_df)
assert_array_equal(X_trans_df0.columns, expected_verbose_names)
X_trans_df1 = ct.fit_transform(X_df)
assert_array_equal(X_trans_df1.columns, expected_non_verbose_names)
[[], np.array([False, False]), [False, False]],
ids=["list", "bool", "bool_int"],
def test_empty_selection_pandas_output(empty_selection):
"""Check that pandas output works when there is an empty selection.
Non-regression test for gh-25487
pd = pytest.importorskip("pandas")
X = pd.DataFrame([[1.0, 2.2], [3.0, 1.0]], columns=["a", "b"])
ct = ColumnTransformer(
("categorical", "passthrough", empty_selection),
("numerical", StandardScaler(), ["a", "b"]),
X_out = ct.fit_transform(X)
assert_array_equal(X_out.columns, ["numerical__a", "numerical__b"])
X_out = ct.fit_transform(X)
assert_array_equal(X_out.columns, ["a", "b"])
def test_raise_error_if_index_not_aligned():
"""Check column transformer raises error if indices are not aligned.
Non-regression test for gh-26210.
pd = pytest.importorskip("pandas")
X = pd.DataFrame([[1.0, 2.2], [3.0, 1.0]], columns=["a", "b"], index=[8, 3])
reset_index_transformer = FunctionTransformer(
lambda x: x.reset_index(drop=True), feature_names_out="one-to-one"
ct = ColumnTransformer(
("num1", "passthrough", ["a"]),
("num2", reset_index_transformer, ["b"]),
msg = (
"Concatenating DataFrames from the transformer's output lead to"
" an inconsistent number of samples. The output may have Pandas"
" Indexes that do not match."
with pytest.raises(ValueError, match=msg):
def test_remainder_set_output():
"""Check that the output is set for the remainder.
Non-regression test for #26306.
pd = pytest.importorskip("pandas")
df = pd.DataFrame({"a": [True, False, True], "b": [1, 2, 3]})
ct = make_column_transformer(
(VarianceThreshold(), make_column_selector(dtype_include=bool)),
out = ct.fit_transform(df)
pd.testing.assert_frame_equal(out, df)
out = ct.fit_transform(df)
assert isinstance(out, np.ndarray)
# TODO(1.6): replace the warning by a ValueError exception
def test_transform_pd_na():
"""Check behavior when a tranformer's output contains pandas.NA
It should emit a warning unless the output config is set to 'pandas'.
pd = pytest.importorskip("pandas")
if not hasattr(pd, "Float64Dtype"):
"The issue with pd.NA tested here does not happen in old versions that do"
" not have the extension dtypes"
df = pd.DataFrame({"a": [1.5, None]})
ct = make_column_transformer(("passthrough", ["a"]))
# No warning with non-extension dtypes and np.nan
with warnings.catch_warnings():
df = df.convert_dtypes()
# Error with extension dtype and pd.NA
with pytest.warns(FutureWarning, match=r"set_output\(transform='pandas'\)"):
# No warning when output is set to pandas
with warnings.catch_warnings():
# No warning when there are no pd.NA
with warnings.catch_warnings():
def test_dataframe_different_dataframe_libraries():
"""Check fitting and transforming on pandas and polars dataframes."""
pd = pytest.importorskip("pandas")
pl = pytest.importorskip("polars")
X_train_np = np.array([[0, 1], [2, 4], [4, 5]])
X_test_np = np.array([[1, 2], [1, 3], [2, 3]])
# Fit on pandas and transform on polars
X_train_pd = pd.DataFrame(X_train_np, columns=["a", "b"])
X_test_pl = pl.DataFrame(X_test_np, schema=["a", "b"])
ct = make_column_transformer((Trans(), [0, 1]))
out_pl_in = ct.transform(X_test_pl)
assert_array_equal(out_pl_in, X_test_np)
# Fit on polars and transform on pandas
X_train_pl = pl.DataFrame(X_train_np, schema=["a", "b"])
X_test_pd = pd.DataFrame(X_test_np, columns=["a", "b"])
out_pd_in = ct.transform(X_test_pd)
assert_array_equal(out_pd_in, X_test_np)
def test_column_transformer__getitem__():
"""Check __getitem__ for ColumnTransformer."""
X = np.array([[0, 1, 2], [3, 4, 5]])
ct = ColumnTransformer([("t1", Trans(), [0, 1]), ("t2", Trans(), [1, 2])])
msg = "ColumnTransformer is subscriptable after it is fitted"
with pytest.raises(TypeError, match=msg):
assert ct["t1"] is ct.named_transformers_["t1"]
assert ct["t2"] is ct.named_transformers_["t2"]
msg = "'does_not_exist' is not a valid transformer name"
with pytest.raises(KeyError, match=msg):
@pytest.mark.parametrize("transform_output", ["default", "pandas"])
def test_column_transformer_remainder_passthrough_naming_consistency(transform_output):
"""Check that when `remainder="passthrough"`, inconsistent naming is handled
correctly by the underlying `FunctionTransformer`.
Non-regression test for:
pd = pytest.importorskip("pandas")
X = pd.DataFrame(np.random.randn(10, 4))
preprocessor = ColumnTransformer(
transformers=[("scaler", StandardScaler(), [0, 1])],
X_trans = preprocessor.fit_transform(X)
assert X_trans.shape == X.shape
expected_column_names = [
if hasattr(X_trans, "columns"):
assert X_trans.columns.tolist() == expected_column_names
assert preprocessor.get_feature_names_out().tolist() == expected_column_names
@pytest.mark.parametrize("dataframe_lib", ["pandas", "polars"])
def test_column_transformer_column_renaming(dataframe_lib):
"""Check that we properly rename columns when using `ColumnTransformer` and
selected columns are redundant between transformers.
Non-regression test for:
lib = pytest.importorskip(dataframe_lib)
df = lib.DataFrame({"x1": [1, 2, 3], "x2": [10, 20, 30], "x3": [100, 200, 300]})
transformer = ColumnTransformer(
("A", "passthrough", ["x1", "x2", "x3"]),
("B", FunctionTransformer(), ["x1", "x2"]),
("C", StandardScaler(), ["x1", "x3"]),
# special case of empty transformer
("D", FunctionTransformer(lambda x: x[[]]), ["x1", "x2", "x3"]),
df_trans = transformer.fit_transform(df)
assert list(df_trans.columns) == [
@pytest.mark.parametrize("dataframe_lib", ["pandas", "polars"])
def test_column_transformer_error_with_duplicated_columns(dataframe_lib):
"""Check that we raise an error when using `ColumnTransformer` and
the columns names are duplicated between transformers."""
lib = pytest.importorskip(dataframe_lib)
df = lib.DataFrame({"x1": [1, 2, 3], "x2": [10, 20, 30], "x3": [100, 200, 300]})
transformer = ColumnTransformer(
("A", "passthrough", ["x1", "x2", "x3"]),
("B", FunctionTransformer(), ["x1", "x2"]),
("C", StandardScaler(), ["x1", "x3"]),
# special case of empty transformer
("D", FunctionTransformer(lambda x: x[[]]), ["x1", "x2", "x3"]),
err_msg = re.escape(
"Duplicated feature names found before concatenating the outputs of the "
"transformers: ['x1', 'x2', 'x3'].\n"
"Transformer A has conflicting columns names: ['x1', 'x2', 'x3'].\n"
"Transformer B has conflicting columns names: ['x1', 'x2'].\n"
"Transformer C has conflicting columns names: ['x1', 'x3'].\n"
with pytest.raises(ValueError, match=err_msg):
parse_version(joblib.__version__) < parse_version("1.3"),
reason="requires joblib >= 1.3",
def test_column_transformer_auto_memmap():
"""Check that ColumnTransformer works in parallel with joblib's auto-memmapping.
non-regression test for issue #28781
X = np.random.RandomState(0).uniform(size=(3, 4))
scaler = StandardScaler(copy=False)
transformer = ColumnTransformer(
transformers=[("scaler", scaler, [0])],
with joblib.parallel_backend("loky", max_nbytes=1):
Xt = transformer.fit_transform(X)
assert_allclose(Xt, StandardScaler().fit_transform(X[:, [0]]))
# Metadata Routing Tests
# ======================
@pytest.mark.parametrize("method", ["transform", "fit_transform", "fit"])
def test_routing_passed_metadata_not_supported(method):
"""Test that the right error message is raised when metadata is passed while
not supported when `enable_metadata_routing=False`."""
X = np.array([[0, 1, 2], [2, 4, 6]]).T
y = [1, 2, 3]
trs = ColumnTransformer([("trans", Trans(), [0])]).fit(X, y)
with pytest.raises(
ValueError, match="is only supported if enable_metadata_routing=True"
getattr(trs, method)([[1]], sample_weight=[1], prop="a")
@pytest.mark.parametrize("method", ["transform", "fit_transform", "fit"])
def test_metadata_routing_for_column_transformer(method):
"""Test that metadata is routed correctly for column transformer."""
X = np.array([[0, 1, 2], [2, 4, 6]]).T
y = [1, 2, 3]
registry = _Registry()
sample_weight, metadata = [1], "a"
trs = ColumnTransformer(
.set_fit_request(sample_weight=True, metadata=True)
.set_transform_request(sample_weight=True, metadata=True),
if method == "transform":
trs.fit(X, y)
trs.transform(X, sample_weight=sample_weight, metadata=metadata)
getattr(trs, method)(X, y, sample_weight=sample_weight, metadata=metadata)
assert len(registry)
for _trs in registry:
obj=_trs, method=method, sample_weight=sample_weight, metadata=metadata
def test_metadata_routing_no_fit_transform():
"""Test metadata routing when the sub-estimator doesn't implement
class NoFitTransform(BaseEstimator):
def fit(self, X, y=None, sample_weight=None, metadata=None):
assert sample_weight
assert metadata
return self
def transform(self, X, sample_weight=None, metadata=None):
assert sample_weight
assert metadata
return X
X = np.array([[0, 1, 2], [2, 4, 6]]).T
y = [1, 2, 3]
sample_weight, metadata = [1], "a"
trs = ColumnTransformer(
.set_fit_request(sample_weight=True, metadata=True)
.set_transform_request(sample_weight=True, metadata=True),
trs.fit(X, y, sample_weight=sample_weight, metadata=metadata)
trs.fit_transform(X, y, sample_weight=sample_weight, metadata=metadata)
@pytest.mark.parametrize("method", ["transform", "fit_transform", "fit"])
def test_metadata_routing_error_for_column_transformer(method):
"""Test that the right error is raised when metadata is not requested."""
X = np.array([[0, 1, 2], [2, 4, 6]]).T
y = [1, 2, 3]
sample_weight, metadata = [1], "a"
trs = ColumnTransformer([("trans", ConsumingTransformer(), [0])])
error_message = (
"[sample_weight, metadata] are passed but are not explicitly set as requested"
f" or not requested for ConsumingTransformer.{method}"
with pytest.raises(ValueError, match=re.escape(error_message)):
if method == "transform":
trs.fit(X, y)
trs.transform(X, sample_weight=sample_weight, metadata=metadata)
getattr(trs, method)(X, y, sample_weight=sample_weight, metadata=metadata)
def test_get_metadata_routing_works_without_fit():
# Regression test for https://github.com/scikit-learn/scikit-learn/issues/28186
# Make sure ct.get_metadata_routing() works w/o having called fit.
ct = ColumnTransformer([("trans", ConsumingTransformer(), [0])])
def test_remainder_request_always_present():
# Test that remainder request is always present.
ct = ColumnTransformer(
[("trans", StandardScaler(), [0])],
router = ct.get_metadata_routing()
assert router.consumes("fit", ["metadata"]) == set(["metadata"])
def test_unused_transformer_request_present():
# Test that the request of a transformer is always present even when not
# used due to no selected columns.
ct = ColumnTransformer(
lambda X: [],
router = ct.get_metadata_routing()
assert router.consumes("fit", ["metadata"]) == set(["metadata"])
# End of Metadata Routing Tests
# =============================