Skip to content

Commit b14fed0

Browse files
committed
Multiprocessing for genmatrix()
1 parent f7ca4ba commit b14fed0

File tree

4 files changed

+90
-13
lines changed

4 files changed

+90
-13
lines changed

CHANGELOG

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
1.3.0
2+
- Multiprocessing for genmatrix
3+
14
1.2.0
25
- Split up one big file into smaller more logical sub-modules
36

cluster/method/hierarchical.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ def single_linkage_distance(self, x, y):
193193

194194
return mindist
195195

196-
def cluster(self, matrix=None, level=None, sequence=None):
196+
def cluster(self, matrix=None, level=None, sequence=None, num_processes=None):
197197
"""
198198
Perform hierarchical clustering. This method is automatically called
199199
by the constructor so you should not need to call it explicitly.
@@ -204,6 +204,11 @@ def cluster(self, matrix=None, level=None, sequence=None):
204204
other
205205
level - The current level of clustering
206206
sequence - The sequence number of the clustering
207+
num_processes
208+
- If you want to use multiprocessing to split up the work
209+
and run genmatrix() in parallel, specify num_processes
210+
> 1 and this number of workers will be spun up, the work
211+
split up amongst them evenly. Default: 1
207212
"""
208213
logger.info("Performing cluster()")
209214

@@ -216,7 +221,7 @@ def cluster(self, matrix=None, level=None, sequence=None):
216221
# if the matrix only has two rows left, we are done
217222
while len(matrix) > 2 or matrix == []:
218223

219-
matrix = genmatrix(self._data, self.linkage, True, 0)
224+
matrix = genmatrix(self._data, self.linkage, True, 0, num_processes)
220225

221226
smallestpair = None
222227
mindistance = None

cluster/util.py

Lines changed: 79 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#
1717

1818
import logging
19+
from multiprocessing import Process, Queue, current_process
1920

2021

2122
logger = logging.getLogger(__name__)
@@ -85,7 +86,7 @@ def minkowski_distance(x, y, p=2):
8586
return pow(sum, 1.0 / float(p))
8687

8788

88-
def genmatrix(data, combinfunc, symmetric=False, diagonal=None):
89+
def genmatrix(data, combinfunc, symmetric=False, diagonal=None, num_processes=1):
8990
"""
9091
Takes a list of data and generates a 2D-matrix using the supplied
9192
combination function to calculate the values.
@@ -104,27 +105,95 @@ def genmatrix(data, combinfunc, symmetric=False, diagonal=None):
104105
could be the function "x-y". Then each diagonal cell
105106
will be "0". If this value is set to None, then the
106107
diagonal will be calculated. Default: None
108+
num_processes
109+
- If you want to use multiprocessing to split up the work
110+
and run combinfunc() in parallel, specify num_processes
111+
> 1 and this number of workers will be spun up, the work
112+
split up amongst them evenly. Default: 1
107113
"""
108114
logger.info("Generating matrix for %s items - O(n^2)", len(data))
115+
use_multiprocessing = num_processes > 1
116+
if use_multiprocessing:
117+
logger.info("Using multiprocessing on %s processes!", num_processes)
118+
109119
matrix = []
120+
task_queue = Queue()
121+
done_queue = Queue()
122+
123+
def worker():
124+
"""Multiprocessing task function run by worker processes
125+
"""
126+
tasks_completed = 0
127+
for task in iter(task_queue.get, 'STOP'):
128+
col_index, item, item2 = task
129+
result = (col_index, combinfunc(item, item2))
130+
done_queue.put(result)
131+
tasks_completed += 1
132+
logger.info("Worker %s performed %s tasks",
133+
current_process().name,
134+
tasks_completed)
135+
136+
if use_multiprocessing:
137+
logger.info("Spinning up %s workers", num_processes)
138+
processes = [Process(target=worker) for i in xrange(num_processes)]
139+
[process.start() for process in processes]
140+
110141
for row_index, item in enumerate(data):
111142
logger.debug("Generating row %s/%s (%0.2f)",
112143
row_index,
113144
len(data),
114145
100.0 * row_index / len(data))
115-
row = []
146+
row = {}
147+
if use_multiprocessing:
148+
num_tasks_queued = num_tasks_completed = 0
116149
for col_index, item2 in enumerate(data):
117150
if diagonal is not None and col_index == row_index:
118-
# if this is a cell on the diagonal
119-
row.append(diagonal)
151+
# This is a cell on the diagonal
152+
row[col_index] = diagonal
120153
elif symmetric and col_index < row_index:
121-
# if the matrix is symmetric and we are "in the lower left
122-
# triangle"
123-
row.append(matrix[col_index][row_index])
154+
# The matrix is symmetric and we are "in the lower left
155+
# triangle" - fill this in after (in case of multiprocessing)
156+
pass
157+
# Otherwise, this cell is not on the diagonal and we do indeed
158+
# need to call combinfunc()
159+
elif use_multiprocessing:
160+
# Add that thing to the task queue!
161+
task_queue.put((col_index, item, item2))
162+
num_tasks_queued += 1
163+
# Start grabbing the results as we go, so as not to stuff all of
164+
# the worker args into memory at once (as Queue.get() is a
165+
# blocking operation)
166+
if num_tasks_queued > num_processes:
167+
col_index, result = done_queue.get()
168+
row[col_index] = result
169+
num_tasks_completed += 1
124170
else:
125-
# if this cell is not on the diagonal
126-
row.append(combinfunc(item, item2))
127-
matrix.append(row)
171+
# Otherwise do it here, in line
172+
row[col_index] = combinfunc(item, item2)
173+
174+
if symmetric:
175+
# One more iteration to get symmetric lower left triangle
176+
for col_index, item2 in enumerate(data):
177+
if col_index >= row_index:
178+
break
179+
# post-process symmetric "lower left triangle"
180+
row[col_index] = matrix[col_index][row_index]
181+
182+
if use_multiprocessing:
183+
# Grab the remaining worker task results
184+
while num_tasks_completed < num_tasks_queued:
185+
col_index, result = done_queue.get()
186+
row[col_index] = result
187+
num_tasks_completed += 1
188+
189+
row_indexed = [row[index] for index in xrange(len(data))]
190+
matrix.append(row_indexed)
191+
192+
if use_multiprocessing:
193+
logger.info("Stopping/joining %s workers", num_processes)
194+
[task_queue.put('STOP') for i in xrange(num_processes)]
195+
[process.join() for process in processes]
196+
128197
logger.info("Matrix generated")
129198
return matrix
130199

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
setup(
1212
name='cluster',
13-
version='1.2.0',
13+
version='1.3.0',
1414
author='Michel Albert',
1515
author_email='[email protected]',
1616
url='http://python-cluster.sourceforge.net/',

0 commit comments

Comments
 (0)