66Python:
77 - 3.8+
88Inputs:
9- - X : a 2D numpy array of features.
9+ - data : a 2D numpy array of features.
1010 - n_components : number of Gaussian distributions (clusters) to fit.
1111 - max_iter : maximum number of EM iterations.
1212 - tol : convergence tolerance.
1313Usage:
14- 1. define 'n_components' value and 'X ' features array
14+ 1. define 'n_components' value and 'data ' features array
1515 2. initialize model:
1616 gmm = GaussianMixture(n_components=3, max_iter=100)
1717 3. fit model to data:
18- gmm.fit(X )
18+ gmm.fit(data )
1919 4. get cluster predictions:
20- labels = gmm.predict(X )
20+ labels = gmm.predict(data )
2121 5. visualize results:
22- gmm.plot_results(X )
22+ gmm.plot_results(data )
2323"""
2424
2525import warnings
26- import numpy as np
26+
2727import matplotlib .pyplot as plt
28+ import numpy as np
29+ from numpy .typing import NDArray
2830from scipy .stats import multivariate_normal
2931
3032warnings .filterwarnings ("ignore" )
3133
3234TAG = "GAUSSIAN-MIXTURE/ "
35+ FloatArray = NDArray [np .float64 ]
3336
3437
3538class GaussianMixture :
3639 """
3740 Gaussian Mixture Model implemented using the Expectation-Maximization algorithm.
3841 """
3942
40- def __init__ (self , n_components = 2 , max_iter = 100 , tol = 1e-4 , seed = None ):
41- self .n_components = n_components
42- self .max_iter = max_iter
43- self .tol = tol
44- self .seed = seed
43+ def __init__ (
44+ self ,
45+ n_components : int = 2 ,
46+ max_iter : int = 100 ,
47+ tol : float = 1e-4 ,
48+ seed : int | None = None ,
49+ ) -> None :
50+ self .n_components : int = n_components
51+ self .max_iter : int = max_iter
52+ self .tol : float = tol
53+ self .seed : int | None = seed
4554
4655 # parameters
47- self .weights_ = None
48- self .means_ = None
49- self .covariances_ = None
50- self .log_likelihoods_ = []
51-
52- def _initialize_parameters (self , X ):
53- """Randomly initialize means, covariances, and mixture weights"""
56+ self .weights_ : FloatArray | None = None
57+ self .means_ : FloatArray | None = None
58+ self .covariances_ : NDArray [np .float64 ] | None = None
59+ self .log_likelihoods_ : list [float ] = []
60+
61+ def _initialize_parameters (self , data : FloatArray ) -> None :
62+ """Randomly initialize means, covariances, and mixture weights.
63+
64+ Examples
65+ --------
66+ >>> sample = np.array(
67+ ... [[0.0, 0.5], [1.0, 1.5], [2.0, 2.5], [3.0, 3.5]]
68+ ... )
69+ >>> model = GaussianMixture(n_components=2, seed=0)
70+ >>> model._initialize_parameters(sample)
71+ >>> model.means_.shape
72+ (2, 2)
73+ >>> bool(np.isclose(model.weights_.sum(), 1.0))
74+ True
75+ """
5476 rng = np .random .default_rng (self .seed )
55- n_samples , n_features = X .shape
77+ n_samples , _ = data .shape
5678
5779 indices = rng .choice (n_samples , self .n_components , replace = False )
58- self .means_ = X [indices ]
80+ self .means_ = data [indices ]
5981
82+ identity = np .eye (data .shape [1 ]) * 1e-6
6083 self .covariances_ = np .array (
61- [np .cov (X , rowvar = False ) for _ in range (self .n_components )]
84+ [np .cov (data , rowvar = False ) + identity for _ in range (self .n_components )]
6285 )
6386 self .weights_ = np .ones (self .n_components ) / self .n_components
6487
65- def _e_step (self , X ):
66- """Compute responsibilities (posterior probabilities)"""
67- n_samples = X .shape [0 ]
88+ def _e_step (self , data : FloatArray ) -> FloatArray :
89+ """Compute responsibilities (posterior probabilities).
90+
91+ Examples
92+ --------
93+ >>> sample = np.array(
94+ ... [[0.0, 0.5], [1.0, 1.5], [2.0, 2.5], [3.0, 3.5]]
95+ ... )
96+ >>> model = GaussianMixture(n_components=2, seed=0)
97+ >>> model._initialize_parameters(sample)
98+ >>> resp = model._e_step(sample)
99+ >>> resp.shape
100+ (4, 2)
101+ >>> bool(np.allclose(resp.sum(axis=1), 1.0))
102+ True
103+ """
104+ if self .weights_ is None or self .means_ is None or self .covariances_ is None :
105+ raise ValueError (
106+ "Model parameters must be initialized before running the E-step."
107+ )
108+
109+ n_samples = data .shape [0 ]
68110 responsibilities = np .zeros ((n_samples , self .n_components ))
111+ weights = self .weights_
112+ means = self .means_
113+ covariances = self .covariances_
69114
70115 for k in range (self .n_components ):
71- rv = multivariate_normal (mean = self .means_ [k ], cov = self .covariances_ [k ])
72- responsibilities [:, k ] = self .weights_ [k ] * rv .pdf (X )
116+ rv = multivariate_normal (
117+ mean = means [k ], cov = covariances [k ], allow_singular = True
118+ )
119+ responsibilities [:, k ] = weights [k ] * rv .pdf (data )
73120
74121 # Normalize to get probabilities
75122 responsibilities /= responsibilities .sum (axis = 1 , keepdims = True )
76123 return responsibilities
77124
78- def _m_step (self , X , responsibilities ):
79- """Update weights, means, and covariances"""
80- n_samples , n_features = X .shape
81- Nk = responsibilities .sum (axis = 0 )
82-
83- self .weights_ = Nk / n_samples
84- self .means_ = (responsibilities .T @ X ) / Nk [:, np .newaxis ]
125+ def _m_step (self , data : FloatArray , responsibilities : FloatArray ) -> None :
126+ """Update weights, means, and covariances.
127+
128+ Note: assumes the model parameters are already initialized.
129+
130+ Examples
131+ --------
132+ >>> sample = np.array(
133+ ... [[0.0, 0.5], [1.0, 1.5], [2.0, 2.5], [3.0, 3.5]]
134+ ... )
135+ >>> model = GaussianMixture(n_components=2, seed=0)
136+ >>> model._initialize_parameters(sample)
137+ >>> resp = model._e_step(sample)
138+ >>> model._m_step(sample, resp)
139+ >>> bool(np.isclose(model.weights_.sum(), 1.0))
140+ True
141+ """
142+ n_samples , n_features = data .shape
143+ component_counts = responsibilities .sum (axis = 0 )
144+
145+ self .weights_ = component_counts / n_samples
146+ self .means_ = (responsibilities .T @ data ) / component_counts [:, np .newaxis ]
147+
148+ if self .covariances_ is None or self .means_ is None :
149+ raise ValueError (
150+ "Model parameters must be initialized before running the M-step."
151+ )
152+
153+ covariances = self .covariances_
154+ means = self .means_
85155
86156 for k in range (self .n_components ):
87- diff = X - self .means_ [k ]
88- self .covariances_ [k ] = (
89- responsibilities [:, k ][:, np .newaxis ] * diff
90- ).T @ diff
91- self .covariances_ [k ] /= Nk [k ]
157+ diff = data - means [k ]
158+ covariances [k ] = (responsibilities [:, k ][:, np .newaxis ] * diff ).T @ diff
159+ covariances [k ] /= component_counts [k ]
92160 # Add small regularization term for numerical stability
93- self .covariances_ [k ] += np .eye (n_features ) * 1e-6
94-
95- def _compute_log_likelihood (self , X ):
96- """Compute total log-likelihood of the model"""
97- n_samples = X .shape [0 ]
161+ covariances [k ] += np .eye (n_features ) * 1e-6
162+
163+ def _compute_log_likelihood (self , data : FloatArray ) -> float :
164+ """Compute total log-likelihood of the model.
165+
166+ Note: assumes the model parameters are already initialized.
167+
168+ Examples
169+ --------
170+ >>> sample = np.array(
171+ ... [[0.0, 0.5], [1.0, 1.5], [2.0, 2.5], [3.0, 3.5]]
172+ ... )
173+ >>> model = GaussianMixture(n_components=2, seed=0)
174+ >>> model._initialize_parameters(sample)
175+ >>> bool(np.isfinite(model._compute_log_likelihood(sample)))
176+ True
177+ """
178+ if self .weights_ is None or self .means_ is None or self .covariances_ is None :
179+ raise ValueError (
180+ "Model parameters must be initialized before computing likelihood."
181+ )
182+
183+ n_samples = data .shape [0 ]
98184 total_pdf = np .zeros ((n_samples , self .n_components ))
185+ weights = self .weights_
186+ means = self .means_
187+ covariances = self .covariances_
99188
100189 for k in range (self .n_components ):
101- rv = multivariate_normal (mean = self .means_ [k ], cov = self .covariances_ [k ])
102- total_pdf [:, k ] = self .weights_ [k ] * rv .pdf (X )
190+ rv = multivariate_normal (
191+ mean = means [k ], cov = covariances [k ], allow_singular = True
192+ )
193+ total_pdf [:, k ] = weights [k ] * rv .pdf (data )
103194
104195 log_likelihood = np .sum (np .log (np .sum (total_pdf , axis = 1 ) + 1e-12 ))
105196 return log_likelihood
106197
107- def fit (self , X ):
108- """Fit the Gaussian Mixture Model to data using the EM algorithm"""
109- self ._initialize_parameters (X )
198+ def fit (self , data : FloatArray ) -> None :
199+ """Fit the Gaussian Mixture Model to data using the EM algorithm.
200+
201+ Examples
202+ --------
203+ >>> sample = np.array(
204+ ... [[0.0, 0.5], [1.0, 1.5], [2.0, 2.5], [3.0, 3.5]]
205+ ... )
206+ >>> model = GaussianMixture(n_components=2, max_iter=5, tol=1e-3, seed=0)
207+ >>> model.fit(sample) # doctest: +ELLIPSIS
208+ GAUSSIAN-MIXTURE/ ...
209+ >>> len(model.log_likelihoods_) > 0
210+ True
211+ """
212+ self ._initialize_parameters (data )
110213
111214 prev_log_likelihood = None
112215
113216 for i in range (self .max_iter ):
114217 # E-step
115- responsibilities = self ._e_step (X )
218+ responsibilities = self ._e_step (data )
116219
117220 # M-step
118- self ._m_step (X , responsibilities )
221+ self ._m_step (data , responsibilities )
119222
120223 # Log-likelihood
121- log_likelihood = self ._compute_log_likelihood (X )
224+ log_likelihood = self ._compute_log_likelihood (data )
122225 self .log_likelihoods_ .append (log_likelihood )
123226
124- if prev_log_likelihood is not None :
125- if abs (log_likelihood - prev_log_likelihood ) < self .tol :
126- print (f"{ TAG } Converged at iteration { i } ." )
127- break
227+ if (
228+ prev_log_likelihood is not None
229+ and abs (log_likelihood - prev_log_likelihood ) < self .tol
230+ ):
231+ print (f"{ TAG } Converged at iteration { i } ." )
232+ break
128233 prev_log_likelihood = log_likelihood
129234
130235 print (f"{ TAG } Training complete. Final log-likelihood: { log_likelihood :.4f} " )
131236
132- def predict (self , X ):
133- """Predict cluster assignment for each data point"""
134- responsibilities = self ._e_step (X )
237+ def predict (self , data : FloatArray ) -> NDArray [np .int_ ]:
238+ """Predict cluster assignment for each data point.
239+
240+ Note: assumes the model parameters are already initialized.
241+
242+ Examples
243+ --------
244+ >>> sample = np.array(
245+ ... [[0.0, 0.5], [1.0, 1.5], [2.0, 2.5], [3.0, 3.5]]
246+ ... )
247+ >>> model = GaussianMixture(n_components=2, max_iter=5, tol=1e-3, seed=0)
248+ >>> model.fit(sample) # doctest: +ELLIPSIS
249+ GAUSSIAN-MIXTURE/ ...
250+ >>> labels = model.predict(sample)
251+ >>> labels.shape
252+ (4,)
253+ """
254+ responsibilities = self ._e_step (data )
135255 return np .argmax (responsibilities , axis = 1 )
136256
137- def plot_results (self , X ):
138- """Visualize GMM clustering results (2D only)"""
139- if X .shape [1 ] != 2 :
257+ def plot_results (self , data : FloatArray ) -> None :
258+ """Visualize GMM clustering results (2D only).
259+
260+ Note: This method assumes self.means_ is initialized.
261+
262+ Examples
263+ --------
264+ >>> sample = np.ones((3, 3))
265+ >>> model = GaussianMixture()
266+ >>> model.plot_results(sample)
267+ GAUSSIAN-MIXTURE/ Plotting only supported for 2D data.
268+ """
269+ if data .shape [1 ] != 2 :
140270 print (f"{ TAG } Plotting only supported for 2D data." )
141271 return
142272
143- labels = self .predict (X )
144- plt .scatter (X [:, 0 ], X [:, 1 ], c = labels , cmap = "viridis" , s = 30 )
273+ labels = self .predict (data )
274+ if self .means_ is None :
275+ raise ValueError ("Model means must be initialized before plotting." )
276+ plt .scatter (data [:, 0 ], data [:, 1 ], c = labels , cmap = "viridis" , s = 30 )
145277 plt .scatter (self .means_ [:, 0 ], self .means_ [:, 1 ], c = "red" , s = 100 , marker = "x" )
146278 plt .title ("Gaussian Mixture Model Clustering" )
147279 plt .xlabel ("Feature 1" )
@@ -153,8 +285,10 @@ def plot_results(self, X):
153285if __name__ == "__main__" :
154286 from sklearn .datasets import make_blobs
155287
156- X , _ = make_blobs (n_samples = 300 , centers = 3 , cluster_std = 1.2 , random_state = 42 )
288+ sample_data , _ = make_blobs (
289+ n_samples = 300 , centers = 3 , cluster_std = 1.2 , random_state = 42
290+ )
157291 gmm = GaussianMixture (n_components = 3 , max_iter = 100 , seed = 42 )
158- gmm .fit (X )
159- labels = gmm .predict (X )
160- gmm .plot_results (X )
292+ gmm .fit (sample_data )
293+ labels = gmm .predict (sample_data )
294+ gmm .plot_results (sample_data )
0 commit comments