diff --git a/bananas/bananas.py b/bananas/bananas.py index d657ee4..d7ab168 100644 --- a/bananas/bananas.py +++ b/bananas/bananas.py @@ -217,7 +217,7 @@ def compile(self, features, nc=1, nb=20, cov="full", ): data = numpy.concatenate(data, axis=0) - from sklearn import mixture + from . import gmm as mixture # XXX: Do not use DPGMM because the normalization is buggy # https://github.com/scikit-learn/scikit-learn/issues/7371 diff --git a/bananas/gmm.py b/bananas/gmm.py new file mode 100644 index 0000000..2d2d827 --- /dev/null +++ b/bananas/gmm.py @@ -0,0 +1,827 @@ +""" +Gaussian Mixture Models. + +This implementation corresponds to frequentist (non-Bayesian) formulation +of Gaussian Mixture Models. +""" + +# Author: Ron Weiss +# Fabian Pedregosa +# Bertrand Thirion + +import warnings +import numpy as np +from scipy import linalg +from time import time + +from sklearn.base import BaseEstimator +from sklearn.utils import check_random_state, check_array +from sklearn.utils.extmath import logsumexp +from sklearn.utils.validation import check_is_fitted +from sklearn import cluster + +from sklearn.externals.six.moves import zip + +EPS = np.finfo(float).eps + + +def log_multivariate_normal_density(X, means, covars, covariance_type='diag'): + """Compute the log probability under a multivariate Gaussian distribution. + + Parameters + ---------- + X : array_like, shape (n_samples, n_features) + List of n_features-dimensional data points. Each row corresponds to a + single data point. + + means : array_like, shape (n_components, n_features) + List of n_features-dimensional mean vectors for n_components Gaussians. + Each row corresponds to a single mean vector. + + covars : array_like + List of n_components covariance parameters for each Gaussian. The shape + depends on `covariance_type`: + (n_components, n_features) if 'spherical', + (n_features, n_features) if 'tied', + (n_components, n_features) if 'diag', + (n_components, n_features, n_features) if 'full' + + covariance_type : string + Type of the covariance parameters. Must be one of + 'spherical', 'tied', 'diag', 'full'. Defaults to 'diag'. + + Returns + ------- + lpr : array_like, shape (n_samples, n_components) + Array containing the log probabilities of each data point in + X under each of the n_components multivariate Gaussian distributions. + """ + log_multivariate_normal_density_dict = { + 'spherical': _log_multivariate_normal_density_spherical, + 'tied': _log_multivariate_normal_density_tied, + 'diag': _log_multivariate_normal_density_diag, + 'full': _log_multivariate_normal_density_full} + return log_multivariate_normal_density_dict[covariance_type]( + X, means, covars) + + +def sample_gaussian(mean, covar, covariance_type='diag', n_samples=1, + random_state=None): + """Generate random samples from a Gaussian distribution. + + Parameters + ---------- + mean : array_like, shape (n_features,) + Mean of the distribution. + + covar : array_like, optional + Covariance of the distribution. The shape depends on `covariance_type`: + scalar if 'spherical', + (n_features) if 'diag', + (n_features, n_features) if 'tied', or 'full' + + covariance_type : string, optional + Type of the covariance parameters. Must be one of + 'spherical', 'tied', 'diag', 'full'. Defaults to 'diag'. + + n_samples : int, optional + Number of samples to generate. Defaults to 1. + + Returns + ------- + X : array, shape (n_features, n_samples) + Randomly generated sample + """ + rng = check_random_state(random_state) + n_dim = len(mean) + rand = rng.randn(n_dim, n_samples) + if n_samples == 1: + rand.shape = (n_dim,) + + if covariance_type == 'spherical': + rand *= np.sqrt(covar) + elif covariance_type == 'diag': + rand = np.dot(np.diag(np.sqrt(covar)), rand) + else: + s, U = linalg.eigh(covar) + s.clip(0, out=s) # get rid of tiny negatives + np.sqrt(s, out=s) + U *= s + rand = np.dot(U, rand) + + return (rand.T + mean).T + + +class GMM(BaseEstimator): + """Gaussian Mixture Model + + Representation of a Gaussian mixture model probability distribution. + This class allows for easy evaluation of, sampling from, and + maximum-likelihood estimation of the parameters of a GMM distribution. + + Initializes parameters such that every mixture component has zero + mean and identity covariance. + + Read more in the :ref:`User Guide `. + + Parameters + ---------- + n_components : int, optional + Number of mixture components. Defaults to 1. + + covariance_type : string, optional + String describing the type of covariance parameters to + use. Must be one of 'spherical', 'tied', 'diag', 'full'. + Defaults to 'diag'. + + random_state: RandomState or an int seed (None by default) + A random number generator instance + + min_covar : float, optional + Floor on the diagonal of the covariance matrix to prevent + overfitting. Defaults to 1e-3. + + tol : float, optional + Convergence threshold. EM iterations will stop when average + gain in log-likelihood is below this threshold. Defaults to 1e-3. + + n_iter : int, optional + Number of EM iterations to perform. + + n_init : int, optional + Number of initializations to perform. the best results is kept + + params : string, optional + Controls which parameters are updated in the training + process. Can contain any combination of 'w' for weights, + 'm' for means, and 'c' for covars. Defaults to 'wmc'. + + init_params : string, optional + Controls which parameters are updated in the initialization + process. Can contain any combination of 'w' for weights, + 'm' for means, and 'c' for covars. Defaults to 'wmc'. + + verbose : int, default: 0 + Enable verbose output. If 1 then it always prints the current + initialization and iteration step. If greater than 1 then + it prints additionally the change and time needed for each step. + + Attributes + ---------- + weights_ : array, shape (`n_components`,) + This attribute stores the mixing weights for each mixture component. + + means_ : array, shape (`n_components`, `n_features`) + Mean parameters for each mixture component. + + covars_ : array + Covariance parameters for each mixture component. The shape + depends on `covariance_type`:: + + (n_components, n_features) if 'spherical', + (n_features, n_features) if 'tied', + (n_components, n_features) if 'diag', + (n_components, n_features, n_features) if 'full' + + converged_ : bool + True when convergence was reached in fit(), False otherwise. + + See Also + -------- + + DPGMM : Infinite gaussian mixture model, using the dirichlet + process, fit with a variational algorithm + + + VBGMM : Finite gaussian mixture model fit with a variational + algorithm, better for situations where there might be too little + data to get a good estimate of the covariance matrix. + + Examples + -------- + + >>> import numpy as np + >>> from sklearn import mixture + >>> np.random.seed(1) + >>> g = mixture.GMM(n_components=2) + >>> # Generate random observations with two modes centered on 0 + >>> # and 10 to use for training. + >>> obs = np.concatenate((np.random.randn(100, 1), + ... 10 + np.random.randn(300, 1))) + >>> g.fit(obs) # doctest: +NORMALIZE_WHITESPACE + GMM(covariance_type='diag', init_params='wmc', min_covar=0.001, + n_components=2, n_init=1, n_iter=100, params='wmc', + random_state=None, thresh=None, tol=0.001, verbose=0) + >>> np.round(g.weights_, 2) + array([ 0.75, 0.25]) + >>> np.round(g.means_, 2) + array([[ 10.05], + [ 0.06]]) + >>> np.round(g.covars_, 2) #doctest: +SKIP + array([[[ 1.02]], + [[ 0.96]]]) + >>> g.predict([[0], [2], [9], [10]]) #doctest: +ELLIPSIS + array([1, 1, 0, 0]...) + >>> np.round(g.score([[0], [2], [9], [10]]), 2) + array([-2.19, -4.58, -1.75, -1.21]) + >>> # Refit the model on new data (initial parameters remain the + >>> # same), this time with an even split between the two modes. + >>> g.fit(20 * [[0]] + 20 * [[10]]) # doctest: +NORMALIZE_WHITESPACE + GMM(covariance_type='diag', init_params='wmc', min_covar=0.001, + n_components=2, n_init=1, n_iter=100, params='wmc', + random_state=None, thresh=None, tol=0.001, verbose=0) + >>> np.round(g.weights_, 2) + array([ 0.5, 0.5]) + + """ + + def __init__(self, n_components=1, covariance_type='diag', + random_state=None, thresh=None, tol=1e-3, min_covar=1e-3, + n_iter=100, n_init=1, params='wmc', init_params='wmc', + verbose=0): + if thresh is not None: + warnings.warn("'thresh' has been replaced by 'tol' in 0.16 " + " and will be removed in 0.18.", + DeprecationWarning) + self.n_components = n_components + self.covariance_type = covariance_type + self.thresh = thresh + self.tol = tol + self.min_covar = min_covar + self.random_state = random_state + self.n_iter = n_iter + self.n_init = n_init + self.params = params + self.init_params = init_params + self.verbose = verbose + + if covariance_type not in ['spherical', 'tied', 'diag', 'full']: + raise ValueError('Invalid value for covariance_type: %s' % + covariance_type) + + if n_init < 1: + raise ValueError('GMM estimation requires at least one run') + + self.weights_ = np.ones(self.n_components) / self.n_components + + # flag to indicate exit status of fit() method: converged (True) or + # n_iter reached (False) + self.converged_ = False + + def _get_covars(self): + """Covariance parameters for each mixture component. + + The shape depends on ``cvtype``:: + + (n_states, n_features) if 'spherical', + (n_features, n_features) if 'tied', + (n_states, n_features) if 'diag', + (n_states, n_features, n_features) if 'full' + + """ + if self.covariance_type == 'full': + return self.covars_ + elif self.covariance_type == 'diag': + return [np.diag(cov) for cov in self.covars_] + elif self.covariance_type == 'tied': + return [self.covars_] * self.n_components + elif self.covariance_type == 'spherical': + return [np.diag(cov) for cov in self.covars_] + + def _set_covars(self, covars): + """Provide values for covariance""" + covars = np.asarray(covars) + _validate_covars(covars, self.covariance_type, self.n_components) + self.covars_ = covars + + def score_samples(self, X): + """Return the per-sample likelihood of the data under the model. + + Compute the log probability of X under the model and + return the posterior distribution (responsibilities) of each + mixture component for each element of X. + + Parameters + ---------- + X: array_like, shape (n_samples, n_features) + List of n_features-dimensional data points. Each row + corresponds to a single data point. + + Returns + ------- + logprob : array_like, shape (n_samples,) + Log probabilities of each data point in X. + + responsibilities : array_like, shape (n_samples, n_components) + Posterior probabilities of each mixture component for each + observation + """ + check_is_fitted(self, 'means_') + + X = check_array(X) + if X.ndim == 1: + X = X[:, np.newaxis] + if X.size == 0: + return np.array([]), np.empty((0, self.n_components)) + if X.shape[1] != self.means_.shape[1]: + raise ValueError('The shape of X is not compatible with self') + + lpr = (log_multivariate_normal_density(X, self.means_, self.covars_, + self.covariance_type) + + np.log(self.weights_)) + logprob = logsumexp(lpr, axis=1) + responsibilities = np.exp(lpr - logprob[:, np.newaxis]) + return logprob, responsibilities + + def score(self, X, y=None): + """Compute the log probability under the model. + + Parameters + ---------- + X : array_like, shape (n_samples, n_features) + List of n_features-dimensional data points. Each row + corresponds to a single data point. + + Returns + ------- + logprob : array_like, shape (n_samples,) + Log probabilities of each data point in X + """ + logprob, _ = self.score_samples(X) + return logprob + + def predict(self, X): + """Predict label for data. + + Parameters + ---------- + X : array-like, shape = [n_samples, n_features] + + Returns + ------- + C : array, shape = (n_samples,) component memberships + """ + logprob, responsibilities = self.score_samples(X) + return responsibilities.argmax(axis=1) + + def predict_proba(self, X): + """Predict posterior probability of data under each Gaussian + in the model. + + Parameters + ---------- + X : array-like, shape = [n_samples, n_features] + + Returns + ------- + responsibilities : array-like, shape = (n_samples, n_components) + Returns the probability of the sample for each Gaussian + (state) in the model. + """ + logprob, responsibilities = self.score_samples(X) + return responsibilities + + def sample(self, n_samples=1, random_state=None): + """Generate random samples from the model. + + Parameters + ---------- + n_samples : int, optional + Number of samples to generate. Defaults to 1. + + Returns + ------- + X : array_like, shape (n_samples, n_features) + List of samples + """ + check_is_fitted(self, 'means_') + + if random_state is None: + random_state = self.random_state + random_state = check_random_state(random_state) + weight_cdf = np.cumsum(self.weights_) + + X = np.empty((n_samples, self.means_.shape[1])) + rand = random_state.rand(n_samples) + # decide which component to use for each sample + comps = weight_cdf.searchsorted(rand) + # for each component, generate all needed samples + for comp in range(self.n_components): + # occurrences of current component in X + comp_in_X = (comp == comps) + # number of those occurrences + num_comp_in_X = comp_in_X.sum() + if num_comp_in_X > 0: + if self.covariance_type == 'tied': + cv = self.covars_ + elif self.covariance_type == 'spherical': + cv = self.covars_[comp][0] + else: + cv = self.covars_[comp] + X[comp_in_X] = sample_gaussian( + self.means_[comp], cv, self.covariance_type, + num_comp_in_X, random_state=random_state).T + return X + + def fit_predict(self, X, y=None): + """Fit and then predict labels for data. + + Warning: due to the final maximization step in the EM algorithm, + with low iterations the prediction may not be 100% accurate + + .. versionadded:: 0.17 + *fit_predict* method in Gaussian Mixture Model. + + Parameters + ---------- + X : array-like, shape = [n_samples, n_features] + + Returns + ------- + C : array, shape = (n_samples,) component memberships + """ + return self._fit(X, y).argmax(axis=1) + + def _fit(self, X, y=None, do_prediction=False): + """Estimate model parameters with the EM algorithm. + + A initialization step is performed before entering the + expectation-maximization (EM) algorithm. If you want to avoid + this step, set the keyword argument init_params to the empty + string '' when creating the GMM object. Likewise, if you would + like just to do an initialization, set n_iter=0. + + Parameters + ---------- + X : array_like, shape (n, n_features) + List of n_features-dimensional data points. Each row + corresponds to a single data point. + + Returns + ------- + responsibilities : array, shape (n_samples, n_components) + Posterior probabilities of each mixture component for each + observation. + """ + + # initialization step + X = check_array(X, dtype=np.float64, ensure_min_samples=2, + estimator=self) + if X.shape[0] < self.n_components: + raise ValueError( + 'GMM estimation with %s components, but got only %s samples' % + (self.n_components, X.shape[0])) + + max_log_prob = -np.infty + + if self.verbose > 0: + print('Expectation-maximization algorithm started.') + + for init in range(self.n_init): + if self.verbose > 0: + print('Initialization ' + str(init + 1)) + start_init_time = time() + + if 'm' in self.init_params or not hasattr(self, 'means_'): + self.means_ = cluster.KMeans( + n_clusters=self.n_components, + random_state=self.random_state).fit(X).cluster_centers_ + if self.verbose > 1: + print('\tMeans have been initialized.') + + if 'w' in self.init_params or not hasattr(self, 'weights_'): + self.weights_ = np.tile(1.0 / self.n_components, + self.n_components) + if self.verbose > 1: + print('\tWeights have been initialized.') + + if 'c' in self.init_params or not hasattr(self, 'covars_'): + cv = np.cov(X.T) + self.min_covar * np.eye(X.shape[1]) + if not cv.shape: + cv.shape = (1, 1) + self.covars_ = \ + distribute_covar_matrix_to_match_covariance_type( + cv, self.covariance_type, self.n_components) + if self.verbose > 1: + print('\tCovariance matrices have been initialized.') + + # EM algorithms + current_log_likelihood = None + # reset self.converged_ to False + self.converged_ = False + + # this line should be removed when 'thresh' is removed in v0.18 + tol = (self.tol if self.thresh is None + else self.thresh / float(X.shape[0])) + + for i in range(self.n_iter): + if self.verbose > 0: + print('\tEM iteration ' + str(i + 1)) + start_iter_time = time() + prev_log_likelihood = current_log_likelihood + # Expectation step + log_likelihoods, responsibilities = self.score_samples(X) + current_log_likelihood = log_likelihoods.mean() + + # Check for convergence. + # (should compare to self.tol when deprecated 'thresh' is + # removed in v0.18) + if prev_log_likelihood is not None: + change = abs(current_log_likelihood - prev_log_likelihood) + if self.verbose > 1: + print('\t\tChange: ' + str(change)) + if change < tol: + self.converged_ = True + if self.verbose > 0: + print('\t\tEM algorithm converged.') + break + + # Maximization step + self._do_mstep(X, responsibilities, self.params, + self.min_covar) + if self.verbose > 1: + print('\t\tEM iteration ' + str(i + 1) + ' took {0:.5f}s'.format( + time() - start_iter_time)) + + # if the results are better, keep it + if self.n_iter: + if current_log_likelihood > max_log_prob: + max_log_prob = current_log_likelihood + best_params = {'weights': self.weights_, + 'means': self.means_, + 'covars': self.covars_} + if self.verbose > 1: + print('\tBetter parameters were found.') + + if self.verbose > 1: + print('\tInitialization ' + str(init + 1) + ' took {0:.5f}s'.format( + time() - start_init_time)) + + # check the existence of an init param that was not subject to + # likelihood computation issue. + if np.isneginf(max_log_prob) and self.n_iter: + raise RuntimeError( + "EM algorithm was never able to compute a valid likelihood " + + "given initial parameters. Try different init parameters " + + "(or increasing n_init) or check for degenerate data.") + + if self.n_iter: + self.covars_ = best_params['covars'] + self.means_ = best_params['means'] + self.weights_ = best_params['weights'] + else: # self.n_iter == 0 occurs when using GMM within HMM + # Need to make sure that there are responsibilities to output + # Output zeros because it was just a quick initialization + responsibilities = np.zeros((X.shape[0], self.n_components)) + + return responsibilities + + def fit(self, X, y=None): + """Estimate model parameters with the EM algorithm. + + A initialization step is performed before entering the + expectation-maximization (EM) algorithm. If you want to avoid + this step, set the keyword argument init_params to the empty + string '' when creating the GMM object. Likewise, if you would + like just to do an initialization, set n_iter=0. + + Parameters + ---------- + X : array_like, shape (n, n_features) + List of n_features-dimensional data points. Each row + corresponds to a single data point. + + Returns + ------- + self + """ + self._fit(X, y) + return self + + def _do_mstep(self, X, responsibilities, params, min_covar=0): + """ Perform the Mstep of the EM algorithm and return the class weights + """ + weights = responsibilities.sum(axis=0) + weighted_X_sum = np.dot(responsibilities.T, X) + inverse_weights = 1.0 / (weights[:, np.newaxis] + 10 * EPS) + + if 'w' in params: + self.weights_ = (weights / (weights.sum() + 10 * EPS) + EPS) + if 'm' in params: + self.means_ = weighted_X_sum * inverse_weights + if 'c' in params: + covar_mstep_func = _covar_mstep_funcs[self.covariance_type] + self.covars_ = covar_mstep_func( + self, X, responsibilities, weighted_X_sum, inverse_weights, + min_covar) + return weights + + def _n_parameters(self): + """Return the number of free parameters in the model.""" + ndim = self.means_.shape[1] + if self.covariance_type == 'full': + cov_params = self.n_components * ndim * (ndim + 1) / 2. + elif self.covariance_type == 'diag': + cov_params = self.n_components * ndim + elif self.covariance_type == 'tied': + cov_params = ndim * (ndim + 1) / 2. + elif self.covariance_type == 'spherical': + cov_params = self.n_components + mean_params = ndim * self.n_components + return int(cov_params + mean_params + self.n_components - 1) + + def bic(self, X): + """Bayesian information criterion for the current model fit + and the proposed data + + Parameters + ---------- + X : array of shape(n_samples, n_dimensions) + + Returns + ------- + bic: float (the lower the better) + """ + return (-2 * self.score(X).sum() + + self._n_parameters() * np.log(X.shape[0])) + + def aic(self, X): + """Akaike information criterion for the current model fit + and the proposed data + + Parameters + ---------- + X : array of shape(n_samples, n_dimensions) + + Returns + ------- + aic: float (the lower the better) + """ + return - 2 * self.score(X).sum() + 2 * self._n_parameters() + + +######################################################################### +# some helper routines +######################################################################### + + +def _log_multivariate_normal_density_diag(X, means, covars): + """Compute Gaussian log-density at X for a diagonal model""" + n_samples, n_dim = X.shape + lpr = -0.5 * (n_dim * np.log(2 * np.pi) + np.sum(np.log(covars), 1) + + np.sum((means ** 2) / covars, 1) + - 2 * np.dot(X, (means / covars).T) + + np.dot(X ** 2, (1.0 / covars).T)) + return lpr + + +def _log_multivariate_normal_density_spherical(X, means, covars): + """Compute Gaussian log-density at X for a spherical model""" + cv = covars.copy() + if covars.ndim == 1: + cv = cv[:, np.newaxis] + if covars.shape[1] == 1: + cv = np.tile(cv, (1, X.shape[-1])) + return _log_multivariate_normal_density_diag(X, means, cv) + + +def _log_multivariate_normal_density_tied(X, means, covars): + """Compute Gaussian log-density at X for a tied model""" + cv = np.tile(covars, (means.shape[0], 1, 1)) + return _log_multivariate_normal_density_full(X, means, cv) + + +def _log_multivariate_normal_density_full(X, means, covars, min_covar=1.e-7): + """Log probability for full covariance matrices.""" + n_samples, n_dim = X.shape + nmix = len(means) + log_prob = np.empty((n_samples, nmix)) + for c, (mu, cv) in enumerate(zip(means, covars)): + try: + cv_chol = linalg.cholesky(cv, lower=True) + except linalg.LinAlgError: + # The model is most probably stuck in a component with too + # few observations, we need to reinitialize this components + try: + cv_chol = linalg.cholesky(cv + min_covar * np.eye(n_dim), + lower=True) + except linalg.LinAlgError: + raise ValueError("'covars' must be symmetric, " + "positive-definite") + + cv_log_det = 2 * np.sum(np.log(np.diagonal(cv_chol))) + cv_sol = linalg.solve_triangular(cv_chol, (X - mu).T, lower=True).T + log_prob[:, c] = - .5 * (np.sum(cv_sol ** 2, axis=1) + + n_dim * np.log(2 * np.pi) + cv_log_det) + + return log_prob + + +def _validate_covars(covars, covariance_type, n_components): + """Do basic checks on matrix covariance sizes and values + """ + from scipy import linalg + if covariance_type == 'spherical': + if len(covars) != n_components: + raise ValueError("'spherical' covars have length n_components") + elif np.any(covars <= 0): + raise ValueError("'spherical' covars must be non-negative") + elif covariance_type == 'tied': + if covars.shape[0] != covars.shape[1]: + raise ValueError("'tied' covars must have shape (n_dim, n_dim)") + elif (not np.allclose(covars, covars.T) + or np.any(linalg.eigvalsh(covars) <= 0)): + raise ValueError("'tied' covars must be symmetric, " + "positive-definite") + elif covariance_type == 'diag': + if len(covars.shape) != 2: + raise ValueError("'diag' covars must have shape " + "(n_components, n_dim)") + elif np.any(covars <= 0): + raise ValueError("'diag' covars must be non-negative") + elif covariance_type == 'full': + if len(covars.shape) != 3: + raise ValueError("'full' covars must have shape " + "(n_components, n_dim, n_dim)") + elif covars.shape[1] != covars.shape[2]: + raise ValueError("'full' covars must have shape " + "(n_components, n_dim, n_dim)") + for n, cv in enumerate(covars): + if (not np.allclose(cv, cv.T) + or np.any(linalg.eigvalsh(cv) <= 0)): + raise ValueError("component %d of 'full' covars must be " + "symmetric, positive-definite" % n) + else: + raise ValueError("covariance_type must be one of " + + "'spherical', 'tied', 'diag', 'full'") + + +def distribute_covar_matrix_to_match_covariance_type( + tied_cv, covariance_type, n_components): + """Create all the covariance matrices from a given template""" + if covariance_type == 'spherical': + cv = np.tile(tied_cv.mean() * np.ones(tied_cv.shape[1]), + (n_components, 1)) + elif covariance_type == 'tied': + cv = tied_cv + elif covariance_type == 'diag': + cv = np.tile(np.diag(tied_cv), (n_components, 1)) + elif covariance_type == 'full': + cv = np.tile(tied_cv, (n_components, 1, 1)) + else: + raise ValueError("covariance_type must be one of " + + "'spherical', 'tied', 'diag', 'full'") + return cv + + +def _covar_mstep_diag(gmm, X, responsibilities, weighted_X_sum, norm, + min_covar): + """Performing the covariance M step for diagonal cases""" + avg_X2 = np.dot(responsibilities.T, X * X) * norm + avg_means2 = gmm.means_ ** 2 + avg_X_means = gmm.means_ * weighted_X_sum * norm + return avg_X2 - 2 * avg_X_means + avg_means2 + min_covar + + +def _covar_mstep_spherical(*args): + """Performing the covariance M step for spherical cases""" + cv = _covar_mstep_diag(*args) + return np.tile(cv.mean(axis=1)[:, np.newaxis], (1, cv.shape[1])) + + +def _covar_mstep_full(gmm, X, responsibilities, weighted_X_sum, norm, + min_covar): + """Performing the covariance M step for full cases""" + # Eq. 12 from K. Murphy, "Fitting a Conditional Linear Gaussian + # Distribution" + n_features = X.shape[1] + cv = np.empty((gmm.n_components, n_features, n_features)) + for c in range(gmm.n_components): + post = responsibilities[:, c] + mu = gmm.means_[c] + diff = X - mu + with np.errstate(under='ignore'): + # Underflow Errors in doing post * X.T are not important + avg_cv = np.dot(post * diff.T, diff) / (post.sum() + 10 * EPS) + cv[c] = avg_cv + min_covar * np.eye(n_features) + return cv + + +def _covar_mstep_tied(gmm, X, responsibilities, weighted_X_sum, norm, + min_covar): + # Eq. 15 from K. Murphy, "Fitting a Conditional Linear Gaussian + # Distribution" + avg_X2 = np.dot(X.T, X) + avg_means2 = np.dot(gmm.means_.T, weighted_X_sum) + out = avg_X2 - avg_means2 + out *= 1. / X.shape[0] + out.flat[::len(out) + 1] += min_covar + return out + +_covar_mstep_funcs = {'spherical': _covar_mstep_spherical, + 'diag': _covar_mstep_diag, + 'tied': _covar_mstep_tied, + 'full': _covar_mstep_full, + } + +