Skip to content

Commit ffcb550

Browse files
committed
Lab10: File processing with distributed.
1 parent 6b00d5b commit ffcb550

File tree

4 files changed

+309
-0
lines changed

4 files changed

+309
-0
lines changed
171 KB
Loading

docs/src/lecture_10/hw.md

Whitespace-only changes.

docs/src/lecture_10/lab.md

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
# [Lab 10: Parallel computing](@id parallel_lab)
2+
3+
# Introduction
4+
- We don't think in parallel
5+
- We learn to write and reason about programs serially
6+
- The desire for parallelism often comes _after_ you've written your algorithm (and found it too slow!)
7+
- We are always on the lookout for the end of Moore's Law, but so far the transistor count stays on the exponential trend.
8+
- The number of cores is increasing
9+
- CPUs' complexity is increasing with the addition of more sophisticated branch predictors
10+
![42-cpu-trend](./42-years-processor-trend.png)
11+
Image source[^1]
12+
13+
[^1]: Performance metrics trend of CPUs in the last 42years: [https://www.karlrupp.net/2018/02/42-years-of-microprocessor-trend-data/](https://www.karlrupp.net/2018/02/42-years-of-microprocessor-trend-data/)
14+
15+
16+
!!! warning "Shortcomings of parallelism"
17+
Parallel computing brings its own set of problems and not an insignificant overhead with data manipulation and communication, therefore try always to optimize your serial code as much as you can before advancing to parallel acceleration.
18+
- parallel programs are hard to debug
19+
20+
## Distributed/multi-processing
21+
Wrapping our heads around having multiple running Julia processes
22+
- how to start
23+
- how to load code
24+
- how to send code there and back again
25+
- convenience pkgs
26+
27+
There are two ways how to start multiple julia processes
28+
- by adding processes using cmd line argument `-p ##`
29+
```bash
30+
julia -p 4
31+
```
32+
- by adding processes after startup using the `addprocs(##)` function from std library `Distributed`
33+
```julia
34+
using Distributed
35+
addprocs(4) # returns a list of ids of individual processes
36+
```
37+
38+
The result shown in a process manager such as `htop`:
39+
```
40+
.../julia-1.6.2/bin/julia --project
41+
.../julia-1.6.2/bin/julia -Cnative -J/home/honza/Apps/julia-1.6.2/lib/julia/sys.so -g1 --bind-to 127.0.0.1 --worker
42+
.../julia-1.6.2/bin/julia -Cnative -J/home/honza/Apps/julia-1.6.2/lib/julia/sys.so -g1 --bind-to 127.0.0.1 --worker
43+
.../julia-1.6.2/bin/julia -Cnative -J/home/honza/Apps/julia-1.6.2/lib/julia/sys.so -g1 --bind-to 127.0.0.1 --worker
44+
.../julia-1.6.2/bin/julia -Cnative -J/home/honza/Apps/julia-1.6.2/lib/julia/sys.so -g1 --bind-to 127.0.0.1 --worker
45+
```
46+
47+
Both of these result in a running of 5 processes in total - 1 controller, 4 workers - with their respective ids accessible via `myid()` function call. Note that the controller process has always id 1 and other processes are assigned subsequent integers, see for yourself with `@everywhere` macro, which runs easily code on all or a subset of processes.
48+
```julia
49+
@everywhere println(myid())
50+
@everywhere [2,3] println(myid()) # select a subset of workers
51+
```
52+
53+
As we have seen from the `htop/top` output, added processes start with specific cmd line arguments, however they are not shared with any aliases that we may have defined, e.g. `julia` ~ `julia --project=.`. Therefore in order to use an environment, we have to first activate it on all processes
54+
```julia
55+
@everywhere begin
56+
using Pkg; Pkg.activate(@__DIR__) # @__DIR__ equivalent to a call to pwd()
57+
Pkg.instantiate(); Pkg.precompile() # this should not be necessary to call everywhere
58+
end
59+
```
60+
We can load files on all processes `-L` has to include
61+
62+
There are generally two ways of working with multiple processes
63+
- using low level functionality - we specify what/where is loaded, what/where is being run and when we fetch results
64+
+ `@everywhere` macro
65+
+ `@spawnat` macro
66+
+ `fetch` function
67+
+ `myid` function
68+
69+
- using high level functionality - define only simple functions and apply them on distributed data structures
70+
+ `DistributedArrays`' `DArray`s
71+
+ `pmap`
72+
73+
### Sum with processes
74+
```@raw html
75+
<div class="admonition is-category-exercise">
76+
<header class="admonition-header">Exercise</header>
77+
<div class="admonition-body">
78+
```
79+
Write an aggregated sum with multiprocessing/distributed.
80+
- without `DistributedArrays`
81+
- with `DistributedArrays`
82+
83+
**HINTS**:
84+
- for base functionality `DistributedArrays`' functionality of distributing preexisting array (`distribute` function) works well
85+
- you have to consider option that the array may not fit into memory
86+
87+
```@raw html
88+
</div></div>
89+
<details class = "solution-body">
90+
<summary class = "solution-header">Solution:</summary><p>
91+
```
92+
93+
```julia
94+
using BenchmarkTools
95+
a = rand(10^7);
96+
using Distributed
97+
# using DistributedArrays
98+
addprocs(4)
99+
@everywhere using DistributedArrays
100+
adist = distribute(a)
101+
j_bench_base = @benchmark sum($a)
102+
j_bench_dist = @benchmark sum($adist)
103+
```
104+
105+
```julia
106+
function mysum_dist(a::DArray)
107+
r = Array{Future}(undef, length(procs(a)))
108+
for (i, id) in enumerate(procs(a))
109+
r[i] = @spawnat id sum(localpart(a))
110+
end
111+
return sum(fetch.(r))
112+
end
113+
j_bench_hand_dist = @benchmark mysum_dist($adist)
114+
```
115+
116+
117+
118+
```@raw html
119+
</p></details>
120+
```
121+
122+
### Distributed file processing
123+
`Distributed` is often used in processing of files, such as the commonly encountered `mapreduce` jobs with technologies like [`Hadoop`](https://hadoop.apache.org/), [`Spark`](http://spark.apache.org/), where the files live on a distributed file system and a typical job requires us to map over all the files and gather some statistics such as histograms, sums and others. We will simulate this situation with the Julia's pkg codebase, which on a typical user installation can contain up to hundreds of thousand of `.jl` files (depending on how extensively one uses Julia).
124+
125+
```@raw html
126+
<div class="admonition is-category-exercise">
127+
<header class="admonition-header">Exercise</header>
128+
<div class="admonition-body">
129+
```
130+
131+
Write a distributed pipeline for computing a histogram of symbols found in AST by parsing Julia source files in your `.julia/packages/` directory. We have already implemented most of the code that you will need (available as source code [here](source code)). **TODO**
132+
133+
Your job is to write the `map` and `reduce` steps, that will gather the dictionaries from different workers. There are two ways to map
134+
- either over directories inside `.julia/packages/`
135+
- or over all files obtained by concatenation of `filter_jl` outputs (*NOTE* that this might not be possible if the listing itself is expensive - speed or memory requirements)
136+
Measure if the speed up scales linearly with the number of processes by restricting the number of workers inside a pmap.
137+
138+
**HINTS**:
139+
- either load `./pkg_processing.jl` on startup with `-L` and `-p` options or `include("./pkg_processing.jl")` inside `@everywhere`
140+
- try writing sequential version first
141+
- use `pmap` to easily iterate in parallel over a collection - the result should be an array of histogram, which has to be merged on the controller node
142+
143+
**BONUS**:
144+
What is the most frequent symbol in your codebase?
145+
146+
```@raw html
147+
</div></div>
148+
<details class = "solution-body">
149+
<summary class = "solution-header">Solution:</summary><p>
150+
```
151+
152+
Let's implement first a sequential version as it is much easier to debug.
153+
```julia
154+
include("./pkg_processing.jl")
155+
156+
using ProgressMeter
157+
function sequential_histogram(path)
158+
h = Dict{Symbol, Int}()
159+
@showprogress for pkg_dir in sample_all_installed_pkgs(path)
160+
for jl_path in filter_jl(pkg_dir)
161+
syms = tokenize(jl_path)
162+
for s in syms
163+
v = get!(h, s, 0)
164+
h[s] += 1
165+
end
166+
end
167+
end
168+
h
169+
end
170+
path = joinpath(DEPOT_PATH[1], "packages") # usually the first entry
171+
@elapsed h = sequential_histogram(path)
172+
```
173+
174+
First we try to distribute over package folders.
175+
```julia
176+
using Distributed
177+
addprocs(8)
178+
179+
@everywhere begin
180+
using Pkg; Pkg.activate(@__DIR__)
181+
# we have to realize that the code that workers have access to functions we have defined
182+
include("./pkg_processing.jl")
183+
end
184+
185+
"""
186+
merge_with!(h1, h2)
187+
188+
Merges count dictionary `h2` into `h1` by adding the counts.
189+
"""
190+
function merge_with!(h1, h2)
191+
for s in keys(h2)
192+
get!(h1, s, 0)
193+
h1[s] += h2[s]
194+
end
195+
h1
196+
end
197+
198+
using ProgressMeter
199+
function distributed_histogram(path)
200+
r = @showprogress pmap(sample_all_installed_pkgs(path)) do pkg_dir
201+
h = Dict{Symbol, Int}()
202+
for jl_path in filter_jl(pkg_dir)
203+
syms = tokenize(jl_path)
204+
for s in syms
205+
v = get!(h, s, 0)
206+
h[s] += 1
207+
end
208+
end
209+
h
210+
end
211+
reduce(merge_with!, r)
212+
end
213+
path = joinpath(DEPOT_PATH[1], "packages")
214+
@elapsed h = distributed_histogram(path)
215+
```
216+
217+
Second we try to distribute over all files.
218+
```julia
219+
function distributed_histogram_naive(path)
220+
jl_files = reduce(vcat, filter_jl(pkg_dir) for pkg_dir in sample_all_installed_pkgs(path))
221+
r = @showprogress pmap(jl_files) do jl_path
222+
# r = @showprogress pmap(WorkerPool([2,3,4,5]), jl_files) do jl_path
223+
h = Dict{Symbol, Int}()
224+
syms = tokenize(jl_path)
225+
for s in syms
226+
v = get!(h, s, 0)
227+
h[s] += 1
228+
end
229+
h
230+
end
231+
reduce(merge_with!, r)
232+
end
233+
path = joinpath(DEPOT_PATH[1], "packages")
234+
@elapsed h = distributed_histogram_naive(path)
235+
```
236+
237+
**BONUS**: You can do some analysis with `DataFrames`
238+
```julia
239+
using DataFrames
240+
df = DataFrame(:sym => collect(keys(h)), :count => collect(values(h)));
241+
sort!(df, :count, rev=true);
242+
df[1:50,:]
243+
```
244+
245+
```@raw html
246+
</p></details>
247+
```
248+
249+
## Threading
250+
251+
### Sum with threads
252+
253+
### Multithreaded file processing
254+
255+
## Task switching
256+
257+
### Only if we I come up with something interesting
258+
259+
260+
## Summary
261+
- when to use what
262+
263+
# Resources
264+
- parallel computing [course](https://juliacomputing.com/resources/webinars/) by Julia Computing
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
"""
2+
sample_all_installed_pkgs(path::AbstractString)
3+
4+
Returns root folders of all installed packages in the system. Package version is sampled.
5+
"""
6+
function sample_all_installed_pkgs(path::AbstractString)
7+
pkgs = readdir(path)
8+
# (rand(readdir(joinpath(path, p), join=true)) for p in pkgs) # sampling version
9+
(readdir(joinpath(path, p), join=true)[1] for p in pkgs) # deterministic version
10+
end
11+
12+
"""
13+
filter_jl(path)
14+
15+
Recursively walks the directory structure to obtain all `.jl` files.
16+
"""
17+
filter_jl(path) = reduce(vcat, joinpath.(rootpath, filter(endswith(".jl"), files)) for (rootpath, dirs, files) in walkdir(path))
18+
19+
"""
20+
tokenize(jl_path)
21+
22+
Parses a ".jl" file located at `jl_path` and extracts all symbols from the extracted AST.
23+
"""
24+
function tokenize(jl_path)
25+
_extract_symbols(x) = Symbol[]
26+
_extract_symbols(x::Symbol) = [x]
27+
function _extract_symbols(x::Expr)
28+
if length(x.args) > 0
29+
Symbol.(reduce(vcat, _extract_symbols(arg) for arg in x.args))
30+
else
31+
Symbol[]
32+
end
33+
end
34+
35+
scode = "begin\n" * read(jl_path, String) * "end\n"
36+
try
37+
code = Meta.parse(scode)
38+
_extract_symbols(code)
39+
catch e
40+
if ~isa(e, Meta.ParseError)
41+
rethrow(e)
42+
end
43+
Symbol[]
44+
end
45+
end

0 commit comments

Comments
 (0)