Skip to content

Commit 1054489

Browse files
authored
Merge pull request #339 from realpython/python-split-list
Materials for How to Split a Python List
2 parents 51ae8a1 + d3b9bf5 commit 1054489

File tree

5 files changed

+419
-0
lines changed

5 files changed

+419
-0
lines changed

python-split-list/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# How to Split a Python List or Iterable Into Chunks
2+
3+
This folder holds sample code that supplements the Real Python tutorial on [How to Split a Python List or Iterable Into Chunks](https://realpython.com/how-to-split-a-python-list-into-chunks/).

python-split-list/parallel_demo.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
"""
2+
Synthesize an image in chunks using parallel workers.
3+
4+
Usage:
5+
$ python parallel_demo.py
6+
"""
7+
8+
import functools
9+
import multiprocessing
10+
import time
11+
from dataclasses import dataclass
12+
from math import log
13+
from os import cpu_count
14+
from typing import Callable, Iterable, Iterator
15+
16+
import numpy as np
17+
from PIL import Image
18+
19+
from spatial_splitting import Bounds, split_multi
20+
21+
IMAGE_WIDTH, IMAGE_HEIGHT = 1920, 1080
22+
CENTER = -0.7435 + 0.1314j
23+
SCALE = 0.0000015
24+
MAX_ITERATIONS = 256
25+
ESCAPE_RADIUS = 1000
26+
NUM_CHUNKS = cpu_count() or 4
27+
28+
29+
class Chunk:
30+
"""A chunk of the image to be computed and rendered."""
31+
32+
def __init__(self, bounds: Bounds) -> None:
33+
self.bounds = bounds
34+
self.height = bounds.size[0]
35+
self.width = bounds.size[1]
36+
self.pixels = np.zeros((self.height, self.width), dtype=np.uint8)
37+
38+
def __getitem__(self, coordinates: tuple[int, int]) -> int:
39+
return self.pixels[self.bounds.offset(*coordinates)]
40+
41+
def __setitem__(self, coordinates: tuple[int, int], value: int) -> None:
42+
self.pixels[self.bounds.offset(*coordinates)] = value
43+
44+
45+
@dataclass
46+
class MandelbrotSet:
47+
max_iterations: int
48+
escape_radius: float = 2.0
49+
50+
def __contains__(self, c):
51+
return self.stability(c) == 1
52+
53+
def stability(self, c, smooth=False, clamp=True):
54+
value = self.escape_count(c, smooth) / self.max_iterations
55+
return max(0.0, min(value, 1.0)) if clamp else value
56+
57+
def escape_count(self, c, smooth=False):
58+
z = 0 + 0j
59+
for iteration in range(self.max_iterations):
60+
z = z**2 + c
61+
if abs(z) > self.escape_radius:
62+
if smooth:
63+
return iteration + 1 - log(log(abs(z))) / log(2)
64+
return iteration
65+
return self.max_iterations
66+
67+
68+
def transform(y: int, x: int) -> complex:
69+
"""Transform the given pixel coordinates to the complex plane."""
70+
im = SCALE * (IMAGE_HEIGHT / 2 - y)
71+
re = SCALE * (x - IMAGE_WIDTH / 2)
72+
return complex(re, im) + CENTER
73+
74+
75+
def generate_chunk(bounds: Bounds) -> Chunk:
76+
"""Generate a chunk of pixels for the given bounds."""
77+
chunk = Chunk(bounds)
78+
mandelbrot_set = MandelbrotSet(MAX_ITERATIONS, ESCAPE_RADIUS)
79+
for y, x in bounds:
80+
c = transform(y, x)
81+
instability = 1 - mandelbrot_set.stability(c, smooth=True)
82+
chunk[y, x] = int(instability * 255)
83+
return chunk
84+
85+
86+
def combine(chunks: Iterable[Chunk]) -> Image.Image:
87+
"""Combine the chunks into a single image."""
88+
pixels = np.zeros((IMAGE_HEIGHT, IMAGE_WIDTH), dtype=np.uint8)
89+
for chunk in chunks:
90+
pixels[chunk.bounds.slices()] = chunk.pixels
91+
return Image.fromarray(pixels, mode="L")
92+
93+
94+
def timed(function: Callable) -> Callable:
95+
@functools.wraps(function)
96+
def wrapper(*args, **kwargs):
97+
start = time.perf_counter()
98+
result = function(*args, **kwargs)
99+
end = time.perf_counter()
100+
print(f"{function.__name__}() took {end - start:.2f} seconds")
101+
return result
102+
103+
return wrapper
104+
105+
106+
def process_sequentially(bounds_iter: Iterator[Bounds]) -> Iterator[Chunk]:
107+
return map(generate_chunk, bounds_iter)
108+
109+
110+
def process_in_parallel(bounds_iter: Iterator[Bounds]) -> list[Chunk]:
111+
with multiprocessing.Pool() as pool:
112+
return pool.map(generate_chunk, bounds_iter)
113+
114+
115+
@timed
116+
def compute(worker: Callable) -> Image.Image:
117+
return combine(worker(split_multi(NUM_CHUNKS, IMAGE_HEIGHT, IMAGE_WIDTH)))
118+
119+
120+
def main() -> None:
121+
for worker in (process_sequentially, process_in_parallel):
122+
compute(worker).show()
123+
124+
125+
if __name__ == "__main__":
126+
main()

python-split-list/requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Pillow==9.4.0
2+
more-itertools==9.0.0
3+
numpy==1.24.1
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
"""
2+
Split in multiple dimensions.
3+
"""
4+
5+
from dataclasses import dataclass
6+
from functools import cached_property
7+
from itertools import combinations_with_replacement, product, starmap
8+
from math import floor, prod, sqrt
9+
from typing import Iterator
10+
11+
12+
@dataclass
13+
class Bounds:
14+
"""Spatial bounds defined in multiple dimensions."""
15+
16+
start_point: tuple[int, ...]
17+
end_point: tuple[int, ...]
18+
19+
def __post_init__(self) -> None:
20+
"""Validate the bounds."""
21+
assert len(self.start_point) == len(self.end_point)
22+
assert self.start_point < self.end_point
23+
24+
def __len__(self):
25+
"""Get the number of points in the bounds (length, area, or volume).
26+
27+
Example:
28+
>>> bounds = Bounds((50, 25), (150, 100))
29+
>>> len(bounds)
30+
7500
31+
"""
32+
return prod(
33+
map(lambda x: x[1] - x[0], zip(self.start_point, self.end_point))
34+
)
35+
36+
def __iter__(self) -> Iterator[tuple[int, ...]]:
37+
"""Iterate over the bounds, yielding each point like in an odometer.
38+
39+
Example:
40+
>>> bounds = Bounds((50, 25), (150, 100))
41+
>>> for x, y in bounds:
42+
... print(x, y) # doctest: +ELLIPSIS
43+
50 25
44+
50 26
45+
50 27
46+
...
47+
149 97
48+
149 98
49+
149 99
50+
"""
51+
return product(*starmap(range, zip(self.start_point, self.end_point)))
52+
53+
def slices(self) -> tuple[slice, ...]:
54+
"""Return the slice for each dimension.
55+
56+
Example:
57+
>>> bounds = Bounds((50, 25), (150, 100))
58+
>>> bounds.slices()
59+
(slice(50, 150, None), slice(25, 100, None))
60+
"""
61+
return tuple(
62+
slice(start, end)
63+
for start, end in zip(self.start_point, self.end_point)
64+
)
65+
66+
@cached_property
67+
def size(self) -> tuple[int, ...]:
68+
"""Return the size of the bounds in each dimension.
69+
70+
Example:
71+
>>> bounds = Bounds((50, 25), (150, 100))
72+
>>> width, height = bounds.size
73+
>>> width
74+
100
75+
>>> height
76+
75
77+
"""
78+
return tuple(
79+
self.end_point[i] - self.start_point[i]
80+
for i in range(self.num_dimensions)
81+
)
82+
83+
@cached_property
84+
def num_dimensions(self) -> int:
85+
"""Return the number of dimensions.
86+
87+
Example:
88+
>>> bounds = Bounds((50, 25), (150, 100))
89+
>>> bounds.num_dimensions
90+
2
91+
"""
92+
return len(self.start_point)
93+
94+
def offset(self, *coordinates):
95+
"""Return the offset of the given coordinates from the start point.
96+
97+
Example:
98+
>>> bounds = Bounds((50, 25), (150, 100))
99+
>>> for x, y in bounds:
100+
... print(bounds.offset(x, y)) # doctest: +ELLIPSIS
101+
(0, 0)
102+
(0, 1)
103+
(0, 2)
104+
...
105+
(99, 72)
106+
(99, 73)
107+
(99, 74)
108+
"""
109+
return tuple(
110+
coordinates[i] - self.start_point[i]
111+
for i in range(self.num_dimensions)
112+
)
113+
114+
115+
def split_multi(num_chunks: int, *dimensions: int) -> Iterator[Bounds]:
116+
"""Return a sequence of n-dimensional slices."""
117+
num_chunks_along_axis = find_most_even(num_chunks, len(dimensions))
118+
for slices_by_dimension in product(
119+
*starmap(get_slices, zip(dimensions, num_chunks_along_axis))
120+
):
121+
yield Bounds(
122+
start_point=tuple(s.start for s in slices_by_dimension),
123+
end_point=tuple(s.stop for s in slices_by_dimension),
124+
)
125+
126+
127+
def get_slices(length: int, num_chunks: int) -> Iterator[slice]:
128+
"""Return a sequence of slices for the given length."""
129+
chunk_size, remaining = divmod(length, num_chunks)
130+
for i in range(num_chunks):
131+
begin = i * chunk_size + min(i, remaining)
132+
end = (i + 1) * chunk_size + min(i + 1, remaining)
133+
yield slice(begin, end)
134+
135+
136+
def find_most_even(number: int, num_factors: int):
137+
"""Return the most even tuple of integer divisors of a number."""
138+
products_by_sum = {
139+
sum(products): products
140+
for products in find_products(number, num_factors)
141+
}
142+
return products_by_sum[min(products_by_sum)]
143+
144+
145+
def find_products(number: int, num_factors: int) -> Iterator[tuple[int, ...]]:
146+
"""Return all possible products of a number."""
147+
divisors = find_divisors(number)
148+
for factors in combinations_with_replacement(divisors, num_factors):
149+
if prod(factors) == number:
150+
yield factors
151+
152+
153+
def find_divisors(number: int) -> set[int]:
154+
"""Return unique integer divisors of a number."""
155+
divisors = {1, number}
156+
for divisor in range(2, floor(sqrt(number)) + 1):
157+
factor, remainder = divmod(number, divisor)
158+
if remainder == 0:
159+
divisors.add(divisor)
160+
divisors.add(factor)
161+
return divisors
162+
163+
164+
if __name__ == "__main__":
165+
import doctest
166+
167+
doctest.testmod()

0 commit comments

Comments
 (0)