Skip to content

Commit d98103d

Browse files
committed
Moved util.genmatrix() into matrix.ItemItemMatrix
more complex code deserves its own submodule
1 parent 34b4845 commit d98103d

File tree

5 files changed

+162
-146
lines changed

5 files changed

+162
-146
lines changed

CHANGELOG

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
1.3.1
2+
- Moved util.genmatrix() into matrix.ItemItemMatrix - more complex code
3+
deserves its own submodule
4+
15
1.3.0
26
- Multiprocessing for genmatrix
37

cluster/matrix.py

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
2+
import logging
3+
from multiprocessing import Process, Queue, current_process
4+
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
class ItemItemMatrix(object):
10+
"""Object representation of the item-item matrix
11+
"""
12+
13+
def __init__(self, data, combinfunc, symmetric=False, diagonal=None, num_processes=1):
14+
"""Takes a list of data and generates a 2D-matrix using the supplied
15+
combination function to calculate the values.
16+
17+
PARAMETERS
18+
data - the list of items
19+
combinfunc - the function that is used to calculate teh value in a
20+
cell. It has to cope with two arguments.
21+
symmetric - Whether it will be a symmetric matrix along the diagonal.
22+
For example, if the list contains integers, and the
23+
combination function is abs(x-y), then the matrix will
24+
be symmetric.
25+
Default: False
26+
diagonal - The value to be put into the diagonal. For some
27+
functions, the diagonal will stay constant. An example
28+
could be the function "x-y". Then each diagonal cell
29+
will be "0". If this value is set to None, then the
30+
diagonal will be calculated. Default: None
31+
num_processes
32+
- If you want to use multiprocessing to split up the work
33+
and run combinfunc() in parallel, specify num_processes
34+
> 1 and this number of workers will be spun up, the work
35+
split up amongst them evenly. Default: 1
36+
"""
37+
self.data = data
38+
self.combinfunc = combinfunc
39+
self.symmetric = symmetric
40+
self.diagonal = diagonal
41+
self.num_processes = num_processes
42+
self.use_multiprocessing = num_processes > 1
43+
if self.use_multiprocessing:
44+
self.task_queue = Queue()
45+
self.done_queue = Queue()
46+
self.genmatrix()
47+
48+
def worker(self):
49+
"""Multiprocessing task function run by worker processes
50+
"""
51+
tasks_completed = 0
52+
for task in iter(self.task_queue.get, 'STOP'):
53+
col_index, item, item2 = task
54+
result = (col_index, self.combinfunc(item, item2))
55+
self.done_queue.put(result)
56+
tasks_completed += 1
57+
logger.info("Worker %s performed %s tasks",
58+
current_process().name,
59+
tasks_completed)
60+
61+
def genmatrix(self):
62+
self.matrix = []
63+
logger.info("Generating matrix for %s items - O(n^2)", len(self.data))
64+
if self.use_multiprocessing:
65+
logger.info("Using multiprocessing on %s processes!", self.num_processes)
66+
67+
if self.use_multiprocessing:
68+
logger.info("Spinning up %s workers", self.num_processes)
69+
processes = [Process(target=self.worker) for i in xrange(self.num_processes)]
70+
[process.start() for process in processes]
71+
72+
for row_index, item in enumerate(self.data):
73+
logger.debug("Generating row %s/%s (%0.2f%%)",
74+
row_index,
75+
len(self.data),
76+
100.0 * row_index / len(self.data))
77+
row = {}
78+
if self.use_multiprocessing:
79+
num_tasks_queued = num_tasks_completed = 0
80+
for col_index, item2 in enumerate(self.data):
81+
if self.diagonal is not None and col_index == row_index:
82+
# This is a cell on the diagonal
83+
row[col_index] = self.diagonal
84+
elif self.symmetric and col_index < row_index:
85+
# The matrix is symmetric and we are "in the lower left
86+
# triangle" - fill this in after (in case of multiprocessing)
87+
pass
88+
# Otherwise, this cell is not on the diagonal and we do indeed
89+
# need to call combinfunc()
90+
elif self.use_multiprocessing:
91+
# Add that thing to the task queue!
92+
self.task_queue.put((col_index, item, item2))
93+
num_tasks_queued += 1
94+
# Start grabbing the results as we go, so as not to stuff all of
95+
# the worker args into memory at once (as Queue.get() is a
96+
# blocking operation)
97+
if num_tasks_queued > self.num_processes:
98+
col_index, result = self.done_queue.get()
99+
row[col_index] = result
100+
num_tasks_completed += 1
101+
else:
102+
# Otherwise do it here, in line
103+
row[col_index] = self.combinfunc(item, item2)
104+
105+
if self.symmetric:
106+
# One more iteration to get symmetric lower left triangle
107+
for col_index, item2 in enumerate(self.data):
108+
if col_index >= row_index:
109+
break
110+
# post-process symmetric "lower left triangle"
111+
row[col_index] = self.matrix[col_index][row_index]
112+
113+
if self.use_multiprocessing:
114+
# Grab the remaining worker task results
115+
while num_tasks_completed < num_tasks_queued:
116+
col_index, result = self.done_queue.get()
117+
row[col_index] = result
118+
num_tasks_completed += 1
119+
120+
row_indexed = [row[index] for index in xrange(len(self.data))]
121+
self.matrix.append(row_indexed)
122+
123+
if self.use_multiprocessing:
124+
logger.info("Stopping/joining %s workers", self.num_processes)
125+
[self.task_queue.put('STOP') for i in xrange(self.num_processes)]
126+
[process.join() for process in processes]
127+
128+
logger.info("Matrix generated")
129+
130+
def printmatrix(self):
131+
"""
132+
Prints out a 2-dimensional list of data cleanly.
133+
This is useful for debugging.
134+
135+
PARAMETERS
136+
data - the 2D-list to display
137+
"""
138+
# determine maximum length
139+
maxlen = 0
140+
colcount = len(self.data[0])
141+
for col in self.data:
142+
for cell in col:
143+
maxlen = max(len(str(cell)), maxlen)
144+
# print data
145+
format = " %%%is |" % maxlen
146+
format = "|" + format * colcount
147+
for row in self.data:
148+
print format % tuple(row)

