Skip to content

Commit 9ef790d

Browse files
committed
Merge remote-tracking branch 'loisaidasam/feature/multiprocessing' into release-1.2.0
Conflicts: cluster/util.py used loisaidasams version.
2 parents ab97c9c + 7f48095 commit 9ef790d

File tree

3 files changed

+170
-68
lines changed

3 files changed

+170
-68
lines changed

cluster/matrix.py

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
2+
import logging
3+
from multiprocessing import Process, Queue, current_process
4+
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
class Matrix(object):
10+
"""Object representation of the item-item matrix
11+
"""
12+
13+
def __init__(self, data, combinfunc, symmetric=False, diagonal=None):
14+
"""Takes a list of data and generates a 2D-matrix using the supplied
15+
combination function to calculate the values.
16+
17+
PARAMETERS
18+
data - the list of items
19+
combinfunc - the function that is used to calculate teh value in a
20+
cell. It has to cope with two arguments.
21+
symmetric - Whether it will be a symmetric matrix along the diagonal.
22+
For example, if the list contains integers, and the
23+
combination function is abs(x-y), then the matrix will
24+
be symmetric.
25+
Default: False
26+
diagonal - The value to be put into the diagonal. For some
27+
functions, the diagonal will stay constant. An example
28+
could be the function "x-y". Then each diagonal cell
29+
will be "0". If this value is set to None, then the
30+
diagonal will be calculated. Default: None
31+
"""
32+
self.data = data
33+
self.combinfunc = combinfunc
34+
self.symmetric = symmetric
35+
self.diagonal = diagonal
36+
37+
def worker(self):
38+
"""Multiprocessing task function run by worker processes
39+
"""
40+
tasks_completed = 0
41+
for task in iter(self.task_queue.get, 'STOP'):
42+
col_index, item, item2 = task
43+
result = (col_index, self.combinfunc(item, item2))
44+
self.task_queue.task_done()
45+
self.done_queue.put(result)
46+
tasks_completed += 1
47+
self.task_queue.task_done()
48+
logger.info("Worker %s performed %s tasks",
49+
current_process().name,
50+
tasks_completed)
51+
52+
def genmatrix(self, num_processes=1):
53+
"""Actually generate the matrix
54+
55+
PARAMETERS
56+
num_processes
57+
- If you want to use multiprocessing to split up the work
58+
and run combinfunc() in parallel, specify num_processes
59+
> 1 and this number of workers will be spun up, the work
60+
split up amongst them evenly. Default: 1
61+
"""
62+
use_multiprocessing = num_processes > 1
63+
if use_multiprocessing:
64+
self.task_queue = Queue()
65+
self.done_queue = Queue()
66+
67+
self.matrix = []
68+
logger.info("Generating matrix for %s items - O(n^2)", len(self.data))
69+
if use_multiprocessing:
70+
logger.info("Using multiprocessing on %s processes!", num_processes)
71+
72+
if use_multiprocessing:
73+
logger.info("Spinning up %s workers", num_processes)
74+
processes = [Process(target=self.worker) for i in range(num_processes)]
75+
[process.start() for process in processes]
76+
77+
for row_index, item in enumerate(self.data):
78+
logger.debug("Generating row %s/%s (%0.2f%%)",
79+
row_index,
80+
len(self.data),
81+
100.0 * row_index / len(self.data))
82+
row = {}
83+
if use_multiprocessing:
84+
num_tasks_queued = num_tasks_completed = 0
85+
for col_index, item2 in enumerate(self.data):
86+
if self.diagonal is not None and col_index == row_index:
87+
# This is a cell on the diagonal
88+
row[col_index] = self.diagonal
89+
elif self.symmetric and col_index < row_index:
90+
# The matrix is symmetric and we are "in the lower left
91+
# triangle" - fill this in after (in case of multiprocessing)
92+
pass
93+
# Otherwise, this cell is not on the diagonal and we do indeed
94+
# need to call combinfunc()
95+
elif use_multiprocessing:
96+
# Add that thing to the task queue!
97+
self.task_queue.put((col_index, item, item2))
98+
num_tasks_queued += 1
99+
# Start grabbing the results as we go, so as not to stuff all of
100+
# the worker args into memory at once (as Queue.get() is a
101+
# blocking operation)
102+
if num_tasks_queued > num_processes:
103+
col_index, result = self.done_queue.get()
104+
self.done_queue.task_done()
105+
row[col_index] = result
106+
num_tasks_completed += 1
107+
else:
108+
# Otherwise do it here, in line
109+
row[col_index] = self.combinfunc(item, item2)
110+
111+
if self.symmetric:
112+
# One more iteration to get symmetric lower left triangle
113+
for col_index, item2 in enumerate(self.data):
114+
if col_index >= row_index:
115+
break
116+
# post-process symmetric "lower left triangle"
117+
row[col_index] = self.matrix[col_index][row_index]
118+
119+
if use_multiprocessing:
120+
# Grab the remaining worker task results
121+
while num_tasks_completed < num_tasks_queued:
122+
col_index, result = self.done_queue.get()
123+
self.done_queue.task_done()
124+
row[col_index] = result
125+
num_tasks_completed += 1
126+
127+
row_indexed = [row[index] for index in range(len(self.data))]
128+
self.matrix.append(row_indexed)
129+
130+
if use_multiprocessing:
131+
logger.info("Stopping/joining %s workers", num_processes)
132+
[self.task_queue.put('STOP') for i in range(num_processes)]
133+
[process.join() for process in processes]
134+
135+
logger.info("Matrix generated")
136+
137+
def __str__(self):
138+
"""
139+
Prints out a 2-dimensional list of data cleanly.
140+
This is useful for debugging.
141+
142+
PARAMETERS
143+
data - the 2D-list to display
144+
"""
145+
# determine maximum length
146+
maxlen = 0
147+
colcount = len(self.data[0])
148+
for col in self.data:
149+
for cell in col:
150+
maxlen = max(len(str(cell)), maxlen)
151+
format = " %%%is |" % maxlen
152+
format = "|" + format * colcount
153+
rows = [format % tuple(row) for row in self.data]
154+
return "\n".join(rows)

