Skip to content

Commit 93e5e29

Browse files
pevnakjanfrancu
authored andcommitted
working on lecture 10
1 parent be8804a commit 93e5e29

File tree

2 files changed

+262
-12
lines changed

2 files changed

+262
-12
lines changed

docs/src/lecture_10/juliaset_p.jl

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
using Plots, BenchmarkTools, Distributed
1+
using Plots
2+
using BenchmarkTools
3+
using Distributed
4+
using SharedArrays
5+
26
function juliaset_pixel(z₀, c)
37
z = z₀
48
for i in 1:255
@@ -32,8 +36,38 @@ function juliaset_distributed(x, y, partitions = nworkers(), n = 1000)
3236
reduce(hcat, slices)
3337
end
3438

35-
@btime juliaset_distributed(-0.79, 0.15)
39+
# @btime juliaset_distributed(-0.79, 0.15)
3640

3741
# frac = juliaset_distributed(-0.79, 0.15)
3842
# plot(heatmap(1:size(frac,1), 1:size(frac,2), frac, color=:Spectral))
3943

44+
45+
####
46+
# Let's work out the shared array approach
47+
####
48+
function juliaset_column!(img, c, n, j)
49+
x = -2.0 + (j-1)*4.0/(n-1)
50+
for i in 1:n
51+
y = -2.0 + (i-1)*4.0/(n-1)
52+
@inbounds img[i, j] = juliaset_pixel(x+im*y, c)
53+
end
54+
nothing
55+
end
56+
57+
function juliaset_range!(img, c, n, columns)
58+
for j in columns
59+
juliaset_column!(img, c, n, j)
60+
end
61+
nothing
62+
end
63+
64+
function juliaset_shared(x, y, partitions = nworkers(), n = 1000)
65+
c = x + y*im
66+
columns = Iterators.partition(1:n, div(n, partitions))
67+
img = SharedArray{UInt8,2}((n, n))
68+
slices = pmap(cols -> juliaset_range!(img, c, n, cols), columns)
69+
img
70+
end
71+
72+
juliaset_shared(-0.79, 0.15)
73+

docs/src/lecture_10/lecture.md

Lines changed: 226 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,239 @@ Julia offers different levels of parallel programming
55
- SIMD instructions
66
- Task switching.
77

8-
In this lecture, we will focus mainly on the first two, since SIMD instructions are mainly used for optimization of loops, and task switching is not a true paralelism, but allows to run a different task when one task is waiting for example for IO.
8+
In this lecture, we will focus mainly on the first two, since SIMD instructions are mainly used for low-level optimization (such as writing you own very performant BLAS library), and task switching is not a true paralelism, but allows to run a different task when one task is waiting for example for IO.
99

10-
## controller / worker model of parallel processes
11-
- the usual remote fetch call
12-
- solve monte carlo
13-
- pmap
10+
## Process-level paralelism
11+
Process-level paralelism means that Julia runs several compilers in different processes. Compilers *do not share anything by default*, where by anything we mean no libraries, not variables. Everyhing has to be set-up, but Julia offers tooling for remote execution and communication primitives.
1412

