-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathGMM.py
More file actions
127 lines (115 loc) · 4.6 KB
/
GMM.py
File metadata and controls
127 lines (115 loc) · 4.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import numpy as np
from matplotlib import pyplot as plt
import matplotlib as mpl
class GMM:
def __init__(self, n_clusters, data):
self.K = n_clusters
# initial means
self.mu = []
random_index = np.random.randint(0, len(data)-1, size=self.K)
for idx in random_index:
self.mu.append(data[idx])
self.mu = np.array(self.mu, float)
# initial covariance matrices
self.sigma = []
for i in range(self.K):
self.sigma.append(np.cov(data.T))
self.sigma = np.array(self.sigma, float)
# initial mixing coefficients
self.pi = np.ones(self.K)/self.K
def gaussian(self, x, k):
"""
Calculate the Gaussian distribution for x_K, mu_K, and sigma_K
:param x: ndarray
:param k: int - which is the k of the cluster currently being considered
:return: float - the Gaussian probability
"""
D = len(x)
normalization_factor = 1.0/np.sqrt((2.0*np.pi)**D * np.linalg.det(self.sigma[k]))
x_mu = np.matrix(x - self.mu[k], float)
dot_product = x_mu * np.linalg.inv(self.sigma[k]) * x_mu.T
return normalization_factor * np.exp(-0.5 * dot_product)
def likelihood(self, x):
"""
Calculate the likelihoods of x
:param x: ndarray
:return: float - the likelihood
"""
result = 0.0
for k in range(self.K):
result += self.pi[k] * self.gaussian(x, k)
return result
def e_step(self, data):
"""
Calculate posterior responsibilities of all data points at each K
:param data: ndarray
:return: responsibilities: ndarray, likelihoods: list
"""
responsibilities = np.zeros((len(data), self.K))
for idx in range(len(data)):
llh_i = self.likelihood(data[idx])
for k in range(self.K):
responsibilities[idx][k] = (self.pi[k] * self.gaussian(data[idx], k)) / llh_i
return responsibilities
def m_step(self, responsibilities, data):
"""
Update parameters: means, covariance matrices, mixing coefficients
:param responsibilities: ndarray
:param data: ndarray
"""
self.mu = np.zeros(self.mu.shape)
self.sigma = np.zeros(self.sigma.shape)
for k in range(self.K):
N_k = np.sum(responsibilities[:, k])
# update mu - mean
for idx in range(len(data)):
self.mu[k] += responsibilities[idx][k] * data[idx]
self.mu[k] /= N_k
# update sigma - covariance matrix
for idx in range(len(data)):
x_mu = np.array([data[idx]]) - np.array([self.mu[k]])
self.sigma[k] += responsibilities[idx][k] * x_mu * x_mu.T
self.sigma[k] /= N_k
# update pi - mixing coefficient
self.pi[k] = N_k/len(data)
def run(self, data):
"""
Run the EM for 20 iterations, save the plot for each iteration
:param data: ndarray
"""
labels = np.zeros(len(data))
colors = np.random.rand(self.K, 3)
for count in range(50):
responsibilities = self.e_step(data)
self.m_step(responsibilities, data)
for idx in range(len(data)):
labels[idx] = np.argmax(responsibilities[idx])
self.plot_results(data, labels, "gmm_{}".format(count), colors)
def plot_results(self, data, label, title, color_iter):
"""
Plot the result
:param data: ndarray
:param label: list
:param title: string
:param color_iter: ndarray
"""
fig, ax = plt.subplots()
for i, (mean, covar, color) in enumerate(zip(self.mu, self.sigma, color_iter)):
v, w = np.linalg.eigh(covar)
v = 2. * np.sqrt(2.) * np.sqrt(v)
u = w[0] / np.linalg.norm(w[0])
# as the DP will not use every component it has access to
# unless it needs it, we shouldn't plot the redundant
# components.
if not np.any(label == i):
continue
ax.scatter(data[label == i, 0], data[label == i, 1], s=7, color=color)
# Plot an ellipse to show the Gaussian component
angle = np.arctan(u[1] / u[0])
angle = 180. * angle / np.pi # convert to degrees
ell = mpl.patches.Ellipse(mean, v[0], v[1], 180. + angle, color=color)
ell.set_clip_box(ax.bbox)
ell.set_alpha(0.5)
ax.add_artist(ell)
ax.set_title(title)
fig.savefig("gmm_plots/{}.png".format(title))