537 lines
18 KiB
Python
537 lines
18 KiB
Python
"""Base class for mixture models."""
|
|
|
|
# Author: Wei Xue <xuewei4d@gmail.com>
|
|
# Modified by Thierry Guillemot <thierry.guillemot.work@gmail.com>
|
|
# License: BSD 3 clause
|
|
|
|
import warnings
|
|
from abc import ABCMeta, abstractmethod
|
|
from time import time
|
|
|
|
import numpy as np
|
|
from scipy.special import logsumexp
|
|
|
|
from .. import cluster
|
|
from ..base import BaseEstimator
|
|
from ..base import DensityMixin
|
|
from ..exceptions import ConvergenceWarning
|
|
from ..utils import check_array, check_random_state
|
|
from ..utils.validation import check_is_fitted
|
|
|
|
|
|
def _check_shape(param, param_shape, name):
|
|
"""Validate the shape of the input parameter 'param'.
|
|
|
|
Parameters
|
|
----------
|
|
param : array
|
|
|
|
param_shape : tuple
|
|
|
|
name : string
|
|
"""
|
|
param = np.array(param)
|
|
if param.shape != param_shape:
|
|
raise ValueError("The parameter '%s' should have the shape of %s, "
|
|
"but got %s" % (name, param_shape, param.shape))
|
|
|
|
|
|
def _check_X(X, n_components=None, n_features=None, ensure_min_samples=1):
|
|
"""Check the input data X.
|
|
|
|
Parameters
|
|
----------
|
|
X : array-like of shape (n_samples, n_features)
|
|
|
|
n_components : int
|
|
|
|
Returns
|
|
-------
|
|
X : array, shape (n_samples, n_features)
|
|
"""
|
|
X = check_array(X, dtype=[np.float64, np.float32],
|
|
ensure_min_samples=ensure_min_samples)
|
|
if n_components is not None and X.shape[0] < n_components:
|
|
raise ValueError('Expected n_samples >= n_components '
|
|
'but got n_components = %d, n_samples = %d'
|
|
% (n_components, X.shape[0]))
|
|
if n_features is not None and X.shape[1] != n_features:
|
|
raise ValueError("Expected the input data X have %d features, "
|
|
"but got %d features"
|
|
% (n_features, X.shape[1]))
|
|
return X
|
|
|
|
|
|
class BaseMixture(DensityMixin, BaseEstimator, metaclass=ABCMeta):
|
|
"""Base class for mixture models.
|
|
|
|
This abstract class specifies an interface for all mixture classes and
|
|
provides basic common methods for mixture models.
|
|
"""
|
|
|
|
def __init__(self, n_components, tol, reg_covar,
|
|
max_iter, n_init, init_params, random_state, warm_start,
|
|
verbose, verbose_interval):
|
|
self.n_components = n_components
|
|
self.tol = tol
|
|
self.reg_covar = reg_covar
|
|
self.max_iter = max_iter
|
|
self.n_init = n_init
|
|
self.init_params = init_params
|
|
self.random_state = random_state
|
|
self.warm_start = warm_start
|
|
self.verbose = verbose
|
|
self.verbose_interval = verbose_interval
|
|
|
|
def _check_initial_parameters(self, X):
|
|
"""Check values of the basic parameters.
|
|
|
|
Parameters
|
|
----------
|
|
X : array-like of shape (n_samples, n_features)
|
|
"""
|
|
if self.n_components < 1:
|
|
raise ValueError("Invalid value for 'n_components': %d "
|
|
"Estimation requires at least one component"
|
|
% self.n_components)
|
|
|
|
if self.tol < 0.:
|
|
raise ValueError("Invalid value for 'tol': %.5f "
|
|
"Tolerance used by the EM must be non-negative"
|
|
% self.tol)
|
|
|
|
if self.n_init < 1:
|
|
raise ValueError("Invalid value for 'n_init': %d "
|
|
"Estimation requires at least one run"
|
|
% self.n_init)
|
|
|
|
if self.max_iter < 1:
|
|
raise ValueError("Invalid value for 'max_iter': %d "
|
|
"Estimation requires at least one iteration"
|
|
% self.max_iter)
|
|
|
|
if self.reg_covar < 0.:
|
|
raise ValueError("Invalid value for 'reg_covar': %.5f "
|
|
"regularization on covariance must be "
|
|
"non-negative"
|
|
% self.reg_covar)
|
|
|
|
# Check all the parameters values of the derived class
|
|
self._check_parameters(X)
|
|
|
|
@abstractmethod
|
|
def _check_parameters(self, X):
|
|
"""Check initial parameters of the derived class.
|
|
|
|
Parameters
|
|
----------
|
|
X : array-like of shape (n_samples, n_features)
|
|
"""
|
|
pass
|
|
|
|
def _initialize_parameters(self, X, random_state):
|
|
"""Initialize the model parameters.
|
|
|
|
Parameters
|
|
----------
|
|
X : array-like of shape (n_samples, n_features)
|
|
|
|
random_state : RandomState
|
|
A random number generator instance that controls the random seed
|
|
used for the method chosen to initialize the parameters.
|
|
"""
|
|
n_samples, _ = X.shape
|
|
|
|
if self.init_params == 'kmeans':
|
|
resp = np.zeros((n_samples, self.n_components))
|
|
label = cluster.KMeans(n_clusters=self.n_components, n_init=1,
|
|
random_state=random_state).fit(X).labels_
|
|
resp[np.arange(n_samples), label] = 1
|
|
elif self.init_params == 'random':
|
|
resp = random_state.rand(n_samples, self.n_components)
|
|
resp /= resp.sum(axis=1)[:, np.newaxis]
|
|
else:
|
|
raise ValueError("Unimplemented initialization method '%s'"
|
|
% self.init_params)
|
|
|
|
self._initialize(X, resp)
|
|
|
|
@abstractmethod
|
|
def _initialize(self, X, resp):
|
|
"""Initialize the model parameters of the derived class.
|
|
|
|
Parameters
|
|
----------
|
|
X : array-like of shape (n_samples, n_features)
|
|
|
|
resp : array-like of shape (n_samples, n_components)
|
|
"""
|
|
pass
|
|
|
|
def fit(self, X, y=None):
|
|
"""Estimate model parameters with the EM algorithm.
|
|
|
|
The method fits the model ``n_init`` times and sets the parameters with
|
|
which the model has the largest likelihood or lower bound. Within each
|
|
trial, the method iterates between E-step and M-step for ``max_iter``
|
|
times until the change of likelihood or lower bound is less than
|
|
``tol``, otherwise, a ``ConvergenceWarning`` is raised.
|
|
If ``warm_start`` is ``True``, then ``n_init`` is ignored and a single
|
|
initialization is performed upon the first call. Upon consecutive
|
|
calls, training starts where it left off.
|
|
|
|
Parameters
|
|
----------
|
|
X : array-like of shape (n_samples, n_features)
|
|
List of n_features-dimensional data points. Each row
|
|
corresponds to a single data point.
|
|
|
|
Returns
|
|
-------
|
|
self
|
|
"""
|
|
self.fit_predict(X, y)
|
|
return self
|
|
|
|
def fit_predict(self, X, y=None):
|
|
"""Estimate model parameters using X and predict the labels for X.
|
|
|
|
The method fits the model n_init times and sets the parameters with
|
|
which the model has the largest likelihood or lower bound. Within each
|
|
trial, the method iterates between E-step and M-step for `max_iter`
|
|
times until the change of likelihood or lower bound is less than
|
|
`tol`, otherwise, a :class:`~sklearn.exceptions.ConvergenceWarning` is
|
|
raised. After fitting, it predicts the most probable label for the
|
|
input data points.
|
|
|
|
.. versionadded:: 0.20
|
|
|
|
Parameters
|
|
----------
|
|
X : array-like of shape (n_samples, n_features)
|
|
List of n_features-dimensional data points. Each row
|
|
corresponds to a single data point.
|
|
|
|
Returns
|
|
-------
|
|
labels : array, shape (n_samples,)
|
|
Component labels.
|
|
"""
|
|
X = _check_X(X, self.n_components, ensure_min_samples=2)
|
|
self._check_n_features(X, reset=True)
|
|
self._check_initial_parameters(X)
|
|
|
|
# if we enable warm_start, we will have a unique initialisation
|
|
do_init = not(self.warm_start and hasattr(self, 'converged_'))
|
|
n_init = self.n_init if do_init else 1
|
|
|
|
max_lower_bound = -np.infty
|
|
self.converged_ = False
|
|
|
|
random_state = check_random_state(self.random_state)
|
|
|
|
n_samples, _ = X.shape
|
|
for init in range(n_init):
|
|
self._print_verbose_msg_init_beg(init)
|
|
|
|
if do_init:
|
|
self._initialize_parameters(X, random_state)
|
|
|
|
lower_bound = (-np.infty if do_init else self.lower_bound_)
|
|
|
|
for n_iter in range(1, self.max_iter + 1):
|
|
prev_lower_bound = lower_bound
|
|
|
|
log_prob_norm, log_resp = self._e_step(X)
|
|
self._m_step(X, log_resp)
|
|
lower_bound = self._compute_lower_bound(
|
|
log_resp, log_prob_norm)
|
|
|
|
change = lower_bound - prev_lower_bound
|
|
self._print_verbose_msg_iter_end(n_iter, change)
|
|
|
|
if abs(change) < self.tol:
|
|
self.converged_ = True
|
|
break
|
|
|
|
self._print_verbose_msg_init_end(lower_bound)
|
|
|
|
if lower_bound > max_lower_bound:
|
|
max_lower_bound = lower_bound
|
|
best_params = self._get_parameters()
|
|
best_n_iter = n_iter
|
|
|
|
if not self.converged_:
|
|
warnings.warn('Initialization %d did not converge. '
|
|
'Try different init parameters, '
|
|
'or increase max_iter, tol '
|
|
'or check for degenerate data.'
|
|
% (init + 1), ConvergenceWarning)
|
|
|
|
self._set_parameters(best_params)
|
|
self.n_iter_ = best_n_iter
|
|
self.lower_bound_ = max_lower_bound
|
|
|
|
# Always do a final e-step to guarantee that the labels returned by
|
|
# fit_predict(X) are always consistent with fit(X).predict(X)
|
|
# for any value of max_iter and tol (and any random_state).
|
|
_, log_resp = self._e_step(X)
|
|
|
|
return log_resp.argmax(axis=1)
|
|
|
|
def _e_step(self, X):
|
|
"""E step.
|
|
|
|
Parameters
|
|
----------
|
|
X : array-like of shape (n_samples, n_features)
|
|
|
|
Returns
|
|
-------
|
|
log_prob_norm : float
|
|
Mean of the logarithms of the probabilities of each sample in X
|
|
|
|
log_responsibility : array, shape (n_samples, n_components)
|
|
Logarithm of the posterior probabilities (or responsibilities) of
|
|
the point of each sample in X.
|
|
"""
|
|
log_prob_norm, log_resp = self._estimate_log_prob_resp(X)
|
|
return np.mean(log_prob_norm), log_resp
|
|
|
|
@abstractmethod
|
|
def _m_step(self, X, log_resp):
|
|
"""M step.
|
|
|
|
Parameters
|
|
----------
|
|
X : array-like of shape (n_samples, n_features)
|
|
|
|
log_resp : array-like of shape (n_samples, n_components)
|
|
Logarithm of the posterior probabilities (or responsibilities) of
|
|
the point of each sample in X.
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def _get_parameters(self):
|
|
pass
|
|
|
|
@abstractmethod
|
|
def _set_parameters(self, params):
|
|
pass
|
|
|
|
def score_samples(self, X):
|
|
"""Compute the weighted log probabilities for each sample.
|
|
|
|
Parameters
|
|
----------
|
|
X : array-like of shape (n_samples, n_features)
|
|
List of n_features-dimensional data points. Each row
|
|
corresponds to a single data point.
|
|
|
|
Returns
|
|
-------
|
|
log_prob : array, shape (n_samples,)
|
|
Log probabilities of each data point in X.
|
|
"""
|
|
check_is_fitted(self)
|
|
X = _check_X(X, None, self.means_.shape[1])
|
|
|
|
return logsumexp(self._estimate_weighted_log_prob(X), axis=1)
|
|
|
|
def score(self, X, y=None):
|
|
"""Compute the per-sample average log-likelihood of the given data X.
|
|
|
|
Parameters
|
|
----------
|
|
X : array-like of shape (n_samples, n_dimensions)
|
|
List of n_features-dimensional data points. Each row
|
|
corresponds to a single data point.
|
|
|
|
Returns
|
|
-------
|
|
log_likelihood : float
|
|
Log likelihood of the Gaussian mixture given X.
|
|
"""
|
|
return self.score_samples(X).mean()
|
|
|
|
def predict(self, X):
|
|
"""Predict the labels for the data samples in X using trained model.
|
|
|
|
Parameters
|
|
----------
|
|
X : array-like of shape (n_samples, n_features)
|
|
List of n_features-dimensional data points. Each row
|
|
corresponds to a single data point.
|
|
|
|
Returns
|
|
-------
|
|
labels : array, shape (n_samples,)
|
|
Component labels.
|
|
"""
|
|
check_is_fitted(self)
|
|
X = _check_X(X, None, self.means_.shape[1])
|
|
return self._estimate_weighted_log_prob(X).argmax(axis=1)
|
|
|
|
def predict_proba(self, X):
|
|
"""Predict posterior probability of each component given the data.
|
|
|
|
Parameters
|
|
----------
|
|
X : array-like of shape (n_samples, n_features)
|
|
List of n_features-dimensional data points. Each row
|
|
corresponds to a single data point.
|
|
|
|
Returns
|
|
-------
|
|
resp : array, shape (n_samples, n_components)
|
|
Returns the probability each Gaussian (state) in
|
|
the model given each sample.
|
|
"""
|
|
check_is_fitted(self)
|
|
X = _check_X(X, None, self.means_.shape[1])
|
|
_, log_resp = self._estimate_log_prob_resp(X)
|
|
return np.exp(log_resp)
|
|
|
|
def sample(self, n_samples=1):
|
|
"""Generate random samples from the fitted Gaussian distribution.
|
|
|
|
Parameters
|
|
----------
|
|
n_samples : int, default=1
|
|
Number of samples to generate.
|
|
|
|
Returns
|
|
-------
|
|
X : array, shape (n_samples, n_features)
|
|
Randomly generated sample
|
|
|
|
y : array, shape (nsamples,)
|
|
Component labels
|
|
|
|
"""
|
|
check_is_fitted(self)
|
|
|
|
if n_samples < 1:
|
|
raise ValueError(
|
|
"Invalid value for 'n_samples': %d . The sampling requires at "
|
|
"least one sample." % (self.n_components))
|
|
|
|
_, n_features = self.means_.shape
|
|
rng = check_random_state(self.random_state)
|
|
n_samples_comp = rng.multinomial(n_samples, self.weights_)
|
|
|
|
if self.covariance_type == 'full':
|
|
X = np.vstack([
|
|
rng.multivariate_normal(mean, covariance, int(sample))
|
|
for (mean, covariance, sample) in zip(
|
|
self.means_, self.covariances_, n_samples_comp)])
|
|
elif self.covariance_type == "tied":
|
|
X = np.vstack([
|
|
rng.multivariate_normal(mean, self.covariances_, int(sample))
|
|
for (mean, sample) in zip(
|
|
self.means_, n_samples_comp)])
|
|
else:
|
|
X = np.vstack([
|
|
mean + rng.randn(sample, n_features) * np.sqrt(covariance)
|
|
for (mean, covariance, sample) in zip(
|
|
self.means_, self.covariances_, n_samples_comp)])
|
|
|
|
y = np.concatenate([np.full(sample, j, dtype=int)
|
|
for j, sample in enumerate(n_samples_comp)])
|
|
|
|
return (X, y)
|
|
|
|
def _estimate_weighted_log_prob(self, X):
|
|
"""Estimate the weighted log-probabilities, log P(X | Z) + log weights.
|
|
|
|
Parameters
|
|
----------
|
|
X : array-like of shape (n_samples, n_features)
|
|
|
|
Returns
|
|
-------
|
|
weighted_log_prob : array, shape (n_samples, n_component)
|
|
"""
|
|
return self._estimate_log_prob(X) + self._estimate_log_weights()
|
|
|
|
@abstractmethod
|
|
def _estimate_log_weights(self):
|
|
"""Estimate log-weights in EM algorithm, E[ log pi ] in VB algorithm.
|
|
|
|
Returns
|
|
-------
|
|
log_weight : array, shape (n_components, )
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def _estimate_log_prob(self, X):
|
|
"""Estimate the log-probabilities log P(X | Z).
|
|
|
|
Compute the log-probabilities per each component for each sample.
|
|
|
|
Parameters
|
|
----------
|
|
X : array-like of shape (n_samples, n_features)
|
|
|
|
Returns
|
|
-------
|
|
log_prob : array, shape (n_samples, n_component)
|
|
"""
|
|
pass
|
|
|
|
def _estimate_log_prob_resp(self, X):
|
|
"""Estimate log probabilities and responsibilities for each sample.
|
|
|
|
Compute the log probabilities, weighted log probabilities per
|
|
component and responsibilities for each sample in X with respect to
|
|
the current state of the model.
|
|
|
|
Parameters
|
|
----------
|
|
X : array-like of shape (n_samples, n_features)
|
|
|
|
Returns
|
|
-------
|
|
log_prob_norm : array, shape (n_samples,)
|
|
log p(X)
|
|
|
|
log_responsibilities : array, shape (n_samples, n_components)
|
|
logarithm of the responsibilities
|
|
"""
|
|
weighted_log_prob = self._estimate_weighted_log_prob(X)
|
|
log_prob_norm = logsumexp(weighted_log_prob, axis=1)
|
|
with np.errstate(under='ignore'):
|
|
# ignore underflow
|
|
log_resp = weighted_log_prob - log_prob_norm[:, np.newaxis]
|
|
return log_prob_norm, log_resp
|
|
|
|
def _print_verbose_msg_init_beg(self, n_init):
|
|
"""Print verbose message on initialization."""
|
|
if self.verbose == 1:
|
|
print("Initialization %d" % n_init)
|
|
elif self.verbose >= 2:
|
|
print("Initialization %d" % n_init)
|
|
self._init_prev_time = time()
|
|
self._iter_prev_time = self._init_prev_time
|
|
|
|
def _print_verbose_msg_iter_end(self, n_iter, diff_ll):
|
|
"""Print verbose message on initialization."""
|
|
if n_iter % self.verbose_interval == 0:
|
|
if self.verbose == 1:
|
|
print(" Iteration %d" % n_iter)
|
|
elif self.verbose >= 2:
|
|
cur_time = time()
|
|
print(" Iteration %d\t time lapse %.5fs\t ll change %.5f" % (
|
|
n_iter, cur_time - self._iter_prev_time, diff_ll))
|
|
self._iter_prev_time = cur_time
|
|
|
|
def _print_verbose_msg_init_end(self, ll):
|
|
"""Print verbose message on the end of iteration."""
|
|
if self.verbose == 1:
|
|
print("Initialization converged: %s" % self.converged_)
|
|
elif self.verbose >= 2:
|
|
print("Initialization converged: %s\t time lapse %.5fs\t ll %.5f" %
|
|
(self.converged_, time() - self._init_prev_time, ll))
|