cluster/method/hierarchical.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
import logging
1919

2020
from cluster.cluster import Cluster
21+
from cluster.matrix import ItemItemMatrix
2122
from cluster.method.base import BaseClusterMethod
22-
from cluster.util import median, mean, genmatrix
23+
from cluster.util import median, mean
2324

2425

2526
logger = logging.getLogger(__name__)
@@ -223,11 +224,13 @@ def cluster(self, matrix=None, level=None, sequence=None):
223224
# if the matrix only has two rows left, we are done
224225
while len(matrix) > 2 or matrix == []:
225226

226-
matrix = genmatrix(self._data,
227-
self.linkage,
228-
True,
229-
0,
230-
self.num_processes)
227+
# TODO: actually use the ItemItemMatrix object
228+
item_item_matrix = ItemItemMatrix(self._data,
229+
self.linkage,
230+
True,
231+
0,
232+
self.num_processes)
233+
matrix = item_item_matrix.matrix
231234

232235
smallestpair = None
233236
mindistance = None

cluster/util.py

Lines changed: 0 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,6 @@
1515
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
1616
#
1717

18-
import logging
19-
from multiprocessing import Process, Queue, current_process
20-
21-
22-
logger = logging.getLogger(__name__)
23-
2418

2519
class ClusteringError(Exception):
2620
pass
@@ -86,139 +80,6 @@ def minkowski_distance(x, y, p=2):
8680
return pow(sum, 1.0 / float(p))
8781

8882

