542 lines
19 KiB
Python
542 lines
19 KiB
Python
from unittest.mock import Mock
|
|
|
|
import numpy as np
|
|
import pytest
|
|
from scipy import sparse
|
|
from scipy.linalg import eigh
|
|
from scipy.sparse.linalg import eigsh, lobpcg
|
|
|
|
from sklearn.cluster import KMeans
|
|
from sklearn.datasets import make_blobs
|
|
from sklearn.manifold import SpectralEmbedding, _spectral_embedding, spectral_embedding
|
|
from sklearn.manifold._spectral_embedding import (
|
|
_graph_connected_component,
|
|
_graph_is_connected,
|
|
)
|
|
from sklearn.metrics import normalized_mutual_info_score, pairwise_distances
|
|
from sklearn.metrics.pairwise import rbf_kernel
|
|
from sklearn.neighbors import NearestNeighbors
|
|
from sklearn.utils._testing import assert_array_almost_equal, assert_array_equal
|
|
from sklearn.utils.extmath import _deterministic_vector_sign_flip
|
|
from sklearn.utils.fixes import (
|
|
COO_CONTAINERS,
|
|
CSC_CONTAINERS,
|
|
CSR_CONTAINERS,
|
|
parse_version,
|
|
sp_version,
|
|
)
|
|
from sklearn.utils.fixes import laplacian as csgraph_laplacian
|
|
|
|
try:
|
|
from pyamg import smoothed_aggregation_solver # noqa
|
|
|
|
pyamg_available = True
|
|
except ImportError:
|
|
pyamg_available = False
|
|
skip_if_no_pyamg = pytest.mark.skipif(
|
|
not pyamg_available, reason="PyAMG is required for the tests in this function."
|
|
)
|
|
|
|
# non centered, sparse centers to check the
|
|
centers = np.array(
|
|
[
|
|
[0.0, 5.0, 0.0, 0.0, 0.0],
|
|
[0.0, 0.0, 4.0, 0.0, 0.0],
|
|
[1.0, 0.0, 0.0, 5.0, 1.0],
|
|
]
|
|
)
|
|
n_samples = 1000
|
|
n_clusters, n_features = centers.shape
|
|
S, true_labels = make_blobs(
|
|
n_samples=n_samples, centers=centers, cluster_std=1.0, random_state=42
|
|
)
|
|
|
|
|
|
def _assert_equal_with_sign_flipping(A, B, tol=0.0):
|
|
"""Check array A and B are equal with possible sign flipping on
|
|
each columns"""
|
|
tol_squared = tol**2
|
|
for A_col, B_col in zip(A.T, B.T):
|
|
assert (
|
|
np.max((A_col - B_col) ** 2) <= tol_squared
|
|
or np.max((A_col + B_col) ** 2) <= tol_squared
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("coo_container", COO_CONTAINERS)
|
|
def test_sparse_graph_connected_component(coo_container):
|
|
rng = np.random.RandomState(42)
|
|
n_samples = 300
|
|
boundaries = [0, 42, 121, 200, n_samples]
|
|
p = rng.permutation(n_samples)
|
|
connections = []
|
|
|
|
for start, stop in zip(boundaries[:-1], boundaries[1:]):
|
|
group = p[start:stop]
|
|
# Connect all elements within the group at least once via an
|
|
# arbitrary path that spans the group.
|
|
for i in range(len(group) - 1):
|
|
connections.append((group[i], group[i + 1]))
|
|
|
|
# Add some more random connections within the group
|
|
min_idx, max_idx = 0, len(group) - 1
|
|
n_random_connections = 1000
|
|
source = rng.randint(min_idx, max_idx, size=n_random_connections)
|
|
target = rng.randint(min_idx, max_idx, size=n_random_connections)
|
|
connections.extend(zip(group[source], group[target]))
|
|
|
|
# Build a symmetric affinity matrix
|
|
row_idx, column_idx = tuple(np.array(connections).T)
|
|
data = rng.uniform(0.1, 42, size=len(connections))
|
|
affinity = coo_container((data, (row_idx, column_idx)))
|
|
affinity = 0.5 * (affinity + affinity.T)
|
|
|
|
for start, stop in zip(boundaries[:-1], boundaries[1:]):
|
|
component_1 = _graph_connected_component(affinity, p[start])
|
|
component_size = stop - start
|
|
assert component_1.sum() == component_size
|
|
|
|
# We should retrieve the same component mask by starting by both ends
|
|
# of the group
|
|
component_2 = _graph_connected_component(affinity, p[stop - 1])
|
|
assert component_2.sum() == component_size
|
|
assert_array_equal(component_1, component_2)
|
|
|
|
|
|
# TODO: investigate why this test is seed-sensitive on 32-bit Python
|
|
# runtimes. Is this revealing a numerical stability problem ? Or is it
|
|
# expected from the test numerical design ? In the latter case the test
|
|
# should be made less seed-sensitive instead.
|
|
@pytest.mark.parametrize(
|
|
"eigen_solver",
|
|
[
|
|
"arpack",
|
|
"lobpcg",
|
|
pytest.param("amg", marks=skip_if_no_pyamg),
|
|
],
|
|
)
|
|
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
|
|
def test_spectral_embedding_two_components(eigen_solver, dtype, seed=0):
|
|
# Test spectral embedding with two components
|
|
random_state = np.random.RandomState(seed)
|
|
n_sample = 100
|
|
affinity = np.zeros(shape=[n_sample * 2, n_sample * 2])
|
|
# first component
|
|
affinity[0:n_sample, 0:n_sample] = (
|
|
np.abs(random_state.randn(n_sample, n_sample)) + 2
|
|
)
|
|
# second component
|
|
affinity[n_sample::, n_sample::] = (
|
|
np.abs(random_state.randn(n_sample, n_sample)) + 2
|
|
)
|
|
|
|
# Test of internal _graph_connected_component before connection
|
|
component = _graph_connected_component(affinity, 0)
|
|
assert component[:n_sample].all()
|
|
assert not component[n_sample:].any()
|
|
component = _graph_connected_component(affinity, -1)
|
|
assert not component[:n_sample].any()
|
|
assert component[n_sample:].all()
|
|
|
|
# connection
|
|
affinity[0, n_sample + 1] = 1
|
|
affinity[n_sample + 1, 0] = 1
|
|
affinity.flat[:: 2 * n_sample + 1] = 0
|
|
affinity = 0.5 * (affinity + affinity.T)
|
|
|
|
true_label = np.zeros(shape=2 * n_sample)
|
|
true_label[0:n_sample] = 1
|
|
|
|
se_precomp = SpectralEmbedding(
|
|
n_components=1,
|
|
affinity="precomputed",
|
|
random_state=np.random.RandomState(seed),
|
|
eigen_solver=eigen_solver,
|
|
)
|
|
|
|
embedded_coordinate = se_precomp.fit_transform(affinity.astype(dtype))
|
|
# thresholding on the first components using 0.
|
|
label_ = np.array(embedded_coordinate.ravel() < 0, dtype=np.int64)
|
|
assert normalized_mutual_info_score(true_label, label_) == pytest.approx(1.0)
|
|
|
|
|
|
@pytest.mark.parametrize("sparse_container", [None, *CSR_CONTAINERS])
|
|
@pytest.mark.parametrize(
|
|
"eigen_solver",
|
|
[
|
|
"arpack",
|
|
"lobpcg",
|
|
pytest.param("amg", marks=skip_if_no_pyamg),
|
|
],
|
|
)
|
|
@pytest.mark.parametrize("dtype", (np.float32, np.float64))
|
|
def test_spectral_embedding_precomputed_affinity(
|
|
sparse_container, eigen_solver, dtype, seed=36
|
|
):
|
|
# Test spectral embedding with precomputed kernel
|
|
gamma = 1.0
|
|
X = S if sparse_container is None else sparse_container(S)
|
|
|
|
se_precomp = SpectralEmbedding(
|
|
n_components=2,
|
|
affinity="precomputed",
|
|
random_state=np.random.RandomState(seed),
|
|
eigen_solver=eigen_solver,
|
|
)
|
|
se_rbf = SpectralEmbedding(
|
|
n_components=2,
|
|
affinity="rbf",
|
|
gamma=gamma,
|
|
random_state=np.random.RandomState(seed),
|
|
eigen_solver=eigen_solver,
|
|
)
|
|
embed_precomp = se_precomp.fit_transform(rbf_kernel(X.astype(dtype), gamma=gamma))
|
|
embed_rbf = se_rbf.fit_transform(X.astype(dtype))
|
|
assert_array_almost_equal(se_precomp.affinity_matrix_, se_rbf.affinity_matrix_)
|
|
_assert_equal_with_sign_flipping(embed_precomp, embed_rbf, 0.05)
|
|
|
|
|
|
def test_precomputed_nearest_neighbors_filtering():
|
|
# Test precomputed graph filtering when containing too many neighbors
|
|
n_neighbors = 2
|
|
results = []
|
|
for additional_neighbors in [0, 10]:
|
|
nn = NearestNeighbors(n_neighbors=n_neighbors + additional_neighbors).fit(S)
|
|
graph = nn.kneighbors_graph(S, mode="connectivity")
|
|
embedding = (
|
|
SpectralEmbedding(
|
|
random_state=0,
|
|
n_components=2,
|
|
affinity="precomputed_nearest_neighbors",
|
|
n_neighbors=n_neighbors,
|
|
)
|
|
.fit(graph)
|
|
.embedding_
|
|
)
|
|
results.append(embedding)
|
|
|
|
assert_array_equal(results[0], results[1])
|
|
|
|
|
|
@pytest.mark.parametrize("sparse_container", [None, *CSR_CONTAINERS])
|
|
def test_spectral_embedding_callable_affinity(sparse_container, seed=36):
|
|
# Test spectral embedding with callable affinity
|
|
gamma = 0.9
|
|
kern = rbf_kernel(S, gamma=gamma)
|
|
X = S if sparse_container is None else sparse_container(S)
|
|
|
|
se_callable = SpectralEmbedding(
|
|
n_components=2,
|
|
affinity=(lambda x: rbf_kernel(x, gamma=gamma)),
|
|
gamma=gamma,
|
|
random_state=np.random.RandomState(seed),
|
|
)
|
|
se_rbf = SpectralEmbedding(
|
|
n_components=2,
|
|
affinity="rbf",
|
|
gamma=gamma,
|
|
random_state=np.random.RandomState(seed),
|
|
)
|
|
embed_rbf = se_rbf.fit_transform(X)
|
|
embed_callable = se_callable.fit_transform(X)
|
|
assert_array_almost_equal(se_callable.affinity_matrix_, se_rbf.affinity_matrix_)
|
|
assert_array_almost_equal(kern, se_rbf.affinity_matrix_)
|
|
_assert_equal_with_sign_flipping(embed_rbf, embed_callable, 0.05)
|
|
|
|
|
|
# TODO: Remove when pyamg does replaces sp.rand call with np.random.rand
|
|
# https://github.com/scikit-learn/scikit-learn/issues/15913
|
|
@pytest.mark.filterwarnings(
|
|
"ignore:scipy.rand is deprecated:DeprecationWarning:pyamg.*"
|
|
)
|
|
# TODO: Remove when pyamg removes the use of np.float
|
|
@pytest.mark.filterwarnings(
|
|
"ignore:`np.float` is a deprecated alias:DeprecationWarning:pyamg.*"
|
|
)
|
|
# TODO: Remove when pyamg removes the use of pinv2
|
|
@pytest.mark.filterwarnings(
|
|
"ignore:scipy.linalg.pinv2 is deprecated:DeprecationWarning:pyamg.*"
|
|
)
|
|
@pytest.mark.filterwarnings(
|
|
"ignore:np.find_common_type is deprecated:DeprecationWarning:pyamg.*"
|
|
)
|
|
@pytest.mark.skipif(
|
|
not pyamg_available, reason="PyAMG is required for the tests in this function."
|
|
)
|
|
@pytest.mark.parametrize("dtype", (np.float32, np.float64))
|
|
@pytest.mark.parametrize("coo_container", COO_CONTAINERS)
|
|
def test_spectral_embedding_amg_solver(dtype, coo_container, seed=36):
|
|
se_amg = SpectralEmbedding(
|
|
n_components=2,
|
|
affinity="nearest_neighbors",
|
|
eigen_solver="amg",
|
|
n_neighbors=5,
|
|
random_state=np.random.RandomState(seed),
|
|
)
|
|
se_arpack = SpectralEmbedding(
|
|
n_components=2,
|
|
affinity="nearest_neighbors",
|
|
eigen_solver="arpack",
|
|
n_neighbors=5,
|
|
random_state=np.random.RandomState(seed),
|
|
)
|
|
embed_amg = se_amg.fit_transform(S.astype(dtype))
|
|
embed_arpack = se_arpack.fit_transform(S.astype(dtype))
|
|
_assert_equal_with_sign_flipping(embed_amg, embed_arpack, 1e-5)
|
|
|
|
# same with special case in which amg is not actually used
|
|
# regression test for #10715
|
|
# affinity between nodes
|
|
row = np.array([0, 0, 1, 2, 3, 3, 4], dtype=np.int32)
|
|
col = np.array([1, 2, 2, 3, 4, 5, 5], dtype=np.int32)
|
|
val = np.array([100, 100, 100, 1, 100, 100, 100], dtype=np.int64)
|
|
|
|
affinity = coo_container(
|
|
(np.hstack([val, val]), (np.hstack([row, col]), np.hstack([col, row]))),
|
|
shape=(6, 6),
|
|
)
|
|
se_amg.affinity = "precomputed"
|
|
se_arpack.affinity = "precomputed"
|
|
embed_amg = se_amg.fit_transform(affinity.astype(dtype))
|
|
embed_arpack = se_arpack.fit_transform(affinity.astype(dtype))
|
|
_assert_equal_with_sign_flipping(embed_amg, embed_arpack, 1e-5)
|
|
|
|
# Check that passing a sparse matrix with `np.int64` indices dtype raises an error
|
|
# or is successful based on the version of SciPy which is installed.
|
|
# Use a CSR matrix to avoid any conversion during the validation
|
|
affinity = affinity.tocsr()
|
|
affinity.indptr = affinity.indptr.astype(np.int64)
|
|
affinity.indices = affinity.indices.astype(np.int64)
|
|
|
|
# PR: https://github.com/scipy/scipy/pull/18913
|
|
# First integration in 1.11.3: https://github.com/scipy/scipy/pull/19279
|
|
scipy_graph_traversal_supports_int64_index = sp_version >= parse_version("1.11.3")
|
|
if scipy_graph_traversal_supports_int64_index:
|
|
se_amg.fit_transform(affinity)
|
|
else:
|
|
err_msg = "Only sparse matrices with 32-bit integer indices are accepted"
|
|
with pytest.raises(ValueError, match=err_msg):
|
|
se_amg.fit_transform(affinity)
|
|
|
|
|
|
# TODO: Remove filterwarnings when pyamg does replaces sp.rand call with
|
|
# np.random.rand:
|
|
# https://github.com/scikit-learn/scikit-learn/issues/15913
|
|
@pytest.mark.filterwarnings(
|
|
"ignore:scipy.rand is deprecated:DeprecationWarning:pyamg.*"
|
|
)
|
|
# TODO: Remove when pyamg removes the use of np.float
|
|
@pytest.mark.filterwarnings(
|
|
"ignore:`np.float` is a deprecated alias:DeprecationWarning:pyamg.*"
|
|
)
|
|
# TODO: Remove when pyamg removes the use of pinv2
|
|
@pytest.mark.filterwarnings(
|
|
"ignore:scipy.linalg.pinv2 is deprecated:DeprecationWarning:pyamg.*"
|
|
)
|
|
@pytest.mark.skipif(
|
|
not pyamg_available, reason="PyAMG is required for the tests in this function."
|
|
)
|
|
# TODO: Remove when pyamg removes the use of np.find_common_type
|
|
@pytest.mark.filterwarnings(
|
|
"ignore:np.find_common_type is deprecated:DeprecationWarning:pyamg.*"
|
|
)
|
|
@pytest.mark.parametrize("dtype", (np.float32, np.float64))
|
|
def test_spectral_embedding_amg_solver_failure(dtype, seed=36):
|
|
# Non-regression test for amg solver failure (issue #13393 on github)
|
|
num_nodes = 100
|
|
X = sparse.rand(num_nodes, num_nodes, density=0.1, random_state=seed)
|
|
X = X.astype(dtype)
|
|
upper = sparse.triu(X) - sparse.diags(X.diagonal())
|
|
sym_matrix = upper + upper.T
|
|
embedding = spectral_embedding(
|
|
sym_matrix, n_components=10, eigen_solver="amg", random_state=0
|
|
)
|
|
|
|
# Check that the learned embedding is stable w.r.t. random solver init:
|
|
for i in range(3):
|
|
new_embedding = spectral_embedding(
|
|
sym_matrix, n_components=10, eigen_solver="amg", random_state=i + 1
|
|
)
|
|
_assert_equal_with_sign_flipping(embedding, new_embedding, tol=0.05)
|
|
|
|
|
|
@pytest.mark.filterwarnings("ignore:the behavior of nmi will change in version 0.22")
|
|
def test_pipeline_spectral_clustering(seed=36):
|
|
# Test using pipeline to do spectral clustering
|
|
random_state = np.random.RandomState(seed)
|
|
se_rbf = SpectralEmbedding(
|
|
n_components=n_clusters, affinity="rbf", random_state=random_state
|
|
)
|
|
se_knn = SpectralEmbedding(
|
|
n_components=n_clusters,
|
|
affinity="nearest_neighbors",
|
|
n_neighbors=5,
|
|
random_state=random_state,
|
|
)
|
|
for se in [se_rbf, se_knn]:
|
|
km = KMeans(n_clusters=n_clusters, random_state=random_state, n_init=10)
|
|
km.fit(se.fit_transform(S))
|
|
assert_array_almost_equal(
|
|
normalized_mutual_info_score(km.labels_, true_labels), 1.0, 2
|
|
)
|
|
|
|
|
|
def test_connectivity(seed=36):
|
|
# Test that graph connectivity test works as expected
|
|
graph = np.array(
|
|
[
|
|
[1, 0, 0, 0, 0],
|
|
[0, 1, 1, 0, 0],
|
|
[0, 1, 1, 1, 0],
|
|
[0, 0, 1, 1, 1],
|
|
[0, 0, 0, 1, 1],
|
|
]
|
|
)
|
|
assert not _graph_is_connected(graph)
|
|
for csr_container in CSR_CONTAINERS:
|
|
assert not _graph_is_connected(csr_container(graph))
|
|
for csc_container in CSC_CONTAINERS:
|
|
assert not _graph_is_connected(csc_container(graph))
|
|
|
|
graph = np.array(
|
|
[
|
|
[1, 1, 0, 0, 0],
|
|
[1, 1, 1, 0, 0],
|
|
[0, 1, 1, 1, 0],
|
|
[0, 0, 1, 1, 1],
|
|
[0, 0, 0, 1, 1],
|
|
]
|
|
)
|
|
assert _graph_is_connected(graph)
|
|
for csr_container in CSR_CONTAINERS:
|
|
assert _graph_is_connected(csr_container(graph))
|
|
for csc_container in CSC_CONTAINERS:
|
|
assert _graph_is_connected(csc_container(graph))
|
|
|
|
|
|
def test_spectral_embedding_deterministic():
|
|
# Test that Spectral Embedding is deterministic
|
|
random_state = np.random.RandomState(36)
|
|
data = random_state.randn(10, 30)
|
|
sims = rbf_kernel(data)
|
|
embedding_1 = spectral_embedding(sims)
|
|
embedding_2 = spectral_embedding(sims)
|
|
assert_array_almost_equal(embedding_1, embedding_2)
|
|
|
|
|
|
def test_spectral_embedding_unnormalized():
|
|
# Test that spectral_embedding is also processing unnormalized laplacian
|
|
# correctly
|
|
random_state = np.random.RandomState(36)
|
|
data = random_state.randn(10, 30)
|
|
sims = rbf_kernel(data)
|
|
n_components = 8
|
|
embedding_1 = spectral_embedding(
|
|
sims, norm_laplacian=False, n_components=n_components, drop_first=False
|
|
)
|
|
|
|
# Verify using manual computation with dense eigh
|
|
laplacian, dd = csgraph_laplacian(sims, normed=False, return_diag=True)
|
|
_, diffusion_map = eigh(laplacian)
|
|
embedding_2 = diffusion_map.T[:n_components]
|
|
embedding_2 = _deterministic_vector_sign_flip(embedding_2).T
|
|
|
|
assert_array_almost_equal(embedding_1, embedding_2)
|
|
|
|
|
|
def test_spectral_embedding_first_eigen_vector():
|
|
# Test that the first eigenvector of spectral_embedding
|
|
# is constant and that the second is not (for a connected graph)
|
|
random_state = np.random.RandomState(36)
|
|
data = random_state.randn(10, 30)
|
|
sims = rbf_kernel(data)
|
|
n_components = 2
|
|
|
|
for seed in range(10):
|
|
embedding = spectral_embedding(
|
|
sims,
|
|
norm_laplacian=False,
|
|
n_components=n_components,
|
|
drop_first=False,
|
|
random_state=seed,
|
|
)
|
|
|
|
assert np.std(embedding[:, 0]) == pytest.approx(0)
|
|
assert np.std(embedding[:, 1]) > 1e-3
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"eigen_solver",
|
|
[
|
|
"arpack",
|
|
"lobpcg",
|
|
pytest.param("amg", marks=skip_if_no_pyamg),
|
|
],
|
|
)
|
|
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
|
|
def test_spectral_embedding_preserves_dtype(eigen_solver, dtype):
|
|
"""Check that `SpectralEmbedding is preserving the dtype of the fitted
|
|
attribute and transformed data.
|
|
|
|
Ideally, this test should be covered by the common test
|
|
`check_transformer_preserve_dtypes`. However, this test only run
|
|
with transformers implementing `transform` while `SpectralEmbedding`
|
|
implements only `fit_transform`.
|
|
"""
|
|
X = S.astype(dtype)
|
|
se = SpectralEmbedding(
|
|
n_components=2, affinity="rbf", eigen_solver=eigen_solver, random_state=0
|
|
)
|
|
X_trans = se.fit_transform(X)
|
|
|
|
assert X_trans.dtype == dtype
|
|
assert se.embedding_.dtype == dtype
|
|
assert se.affinity_matrix_.dtype == dtype
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
pyamg_available,
|
|
reason="PyAMG is installed and we should not test for an error.",
|
|
)
|
|
def test_error_pyamg_not_available():
|
|
se_precomp = SpectralEmbedding(
|
|
n_components=2,
|
|
affinity="rbf",
|
|
eigen_solver="amg",
|
|
)
|
|
err_msg = "The eigen_solver was set to 'amg', but pyamg is not available."
|
|
with pytest.raises(ValueError, match=err_msg):
|
|
se_precomp.fit_transform(S)
|
|
|
|
|
|
# TODO: Remove when pyamg removes the use of np.find_common_type
|
|
@pytest.mark.filterwarnings(
|
|
"ignore:np.find_common_type is deprecated:DeprecationWarning:pyamg.*"
|
|
)
|
|
@pytest.mark.parametrize("solver", ["arpack", "amg", "lobpcg"])
|
|
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
|
|
def test_spectral_eigen_tol_auto(monkeypatch, solver, csr_container):
|
|
"""Test that `eigen_tol="auto"` is resolved correctly"""
|
|
if solver == "amg" and not pyamg_available:
|
|
pytest.skip("PyAMG is not available.")
|
|
X, _ = make_blobs(
|
|
n_samples=200, random_state=0, centers=[[1, 1], [-1, -1]], cluster_std=0.01
|
|
)
|
|
D = pairwise_distances(X) # Distance matrix
|
|
S = np.max(D) - D # Similarity matrix
|
|
|
|
solver_func = eigsh if solver == "arpack" else lobpcg
|
|
default_value = 0 if solver == "arpack" else None
|
|
if solver == "amg":
|
|
S = csr_container(S)
|
|
|
|
mocked_solver = Mock(side_effect=solver_func)
|
|
|
|
monkeypatch.setattr(_spectral_embedding, solver_func.__qualname__, mocked_solver)
|
|
|
|
spectral_embedding(S, random_state=42, eigen_solver=solver, eigen_tol="auto")
|
|
mocked_solver.assert_called()
|
|
|
|
_, kwargs = mocked_solver.call_args
|
|
assert kwargs["tol"] == default_value
|