cluster/method/hierarchical.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
import logging
1919

2020
from cluster.cluster import Cluster
21+
from cluster.matrix import Matrix
2122
from cluster.method.base import BaseClusterMethod
22-
from cluster.util import median, mean, genmatrix
23+
from cluster.util import median, mean
2324

2425

2526
logger = logging.getLogger(__name__)
@@ -41,18 +42,25 @@ class HierarchicalClustering(BaseClusterMethod):
4142
Note that all of the returned clusters are more that 90 apart
4243
"""
4344

44-
def __init__(self, data, distance_function, linkage=None):
45+
def __init__(self, data, distance_function, linkage=None, num_processes=1):
4546
"""
4647
Constructor
4748
4849
See BaseClusterMethod.__init__ for more details.
50+
51+
num_processes
52+
- If you want to use multiprocessing to split up the work
53+
and run genmatrix() in parallel, specify num_processes
54+
> 1 and this number of workers will be spun up, the work
55+
split up amongst them evenly. Default: 1
4956
"""
5057
if not linkage:
5158
linkage = 'single'
5259
logger.info("Initializing HierarchicalClustering object with linkage "
5360
"method %s", linkage)
5461
BaseClusterMethod.__init__(self, data, distance_function)
5562
self.set_linkage_method(linkage)
63+
self.num_processes = num_processes
5664
self.__cluster_created = False
5765

5866
def set_linkage_method(self, method):
@@ -216,7 +224,12 @@ def cluster(self, matrix=None, level=None, sequence=None):
216224
# if the matrix only has two rows left, we are done
217225
while len(matrix) > 2 or matrix == []:
218226

219-
matrix = genmatrix(self._data, self.linkage, True, 0)
227+
item_item_matrix = Matrix(self._data,
228+
self.linkage,
229+
True,
230+
0)
231+
item_item_matrix.genmatrix(self.num_processes)
232+
matrix = item_item_matrix.matrix
220233

221234
smallestpair = None
222235
mindistance = None

cluster/util.py

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -86,71 +86,6 @@ def minkowski_distance(x, y, p=2):
8686
return pow(sum, 1.0 / float(p))
8787

8888

89-
def genmatrix(data, combinfunc, symmetric=False, diagonal=None):
90-
"""
91-
Takes a list of data and generates a 2D-matrix using the supplied
92-
combination function to calculate the values.
93-
94-
PARAMETERS
95-
data - the list of items
96-
combinfunc - the function that is used to calculate teh value in a
97-
cell. It has to cope with two arguments.
98-
symmetric - Whether it will be a symmetric matrix along the diagonal.
99-
For example, if the list contains integers, and the
100-
combination function is abs(x-y), then the matrix will
101-
be symmetric.
102-
Default: False
103-
diagonal - The value to be put into the diagonal. For some
104-
functions, the diagonal will stay constant. An example
105-
could be the function "x-y". Then each diagonal cell
106-
will be "0". If this value is set to None, then the
107-
diagonal will be calculated. Default: None
108-
"""
109-
logger.info("Generating matrix for %s items - O(n^2)", len(data))
110-
matrix = []
111-
for row_index, item in enumerate(data):
112-
logger.debug("Generating row %s/%s (%0.2f)",
113-
row_index,
114-
len(data),
115-
100.0 * row_index / len(data))
116-
row = []
117-
for col_index, item2 in enumerate(data):
118-
if diagonal is not None and col_index == row_index:
119-
# if this is a cell on the diagonal
120-
row.append(diagonal)
121-
elif symmetric and col_index < row_index:
122-
# if the matrix is symmetric and we are "in the lower left
123-
# triangle"
124-
row.append(matrix[col_index][row_index])
125-
else:
126-
# if this cell is not on the diagonal
127-
row.append(combinfunc(item, item2))
128-
matrix.append(row)
129-
logger.info("Matrix generated")
130-
return matrix
131-
132-
133-
def printmatrix(data):
134-
"""
135-
Prints out a 2-dimensional list of data cleanly.
136-
This is useful for debugging.
137-
138-
PARAMETERS
139-
data - the 2D-list to display
140-
"""
141-
# determine maximum length
142-
maxlen = 0
143-
colcount = len(data[0])
144-
for col in data:
145-
for cell in col:
146-
maxlen = max(len(str(cell)), maxlen)
147-
# print data
148-
format = " %%%is |" % maxlen
149-
format = "|" + format * colcount
150-
for row in data:
151-
print(format % tuple(row))
152-
153-
15489
def magnitude(a):
15590
"calculates the magnitude of a vecor"
15691
from math import sqrt

0 commit comments

Comments
 (0)