Skip to content

Commit f151cec

Browse files
committed
Merge remote-tracking branch 'loisaidasam/feature/multiprocessing' into develop
2 parents f7ca4ba + 7f48095 commit f151cec

File tree

4 files changed

+172
-75
lines changed

4 files changed

+172
-75
lines changed

cluster/matrix.py

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
2+
import logging
3+
from multiprocessing import Process, Queue, current_process
4+
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
class Matrix(object):
10+
"""Object representation of the item-item matrix
11+
"""
12+
13+
def __init__(self, data, combinfunc, symmetric=False, diagonal=None):
14+
"""Takes a list of data and generates a 2D-matrix using the supplied
15+
combination function to calculate the values.
16+
17+
PARAMETERS
18+
data - the list of items
19+
combinfunc - the function that is used to calculate teh value in a
20+
cell. It has to cope with two arguments.
21+
symmetric - Whether it will be a symmetric matrix along the diagonal.
22+
For example, if the list contains integers, and the
23+
combination function is abs(x-y), then the matrix will
24+
be symmetric.
25+
Default: False
26+
diagonal - The value to be put into the diagonal. For some
27+
functions, the diagonal will stay constant. An example
28+
could be the function "x-y". Then each diagonal cell
29+
will be "0". If this value is set to None, then the
30+
diagonal will be calculated. Default: None
31+
"""
32+
self.data = data
33+
self.combinfunc = combinfunc
34+
self.symmetric = symmetric
35+
self.diagonal = diagonal
36+
37+
def worker(self):
38+
"""Multiprocessing task function run by worker processes
39+
"""
40+
tasks_completed = 0
41+
for task in iter(self.task_queue.get, 'STOP'):
42+
col_index, item, item2 = task
43+
result = (col_index, self.combinfunc(item, item2))
44+
self.task_queue.task_done()
45+
self.done_queue.put(result)
46+
tasks_completed += 1
47+
self.task_queue.task_done()
48+
logger.info("Worker %s performed %s tasks",
49+
current_process().name,
50+
tasks_completed)
51+
52+
def genmatrix(self, num_processes=1):
53+
"""Actually generate the matrix
54+
55+
PARAMETERS
56+
num_processes
57+
- If you want to use multiprocessing to split up the work
58+
and run combinfunc() in parallel, specify num_processes
59+
> 1 and this number of workers will be spun up, the work
60+
split up amongst them evenly. Default: 1
61+
"""
62+
use_multiprocessing = num_processes > 1
63+
if use_multiprocessing:
64+
self.task_queue = Queue()
65+
self.done_queue = Queue()
66+
67+
self.matrix = []
68+
logger.info("Generating matrix for %s items - O(n^2)", len(self.data))
69+
if use_multiprocessing:
70+
logger.info("Using multiprocessing on %s processes!", num_processes)
71+
72+
if use_multiprocessing:
73+
logger.info("Spinning up %s workers", num_processes)
74+
processes = [Process(target=self.worker) for i in range(num_processes)]
75+
[process.start() for process in processes]
76+
77+
for row_index, item in enumerate(self.data):
78+
logger.debug("Generating row %s/%s (%0.2f%%)",
79+
row_index,
80+
len(self.data),
81+
100.0 * row_index / len(self.data))
82+
row = {}
83+
if use_multiprocessing:
84+
num_tasks_queued = num_tasks_completed = 0
85+
for col_index, item2 in enumerate(self.data):
86+
if self.diagonal is not None and col_index == row_index:
87+
# This is a cell on the diagonal
88+
row[col_index] = self.diagonal
89+
elif self.symmetric and col_index < row_index:
90+
# The matrix is symmetric and we are "in the lower left
91+
# triangle" - fill this in after (in case of multiprocessing)
92+
pass
93+
# Otherwise, this cell is not on the diagonal and we do indeed
94+
# need to call combinfunc()
95+
elif use_multiprocessing:
96+
# Add that thing to the task queue!
97+
self.task_queue.put((col_index, item, item2))
98+
num_tasks_queued += 1
99+
# Start grabbing the results as we go, so as not to stuff all of
100+
# the worker args into memory at once (as Queue.get() is a
101+
# blocking operation)
102+
if num_tasks_queued > num_processes:
103+
col_index, result = self.done_queue.get()
104+
self.done_queue.task_done()
105+
row[col_index] = result
106+
num_tasks_completed += 1
107+
else:
108+
# Otherwise do it here, in line
109+
row[col_index] = self.combinfunc(item, item2)
110+
111+
if self.symmetric:
112+
# One more iteration to get symmetric lower left triangle
113+
for col_index, item2 in enumerate(self.data):
114+
if col_index >= row_index:
115+
break
116+
# post-process symmetric "lower left triangle"
117+
row[col_index] = self.matrix[col_index][row_index]
118+
119+
if use_multiprocessing:
120+
# Grab the remaining worker task results
121+
while num_tasks_completed < num_tasks_queued:
122+
col_index, result = self.done_queue.get()
123+
self.done_queue.task_done()
124+
row[col_index] = result
125+
num_tasks_completed += 1
126+
127+
row_indexed = [row[index] for index in range(len(self.data))]
128+
self.matrix.append(row_indexed)
129+
130+
if use_multiprocessing:
131+
logger.info("Stopping/joining %s workers", num_processes)
132+
[self.task_queue.put('STOP') for i in range(num_processes)]
133+
[process.join() for process in processes]
134+
135+
logger.info("Matrix generated")
136+
137+
def __str__(self):
138+
"""
139+
Prints out a 2-dimensional list of data cleanly.
140+
This is useful for debugging.
141+
142+
PARAMETERS
143+
data - the 2D-list to display
144+
"""
145+
# determine maximum length
146+
maxlen = 0
147+
colcount = len(self.data[0])
148+
for col in self.data:
149+
for cell in col:
150+
maxlen = max(len(str(cell)), maxlen)
151+
format = " %%%is |" % maxlen
152+
format = "|" + format * colcount
153+
rows = [format % tuple(row) for row in self.data]
154+
return "\n".join(rows)

