Skip to content

Commit f77cec6

Browse files
committed
Add examples of Python SharedMemory
1 parent fddb0b7 commit f77cec6

File tree

2 files changed

+142
-0
lines changed

2 files changed

+142
-0
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#!/usr/bin/env python
2+
3+
from argparse import ArgumentParser
4+
from collections import Counter
5+
import math
6+
import multiprocessing as mp
7+
from multiprocessing.managers import SharedMemoryManager
8+
import numpy as np
9+
import os
10+
import sys
11+
12+
13+
def init_z(z_buf):
14+
nr_points = math.isqrt(z_buf.size)
15+
x = np.linspace(-1.8, 1.8, nr_points)
16+
y = np.linspace(-1.8j, 1.8j, nr_points)
17+
X, Y = np.meshgrid(x, np.flip(y))
18+
z = X + Y
19+
for i in range(z.shape[0]):
20+
for j in range(z.shape[1]):
21+
z_buf[i*z.shape[1] + j] = z[i, j]
22+
23+
24+
def init_n(n_buf):
25+
for i in range(n_buf.size):
26+
n_buf[i] = 0
27+
28+
29+
def compute_partial_julia(args):
30+
z_shmem, n_shmem, idx_begin, idx_end, max_iters, max_norm = args
31+
z_sizeof = np.dtype(np.complex).itemsize
32+
z = np.ndarray((idx_end - idx_begin, ), dtype=np.complex,
33+
buffer=z_shmem.buf[z_sizeof*idx_begin:z_sizeof*idx_end])
34+
n_sizeof = np.dtype(np.int32).itemsize
35+
n = np.ndarray((idx_end - idx_begin, ), dtype=np.int32,
36+
buffer=n_shmem.buf[n_sizeof*idx_begin:n_sizeof*idx_end])
37+
for i, z_value in enumerate(z):
38+
while (n[i] <= max_iters and np.abs(z_value) <= max_norm):
39+
z_value = z_value**2 - 0.622772 + 0.42193j
40+
n[i] += 1
41+
return os.getpid()
42+
43+
44+
def compute_julia(nr_points=100, pool_size=2, work_size=15, verbose=False,
45+
max_iters=255, max_norm=2.0):
46+
size = nr_points**2
47+
with SharedMemoryManager() as shmem_mgr:
48+
with mp.Pool(pool_size) as pool:
49+
z_shmem = shmem_mgr.SharedMemory(size=16*size)
50+
z_buf = np.ndarray((size, ), dtype=np.complex, buffer=z_shmem.buf)
51+
init_z(z_buf)
52+
n_shmem = shmem_mgr.SharedMemory(size=4*size)
53+
n_buf = np.ndarray((size, ), dtype=np.int32, buffer=n_shmem.buf)
54+
init_n(n_buf)
55+
args = [(z_shmem, n_shmem, i*work_size, min(z_buf.size, (i + 1)*work_size),
56+
max_iters, max_norm)
57+
for i in range(int(np.ceil(z_buf.size/work_size)))]
58+
if verbose:
59+
print(args, file=sys.stderr)
60+
pid_counter = Counter()
61+
for pid in pool.imap_unordered(compute_partial_julia, args):
62+
pid_counter[pid] += 1
63+
if verbose:
64+
print(pid_counter, file=sys.stderr)
65+
return n_buf.copy().reshape(nr_points, nr_points)
66+
67+
68+
def main():
69+
arg_parser = ArgumentParser(description='compute pi')
70+
arg_parser.add_argument('--pool_size', type=int, default=2, help='pool size')
71+
arg_parser.add_argument('--work_size', type=int, default=10,
72+
help='number of points per work item')
73+
arg_parser.add_argument('--nr_points', type=int, default=10,
74+
help='size of the image n x n')
75+
arg_parser.add_argument('--verbose', action='store_true', help='verbose output')
76+
options = arg_parser.parse_args()
77+
constructor = None
78+
n = compute_julia(nr_points=options.nr_points, pool_size=options.pool_size,
79+
work_size=options.work_size, verbose=options.verbose)
80+
np.savetxt(sys.stdout, n, fmt='%3d')
81+
return 0
82+
83+
if __name__ == '__main__':
84+
status = main()
85+
sys.exit(status)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#!/usr/bin/env python
2+
3+
from argparse import ArgumentParser
4+
from multiprocessing import Pool
5+
from multiprocessing.managers import SharedMemoryManager
6+
import numpy as np
7+
import os
8+
import sys
9+
10+
def increment(args):
11+
shmem, dtype, incr, i_min, i_max = args
12+
t_size = np.dtype(dtype).itemsize
13+
data = np.ndarray((i_max - i_min, ), dtype=dtype,
14+
buffer=shmem.buf[t_size*i_min:t_size*i_max])
15+
for i in range(data.size):
16+
data[i] *= incr
17+
return os.getpid(), incr
18+
19+
20+
def compute(array_size, pool_size, chunk_size, verbose=False):
21+
with SharedMemoryManager() as shmem_manager:
22+
with Pool(pool_size) as pool:
23+
dtype = np.int32
24+
t_size = np.dtype(dtype).itemsize
25+
shmem_data = shmem_manager.SharedMemory(size=t_size*array_size**2)
26+
data = np.ndarray((array_size, array_size), dtype=dtype,
27+
buffer=shmem_data.buf)
28+
for i in range(data.shape[0]):
29+
for j in range(data.shape[1]):
30+
data[i, j] = i*array_size + j
31+
args = [(shmem_data, np.int32, i + 1, i*chunk_size,
32+
min((i + 1)*chunk_size, data.size))
33+
for i in range(int(np.ceil(data.size/chunk_size)))]
34+
for result in pool.imap_unordered(increment, args):
35+
if verbose:
36+
print(result)
37+
return data.copy()
38+
39+
if __name__ == '__main__':
40+
arg_parser = ArgumentParser(description='illustrating shared memory')
41+
arg_parser.add_argument('--pool_size', type=int, default=2,
42+
help='pool size')
43+
arg_parser.add_argument('--array_size', type=int, default=10,
44+
help='array size')
45+
arg_parser.add_argument('--chunk_size', type=int, default=10,
46+
help='chunk size')
47+
arg_parser.add_argument('--sum_only', action='store_true',
48+
help='only desplay sum of array elements')
49+
arg_parser.add_argument('--verbose', action='store_true',
50+
help='verbose output')
51+
options = arg_parser.parse_args()
52+
data = compute(array_size=options.array_size, pool_size=options.pool_size,
53+
chunk_size=options.chunk_size, verbose=options.verbose)
54+
if options.sum_only:
55+
print(data.sum())
56+
else:
57+
np.savetxt(sys.stdout,data, fmt='%5d')

0 commit comments

Comments
 (0)