89-
def genmatrix(data, combinfunc, symmetric=False, diagonal=None, num_processes=1):
90-
"""
91-
Takes a list of data and generates a 2D-matrix using the supplied
92-
combination function to calculate the values.
93-
94-
PARAMETERS
95-
data - the list of items
96-
combinfunc - the function that is used to calculate teh value in a
97-
cell. It has to cope with two arguments.
98-
symmetric - Whether it will be a symmetric matrix along the diagonal.
99-
For example, if the list contains integers, and the
100-
combination function is abs(x-y), then the matrix will
101-
be symmetric.
102-
Default: False
103-
diagonal - The value to be put into the diagonal. For some
104-
functions, the diagonal will stay constant. An example
105-
could be the function "x-y". Then each diagonal cell
106-
will be "0". If this value is set to None, then the
107-
diagonal will be calculated. Default: None
108-
num_processes
109-
- If you want to use multiprocessing to split up the work
110-
and run combinfunc() in parallel, specify num_processes
111-
> 1 and this number of workers will be spun up, the work
112-
split up amongst them evenly. Default: 1
113-
"""
114-
logger.info("Generating matrix for %s items - O(n^2)", len(data))
115-
use_multiprocessing = num_processes > 1
116-
if use_multiprocessing:
117-
logger.info("Using multiprocessing on %s processes!", num_processes)
118-
119-
matrix = []
120-
task_queue = Queue()
121-
done_queue = Queue()
122-
123-
def worker():
124-
"""Multiprocessing task function run by worker processes
125-
"""
126-
tasks_completed = 0
127-
for task in iter(task_queue.get, 'STOP'):
128-
col_index, item, item2 = task
129-
result = (col_index, combinfunc(item, item2))
130-
done_queue.put(result)
131-
tasks_completed += 1
132-
logger.info("Worker %s performed %s tasks",
133-
current_process().name,
134-
tasks_completed)
135-
136-
if use_multiprocessing:
137-
logger.info("Spinning up %s workers", num_processes)
138-
processes = [Process(target=worker) for i in xrange(num_processes)]
139-
[process.start() for process in processes]
140-
141-
for row_index, item in enumerate(data):
142-
logger.debug("Generating row %s/%s (%0.2f)",
143-
row_index,
144-
len(data),
145-
100.0 * row_index / len(data))
146-
row = {}
147-
if use_multiprocessing:
148-
num_tasks_queued = num_tasks_completed = 0
149-
for col_index, item2 in enumerate(data):
150-
if diagonal is not None and col_index == row_index:
151-
# This is a cell on the diagonal
152-
row[col_index] = diagonal
153-
elif symmetric and col_index < row_index:
154-
# The matrix is symmetric and we are "in the lower left
155-
# triangle" - fill this in after (in case of multiprocessing)
156-
pass
157-
# Otherwise, this cell is not on the diagonal and we do indeed
158-
# need to call combinfunc()
159-
elif use_multiprocessing:
160-
# Add that thing to the task queue!
161-
task_queue.put((col_index, item, item2))
162-
num_tasks_queued += 1
163-
# Start grabbing the results as we go, so as not to stuff all of
164-
# the worker args into memory at once (as Queue.get() is a
165-
# blocking operation)
166-
if num_tasks_queued > num_processes:
167-
col_index, result = done_queue.get()
168-
row[col_index] = result
169-
num_tasks_completed += 1
170-
else:
171-
# Otherwise do it here, in line
172-
row[col_index] = combinfunc(item, item2)
173-
174-
if symmetric:
175-
# One more iteration to get symmetric lower left triangle
176-
for col_index, item2 in enumerate(data):
177-
if col_index >= row_index:
178-
break
179-
# post-process symmetric "lower left triangle"
180-
row[col_index] = matrix[col_index][row_index]
181-
182-
if use_multiprocessing:
183-
# Grab the remaining worker task results
184-
while num_tasks_completed < num_tasks_queued:
185-
col_index, result = done_queue.get()
186-
row[col_index] = result
187-
num_tasks_completed += 1
188-
189-
row_indexed = [row[index] for index in xrange(len(data))]
190-
matrix.append(row_indexed)
191-
192-
if use_multiprocessing:
193-
logger.info("Stopping/joining %s workers", num_processes)
194-
[task_queue.put('STOP') for i in xrange(num_processes)]
195-
[process.join() for process in processes]
196-
197-
logger.info("Matrix generated")
198-
return matrix
199-
200-
201-
def printmatrix(data):
202-
"""
203-
Prints out a 2-dimensional list of data cleanly.
204-
This is useful for debugging.
205-
206-
PARAMETERS
207-
data - the 2D-list to display
208-
"""
209-
# determine maximum length
210-
maxlen = 0
211-
colcount = len(data[0])
212-
for col in data:
213-
for cell in col:
214-
maxlen = max(len(str(cell)), maxlen)
215-
# print data
216-
format = " %%%is |" % maxlen
217-
format = "|" + format * colcount
218-
for row in data:
219-
print format % tuple(row)
220-
221-
22283
def magnitude(a):
22384
"calculates the magnitude of a vecor"
22485
from math import sqrt

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
setup(
1212
name='cluster',
13-
version='1.3.0',
13+
version='1.3.1',
1414
author='Michel Albert',
1515
author_email='[email protected]',
1616
url='http://python-cluster.sourceforge.net/',

0 commit comments

Comments
 (0)