cluster/method/hierarchical.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
import logging
1919

2020
from cluster.cluster import Cluster
21+
from cluster.matrix import Matrix
2122
from cluster.method.base import BaseClusterMethod
22-
from cluster.util import median, mean, genmatrix
23+
from cluster.util import median, mean
2324

2425

2526
logger = logging.getLogger(__name__)
@@ -41,18 +42,25 @@ class HierarchicalClustering(BaseClusterMethod):
4142
Note that all of the returned clusters are more that 90 apart
4243
"""
4344

44-
def __init__(self, data, distance_function, linkage=None):
45+
def __init__(self, data, distance_function, linkage=None, num_processes=1):
4546
"""
4647
Constructor
4748
4849
See BaseClusterMethod.__init__ for more details.
50+
51+
num_processes
52+
- If you want to use multiprocessing to split up the work
53+
and run genmatrix() in parallel, specify num_processes
54+
> 1 and this number of workers will be spun up, the work
55+
split up amongst them evenly. Default: 1
4956
"""
5057
if not linkage:
5158
linkage = 'single'
5259
logger.info("Initializing HierarchicalClustering object with linkage "
5360
"method %s", linkage)
5461
BaseClusterMethod.__init__(self, data, distance_function)
5562
self.set_linkage_method(linkage)
63+
self.num_processes = num_processes
5664
self.__cluster_created = False
5765

5866
def set_linkage_method(self, method):
@@ -216,7 +224,12 @@ def cluster(self, matrix=None, level=None, sequence=None):
216224
# if the matrix only has two rows left, we are done
217225
while len(matrix) > 2 or matrix == []:
218226

219-
matrix = genmatrix(self._data, self.linkage, True, 0)
227+
item_item_matrix = Matrix(self._data,
228+
self.linkage,
229+
True,
230+
0)
231+
item_item_matrix.genmatrix(self.num_processes)
232+
matrix = item_item_matrix.matrix
220233

221234
smallestpair = None
222235
mindistance = None

cluster/util.py

Lines changed: 0 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,6 @@
1515
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
1616
#
1717

18-
import logging
19-
20-
21-
logger = logging.getLogger(__name__)
22-
2318

2419
class ClusteringError(Exception):
2520
pass
@@ -85,71 +80,6 @@ def minkowski_distance(x, y, p=2):
8580
return pow(sum, 1.0 / float(p))
8681

8782

88-
def genmatrix(data, combinfunc, symmetric=False, diagonal=None):
89-
"""
90-
Takes a list of data and generates a 2D-matrix using the supplied
91-
combination function to calculate the values.
92-
93-
PARAMETERS
94-
data - the list of items
95-
combinfunc - the function that is used to calculate teh value in a
96-
cell. It has to cope with two arguments.
97-
symmetric - Whether it will be a symmetric matrix along the diagonal.
98-
For example, if the list contains integers, and the
99-
combination function is abs(x-y), then the matrix will
100-
be symmetric.
101-
Default: False
102-
diagonal - The value to be put into the diagonal. For some
103-
functions, the diagonal will stay constant. An example
104-
could be the function "x-y". Then each diagonal cell
105-
will be "0". If this value is set to None, then the
106-
diagonal will be calculated. Default: None
107-
"""
108-
logger.info("Generating matrix for %s items - O(n^2)", len(data))
109-
matrix = []
110-
for row_index, item in enumerate(data):
111-
logger.debug("Generating row %s/%s (%0.2f)",
112-
row_index,
113-
len(data),
114-
100.0 * row_index / len(data))
115-
row = []
116-
for col_index, item2 in enumerate(data):
117-
if diagonal is not None and col_index == row_index:
118-
# if this is a cell on the diagonal
119-
row.append(diagonal)
120-
elif symmetric and col_index < row_index:
121-
# if the matrix is symmetric and we are "in the lower left
122-
# triangle"
123-
row.append(matrix[col_index][row_index])
124-
else:
125-
# if this cell is not on the diagonal
126-
row.append(combinfunc(item, item2))
127-
matrix.append(row)
128-
logger.info("Matrix generated")
129-
return matrix
130-
131-
132-
def printmatrix(data):
133-
"""
134-
Prints out a 2-dimensional list of data cleanly.
135-
This is useful for debugging.
136-
137-
PARAMETERS
138-
data - the 2D-list to display
139-
"""
140-
# determine maximum length
141-
maxlen = 0
142-
colcount = len(data[0])
143-
for col in data:
144-
for cell in col:
145-
maxlen = max(len(str(cell)), maxlen)
146-
# print data
147-
format = " %%%is |" % maxlen
148-
format = "|" + format * colcount
149-
for row in data:
150-
print format % tuple(row)
151-
152-
15383
def magnitude(a):
15484
"calculates the magnitude of a vecor"
15585
from math import sqrt

test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,9 +234,9 @@ def testNumpyRandom(self):
234234
unittest.makeSuite(HClusterSmallListTestCase),
235235
unittest.makeSuite(HClusterStringTestCase),
236236
unittest.makeSuite(KCluster2DTestCase),
237-
unittest.makeSuite(KClusterSFBugs)),
237+
unittest.makeSuite(KClusterSFBugs),
238238
unittest.makeSuite(KClusterSmallListTestCase),
239239
unittest.makeSuite(NumpyTests),
240-
)
240+
))
241241

242242
unittest.TextTestRunner(verbosity=2).run(suite)

0 commit comments

Comments
 (0)