-
Notifications
You must be signed in to change notification settings - Fork 32
Parallel weight generation
J.L Stevens edited this page Jul 6, 2014
·
13 revisions
-
mp.Queueis hopeless - pickling time for array transfer dominates to the point where weight generation is slower using four cores! - The only approach that can give any speed-up is to used shared-memory Arrays. In addition, the only way to quickly convert from
Arrayobjects to numpy arrays is to use the buffer interface.
- Simplest example of a speed-up using shared memory arrays with numpy:
import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool, Array
size = 1700 # Number of 'CFs'
arrays = []
# Note, these Arrays must be globally accessible!
for i in range(size):
arrays.append(
Array('d', [np.pi * i for i in range(10000)], lock=False))
def parallel_section(i):
arr = np.frombuffer(arrays[i]) # Select ith 'CF'
for _ in range(10):
arr *= np.random.rand(*arr.shape)
if __name__ == '__main__':
print "%d 'CFs' allocated in memory. " % len(arrays)
parallel = False
if parallel:
pool = Pool(4)
print " %s seconds" % timeit(lambda: pool.map(parallel_section, range(size)), number=20)
else:
print "%s seconds" % timeit(lambda: map(parallel_section, range(size)), number=20)
print np.frombuffer(arrays[0])[:10]Single-threaded: 50.6 seconds per loop.
Parallel: 25 seconds per loop.
- Can we now make it work with imagen patterns? Yup. Here is how:
import external
from imagen.random import UniformRandom
import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool, Array
size = 1700 # Number of 'CFs'
arrays = []
# Note, these Arrays must be globally accessible!
for i in range(size):
arrays.append(
Array('d', [1.0 for i in range(10000)], lock=False))
def parallel_section(i):
arr = np.frombuffer(arrays[i]) # Select ith 'CF'
for _ in range(10):
arr *= np.random.rand(*arr.shape)
def parallel_pattern(i):
arr = np.frombuffer(arrays[i]) # Select ith 'CF'
reshaped = arr.reshape((100,100))
reshaped *= UniformRandom(xdensity=100, ydensity=100)()
if __name__ == '__main__':
print "%d 'CFs' allocated in memory. " % len(arrays)
parallel_fn = parallel_pattern
parallel = False#True
if parallel:
pool = Pool(4)
print " %s seconds" % timeit(lambda: pool.map(parallel_fn, range(size)), number=20)
else:
print "%s seconds" % timeit(lambda: map(parallel_fn, range(size)), number=20)
print np.frombuffer(arrays[0])[:10]Single-threaded: 27.4 seconds per loop.
Parallel: 13.7 seconds per loop.
- Now here is the problem - we want to avoid pickling and use shared memory. This means we just want the weights to be computed in parallel but create the
ConnectionFieldobjects normally. Unfortunately, I can't figure out how to get everything to share the same buffer as follows:
import external
from imagen.random import UniformRandom
import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool, Array
size = 1700 # Number of 'CFs'
arrays = []
cf_arrays = []
# Test by switching to np.zeros and checking sum.
create_array = np.ones
# Note, these Arrays must be globally accessible!
for i in range(size):
arr = create_array((100,100)) #
cf_arrays.append(arr)
arrays.append(
Array('B', bytearray(arr.data), lock=False))
def parallel_pattern(i):
arr = np.frombuffer(arrays[i]) # Select ith 'CF'
reshaped = arr.reshape((100,100))
reshaped *= UniformRandom(xdensity=100, ydensity=100)()
if __name__ == '__main__':
print "%d 'CFs' allocated in memory. " % len(arrays)
parallel_fn = parallel_pattern
parallel = True
if parallel:
pool = Pool(4)
print " %s seconds" % timeit(lambda: pool.map(parallel_fn, range(size)), number=20)
else:
print "%s seconds" % timeit(lambda: map(parallel_fn, range(size)), number=20)
print cf_arrays[0].sum(), np.frombuffer(arrays[0]).sum() # 10000, 0.00... WRONG!- The best solution is probably outlined as follows:
import external
from imagen.random import UniformRandom
import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool, Array
size = 1700 # Number of 'CFs'
arrays = []
# Test by switching to np.zeros and checking sum.
create_array = np.ones
# Note, these Arrays must be globally accessible!
for i in range(size):
# Shape computed as per normal inside a CF i.e
# bounds = input_sheet_slice.compute_bounds(input_sheet)
# shape = SheetCoordinateSystem(bounds, xdensity, ydensity).shape
arrays.append(
Array('B', bytearray(create_array((100,100))), lock=False))
def parallel_pattern(i): # Essentially the __init__ of a CF for computing the weights.
arr = np.frombuffer(arrays[i]) # Select ith 'CF'
reshaped = arr.reshape((100,100))
reshaped *= UniformRandom(xdensity=100, ydensity=100)()
if __name__ == '__main__':
print "%d 'CFs' allocated in memory. " % len(arrays)
parallel_fn = parallel_pattern
parallel = True
if parallel:
pool = Pool(4)
print " %s seconds" % timeit(lambda: pool.map(parallel_fn, range(size)), number=20)
else:
print "%s seconds" % timeit(lambda: map(parallel_fn, range(size)), number=20)
print np.frombuffer(arrays[0]).sum() # This is used to set the weights inside the CF quickly...- One issue is that
ConnectionFieldshaveself.maskand the same mask is used to generate the weight pattern. The simplest solution is probably to pre-compute the masks before the patterns are generated so that they are available during generation (and then they can be set toself.maskusingnp.frombuffer). - The problem is that although
Arraytonp.arrayis trivial via the buffer interface, I haven't figured out the inverse yet (np.arraytoArray). - The only thing that works seems to be to create a vanilla Python array first and get the numpy array to share the buffer:
>>> import array
>>> import numpy as np
>>> a = array.array('d', [1,2,3,4,5,6])
>>> b = np.frombuffer(a)
>>> b
array([ 1., 2., 3., 4., 5., 6.])
>>> c = b.reshape(2,3)
>>> c
array([[ 1., 2., 3.],
[ 4., 5., 6.]])
>>> c *= 3
>>> a
array('d', [3.0, 6.0, 9.0, 12.0, 15.0, 18.0])
>>> a[0] = 42.0
c
array([[ 42., 6., 9.],
[ 12., 15., 18.]])- Pull out code that computes the shape of the CF (e.g make
get_boundsinto a classmethod). - I.e.
SheetCoordinateSystem(ConnectionField.get_bounds(input_sheet), xdensity, ydensity).shape - Use these shapes to initialized
multiprocessing.Arrayinstances into two lists: one for mask arrays and one for the weight arrays themselves. - Pull out weight and mask generation into a function to be parallelized, writing to the shared arrays as appropriate.
- (Additional wrinkle)
ConnectionFieldoutput functions need to be transferred to the pattern. - Change
ConnectionFieldconstructor to accept buffers for the weight and mask arrays.