3RNN/Lib/site-packages/sklearn/decomposition/tests/test_dict_learning.py

984 lines
30 KiB
Python
Raw Permalink Normal View History

2024-05-26 19:49:15 +02:00
import itertools
import warnings
from functools import partial
import numpy as np
import pytest
import sklearn
from sklearn.base import clone
from sklearn.decomposition import (
DictionaryLearning,
MiniBatchDictionaryLearning,
SparseCoder,
dict_learning,
dict_learning_online,
sparse_encode,
)
from sklearn.decomposition._dict_learning import _update_dict
from sklearn.exceptions import ConvergenceWarning
from sklearn.utils import check_array
from sklearn.utils._testing import (
TempMemmap,
assert_allclose,
assert_array_almost_equal,
assert_array_equal,
ignore_warnings,
)
from sklearn.utils.estimator_checks import (
check_transformer_data_not_an_array,
check_transformer_general,
check_transformers_unfitted,
)
from sklearn.utils.parallel import Parallel
rng_global = np.random.RandomState(0)
n_samples, n_features = 10, 8
X = rng_global.randn(n_samples, n_features)
def test_sparse_encode_shapes_omp():
rng = np.random.RandomState(0)
algorithms = ["omp", "lasso_lars", "lasso_cd", "lars", "threshold"]
for n_components, n_samples in itertools.product([1, 5], [1, 9]):
X_ = rng.randn(n_samples, n_features)
dictionary = rng.randn(n_components, n_features)
for algorithm, n_jobs in itertools.product(algorithms, [1, 2]):
code = sparse_encode(X_, dictionary, algorithm=algorithm, n_jobs=n_jobs)
assert code.shape == (n_samples, n_components)
def test_dict_learning_shapes():
n_components = 5
dico = DictionaryLearning(n_components, random_state=0).fit(X)
assert dico.components_.shape == (n_components, n_features)
n_components = 1
dico = DictionaryLearning(n_components, random_state=0).fit(X)
assert dico.components_.shape == (n_components, n_features)
assert dico.transform(X).shape == (X.shape[0], n_components)
def test_dict_learning_overcomplete():
n_components = 12
dico = DictionaryLearning(n_components, random_state=0).fit(X)
assert dico.components_.shape == (n_components, n_features)
def test_max_iter():
def ricker_function(resolution, center, width):
"""Discrete sub-sampled Ricker (Mexican hat) wavelet"""
x = np.linspace(0, resolution - 1, resolution)
x = (
(2 / (np.sqrt(3 * width) * np.pi**0.25))
* (1 - (x - center) ** 2 / width**2)
* np.exp(-((x - center) ** 2) / (2 * width**2))
)
return x
def ricker_matrix(width, resolution, n_components):
"""Dictionary of Ricker (Mexican hat) wavelets"""
centers = np.linspace(0, resolution - 1, n_components)
D = np.empty((n_components, resolution))
for i, center in enumerate(centers):
D[i] = ricker_function(resolution, center, width)
D /= np.sqrt(np.sum(D**2, axis=1))[:, np.newaxis]
return D
transform_algorithm = "lasso_cd"
resolution = 1024
subsampling = 3 # subsampling factor
n_components = resolution // subsampling
# Compute a wavelet dictionary
D_multi = np.r_[
tuple(
ricker_matrix(
width=w, resolution=resolution, n_components=n_components // 5
)
for w in (10, 50, 100, 500, 1000)
)
]
X = np.linspace(0, resolution - 1, resolution)
first_quarter = X < resolution / 4
X[first_quarter] = 3.0
X[np.logical_not(first_quarter)] = -1.0
X = X.reshape(1, -1)
# check that the underlying model fails to converge
with pytest.warns(ConvergenceWarning):
model = SparseCoder(
D_multi, transform_algorithm=transform_algorithm, transform_max_iter=1
)
model.fit_transform(X)
# check that the underlying model converges w/o warnings
with warnings.catch_warnings():
warnings.simplefilter("error", ConvergenceWarning)
model = SparseCoder(
D_multi, transform_algorithm=transform_algorithm, transform_max_iter=2000
)
model.fit_transform(X)
def test_dict_learning_lars_positive_parameter():
n_components = 5
alpha = 1
err_msg = "Positive constraint not supported for 'lars' coding method."
with pytest.raises(ValueError, match=err_msg):
dict_learning(X, n_components, alpha=alpha, positive_code=True)
@pytest.mark.parametrize(
"transform_algorithm",
[
"lasso_lars",
"lasso_cd",
"threshold",
],
)
@pytest.mark.parametrize("positive_code", [False, True])
@pytest.mark.parametrize("positive_dict", [False, True])
def test_dict_learning_positivity(transform_algorithm, positive_code, positive_dict):
n_components = 5
dico = DictionaryLearning(
n_components,
transform_algorithm=transform_algorithm,
random_state=0,
positive_code=positive_code,
positive_dict=positive_dict,
fit_algorithm="cd",
).fit(X)
code = dico.transform(X)
if positive_dict:
assert (dico.components_ >= 0).all()
else:
assert (dico.components_ < 0).any()
if positive_code:
assert (code >= 0).all()
else:
assert (code < 0).any()
@pytest.mark.parametrize("positive_dict", [False, True])
def test_dict_learning_lars_dict_positivity(positive_dict):
n_components = 5
dico = DictionaryLearning(
n_components,
transform_algorithm="lars",
random_state=0,
positive_dict=positive_dict,
fit_algorithm="cd",
).fit(X)
if positive_dict:
assert (dico.components_ >= 0).all()
else:
assert (dico.components_ < 0).any()
def test_dict_learning_lars_code_positivity():
n_components = 5
dico = DictionaryLearning(
n_components,
transform_algorithm="lars",
random_state=0,
positive_code=True,
fit_algorithm="cd",
).fit(X)
err_msg = "Positive constraint not supported for '{}' coding method."
err_msg = err_msg.format("lars")
with pytest.raises(ValueError, match=err_msg):
dico.transform(X)
def test_dict_learning_reconstruction():
n_components = 12
dico = DictionaryLearning(
n_components, transform_algorithm="omp", transform_alpha=0.001, random_state=0
)
code = dico.fit(X).transform(X)
assert_array_almost_equal(np.dot(code, dico.components_), X)
dico.set_params(transform_algorithm="lasso_lars")
code = dico.transform(X)
assert_array_almost_equal(np.dot(code, dico.components_), X, decimal=2)
# used to test lars here too, but there's no guarantee the number of
# nonzero atoms is right.
def test_dict_learning_reconstruction_parallel():
# regression test that parallel reconstruction works with n_jobs>1
n_components = 12
dico = DictionaryLearning(
n_components,
transform_algorithm="omp",
transform_alpha=0.001,
random_state=0,
n_jobs=4,
)
code = dico.fit(X).transform(X)
assert_array_almost_equal(np.dot(code, dico.components_), X)
dico.set_params(transform_algorithm="lasso_lars")
code = dico.transform(X)
assert_array_almost_equal(np.dot(code, dico.components_), X, decimal=2)
def test_dict_learning_lassocd_readonly_data():
n_components = 12
with TempMemmap(X) as X_read_only:
dico = DictionaryLearning(
n_components,
transform_algorithm="lasso_cd",
transform_alpha=0.001,
random_state=0,
n_jobs=4,
)
with ignore_warnings(category=ConvergenceWarning):
code = dico.fit(X_read_only).transform(X_read_only)
assert_array_almost_equal(
np.dot(code, dico.components_), X_read_only, decimal=2
)
def test_dict_learning_nonzero_coefs():
n_components = 4
dico = DictionaryLearning(
n_components,
transform_algorithm="lars",
transform_n_nonzero_coefs=3,
random_state=0,
)
code = dico.fit(X).transform(X[np.newaxis, 1])
assert len(np.flatnonzero(code)) == 3
dico.set_params(transform_algorithm="omp")
code = dico.transform(X[np.newaxis, 1])
assert len(np.flatnonzero(code)) == 3
def test_dict_learning_split():
n_components = 5
dico = DictionaryLearning(
n_components, transform_algorithm="threshold", random_state=0
)
code = dico.fit(X).transform(X)
dico.split_sign = True
split_code = dico.transform(X)
assert_array_almost_equal(
split_code[:, :n_components] - split_code[:, n_components:], code
)
def test_dict_learning_online_shapes():
rng = np.random.RandomState(0)
n_components = 8
code, dictionary = dict_learning_online(
X,
n_components=n_components,
batch_size=4,
max_iter=10,
method="cd",
random_state=rng,
return_code=True,
)
assert code.shape == (n_samples, n_components)
assert dictionary.shape == (n_components, n_features)
assert np.dot(code, dictionary).shape == X.shape
dictionary = dict_learning_online(
X,
n_components=n_components,
batch_size=4,
max_iter=10,
method="cd",
random_state=rng,
return_code=False,
)
assert dictionary.shape == (n_components, n_features)
def test_dict_learning_online_lars_positive_parameter():
err_msg = "Positive constraint not supported for 'lars' coding method."
with pytest.raises(ValueError, match=err_msg):
dict_learning_online(X, batch_size=4, max_iter=10, positive_code=True)
@pytest.mark.parametrize(
"transform_algorithm",
[
"lasso_lars",
"lasso_cd",
"threshold",
],
)
@pytest.mark.parametrize("positive_code", [False, True])
@pytest.mark.parametrize("positive_dict", [False, True])
def test_minibatch_dictionary_learning_positivity(
transform_algorithm, positive_code, positive_dict
):
n_components = 8
dico = MiniBatchDictionaryLearning(
n_components,
batch_size=4,
max_iter=10,
transform_algorithm=transform_algorithm,
random_state=0,
positive_code=positive_code,
positive_dict=positive_dict,
fit_algorithm="cd",
).fit(X)
code = dico.transform(X)
if positive_dict:
assert (dico.components_ >= 0).all()
else:
assert (dico.components_ < 0).any()
if positive_code:
assert (code >= 0).all()
else:
assert (code < 0).any()
@pytest.mark.parametrize("positive_dict", [False, True])
def test_minibatch_dictionary_learning_lars(positive_dict):
n_components = 8
dico = MiniBatchDictionaryLearning(
n_components,
batch_size=4,
max_iter=10,
transform_algorithm="lars",
random_state=0,
positive_dict=positive_dict,
fit_algorithm="cd",
).fit(X)
if positive_dict:
assert (dico.components_ >= 0).all()
else:
assert (dico.components_ < 0).any()
@pytest.mark.parametrize("positive_code", [False, True])
@pytest.mark.parametrize("positive_dict", [False, True])
def test_dict_learning_online_positivity(positive_code, positive_dict):
rng = np.random.RandomState(0)
n_components = 8
code, dictionary = dict_learning_online(
X,
n_components=n_components,
batch_size=4,
method="cd",
alpha=1,
random_state=rng,
positive_dict=positive_dict,
positive_code=positive_code,
)
if positive_dict:
assert (dictionary >= 0).all()
else:
assert (dictionary < 0).any()
if positive_code:
assert (code >= 0).all()
else:
assert (code < 0).any()
def test_dict_learning_online_verbosity():
# test verbosity for better coverage
n_components = 5
import sys
from io import StringIO
old_stdout = sys.stdout
try:
sys.stdout = StringIO()
# convergence monitoring verbosity
dico = MiniBatchDictionaryLearning(
n_components, batch_size=4, max_iter=5, verbose=1, tol=0.1, random_state=0
)
dico.fit(X)
dico = MiniBatchDictionaryLearning(
n_components,
batch_size=4,
max_iter=5,
verbose=1,
max_no_improvement=2,
random_state=0,
)
dico.fit(X)
# higher verbosity level
dico = MiniBatchDictionaryLearning(
n_components, batch_size=4, max_iter=5, verbose=2, random_state=0
)
dico.fit(X)
# function API verbosity
dict_learning_online(
X,
n_components=n_components,
batch_size=4,
alpha=1,
verbose=1,
random_state=0,
)
dict_learning_online(
X,
n_components=n_components,
batch_size=4,
alpha=1,
verbose=2,
random_state=0,
)
finally:
sys.stdout = old_stdout
assert dico.components_.shape == (n_components, n_features)
def test_dict_learning_online_estimator_shapes():
n_components = 5
dico = MiniBatchDictionaryLearning(
n_components, batch_size=4, max_iter=5, random_state=0
)
dico.fit(X)
assert dico.components_.shape == (n_components, n_features)
def test_dict_learning_online_overcomplete():
n_components = 12
dico = MiniBatchDictionaryLearning(
n_components, batch_size=4, max_iter=5, random_state=0
).fit(X)
assert dico.components_.shape == (n_components, n_features)
def test_dict_learning_online_initialization():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features)
dico = MiniBatchDictionaryLearning(
n_components, batch_size=4, max_iter=0, dict_init=V, random_state=0
).fit(X)
assert_array_equal(dico.components_, V)
def test_dict_learning_online_readonly_initialization():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features)
V.setflags(write=False)
MiniBatchDictionaryLearning(
n_components,
batch_size=4,
max_iter=1,
dict_init=V,
random_state=0,
shuffle=False,
).fit(X)
def test_dict_learning_online_partial_fit():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
dict1 = MiniBatchDictionaryLearning(
n_components,
max_iter=10,
batch_size=1,
alpha=1,
shuffle=False,
dict_init=V,
max_no_improvement=None,
tol=0.0,
random_state=0,
).fit(X)
dict2 = MiniBatchDictionaryLearning(
n_components, alpha=1, dict_init=V, random_state=0
)
for i in range(10):
for sample in X:
dict2.partial_fit(sample[np.newaxis, :])
assert not np.all(sparse_encode(X, dict1.components_, alpha=1) == 0)
assert_array_almost_equal(dict1.components_, dict2.components_, decimal=2)
# partial_fit should ignore max_iter (#17433)
assert dict1.n_steps_ == dict2.n_steps_ == 100
def test_sparse_encode_shapes():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
for algo in ("lasso_lars", "lasso_cd", "lars", "omp", "threshold"):
code = sparse_encode(X, V, algorithm=algo)
assert code.shape == (n_samples, n_components)
@pytest.mark.parametrize("algo", ["lasso_lars", "lasso_cd", "threshold"])
@pytest.mark.parametrize("positive", [False, True])
def test_sparse_encode_positivity(algo, positive):
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
code = sparse_encode(X, V, algorithm=algo, positive=positive)
if positive:
assert (code >= 0).all()
else:
assert (code < 0).any()
@pytest.mark.parametrize("algo", ["lars", "omp"])
def test_sparse_encode_unavailable_positivity(algo):
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
err_msg = "Positive constraint not supported for '{}' coding method."
err_msg = err_msg.format(algo)
with pytest.raises(ValueError, match=err_msg):
sparse_encode(X, V, algorithm=algo, positive=True)
def test_sparse_encode_input():
n_components = 100
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
Xf = check_array(X, order="F")
for algo in ("lasso_lars", "lasso_cd", "lars", "omp", "threshold"):
a = sparse_encode(X, V, algorithm=algo)
b = sparse_encode(Xf, V, algorithm=algo)
assert_array_almost_equal(a, b)
def test_sparse_encode_error():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
code = sparse_encode(X, V, alpha=0.001)
assert not np.all(code == 0)
assert np.sqrt(np.sum((np.dot(code, V) - X) ** 2)) < 0.1
def test_sparse_encode_error_default_sparsity():
rng = np.random.RandomState(0)
X = rng.randn(100, 64)
D = rng.randn(2, 64)
code = ignore_warnings(sparse_encode)(X, D, algorithm="omp", n_nonzero_coefs=None)
assert code.shape == (100, 2)
def test_sparse_coder_estimator():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
coder = SparseCoder(
dictionary=V, transform_algorithm="lasso_lars", transform_alpha=0.001
).transform(X)
assert not np.all(coder == 0)
assert np.sqrt(np.sum((np.dot(coder, V) - X) ** 2)) < 0.1
def test_sparse_coder_estimator_clone():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
coder = SparseCoder(
dictionary=V, transform_algorithm="lasso_lars", transform_alpha=0.001
)
cloned = clone(coder)
assert id(cloned) != id(coder)
np.testing.assert_allclose(cloned.dictionary, coder.dictionary)
assert id(cloned.dictionary) != id(coder.dictionary)
assert cloned.n_components_ == coder.n_components_
assert cloned.n_features_in_ == coder.n_features_in_
data = np.random.rand(n_samples, n_features).astype(np.float32)
np.testing.assert_allclose(cloned.transform(data), coder.transform(data))
def test_sparse_coder_parallel_mmap():
# Non-regression test for:
# https://github.com/scikit-learn/scikit-learn/issues/5956
# Test that SparseCoder does not error by passing reading only
# arrays to child processes
rng = np.random.RandomState(777)
n_components, n_features = 40, 64
init_dict = rng.rand(n_components, n_features)
# Ensure that `data` is >2M. Joblib memory maps arrays
# if they are larger than 1MB. The 4 accounts for float32
# data type
n_samples = int(2e6) // (4 * n_features)
data = np.random.rand(n_samples, n_features).astype(np.float32)
sc = SparseCoder(init_dict, transform_algorithm="omp", n_jobs=2)
sc.fit_transform(data)
def test_sparse_coder_common_transformer():
rng = np.random.RandomState(777)
n_components, n_features = 40, 3
init_dict = rng.rand(n_components, n_features)
sc = SparseCoder(init_dict)
check_transformer_data_not_an_array(sc.__class__.__name__, sc)
check_transformer_general(sc.__class__.__name__, sc)
check_transformer_general_memmap = partial(
check_transformer_general, readonly_memmap=True
)
check_transformer_general_memmap(sc.__class__.__name__, sc)
check_transformers_unfitted(sc.__class__.__name__, sc)
def test_sparse_coder_n_features_in():
d = np.array([[1, 2, 3], [1, 2, 3]])
sc = SparseCoder(d)
assert sc.n_features_in_ == d.shape[1]
def test_update_dict():
# Check the dict update in batch mode vs online mode
# Non-regression test for #4866
rng = np.random.RandomState(0)
code = np.array([[0.5, -0.5], [0.1, 0.9]])
dictionary = np.array([[1.0, 0.0], [0.6, 0.8]])
X = np.dot(code, dictionary) + rng.randn(2, 2)
# full batch update
newd_batch = dictionary.copy()
_update_dict(newd_batch, X, code)
# online update
A = np.dot(code.T, code)
B = np.dot(X.T, code)
newd_online = dictionary.copy()
_update_dict(newd_online, X, code, A, B)
assert_allclose(newd_batch, newd_online)
@pytest.mark.parametrize(
"algorithm", ("lasso_lars", "lasso_cd", "lars", "threshold", "omp")
)
@pytest.mark.parametrize("data_type", (np.float32, np.float64))
# Note: do not check integer input because `lasso_lars` and `lars` fail with
# `ValueError` in `_lars_path_solver`
def test_sparse_encode_dtype_match(data_type, algorithm):
n_components = 6
rng = np.random.RandomState(0)
dictionary = rng.randn(n_components, n_features)
code = sparse_encode(
X.astype(data_type), dictionary.astype(data_type), algorithm=algorithm
)
assert code.dtype == data_type
@pytest.mark.parametrize(
"algorithm", ("lasso_lars", "lasso_cd", "lars", "threshold", "omp")
)
def test_sparse_encode_numerical_consistency(algorithm):
# verify numerical consistency among np.float32 and np.float64
rtol = 1e-4
n_components = 6
rng = np.random.RandomState(0)
dictionary = rng.randn(n_components, n_features)
code_32 = sparse_encode(
X.astype(np.float32), dictionary.astype(np.float32), algorithm=algorithm
)
code_64 = sparse_encode(
X.astype(np.float64), dictionary.astype(np.float64), algorithm=algorithm
)
assert_allclose(code_32, code_64, rtol=rtol)
@pytest.mark.parametrize(
"transform_algorithm", ("lasso_lars", "lasso_cd", "lars", "threshold", "omp")
)
@pytest.mark.parametrize("data_type", (np.float32, np.float64))
# Note: do not check integer input because `lasso_lars` and `lars` fail with
# `ValueError` in `_lars_path_solver`
def test_sparse_coder_dtype_match(data_type, transform_algorithm):
# Verify preserving dtype for transform in sparse coder
n_components = 6
rng = np.random.RandomState(0)
dictionary = rng.randn(n_components, n_features)
coder = SparseCoder(
dictionary.astype(data_type), transform_algorithm=transform_algorithm
)
code = coder.transform(X.astype(data_type))
assert code.dtype == data_type
@pytest.mark.parametrize("fit_algorithm", ("lars", "cd"))
@pytest.mark.parametrize(
"transform_algorithm", ("lasso_lars", "lasso_cd", "lars", "threshold", "omp")
)
@pytest.mark.parametrize(
"data_type, expected_type",
(
(np.float32, np.float32),
(np.float64, np.float64),
(np.int32, np.float64),
(np.int64, np.float64),
),
)
def test_dictionary_learning_dtype_match(
data_type,
expected_type,
fit_algorithm,
transform_algorithm,
):
# Verify preserving dtype for fit and transform in dictionary learning class
dict_learner = DictionaryLearning(
n_components=8,
fit_algorithm=fit_algorithm,
transform_algorithm=transform_algorithm,
random_state=0,
)
dict_learner.fit(X.astype(data_type))
assert dict_learner.components_.dtype == expected_type
assert dict_learner.transform(X.astype(data_type)).dtype == expected_type
@pytest.mark.parametrize("fit_algorithm", ("lars", "cd"))
@pytest.mark.parametrize(
"transform_algorithm", ("lasso_lars", "lasso_cd", "lars", "threshold", "omp")
)
@pytest.mark.parametrize(
"data_type, expected_type",
(
(np.float32, np.float32),
(np.float64, np.float64),
(np.int32, np.float64),
(np.int64, np.float64),
),
)
def test_minibatch_dictionary_learning_dtype_match(
data_type,
expected_type,
fit_algorithm,
transform_algorithm,
):
# Verify preserving dtype for fit and transform in minibatch dictionary learning
dict_learner = MiniBatchDictionaryLearning(
n_components=8,
batch_size=10,
fit_algorithm=fit_algorithm,
transform_algorithm=transform_algorithm,
max_iter=100,
tol=1e-1,
random_state=0,
)
dict_learner.fit(X.astype(data_type))
assert dict_learner.components_.dtype == expected_type
assert dict_learner.transform(X.astype(data_type)).dtype == expected_type
assert dict_learner._A.dtype == expected_type
assert dict_learner._B.dtype == expected_type
@pytest.mark.parametrize("method", ("lars", "cd"))
@pytest.mark.parametrize(
"data_type, expected_type",
(
(np.float32, np.float32),
(np.float64, np.float64),
(np.int32, np.float64),
(np.int64, np.float64),
),
)
def test_dict_learning_dtype_match(data_type, expected_type, method):
# Verify output matrix dtype
rng = np.random.RandomState(0)
n_components = 8
code, dictionary, _ = dict_learning(
X.astype(data_type),
n_components=n_components,
alpha=1,
random_state=rng,
method=method,
)
assert code.dtype == expected_type
assert dictionary.dtype == expected_type
@pytest.mark.parametrize("method", ("lars", "cd"))
def test_dict_learning_numerical_consistency(method):
# verify numerically consistent among np.float32 and np.float64
rtol = 1e-6
n_components = 4
alpha = 2
U_64, V_64, _ = dict_learning(
X.astype(np.float64),
n_components=n_components,
alpha=alpha,
random_state=0,
method=method,
)
U_32, V_32, _ = dict_learning(
X.astype(np.float32),
n_components=n_components,
alpha=alpha,
random_state=0,
method=method,
)
# Optimal solution (U*, V*) is not unique.
# If (U*, V*) is optimal solution, (-U*,-V*) is also optimal,
# and (column permutated U*, row permutated V*) are also optional
# as long as holding UV.
# So here UV, ||U||_1,1 and sum(||V_k||_2^2) are verified
# instead of comparing directly U and V.
assert_allclose(np.matmul(U_64, V_64), np.matmul(U_32, V_32), rtol=rtol)
assert_allclose(np.sum(np.abs(U_64)), np.sum(np.abs(U_32)), rtol=rtol)
assert_allclose(np.sum(V_64**2), np.sum(V_32**2), rtol=rtol)
# verify an obtained solution is not degenerate
assert np.mean(U_64 != 0.0) > 0.05
assert np.count_nonzero(U_64 != 0.0) == np.count_nonzero(U_32 != 0.0)
@pytest.mark.parametrize("method", ("lars", "cd"))
@pytest.mark.parametrize(
"data_type, expected_type",
(
(np.float32, np.float32),
(np.float64, np.float64),
(np.int32, np.float64),
(np.int64, np.float64),
),
)
def test_dict_learning_online_dtype_match(data_type, expected_type, method):
# Verify output matrix dtype
rng = np.random.RandomState(0)
n_components = 8
code, dictionary = dict_learning_online(
X.astype(data_type),
n_components=n_components,
alpha=1,
batch_size=10,
random_state=rng,
method=method,
)
assert code.dtype == expected_type
assert dictionary.dtype == expected_type
@pytest.mark.parametrize("method", ("lars", "cd"))
def test_dict_learning_online_numerical_consistency(method):
# verify numerically consistent among np.float32 and np.float64
rtol = 1e-4
n_components = 4
alpha = 1
U_64, V_64 = dict_learning_online(
X.astype(np.float64),
n_components=n_components,
max_iter=1_000,
alpha=alpha,
batch_size=10,
random_state=0,
method=method,
tol=0.0,
max_no_improvement=None,
)
U_32, V_32 = dict_learning_online(
X.astype(np.float32),
n_components=n_components,
max_iter=1_000,
alpha=alpha,
batch_size=10,
random_state=0,
method=method,
tol=0.0,
max_no_improvement=None,
)
# Optimal solution (U*, V*) is not unique.
# If (U*, V*) is optimal solution, (-U*,-V*) is also optimal,
# and (column permutated U*, row permutated V*) are also optional
# as long as holding UV.
# So here UV, ||U||_1,1 and sum(||V_k||_2) are verified
# instead of comparing directly U and V.
assert_allclose(np.matmul(U_64, V_64), np.matmul(U_32, V_32), rtol=rtol)
assert_allclose(np.sum(np.abs(U_64)), np.sum(np.abs(U_32)), rtol=rtol)
assert_allclose(np.sum(V_64**2), np.sum(V_32**2), rtol=rtol)
# verify an obtained solution is not degenerate
assert np.mean(U_64 != 0.0) > 0.05
assert np.count_nonzero(U_64 != 0.0) == np.count_nonzero(U_32 != 0.0)
@pytest.mark.parametrize(
"estimator",
[
SparseCoder(X.T),
DictionaryLearning(),
MiniBatchDictionaryLearning(batch_size=4, max_iter=10),
],
ids=lambda x: x.__class__.__name__,
)
def test_get_feature_names_out(estimator):
"""Check feature names for dict learning estimators."""
estimator.fit(X)
n_components = X.shape[1]
feature_names_out = estimator.get_feature_names_out()
estimator_name = estimator.__class__.__name__.lower()
assert_array_equal(
feature_names_out,
[f"{estimator_name}{i}" for i in range(n_components)],
)
def test_cd_work_on_joblib_memmapped_data(monkeypatch):
monkeypatch.setattr(
sklearn.decomposition._dict_learning,
"Parallel",
partial(Parallel, max_nbytes=100),
)
rng = np.random.RandomState(0)
X_train = rng.randn(10, 10)
dict_learner = DictionaryLearning(
n_components=5,
random_state=0,
n_jobs=2,
fit_algorithm="cd",
max_iter=50,
verbose=True,
)
# This must run and complete without error.
dict_learner.fit(X_train)
# TODO(1.6): remove in 1.6
def test_xxx():
warn_msg = "`max_iter=None` is deprecated in version 1.4 and will be removed"
with pytest.warns(FutureWarning, match=warn_msg):
MiniBatchDictionaryLearning(max_iter=None, random_state=0).fit(X)
with pytest.warns(FutureWarning, match=warn_msg):
dict_learning_online(X, max_iter=None, random_state=0)