15-
- how to set up workers,
16-
+ how to load functions, modules
17-
+ julia -p 16 -L load_my_script.jl
18-
- how to send data / how to define variable on remote process
13+
Julia off-the-shelf supports a mode, where a single *main* process controlls several workers. This main process has `myid() == 1`, worker processes receive higher numbers. Julia can be started with multiple workers from the very beggining, using `-p` switch as
14+
```julia
15+
julia -p n
16+
```
17+
where `n` is the number of workers, or you can add workers after Julia has been started by
18+
```julia
19+
using Distributed
20+
addprocs(4)
21+
```
22+
(when Julia is started with `-p`, `Distributed` library is loaded by default on main worker). You can also remove workers using `rmprocs`. Workers can be on the same physical machines, or on different machines. Julia offer integration via `ClusterManagers.jl` with most schedulling systems.
23+
24+
If you want evaluate piece of code on all workers including main process, a convenience macro `@everywhere` is offered.
25+
```julia
26+
@everywhere @show myid()
27+
```
28+
As we have mentioned, workers are loaded without libraries. We can see that by running
29+
```julia
30+
@everywhere InteractiveUtils.varinfo()
31+
```
32+
which fails, but
33+
```julia
34+
@everywhere begin
35+
using InteractiveUtils
36+
println(InteractiveUtils.varinfo())
37+
end
38+
```
39+
40+
`@everywhere` macro allows us to define function and variables, and import libraries on workers as
41+
```julia
42+
@everywhere begin
43+
foo(x, y) = x * y + sin(y)
44+
foo(x) = foo(x, myid())
45+
end
46+
@everywhere @show foo(1.0)
47+
```
48+
Alternatively, we can put the code into a separate file and load it on all workers using `-L filename.jl`
49+
50+
A real benefit from multi-processing is when we can *schedulle* an execution of a function and return the control immeadiately to do something else. A low-level function providing this functionality is `remotecall(fun, worker_id, args...)`. For example
51+
```julia
52+
@everywhere begin
53+
function delayed_foo(x, y, n )
54+
sleep(n)
55+
foo(x, y)
56+
end
57+
end
58+
r = remotecall(delayed_foo, 2, 1, 1, 60)
59+
```
60+
which terminates immediately. `r` does not contain result of `foo(1, 1)`, but a struct `Future`. The `Future` can be seen as a *handle* allowing to retrieve the result later, using `fetch`, which either fetches the result or wait until the result is available.
61+
```julia
62+
fetch(r) == foo(1, 1)
63+
```
64+
An interesting feature of `fetch` is that it re-throw an exception raised on a different process.
65+
```julia
66+
@everywhere begin
67+
function exfoo()
68+
throw("Exception from $(myid())")
69+
end
70+
end
71+
r = @spawnat 2 exfoo()
72+
```
73+
where `@spawnat` is a an alternative to `remotecall`, which executes a closure around expression (in this case `exfoo()`) on a specified worker (in this case 2). Fetching the result `r` throws an exception on the main process.
74+
```jula
75+
fetch(r)
76+
```
77+
78+
## Example: Julia sets
79+
Our example for explaining mechanisms of distributed computing will be the computation of Julia set fractal. The computation of the fractal can be easily paralelized, since the value of each pixel is independent from the remaining. The example is adapted from [Eric Aubanel](http://www.cs.unb.ca/~aubanel/JuliaMultithreadingNotes.html).
80+
```julia
81+
using Plots
82+
@everywhere begin
83+
function juliaset_pixel(z₀, c)
84+
z = z₀
85+
for i in 1:255
86+
abs2(z)> 4.0 && return (i - 1)%UInt8
87+
z = z*z + c
88+
end
89+
return UInt8(255)
90+
end
91+
92+
function juliaset_column!(img, c, n, colj, j)
93+
x = -2.0 + (j-1)*4.0/(n-1)
94+
for i in 1:n
95+
y = -2.0 + (i-1)*4.0/(n-1)
96+
@inbounds img[i,colj] = juliaset_pixel(x+im*y, c)
97+
end
98+
nothing
99+
end
100+
end
101+
102+
function juliaset(x, y, n=1000)
103+
c = x + y*im
104+
img = Array{UInt8,2}(undef,n,n)
105+
for j in 1:n
106+
juliaset_column!(img, c, n, j, j)
107+
end
108+
return img
109+
end
110+
111+
frac = juliaset(-0.79, 0.15)
112+
plot(heatmap(1:size(frac,1),1:size(frac,2), frac, color=:Spectral))
113+
```
114+
115+
We can split the computation of the whole image into bands, such that each worker computes a smaller portion.
116+
```julia
117+
@everywhere begin
118+
function juliaset_columns(c, n, columns)
119+
img = Array{UInt8,2}(undef, n, length(columns))
120+
for (colj, j) in enumerate(columns)
121+
juliaset_column!(img, c, n, colj, j)
122+
end
123+
img
124+
end
125+
end
126+
127+
function juliaset_spawn(x, y, n = 1000)
128+
c = x + y*im
129+
columns = Iterators.partition(1:n, div(n, nworkers()))
130+
r_bands = [@spawnat w juliaset_columns(c, n, cols) for (w, cols) in enumerate(columns)]
131+
slices = map(fetch, r_bands)
132+
reduce(hcat, slices)
133+
end
134+
```
135+
we observe some speed-up over the serial version, but not linear in terms of number of workers
136+
```julia
137+
julia> @btime juliaset(-0.79, 0.15);
138+
38.699 ms (2 allocations: 976.70 KiB)
139+
140+
julia> @btime juliaset_spawn(-0.79, 0.15);
141+
21.521 ms (480 allocations: 1.93 MiB)
142+
```
143+
In the above example, we spawn one function on each worker and collect the results. In essence, we are performing `map` over bands. Julia offers for this usecase a parallel version of map `pmap`. With that, our example can look like
144+
```julia
145+
function juliaset_pmap(x, y, n = 1000, np = nworkers())
146+
c = x + y*im
147+
columns = Iterators.partition(1:n, div(n, np))
148+
slices = pmap(cols -> juliaset_columns(c, n, cols), columns)
149+
reduce(hcat, slices)
150+
end
151+
152+
julia> @btime juliaset_pmap(-0.79, 0.15);
153+
17.597 ms (451 allocations: 1.93 MiB)
154+
```
155+
which has slightly better timing then the version based on `@spawnat` and `fetch` (as explained below in section about `Threads`, the parallel computation of Julia set suffers from each pixel taking different time to compute, which can be relieved by dividing the work into more parts --- `@btime juliaset_pmap(-0.79, 0.15, 1000, 16);`).
156+
157+
## Synchronization / Communication primitives
158+
The orchestration of a complicated computation might be difficult with relatively low-level remote calls. A Producer / Consumer paradigm is a synchronization paradigm that uses queues. Consumer fetches work intructions from the queue and pushes results to different queue. Julia supports this paradigm with `Channel` and `RemoteChannel` primitives. Importantly, putting to and taking from queue is an atomic operation, hence we do not have take care of race conditions.
159+
The code for the worker might look like
160+
```julia
161+
@everywhere begin
162+
function juliaset_channel_worker(instructions, results)
163+
while isready(instructions)
164+
c, n, cols = take!(instructions)
165+
put!(results, (cols, juliaset_columns(c, n, cols)))
166+
end
167+
end
168+
println("finishing juliaset_channel_worker on ", myid())
169+
end
170+
```
171+
The code for the main will look like
172+
```julia
173+
function juliaset_channels(x, y, n = 1000, np = nworkers())
174+
c = x + y*im
175+
columns = Iterators.partition(1:n, div(n, np))
176+
instructions = RemoteChannel(() -> Channel{Tuple}(np))
177+
foreach(cols -> put!(instructions, (c, n, cols)), columns)
178+
results = RemoteChannel(()->Channel{Tuple}(np))
179+
rfuns = [@spawnat i juliaset_channel_worker(instructions, results) for i in workers()]
180+
181+
img = Array{UInt8,2}(undef, n, n)
182+
while isready(results)
183+
cols, impart = take!(results)
184+
img[:,cols] .= impart;
185+
end
186+
img
187+
end
188+
189+
julia> @btime juliaset_channels(-0.79, 0.15);
190+
254.151 μs (254 allocations: 987.09 KiB)
191+
```
192+
The execution tim is much higher then the we have observed in the previous cases and changing the number of workers do not help much. What went wrong? The reason is that setting up the infrastructure around remote channels is a costly process. Consider the following alternative, where (i) we let workers to run end-lessly and (ii) the channel infrastructure is set-up once and wrapped into an anonymous function
193+
```julia
194+
@everywhere begin
195+
function juliaset_channel_worker(instructions, results)
196+
while true
197+
c, n, cols = take!(instructions)
198+
put!(results, (cols, juliaset_columns(c, n, cols)))
199+
end
200+
end
201+
202+
function juliaset_init(x, y, n = 1000, np = nworkers())
203+
c = x + y*im
204+
columns = Iterators.partition(1:n, div(n, np))
205+
instructions = RemoteChannel(() -> Channel{Tuple}(np))
206+
results = RemoteChannel(()->Channel{Tuple}(np))
207+
foreach(p -> remote_do(juliaset_channel_worker, p, instructions, results), workers())
208+
function compute()
209+
img = Array{UInt8,2}(undef, n, n)
210+
foreach(cols -> put!(instructions, (c, n, cols)), columns)
211+
for i in 1:np
212+
cols, impart = take!(results)
213+
img[:,cols] .= impart;
214+
end
215+
img
216+
end
217+
end
218+
219+
t = juliaset_init(-0.79, 0.15)
220+
julia> @btime t();
221+
17.697 ms (776 allocations: 1.94 MiB)
222+
```
223+
with which we obtain the comparable speed.
224+
Instead of `@spawnat` we can also use `remote_do` as foreach`(p -> remote_do(juliaset_channel_worker, p, instructions, results), workers)`, which executes the function `juliaset_channel_worker` at worker `p` with parameters `instructions` and `results` but does not return handle to receive the future results.
19225

20-
## Synchronization primitives
21226
- Channels and their guarantees
22227
- How to orchestrate workers by channels
23228
- how to kill the remote process with channel
24229

230+
I can send indices of columns that should be calculated on remote processes over the queue, from which they can pick it up and send it back over the channel.
231+
232+
233+
234+
235+
## tooling
236+
- how to set up workers,
237+
+ how to load functions, modules
238+
+ julia -p 16 -L load_my_script.jl
239+
- how to send data / how to define variable on remote process
240+
25241
## Sending data
26242
- Do not send `randn(1000, 1000)`
27243
- Serialization is very time consuming, an efficient converstion to something simple might be wort

0 commit comments

Comments
 (0)